You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by Joe Smith <jo...@yahoo.com> on 2011/03/22 20:03:46 UTC

virtual topic with queue consumer on a broker cluster

Hi,

I'm encountering a problem in using VirtualTopics in a cluster of 3 brokers.  
Hope ActiveMQ experts can help.  I attached the configuration at the end.  I 
used most of the networkconnector and transportconnector option, hoping I got it 
right - please help.  Thank you.

OS redhad linux
java 1.6
activemq 5.4.2

3 broker cluster: A, B, C
1 producer, sending to topic VirtualTopic.testtopic
3 consumers subscribing to queue Consumer.group1.VirtualTopic.testtopic
1 consumer subscribing to topic VirtualTopic.testtopic
consumers connect to brokers using the failover:// - so they randomally connect 
to any one broker.  The transport connector has the update cluster client  and 
reblance options.

The consumer on the topic gets the correct number of messages (all of them).  
However, the consumers (on queue for load-balance) on a few occasions got the 
correct number of messages on the 1st batch (2000 msgs) after initial connect, 
but numbers always not add up on subsequent batches.

Problems observed:
1. 3 consumers (load-balanced) are not properly load-balanced.  The number of 
messages does not always add up to the number sent.  Sometimes total is below, 
sometimes it received way more that what's sent.  Sometimes a queue consumer 
would not get any message.

2. Producer test send batch of 2000 msgs.  In subsequent batches, the 3 
load-balanced consumers progressively received more msgs than 2000 each time.  
It's like msgs already received were sent again by the broker.

3. I check the jms destination name, the queue consumers sometimes got message 
from queue only,  sometimes topic only (which I assume it should not), and 
sometimes both queue AND topic - queue://Consumer.group1.VirtualTopic.testtopic, 
topic://VirtualTopic.testtopic

4. Log file shows many rejectet messages: Duplicate message add attempt rejected 
(on the queue destination).  I specified the excludedDestinaton and 
suppressDuplicateQueueSubscriptions to no avail.

5. Sometimes queue consomer got repeated messages after a disconnect/connect.  
The disconnect was after no message had been received.

At one point I used a separate transport conector for the network connector.  
Still had problems.  In addition, the consumer connections sometimes were failed 
over to the network connector port - not sure if that is the correct behavior.

Any help would be much appreciated.  Thanks, again.


Configuration applied:

1.         <destinationInterceptors>
            <virtualDestinationInterceptor>
                <virtualDestinations>
                        <virtualTopic name="VirtualTopic.>" />
                </virtualDestinations>
            </virtualDestinationInterceptor>
        </destinationInterceptors>

    I tried both with and without the prefix attribute.  No difference.


2.             <networkConnector 
uri="static:(tcp://host1:61616,tcp://host2:61616,tcp://host3:61616)"
                        conduitSubscriptions="false"
                        dynamicOnly="true"
                        networkTTL="3"
                        suppressDuplicateQueueSubscriptions="true"
                        >
                <excludedDestinations> 
                        <queue physicalName="Consumer.*.VirtualTopic.>"/>
                </excludedDestinations>
            </networkConnector>

  Tried various combination of values.

3.          <transportConnectors>
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616" 
updateClusterClients="true" rebalanceClusterClients="true" 
updateClusterClientsOnRemove="true"/>
        </transportConnectors>





broker xml config below

<!-- START SNIPPET: example -->
<beans
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:amq="http://activemq.apache.org/schema/core"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  http://activemq.apache.org/schema/core 
http://activemq.apache.org/schema/core/activemq-core.xsd">

    <!-- Allows us to use system properties as variables in this configuration 
file -->
    <bean 
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
             <value>file:${activemq.base}/conf/credentials.properties</value>
        </property>
    </bean>

    <!--
        The <broker> element is used to configure the ActiveMQ broker.
    -->
    
            useJmx="true"
            brokerName="d01-fus-arch01.stage.root"
            dataDirectory="${activemq.base}/data"
            destroyApplicationContextOnStop="true">

         <!--
                        <virtualTopic name="VirtualTopic.>" 
prefix="Consumer.*."/>
                        <virtualTopic name="VirtualTopic.>" />
        -->

        <destinationInterceptors>
            <virtualDestinationInterceptor>
                <virtualDestinations>
                        <virtualTopic name="VirtualTopic.>"  />
                </virtualDestinations>
            </virtualDestinationInterceptor>
        </destinationInterceptors>


        <!--
                        For better performances use VM cursor and small memory 
limit.
                        For more information, see:

            http://activemq.apache.org/message-cursors.html

            Also, if your  producer is "hanging", it's probably due to producer 
flow control.
            For more information, see:
            http://activemq.apache.org/producer-flow-control.html
        -->

        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" producerFlowControl="true" 
memoryLimit="1mb">
                   <pendingSubscriberPolicy>
                    <vmCursor />
                  </pendingSubscriberPolicy>
                </policyEntry>
                <policyEntry queue=">" producerFlowControl="true" 
memoryLimit="1mb">
                  <!-- Use VM cursor for better latency
                       For more information,  see:

                       http://activemq.apache.org/message-cursors.html

                  <pendingQueuePolicy>
                    <vmQueueCursor/>
                  </pendingQueuePolicy>
                  -->
                </policyEntry>
               </policyEntries>
            </policyMap>
        </destinationPolicy>



        <!--
            The managementContext is used to configure how ActiveMQ is exposed 
in
            JMX. By default, ActiveMQ uses the MBean server that is started by
            the JVM. For more information, see:

            http://activemq.apache.org/jmx.html
        -->
        <managementContext>
            <managementContext  createConnector="false"/>
        </managementContext>



        <networkConnectors>
            <networkConnector 
uri="static:(tcp://host1:61616,tcp://host2:61616,tcp://host3:61616)"
                        conduitSubscriptions="false"
                        dynamicOnly="true"
                         networkTTL="3"
                        suppressDuplicateQueueSubscriptions="true"
                        >
                <excludedDestinations> 
                        <queue physicalName="Consumer.*.VirtualTopic.>"/>
                </excludedDestinations>
            </networkConnector>
         </networkConnectors>


        <!--
            Configure message persistence for the broker. The default 
persistence
            mechanism is the KahaDB store (identified by the kahaDB tag).
            For more information, see:

            http://activemq.apache.org/persistence.html
        -->
        <persistenceAdapter>
            <kahaDB directory="${activemq.base}/data/kahadb"/>
         </persistenceAdapter>


          <!--
            The systemUsage controls the maximum amount of space the broker will
            use before slowing down producers. For more information, see:
            
            http://activemq.apache.org/producer-flow-control.html
             
        <systemUsage>
            <systemUsage>
                 <memoryUsage>
                    <memoryUsage limit="20 mb"/>
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="1 gb"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="100  mb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>
                -->

        <!--
            The transport connectors expose ActiveMQ over a given protocol to
            clients and other brokers. For more information, see:

            http://activemq.apache.org/configuring-transports.html
        -->
         <transportConnectors>
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616" 
updateClusterClients="true" rebalanceClusterClients="true" 
updateClusterClientsOnRemove="true"/>
        </transportConnectors>

    </broker>

    <!--
        Enable web consoles, REST and Ajax APIs and demos
        It also includes Camel (with its web console), see 
${ACTIVEMQ_HOME}/conf/camel.xml for more info

        Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
    -->
    <import resource="jetty.xml"/>

</beans>


      

Re: virtual topic with queue consumer on a broker cluster

Posted by Joe Smith <jo...@yahoo.com>.
Hi,

Made some progress tracking down the issue.  Am able to reproduce the problem.  
Whenever there is both a queue consumers and a topic consume subscribing to the 
same virtual topic, there is message distribution error for the queue 
consumers.  I had trials of sending 2k messages to the virtual topic, eventually 
the load-balanced queue consumers get out of sync.  Most often, the msg count 
get out of sync after 2 or 3 batches.  The topic consumer seems to always get 
the right number of msgs.

Occasionally, when a client re-connects - it started to receive extra msgs as 
well.

Broker default persistence is on.

E.g.: vitural topic is: VirtualTopic.testtopic
3 queue consumer listening on Consumer.group1.VirtualTopic.testtopic
1 topic consumer listening on VirtualTopic.testtopic

When there was only queue consumer(s) on the virtual topic, message counts were 
correct. When msgs volume increased, occasionally I saw an extra msg here and 
there (e.g. 12001 vs 12000) - yet the count for the redelivered flag was 0.

It's pretty consistent - whenever there is a virtual topic consumer, the problem 
occurred for the queue consumers.

Is any one aware of this problem?  What would be the right approach to 
report/fix this issue?

Thanks.





________________________________
From: Joe Smith <jo...@yahoo.com>
To: users@activemq.apache.org
Sent: Tue, March 22, 2011 3:03:46 PM
Subject: virtual topic with queue consumer on a broker cluster

Hi,

I'm encountering a problem in using VirtualTopics in a cluster of 3 brokers.  
Hope ActiveMQ experts can help.  I attached the configuration at the end.  I 
used most of the networkconnector and transportconnector option, hoping I got it 

right - please help.  Thank you.

OS redhad linux
java 1.6
activemq 5.4.2

3 broker cluster: A, B, C
1 producer, sending to topic VirtualTopic.testtopic
3 consumers subscribing to queue Consumer.group1.VirtualTopic.testtopic
1 consumer subscribing to topic VirtualTopic.testtopic
consumers connect to brokers using the failover:// - so they randomally connect 
to any one broker.  The transport connector has the update cluster client  and 
reblance options.

The consumer on the topic gets the correct number of messages (all of them).  
However, the consumers (on queue for load-balance) on a few occasions got the 
correct number of messages on the 1st batch (2000 msgs) after initial connect, 
but numbers always not add up on subsequent batches.

Problems observed:
1. 3 consumers (load-balanced) are not properly load-balanced.  The number of 
messages does not always add up to the number sent.  Sometimes total is below, 
sometimes it received way more that what's sent.  Sometimes a queue consumer 
would not get any message.

2. Producer test send batch of 2000 msgs.  In subsequent batches, the 3 
load-balanced consumers progressively received more msgs than 2000 each time.  
It's like msgs already received were sent again by the broker.

3. I check the jms destination name, the queue consumers sometimes got message 
from queue only,  sometimes topic only (which I assume it should not), and 
sometimes both queue AND topic - queue://Consumer.group1.VirtualTopic.testtopic, 

topic://VirtualTopic.testtopic

4. Log file shows many rejectet messages: Duplicate message add attempt rejected 

(on the queue destination).  I specified the excludedDestinaton and 
suppressDuplicateQueueSubscriptions to no avail.

5. Sometimes queue consomer got repeated messages after a disconnect/connect.  
The disconnect was after no message had been received.

At one point I used a separate transport conector for the network connector.  
Still had problems.  In addition, the consumer connections sometimes were failed 

over to the network connector port - not sure if that is the correct behavior.

Any help would be much appreciated.  Thanks, again.


Configuration applied:

1.         <destinationInterceptors>
            <virtualDestinationInterceptor>
                <virtualDestinations>
                        <virtualTopic name="VirtualTopic.>" />
                </virtualDestinations>
            </virtualDestinationInterceptor>
        </destinationInterceptors>

    I tried both with and without the prefix attribute.  No difference.


2.             <networkConnector 
uri="static:(tcp://host1:61616,tcp://host2:61616,tcp://host3:61616)"
                        conduitSubscriptions="false"
                        dynamicOnly="true"
                        networkTTL="3"
                        suppressDuplicateQueueSubscriptions="true"
                        >
                <excludedDestinations> 
                        <queue physicalName="Consumer.*.VirtualTopic.>"/>
                </excludedDestinations>
            </networkConnector>

  Tried various combination of values.

3.          <transportConnectors>
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616" 
updateClusterClients="true" rebalanceClusterClients="true" 
updateClusterClientsOnRemove="true"/>
        </transportConnectors>





broker xml config below

<!-- START SNIPPET: example -->
<beans
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:amq="http://activemq.apache.org/schema/core"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  http://activemq.apache.org/schema/core 
http://activemq.apache.org/schema/core/activemq-core.xsd">

    <!-- Allows us to use system properties as variables in this configuration 
file -->
    <bean 
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
             <value>file:${activemq.base}/conf/credentials.properties</value>
        </property>
    </bean>

    <!--
        The <broker> element is used to configure the ActiveMQ broker.
    -->
    
            useJmx="true"
            brokerName="d01-fus-arch01.stage.root"
            dataDirectory="${activemq.base}/data"
            destroyApplicationContextOnStop="true">

         <!--
                        <virtualTopic name="VirtualTopic.>" 
prefix="Consumer.*."/>
                        <virtualTopic name="VirtualTopic.>" />
        -->

        <destinationInterceptors>
            <virtualDestinationInterceptor>
                <virtualDestinations>
                        <virtualTopic name="VirtualTopic.>"  />
                </virtualDestinations>
            </virtualDestinationInterceptor>
        </destinationInterceptors>


        <!--
                        For better performances use VM cursor and small memory 
limit.
                        For more information, see:

            http://activemq.apache.org/message-cursors.html

            Also, if your  producer is "hanging", it's probably due to producer 
flow control.
            For more information, see:
            http://activemq.apache.org/producer-flow-control.html
        -->

        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" producerFlowControl="true" 
memoryLimit="1mb">
                   <pendingSubscriberPolicy>
                    <vmCursor />
                  </pendingSubscriberPolicy>
                </policyEntry>
                <policyEntry queue=">" producerFlowControl="true" 
memoryLimit="1mb">
                  <!-- Use VM cursor for better latency
                       For more information,  see:

                      http://activemq.apache.org/message-cursors.html

                  <pendingQueuePolicy>
                    <vmQueueCursor/>
                  </pendingQueuePolicy>
                  -->
                </policyEntry>
               </policyEntries>
            </policyMap>
        </destinationPolicy>



        <!--
            The managementContext is used to configure how ActiveMQ is exposed 
in
            JMX. By default, ActiveMQ uses the MBean server that is started by
            the JVM. For more information, see:

            http://activemq.apache.org/jmx.html
        -->
        <managementContext>
            <managementContext  createConnector="false"/>
        </managementContext>



        <networkConnectors>
            <networkConnector 
uri="static:(tcp://host1:61616,tcp://host2:61616,tcp://host3:61616)"
                        conduitSubscriptions="false"
                        dynamicOnly="true"
                         networkTTL="3"
                        suppressDuplicateQueueSubscriptions="true"
                        >
                <excludedDestinations> 
                        <queue physicalName="Consumer.*.VirtualTopic.>"/>
                </excludedDestinations>
            </networkConnector>
         </networkConnectors>


        <!--
            Configure message persistence for the broker. The default 
persistence
            mechanism is the KahaDB store (identified by the kahaDB tag).
            For more information, see:

            http://activemq.apache.org/persistence.html
        -->
        <persistenceAdapter>
            <kahaDB directory="${activemq.base}/data/kahadb"/>
         </persistenceAdapter>


          <!--
            The systemUsage controls the maximum amount of space the broker will
            use before slowing down producers. For more information, see:
            
            http://activemq.apache.org/producer-flow-control.html
            
        <systemUsage>
            <systemUsage>
                 <memoryUsage>
                    <memoryUsage limit="20 mb"/>
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="1 gb"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="100  mb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>
                -->

        <!--
            The transport connectors expose ActiveMQ over a given protocol to
            clients and other brokers. For more information, see:

            http://activemq.apache.org/configuring-transports.html
        -->
         <transportConnectors>
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616" 
updateClusterClients="true" rebalanceClusterClients="true" 
updateClusterClientsOnRemove="true"/>
        </transportConnectors>

    </broker>

    <!--
        Enable web consoles, REST and Ajax APIs and demos
        It also includes Camel (with its web console), see 
${ACTIVEMQ_HOME}/conf/camel.xml for more info

        Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
    -->
    <import resource="jetty.xml"/>

</beans>