You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by Matt Henkel <gu...@gmail.com> on 2012/12/15 01:32:33 UTC

NetworkConnector and VirtualTopic delivery issues

We've got a cluster of two brokers (5.7.0) running (same 
configuration--puppet deployed--on both boxes except for the brokerName) 
in our production environment and most of the VirtualTopics and queues 
appear to be fine, however one broker (amq00) is having problems.

We have the VirtualTopic.nonBlockingMessageFile which has a few 
Consumer.*.VirtualTopic.nonBlockingMessageFile queues consuming from it. 
If I use the web admin and "Send To" the VirtualTopic or a consuming 
queue that message is delivered to the queue(s) and processed as 
expected. However when our producers try to produce to the topic 
VirtualTopic.nonBlockingMessageFile the topic will be created, however 
nothing is ever enqueued. This is all on amq00. On amq01 the queue is 
created and messages are enqueued, then delivered to the consuming 
queues. Our producers are using the failover protocol and do switch 
between the brokers as we start and stop them.

The other issue is that if we pin some consumers to a 
Consumer.foo.VirtualTopic.nonBlockingMessageFile queue on amq00 and turn 
off any consumers on the same queue on amq01 I can see messages being 
delivered to the networkConnector, however I never see them enqueued or 
processed through amq00 (we have a number of networkConnectors 
configured based on the consumer process priority so we actually exclude 
the VirtualTopic and explicitly include the queues).

I've let all the queues drain, deleted all the queues (even trashed the 
whole kahadb directory), and bounced the broker only to have this 
problem persist.  Any ideas what's going on? As mentioned before amq01 
is working fine, and we can't reproduce this in our test environment.

Here's our activemq.xml:

    <beans
       xmlns="http://www.springframework.org/schema/beans"
       xmlns:amq="http://activemq.apache.org/schema/core"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
       http://activemq.apache.org/schema/core
    http://activemq.apache.org/schema/core/activemq-core.xsd">

         <!-- Allows us to use system properties as variables in this
    configuration file -->
         <bean
    class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
             <property name="locations">
    <value>file:${activemq.conf}/credentials.properties</value>
             </property>
         </bean>

         <!--
             The <broker> element is used to configure the ActiveMQ broker.
         -->
         <broker xmlns="http://activemq.apache.org/schema/core"
    brokerName="amq00.lan" dataDirectory="${activemq.data}">

             <!--
                 For better performances use VM cursor and small memory
    limit.
                 For more information, see:

                 http://activemq.apache.org/message-cursors.html

                 Also, if your producer is "hanging", it's probably due
    to producer flow control.
                 For more information, see:
                 http://activemq.apache.org/producer-flow-control.html
             -->

             <destinationPolicy>
                 <policyMap>
                   <policyEntries>
                     <policyEntry topic=">" producerFlowControl="false"
    memoryLimit="128mb">
                       <pendingSubscriberPolicy>
                         <vmCursor />
                       </pendingSubscriberPolicy>
                     </policyEntry>
                     <policyEntry queue=">" producerFlowControl="false"
    memoryLimit="128mb">
                       <!-- Use VM cursor for better latency
                            For more information, see:

    http://activemq.apache.org/message-cursors.html

                       <pendingQueuePolicy>
                         <vmQueueCursor />
                       </pendingQueuePolicy>
                       -->
                     </policyEntry>
                   </policyEntries>
                 </policyMap>
             </destinationPolicy>


             <!--
                 The managementContext is used to configure how ActiveMQ
    is exposed in
                 JMX. By default, ActiveMQ uses the MBean server that is
    started by
                 the JVM. For more information, see:

                 http://activemq.apache.org/jmx.html
             -->
             <managementContext>
                 <managementContext createConnector="true" />
             </managementContext>

             <!--
                 Configure message persistence for the broker. The
    default persistence
                 mechanism is the KahaDB store (identified by the kahaDB
    tag).
                 For more information, see:

                 http://activemq.apache.org/persistence.html
             -->
             <persistenceAdapter>
                            <!--
                 <kahaDB directory="${activemq.data}/kahadb"
                         ignoreMissingJournalfiles="true"
                         checkForCorruptJournalFiles="false"
                         checksumJournalFiles="false" />
                            -->
                            <mKahaDB
    directory="${activemq.base}/data/kahadb">
                                    <filteredPersistenceAdapters>
                                            <!-- kahaDB per destinations -->
                                            <filteredKahaDB
    perDestination="true" >
    <persistenceAdapter>
                                                            <kahaDB
    journalMaxFileLength="32mb" />
    </persistenceAdapter>
                                            </filteredKahaDB>
    </filteredPersistenceAdapters>
                            </mKahaDB>
             </persistenceAdapter>


               <systemUsage>
                 <systemUsage>
                     <memoryUsage>
                         <memoryUsage limit="2 gb" />
                     </memoryUsage>
                     <storeUsage>
                         <storeUsage limit="65 gb" />
                     </storeUsage>
                     <tempUsage>
                         <tempUsage limit="25 gb" />
                     </tempUsage>
                 </systemUsage>
             </systemUsage>

             <!--
                 networkConnectors added for clustering
             -->
             <networkConnectors>
                 <networkConnector
                     uri="multicast://default?group=nonblocking"
                     dynamicOnly="true"
                     networkTTL="3"
                     prefetchSize="1000"
                     decreaseNetworkConsumerPriority="true">

                     <excludedDestinations>
                         <!-- If you have the netcwork connector on both the
                              VirtualTopic and the underlying queues
    then your
                              consumers are likely to get duplicates. -->
                         <topic physicalName="VirtualTopic.>"/>
                         <!-- This queue has its own dedicated
    networkConnector -->
                         <queue
    physicalName="Consumer.Sorter.VirtualTopic.nonBlockingMessageFile" />
                         <queue
    physicalName="Consumer.Aggregator.VirtualTopic.BadForwarderBucket" />
                         <queue
    physicalName="Consumer.Aggregator.VirtualTopic.ConfProblemBucket" />
                         <queue
    physicalName="Consumer.Aggregator.VirtualTopic.GoodForwarderBucket" />
                         <queue
    physicalName="Consumer.Aggregator.VirtualTopic.NoProblemBucket" />
                         <queue
    physicalName="Consumer.Aggregator.VirtualTopic.SuspiciousBucket" />
                         <queue
    physicalName="Consumer.Engine.VirtualTopic.HbaseLoader" />
                     </excludedDestinations>
                 </networkConnector>
                 <networkConnector
                     name="NC_Sorter_nonBlockingMessageFile"
                     uri="multicast://default?group=nonblocking"
                     dynamicOnly="true"
                     networkTTL="3"
                     prefetchSize="10"
                     decreaseNetworkConsumerPriority="true">

                     <dynamicallyIncludedDestinations>
                         <queue
    physicalName="Consumer.Sorter.VirtualTopic.nonBlockingMessageFile" />
                     </dynamicallyIncludedDestinations>
                 </networkConnector>
                 <networkConnector
                     name="NC_Aggregator_Buckets"
                     uri="multicast://default?group=nonblocking"
                     dynamicOnly="true"
                     networkTTL="3"
                     prefetchSize="1000"
                     decreaseNetworkConsumerPriority="true">

                     <dynamicallyIncludedDestinations>
                         <queue
    physicalName="Consumer.Aggregator.VirtualTopic.BadForwarderBucket" />
                         <queue
    physicalName="Consumer.Aggregator.VirtualTopic.ConfProblemBucket" />
                         <queue
    physicalName="Consumer.Aggregator.VirtualTopic.GoodForwarderBucket" />
                         <queue
    physicalName="Consumer.Aggregator.VirtualTopic.NoProblemBucket" />
                         <queue
    physicalName="Consumer.Aggregator.VirtualTopic.SuspiciousBucket" />
                     </dynamicallyIncludedDestinations>
                 </networkConnector>
                 <networkConnector
                     name="NC_HbaseLoader"
                     uri="multicast://default?group=nonblocking"
                     dynamicOnly="true"
                     networkTTL="3"
                     prefetchSize="10000"
                     decreaseNetworkConsumerPriority="true">

                     <dynamicallyIncludedDestinations>
                         <queue
    physicalName="Consumer.Engine.VirtualTopic.HbaseLoader" />
                     </dynamicallyIncludedDestinations>
                 </networkConnector>
             </networkConnectors>


             <!--
                 The transport connectors expose ActiveMQ over a given
    protocol to
                 clients and other brokers. For more information, see:

                 http://activemq.apache.org/configuring-transports.html
             -->
             <transportConnectors>
                 <transportConnector
                     name="openwire"
                     uri="tcp://0.0.0.0:61616"
    discoveryUri="multicast://default?group=nonblocking"
                     updateClusterClients="true"
                     rebalanceClusterClients="true"
                     updateClusterClientsOnRemove="true" />
                 <transportConnector
                     name="stomp"
    uri="stomp://0.0.0.0:61613?transport.closeAsync=false"
    discoveryUri="multicast://default?group=nonblocking"
                     updateClusterClients="true"
                     rebalanceClusterClients="true"
                     updateClusterClientsOnRemove="true" />
             </transportConnectors>

             <plugins>
                 <discardingDLQBrokerPlugin dropAll="true"
    dropTemporaryTopics="true" dropTemporaryQueues="true" />
             </plugins>

         </broker>

         <!--
             Enable web consoles, REST and Ajax APIs and demos

             Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
         -->
         <import resource="jetty.xml" />

    </beans>


Any insight or suggestions as to what might be going on, or how to 
resolve it, would be greatly appreciated.

~/Matt

Re: NetworkConnector and VirtualTopic delivery issues

Posted by Dejan Bosanac <de...@nighttale.net>.
Hi Matt,

I don't see any reason why virtual topics would not work when sending
messages using standard producer and work when sent though web
console. It's a complex environment you have, so I'd suggest you
trying to narrow it down if possible.

Try seeing what's happening to the destination in some of those cases,
by turning debug logging and watching JMX parameters on the
destinations and subscriptions.

Also, if possible try to isolate the broker.

I hope this helps a bit.


Regards
--
Dejan Bosanac
----------------------
Red Hat, Inc.
FuseSource is now part of Red Hat
dbosanac@redhat.com
Twitter: @dejanb
Blog: http://sensatic.net
ActiveMQ in Action: http://www.manning.com/snyder/


On Sat, Dec 15, 2012 at 1:32 AM, Matt Henkel <gu...@gmail.com> wrote:
> We've got a cluster of two brokers (5.7.0) running (same
> configuration--puppet deployed--on both boxes except for the brokerName) in
> our production environment and most of the VirtualTopics and queues appear
> to be fine, however one broker (amq00) is having problems.
>
> We have the VirtualTopic.nonBlockingMessageFile which has a few
> Consumer.*.VirtualTopic.nonBlockingMessageFile queues consuming from it. If
> I use the web admin and "Send To" the VirtualTopic or a consuming queue that
> message is delivered to the queue(s) and processed as expected. However when
> our producers try to produce to the topic
> VirtualTopic.nonBlockingMessageFile the topic will be created, however
> nothing is ever enqueued. This is all on amq00. On amq01 the queue is
> created and messages are enqueued, then delivered to the consuming queues.
> Our producers are using the failover protocol and do switch between the
> brokers as we start and stop them.
>
> The other issue is that if we pin some consumers to a
> Consumer.foo.VirtualTopic.nonBlockingMessageFile queue on amq00 and turn off
> any consumers on the same queue on amq01 I can see messages being delivered
> to the networkConnector, however I never see them enqueued or processed
> through amq00 (we have a number of networkConnectors configured based on the
> consumer process priority so we actually exclude the VirtualTopic and
> explicitly include the queues).
>
> I've let all the queues drain, deleted all the queues (even trashed the
> whole kahadb directory), and bounced the broker only to have this problem
> persist.  Any ideas what's going on? As mentioned before amq01 is working
> fine, and we can't reproduce this in our test environment.
>
> Here's our activemq.xml:
>
>    <beans
>       xmlns="http://www.springframework.org/schema/beans"
>       xmlns:amq="http://activemq.apache.org/schema/core"
>       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>       xsi:schemaLocation="http://www.springframework.org/schema/beans
>    http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
>       http://activemq.apache.org/schema/core
>    http://activemq.apache.org/schema/core/activemq-core.xsd">
>
>         <!-- Allows us to use system properties as variables in this
>    configuration file -->
>         <bean
>
> class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
>             <property name="locations">
>    <value>file:${activemq.conf}/credentials.properties</value>
>             </property>
>         </bean>
>
>         <!--
>             The <broker> element is used to configure the ActiveMQ broker.
>         -->
>         <broker xmlns="http://activemq.apache.org/schema/core"
>    brokerName="amq00.lan" dataDirectory="${activemq.data}">
>
>             <!--
>                 For better performances use VM cursor and small memory
>    limit.
>                 For more information, see:
>
>                 http://activemq.apache.org/message-cursors.html
>
>                 Also, if your producer is "hanging", it's probably due
>    to producer flow control.
>                 For more information, see:
>                 http://activemq.apache.org/producer-flow-control.html
>             -->
>
>             <destinationPolicy>
>                 <policyMap>
>                   <policyEntries>
>                     <policyEntry topic=">" producerFlowControl="false"
>    memoryLimit="128mb">
>                       <pendingSubscriberPolicy>
>                         <vmCursor />
>                       </pendingSubscriberPolicy>
>                     </policyEntry>
>                     <policyEntry queue=">" producerFlowControl="false"
>    memoryLimit="128mb">
>                       <!-- Use VM cursor for better latency
>                            For more information, see:
>
>    http://activemq.apache.org/message-cursors.html
>
>                       <pendingQueuePolicy>
>                         <vmQueueCursor />
>                       </pendingQueuePolicy>
>                       -->
>                     </policyEntry>
>                   </policyEntries>
>                 </policyMap>
>             </destinationPolicy>
>
>
>             <!--
>                 The managementContext is used to configure how ActiveMQ
>    is exposed in
>                 JMX. By default, ActiveMQ uses the MBean server that is
>    started by
>                 the JVM. For more information, see:
>
>                 http://activemq.apache.org/jmx.html
>             -->
>             <managementContext>
>                 <managementContext createConnector="true" />
>             </managementContext>
>
>             <!--
>                 Configure message persistence for the broker. The
>    default persistence
>                 mechanism is the KahaDB store (identified by the kahaDB
>    tag).
>                 For more information, see:
>
>                 http://activemq.apache.org/persistence.html
>             -->
>             <persistenceAdapter>
>                            <!--
>                 <kahaDB directory="${activemq.data}/kahadb"
>                         ignoreMissingJournalfiles="true"
>                         checkForCorruptJournalFiles="false"
>                         checksumJournalFiles="false" />
>                            -->
>                            <mKahaDB
>    directory="${activemq.base}/data/kahadb">
>                                    <filteredPersistenceAdapters>
>                                            <!-- kahaDB per destinations -->
>                                            <filteredKahaDB
>    perDestination="true" >
>    <persistenceAdapter>
>                                                            <kahaDB
>    journalMaxFileLength="32mb" />
>    </persistenceAdapter>
>                                            </filteredKahaDB>
>    </filteredPersistenceAdapters>
>                            </mKahaDB>
>             </persistenceAdapter>
>
>
>               <systemUsage>
>                 <systemUsage>
>                     <memoryUsage>
>                         <memoryUsage limit="2 gb" />
>                     </memoryUsage>
>                     <storeUsage>
>                         <storeUsage limit="65 gb" />
>                     </storeUsage>
>                     <tempUsage>
>                         <tempUsage limit="25 gb" />
>                     </tempUsage>
>                 </systemUsage>
>             </systemUsage>
>
>             <!--
>                 networkConnectors added for clustering
>             -->
>             <networkConnectors>
>                 <networkConnector
>                     uri="multicast://default?group=nonblocking"
>                     dynamicOnly="true"
>                     networkTTL="3"
>                     prefetchSize="1000"
>                     decreaseNetworkConsumerPriority="true">
>
>                     <excludedDestinations>
>                         <!-- If you have the netcwork connector on both the
>                              VirtualTopic and the underlying queues
>    then your
>                              consumers are likely to get duplicates. -->
>                         <topic physicalName="VirtualTopic.>"/>
>                         <!-- This queue has its own dedicated
>    networkConnector -->
>                         <queue
>    physicalName="Consumer.Sorter.VirtualTopic.nonBlockingMessageFile" />
>                         <queue
>    physicalName="Consumer.Aggregator.VirtualTopic.BadForwarderBucket" />
>                         <queue
>    physicalName="Consumer.Aggregator.VirtualTopic.ConfProblemBucket" />
>                         <queue
>    physicalName="Consumer.Aggregator.VirtualTopic.GoodForwarderBucket" />
>                         <queue
>    physicalName="Consumer.Aggregator.VirtualTopic.NoProblemBucket" />
>                         <queue
>    physicalName="Consumer.Aggregator.VirtualTopic.SuspiciousBucket" />
>                         <queue
>    physicalName="Consumer.Engine.VirtualTopic.HbaseLoader" />
>                     </excludedDestinations>
>                 </networkConnector>
>                 <networkConnector
>                     name="NC_Sorter_nonBlockingMessageFile"
>                     uri="multicast://default?group=nonblocking"
>                     dynamicOnly="true"
>                     networkTTL="3"
>                     prefetchSize="10"
>                     decreaseNetworkConsumerPriority="true">
>
>                     <dynamicallyIncludedDestinations>
>                         <queue
>    physicalName="Consumer.Sorter.VirtualTopic.nonBlockingMessageFile" />
>                     </dynamicallyIncludedDestinations>
>                 </networkConnector>
>                 <networkConnector
>                     name="NC_Aggregator_Buckets"
>                     uri="multicast://default?group=nonblocking"
>                     dynamicOnly="true"
>                     networkTTL="3"
>                     prefetchSize="1000"
>                     decreaseNetworkConsumerPriority="true">
>
>                     <dynamicallyIncludedDestinations>
>                         <queue
>    physicalName="Consumer.Aggregator.VirtualTopic.BadForwarderBucket" />
>                         <queue
>    physicalName="Consumer.Aggregator.VirtualTopic.ConfProblemBucket" />
>                         <queue
>    physicalName="Consumer.Aggregator.VirtualTopic.GoodForwarderBucket" />
>                         <queue
>    physicalName="Consumer.Aggregator.VirtualTopic.NoProblemBucket" />
>                         <queue
>    physicalName="Consumer.Aggregator.VirtualTopic.SuspiciousBucket" />
>                     </dynamicallyIncludedDestinations>
>                 </networkConnector>
>                 <networkConnector
>                     name="NC_HbaseLoader"
>                     uri="multicast://default?group=nonblocking"
>                     dynamicOnly="true"
>                     networkTTL="3"
>                     prefetchSize="10000"
>                     decreaseNetworkConsumerPriority="true">
>
>                     <dynamicallyIncludedDestinations>
>                         <queue
>    physicalName="Consumer.Engine.VirtualTopic.HbaseLoader" />
>                     </dynamicallyIncludedDestinations>
>                 </networkConnector>
>             </networkConnectors>
>
>
>             <!--
>                 The transport connectors expose ActiveMQ over a given
>    protocol to
>                 clients and other brokers. For more information, see:
>
>                 http://activemq.apache.org/configuring-transports.html
>             -->
>             <transportConnectors>
>                 <transportConnector
>                     name="openwire"
>                     uri="tcp://0.0.0.0:61616"
>    discoveryUri="multicast://default?group=nonblocking"
>                     updateClusterClients="true"
>                     rebalanceClusterClients="true"
>                     updateClusterClientsOnRemove="true" />
>                 <transportConnector
>                     name="stomp"
>    uri="stomp://0.0.0.0:61613?transport.closeAsync=false"
>    discoveryUri="multicast://default?group=nonblocking"
>                     updateClusterClients="true"
>                     rebalanceClusterClients="true"
>                     updateClusterClientsOnRemove="true" />
>             </transportConnectors>
>
>             <plugins>
>                 <discardingDLQBrokerPlugin dropAll="true"
>    dropTemporaryTopics="true" dropTemporaryQueues="true" />
>             </plugins>
>
>         </broker>
>
>         <!--
>             Enable web consoles, REST and Ajax APIs and demos
>
>             Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
>         -->
>         <import resource="jetty.xml" />
>
>    </beans>
>
>
> Any insight or suggestions as to what might be going on, or how to resolve
> it, would be greatly appreciated.
>
> ~/Matt