You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by Chris Hofstaedter <ch...@nmwco.com> on 2006/11/03 21:30:24 UTC

Missing destination queue after broker restarted

The following applies to 4.0.1, 4.0.2, and head.

My topology is pretty simple in that I'm trying to set up a queue in
which the producers all use embedded brokers and the consumer does not.


I've done my test both collocated on the same machine and across the
net.  My test only includes a single producer at this point.
"Log4Mobility" is the queue name.

Upon initial startup, everything works well.  Specifically, on initial
startup, the debug log on the broker indicates (I've replaced what I
believe to be non pertinent info with "..." for readability):

INFO  TransportConnector - Connector default Started
INFO  BrokerService - ActiveMQ JMS Message Broker (...) started
DEBUG TcpTransport - TCP consumer thread starting
DEBUG WireFormatNegotiator - Sending: WireFormatInfo ...
DEBUG WireFormatNegotiator - Received WireFormat: ...
DEBUG WireFormatNegotiator - tcp:///10.47.20.123:1142 ...
DEBUG WireFormatNegotiator - tcp:///10.47.20.123:1142 ...
DEBUG AbstractRegion - Adding destination:
topic://ActiveMQ.Advisory.Connection
DEBUG AbstractRegion - Adding destination:
topic://ActiveMQ.Advisory.Topic
DEBUG AbstractRegion - Adding destination:
topic://ActiveMQ.Advisory.Consumer.Queue.Log4Mobility
DEBUG WireFormatNegotiator - Sending: WireFormatInfo ...
DEBUG TcpTransport - TCP consumer thread starting
DEBUG WireFormatNegotiator - Received WireFormat: ...
DEBUG WireFormatNegotiator - tcp:///10.47.20.123:1144 ...
DEBUG WireFormatNegotiator - tcp:///10.47.20.123:1144 ...
DEBUG AbstractRegion - Adding destination: queue://Log4Mobility

Then, while leaving the consumer and producer running, I shut down the
broker.  Upon subsequent startup of the broker, the debug log on the
broker indicates:

INFO  TransportConnector - Connector default Started
INFO  BrokerService - ActiveMQ JMS Message Broker (...) started
DEBUG TcpTransport - TCP consumer thread starting
DEBUG WireFormatNegotiator - Sending: WireFormatInfo ...
DEBUG WireFormatNegotiator - Received WireFormat: ...
DEBUG WireFormatNegotiator - tcp:///10.47.20.123:1165 ...
DEBUG WireFormatNegotiator - tcp:///10.47.20.123:1165 ...
DEBUG AbstractRegion - Adding destination:
topic://ActiveMQ.Advisory.Connection
DEBUG AbstractRegion - Adding destination:
topic://ActiveMQ.Advisory.Topic
DEBUG AbstractRegion - Adding destination:
topic://ActiveMQ.Advisory.Consumer.Queue.Log4Mobility
DEBUG WireFormatNegotiator - Sending: WireFormatInfo ...
DEBUG TcpTransport - TCP consumer thread starting
DEBUG WireFormatNegotiator - Received WireFormat: ...
DEBUG WireFormatNegotiator - tcp:///10.47.20.123:1166 ...
DEBUG WireFormatNegotiator - tcp:///10.47.20.123:1166 ...

It's virtually the same except that the message about actually adding
the queue is notably missing.  I've also confirmed through bstat that
the queue is present before the restart but not present after the
restart.

At this point, no messages flow.  Oddly, the producer thinks it's
forwarding the messages over the bridge as indicated by its debug log
from the point at which it reestablishes its connection:

DEBUG FailoverTransport - Connection established
INFO  DemandForwardingBridge - Outbound transport to Log4Mobility
resumed
INFO  DemandForwardingBridge - Network connection between
vm://localhost#0 and tcp://10.47.20.123:61616(Log4Mobility) has been
established.
DEBUG DemandForwardingBridge - Ignoring sub ConsumerInfo {commandId = 4,
responseRequired = true, consumerId =
ID:Hofstaedter-1141-1162581700794-1:6:1:1, destination =
queue://Log4Mobility, prefetchSize = 1, maximumPendingMessageLimit = 0,
browser = false, dispatchAsync = false, selector = null, subcriptionName
= null, noLocal = false, exclusive = false, retroactive = false,
priority = 0, brokerPath = [ID:Hofstaedter-1158-1162581728012-1:0],
optimizedAcknowledge = false, noRangeAcks = false, additionalPredicate =
null} already subscribed to matching destination
DEBUG PrefetchSubscription - Prefetch limit.
DEBUG DemandForwardingBridge - Forwarding messages for static
destination: queue://Log4Mobility

The fourth log message in the set immediately above is interesting and
I'm unsure if it is related to the problem.  I think it may related to
that message b/c everything picks back up again if I restart the
producer.  

I've seen messages from others on this list about failures on reconnect
- but they are always accompanied by specific exceptions and stack
traces and I see none of those - so I don't think it's the same thing.
In fact, I'm fairly new to AMQ, so it could be as simple as my config or
client code.  The config I'm using is:

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:util="http://www.springframework.org/schema/util">
       
  <bean
class="org.springframework.beans.factory.config.PropertyPlaceholderConfi
gurer"/>
   
   <broker brokerName="Log4Mobility" 
           persistent="true" 
           useShutdownHook="false" 
           useJmx="true"
           xmlns="http://activemq.org/config/1.0">

      <managementContext>
         <managementContext connectorPort="1099"
jmxDomainName="org.apache.activemq"/>
      </managementContext>

      <persistenceAdapter>
         <journaledJDBC journalLogFiles="5"
dataDirectory="${activemq.home}/activemq-data"/>
      </persistenceAdapter>

      <transportConnectors>
         <transportConnector name="default"
uri=""tcp://10.47.20.123:61616"/>
      </transportConnectors>

   </broker>

</beans>

And finally, in case I'm performing some stupid code tricks, following
is my producer and consumer code to establish my connections:

Producer:
   m_broker = new BrokerService();
   m_broker.addConnector("vm://localhost");
   m_broker.setUseLoggingForShutdownErrors(false);
   m_broker.setUseShutdownHook(true);
   m_broker.setUseJmx(false);
   m_broker.setPersistent(m_persistent);
   DiscoveryNetworkConnector nc = new DiscoveryNetworkConnector();
   nc.setUri(new URI("static:" + m_url));
   nc.setFailover(true);
   nc.addStaticallyIncludedDestination(new ActiveMQQueue(m_queueName));
   nc.setUserName(m_userName);
   nc.setPassword(m_password);
   m_broker.addNetworkConnector(nc);
   m_broker.start();
   ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory(m_user, m_password, "vm://localhost");
   m_connection = factory.createConnection();
   m_connection.start();
   m_session = m_connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
   m_destination = m_session.createQueue(m_queueName);
   m_producer = m_session.createProducer(m_destination);
   if (m_persistent)
      m_producer.setDeliveryMode(DeliveryMode.PERSISTENT);
   else
      m_producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

Consumer:
   ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
   factory.setBrokerURL(m_url);
   factory.setUserName(m_userName);
   factory.setPassword(m_password);
   m_connection = factory.createConnection();
   m_session = m_connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
   m_destination = m_session.createQueue(m_queueName +
"?consumer.prefetchSize=1");
   m_consumer = m_session.createConsumer(m_destination);
   m_connection.setExceptionListener(this);
   m_connection.start();
   m_consumer.setMessageListener(this);

Thanks in advance for any help or insight anyone can provide.


RE: Missing destination queue after broker restarted

Posted by Chris Hofstaedter <ch...@nmwco.com>.
Ok.  Just found the cause.  

It's described in AMQ-776 (sorry I missed this one earlier when I was
trolling around).  I just needed to apply the
DemandForwardingBridgeSupport.patch.  

The subscriptions were not getting cleared on transport interrupted.
Now they are and things seem to be working well.

By the way, any expectation when this patch will be represented within a
stable release?


-----Original Message-----
From: Chris Hofstaedter [mailto:chrish@nmwco.com] 
Sent: Friday, November 03, 2006 3:30 PM
To: activemq-users@geronimo.apache.org
Subject: Missing destination queue after broker restarted

The following applies to 4.0.1, 4.0.2, and head.

My topology is pretty simple in that I'm trying to set up a queue in
which the producers all use embedded brokers and the consumer does not.


I've done my test both collocated on the same machine and across the
net.  My test only includes a single producer at this point.
"Log4Mobility" is the queue name.

Upon initial startup, everything works well.  Specifically, on initial
startup, the debug log on the broker indicates (I've replaced what I
believe to be non pertinent info with "..." for readability):

INFO  TransportConnector - Connector default Started
INFO  BrokerService - ActiveMQ JMS Message Broker (...) started
DEBUG TcpTransport - TCP consumer thread starting
DEBUG WireFormatNegotiator - Sending: WireFormatInfo ...
DEBUG WireFormatNegotiator - Received WireFormat: ...
DEBUG WireFormatNegotiator - tcp:///10.47.20.123:1142 ...
DEBUG WireFormatNegotiator - tcp:///10.47.20.123:1142 ...
DEBUG AbstractRegion - Adding destination:
topic://ActiveMQ.Advisory.Connection
DEBUG AbstractRegion - Adding destination:
topic://ActiveMQ.Advisory.Topic
DEBUG AbstractRegion - Adding destination:
topic://ActiveMQ.Advisory.Consumer.Queue.Log4Mobility
DEBUG WireFormatNegotiator - Sending: WireFormatInfo ...
DEBUG TcpTransport - TCP consumer thread starting
DEBUG WireFormatNegotiator - Received WireFormat: ...
DEBUG WireFormatNegotiator - tcp:///10.47.20.123:1144 ...
DEBUG WireFormatNegotiator - tcp:///10.47.20.123:1144 ...
DEBUG AbstractRegion - Adding destination: queue://Log4Mobility

Then, while leaving the consumer and producer running, I shut down the
broker.  Upon subsequent startup of the broker, the debug log on the
broker indicates:

INFO  TransportConnector - Connector default Started
INFO  BrokerService - ActiveMQ JMS Message Broker (...) started
DEBUG TcpTransport - TCP consumer thread starting
DEBUG WireFormatNegotiator - Sending: WireFormatInfo ...
DEBUG WireFormatNegotiator - Received WireFormat: ...
DEBUG WireFormatNegotiator - tcp:///10.47.20.123:1165 ...
DEBUG WireFormatNegotiator - tcp:///10.47.20.123:1165 ...
DEBUG AbstractRegion - Adding destination:
topic://ActiveMQ.Advisory.Connection
DEBUG AbstractRegion - Adding destination:
topic://ActiveMQ.Advisory.Topic
DEBUG AbstractRegion - Adding destination:
topic://ActiveMQ.Advisory.Consumer.Queue.Log4Mobility
DEBUG WireFormatNegotiator - Sending: WireFormatInfo ...
DEBUG TcpTransport - TCP consumer thread starting
DEBUG WireFormatNegotiator - Received WireFormat: ...
DEBUG WireFormatNegotiator - tcp:///10.47.20.123:1166 ...
DEBUG WireFormatNegotiator - tcp:///10.47.20.123:1166 ...

It's virtually the same except that the message about actually adding
the queue is notably missing.  I've also confirmed through bstat that
the queue is present before the restart but not present after the
restart.

At this point, no messages flow.  Oddly, the producer thinks it's
forwarding the messages over the bridge as indicated by its debug log
from the point at which it reestablishes its connection:

DEBUG FailoverTransport - Connection established
INFO  DemandForwardingBridge - Outbound transport to Log4Mobility
resumed
INFO  DemandForwardingBridge - Network connection between
vm://localhost#0 and tcp://10.47.20.123:61616(Log4Mobility) has been
established.
DEBUG DemandForwardingBridge - Ignoring sub ConsumerInfo {commandId = 4,
responseRequired = true, consumerId =
ID:Hofstaedter-1141-1162581700794-1:6:1:1, destination =
queue://Log4Mobility, prefetchSize = 1, maximumPendingMessageLimit = 0,
browser = false, dispatchAsync = false, selector = null, subcriptionName
= null, noLocal = false, exclusive = false, retroactive = false,
priority = 0, brokerPath = [ID:Hofstaedter-1158-1162581728012-1:0],
optimizedAcknowledge = false, noRangeAcks = false, additionalPredicate =
null} already subscribed to matching destination
DEBUG PrefetchSubscription - Prefetch limit.
DEBUG DemandForwardingBridge - Forwarding messages for static
destination: queue://Log4Mobility

The fourth log message in the set immediately above is interesting and
I'm unsure if it is related to the problem.  I think it may related to
that message b/c everything picks back up again if I restart the
producer.  

I've seen messages from others on this list about failures on reconnect
- but they are always accompanied by specific exceptions and stack
traces and I see none of those - so I don't think it's the same thing.
In fact, I'm fairly new to AMQ, so it could be as simple as my config or
client code.  The config I'm using is:

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:util="http://www.springframework.org/schema/util">
       
  <bean
class="org.springframework.beans.factory.config.PropertyPlaceholderConfi
gurer"/>
   
   <broker brokerName="Log4Mobility" 
           persistent="true" 
           useShutdownHook="false" 
           useJmx="true"
           xmlns="http://activemq.org/config/1.0">

      <managementContext>
         <managementContext connectorPort="1099"
jmxDomainName="org.apache.activemq"/>
      </managementContext>

      <persistenceAdapter>
         <journaledJDBC journalLogFiles="5"
dataDirectory="${activemq.home}/activemq-data"/>
      </persistenceAdapter>

      <transportConnectors>
         <transportConnector name="default"
uri=""tcp://10.47.20.123:61616"/>
      </transportConnectors>

   </broker>

</beans>

And finally, in case I'm performing some stupid code tricks, following
is my producer and consumer code to establish my connections:

Producer:
   m_broker = new BrokerService();
   m_broker.addConnector("vm://localhost");
   m_broker.setUseLoggingForShutdownErrors(false);
   m_broker.setUseShutdownHook(true);
   m_broker.setUseJmx(false);
   m_broker.setPersistent(m_persistent);
   DiscoveryNetworkConnector nc = new DiscoveryNetworkConnector();
   nc.setUri(new URI("static:" + m_url));
   nc.setFailover(true);
   nc.addStaticallyIncludedDestination(new ActiveMQQueue(m_queueName));
   nc.setUserName(m_userName);
   nc.setPassword(m_password);
   m_broker.addNetworkConnector(nc);
   m_broker.start();
   ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory(m_user, m_password, "vm://localhost");
   m_connection = factory.createConnection();
   m_connection.start();
   m_session = m_connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
   m_destination = m_session.createQueue(m_queueName);
   m_producer = m_session.createProducer(m_destination);
   if (m_persistent)
      m_producer.setDeliveryMode(DeliveryMode.PERSISTENT);
   else
      m_producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

Consumer:
   ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
   factory.setBrokerURL(m_url);
   factory.setUserName(m_userName);
   factory.setPassword(m_password);
   m_connection = factory.createConnection();
   m_session = m_connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
   m_destination = m_session.createQueue(m_queueName +
"?consumer.prefetchSize=1");
   m_consumer = m_session.createConsumer(m_destination);
   m_connection.setExceptionListener(this);
   m_connection.start();
   m_consumer.setMessageListener(this);

Thanks in advance for any help or insight anyone can provide.