You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by Joe Smith <jo...@yahoo.com> on 2011/03/22 20:03:46 UTC
virtual topic with queue consumer on a broker cluster
Hi,
I'm encountering a problem in using VirtualTopics in a cluster of 3 brokers.
Hope ActiveMQ experts can help. I attached the configuration at the end. I
used most of the networkconnector and transportconnector option, hoping I got it
right - please help. Thank you.
OS redhad linux
java 1.6
activemq 5.4.2
3 broker cluster: A, B, C
1 producer, sending to topic VirtualTopic.testtopic
3 consumers subscribing to queue Consumer.group1.VirtualTopic.testtopic
1 consumer subscribing to topic VirtualTopic.testtopic
consumers connect to brokers using the failover:// - so they randomally connect
to any one broker. The transport connector has the update cluster client and
reblance options.
The consumer on the topic gets the correct number of messages (all of them).
However, the consumers (on queue for load-balance) on a few occasions got the
correct number of messages on the 1st batch (2000 msgs) after initial connect,
but numbers always not add up on subsequent batches.
Problems observed:
1. 3 consumers (load-balanced) are not properly load-balanced. The number of
messages does not always add up to the number sent. Sometimes total is below,
sometimes it received way more that what's sent. Sometimes a queue consumer
would not get any message.
2. Producer test send batch of 2000 msgs. In subsequent batches, the 3
load-balanced consumers progressively received more msgs than 2000 each time.
It's like msgs already received were sent again by the broker.
3. I check the jms destination name, the queue consumers sometimes got message
from queue only, sometimes topic only (which I assume it should not), and
sometimes both queue AND topic - queue://Consumer.group1.VirtualTopic.testtopic,
topic://VirtualTopic.testtopic
4. Log file shows many rejectet messages: Duplicate message add attempt rejected
(on the queue destination). I specified the excludedDestinaton and
suppressDuplicateQueueSubscriptions to no avail.
5. Sometimes queue consomer got repeated messages after a disconnect/connect.
The disconnect was after no message had been received.
At one point I used a separate transport conector for the network connector.
Still had problems. In addition, the consumer connections sometimes were failed
over to the network connector port - not sure if that is the correct behavior.
Any help would be much appreciated. Thanks, again.
Configuration applied:
1. <destinationInterceptors>
<virtualDestinationInterceptor>
<virtualDestinations>
<virtualTopic name="VirtualTopic.>" />
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>
I tried both with and without the prefix attribute. No difference.
2. <networkConnector
uri="static:(tcp://host1:61616,tcp://host2:61616,tcp://host3:61616)"
conduitSubscriptions="false"
dynamicOnly="true"
networkTTL="3"
suppressDuplicateQueueSubscriptions="true"
>
<excludedDestinations>
<queue physicalName="Consumer.*.VirtualTopic.>"/>
</excludedDestinations>
</networkConnector>
Tried various combination of values.
3. <transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616"
updateClusterClients="true" rebalanceClusterClients="true"
updateClusterClientsOnRemove="true"/>
</transportConnectors>
broker xml config below
<!-- START SNIPPET: example -->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd">
<!-- Allows us to use system properties as variables in this configuration
file -->
<bean
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<value>file:${activemq.base}/conf/credentials.properties</value>
</property>
</bean>
<!--
The <broker> element is used to configure the ActiveMQ broker.
-->
useJmx="true"
brokerName="d01-fus-arch01.stage.root"
dataDirectory="${activemq.base}/data"
destroyApplicationContextOnStop="true">
<!--
<virtualTopic name="VirtualTopic.>"
prefix="Consumer.*."/>
<virtualTopic name="VirtualTopic.>" />
-->
<destinationInterceptors>
<virtualDestinationInterceptor>
<virtualDestinations>
<virtualTopic name="VirtualTopic.>" />
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>
<!--
For better performances use VM cursor and small memory
limit.
For more information, see:
http://activemq.apache.org/message-cursors.html
Also, if your producer is "hanging", it's probably due to producer
flow control.
For more information, see:
http://activemq.apache.org/producer-flow-control.html
-->
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" producerFlowControl="true"
memoryLimit="1mb">
<pendingSubscriberPolicy>
<vmCursor />
</pendingSubscriberPolicy>
</policyEntry>
<policyEntry queue=">" producerFlowControl="true"
memoryLimit="1mb">
<!-- Use VM cursor for better latency
For more information, see:
http://activemq.apache.org/message-cursors.html
<pendingQueuePolicy>
<vmQueueCursor/>
</pendingQueuePolicy>
-->
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<!--
The managementContext is used to configure how ActiveMQ is exposed
in
JMX. By default, ActiveMQ uses the MBean server that is started by
the JVM. For more information, see:
http://activemq.apache.org/jmx.html
-->
<managementContext>
<managementContext createConnector="false"/>
</managementContext>
<networkConnectors>
<networkConnector
uri="static:(tcp://host1:61616,tcp://host2:61616,tcp://host3:61616)"
conduitSubscriptions="false"
dynamicOnly="true"
networkTTL="3"
suppressDuplicateQueueSubscriptions="true"
>
<excludedDestinations>
<queue physicalName="Consumer.*.VirtualTopic.>"/>
</excludedDestinations>
</networkConnector>
</networkConnectors>
<!--
Configure message persistence for the broker. The default
persistence
mechanism is the KahaDB store (identified by the kahaDB tag).
For more information, see:
http://activemq.apache.org/persistence.html
-->
<persistenceAdapter>
<kahaDB directory="${activemq.base}/data/kahadb"/>
</persistenceAdapter>
<!--
The systemUsage controls the maximum amount of space the broker will
use before slowing down producers. For more information, see:
http://activemq.apache.org/producer-flow-control.html
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage limit="20 mb"/>
</memoryUsage>
<storeUsage>
<storeUsage limit="1 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="100 mb"/>
</tempUsage>
</systemUsage>
</systemUsage>
-->
<!--
The transport connectors expose ActiveMQ over a given protocol to
clients and other brokers. For more information, see:
http://activemq.apache.org/configuring-transports.html
-->
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616"
updateClusterClients="true" rebalanceClusterClients="true"
updateClusterClientsOnRemove="true"/>
</transportConnectors>
</broker>
<!--
Enable web consoles, REST and Ajax APIs and demos
It also includes Camel (with its web console), see
${ACTIVEMQ_HOME}/conf/camel.xml for more info
Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
-->
<import resource="jetty.xml"/>
</beans>
Re: virtual topic with queue consumer on a broker cluster
Posted by Joe Smith <jo...@yahoo.com>.
Hi,
Made some progress tracking down the issue. Am able to reproduce the problem.
Whenever there is both a queue consumers and a topic consume subscribing to the
same virtual topic, there is message distribution error for the queue
consumers. I had trials of sending 2k messages to the virtual topic, eventually
the load-balanced queue consumers get out of sync. Most often, the msg count
get out of sync after 2 or 3 batches. The topic consumer seems to always get
the right number of msgs.
Occasionally, when a client re-connects - it started to receive extra msgs as
well.
Broker default persistence is on.
E.g.: vitural topic is: VirtualTopic.testtopic
3 queue consumer listening on Consumer.group1.VirtualTopic.testtopic
1 topic consumer listening on VirtualTopic.testtopic
When there was only queue consumer(s) on the virtual topic, message counts were
correct. When msgs volume increased, occasionally I saw an extra msg here and
there (e.g. 12001 vs 12000) - yet the count for the redelivered flag was 0.
It's pretty consistent - whenever there is a virtual topic consumer, the problem
occurred for the queue consumers.
Is any one aware of this problem? What would be the right approach to
report/fix this issue?
Thanks.
________________________________
From: Joe Smith <jo...@yahoo.com>
To: users@activemq.apache.org
Sent: Tue, March 22, 2011 3:03:46 PM
Subject: virtual topic with queue consumer on a broker cluster
Hi,
I'm encountering a problem in using VirtualTopics in a cluster of 3 brokers.
Hope ActiveMQ experts can help. I attached the configuration at the end. I
used most of the networkconnector and transportconnector option, hoping I got it
right - please help. Thank you.
OS redhad linux
java 1.6
activemq 5.4.2
3 broker cluster: A, B, C
1 producer, sending to topic VirtualTopic.testtopic
3 consumers subscribing to queue Consumer.group1.VirtualTopic.testtopic
1 consumer subscribing to topic VirtualTopic.testtopic
consumers connect to brokers using the failover:// - so they randomally connect
to any one broker. The transport connector has the update cluster client and
reblance options.
The consumer on the topic gets the correct number of messages (all of them).
However, the consumers (on queue for load-balance) on a few occasions got the
correct number of messages on the 1st batch (2000 msgs) after initial connect,
but numbers always not add up on subsequent batches.
Problems observed:
1. 3 consumers (load-balanced) are not properly load-balanced. The number of
messages does not always add up to the number sent. Sometimes total is below,
sometimes it received way more that what's sent. Sometimes a queue consumer
would not get any message.
2. Producer test send batch of 2000 msgs. In subsequent batches, the 3
load-balanced consumers progressively received more msgs than 2000 each time.
It's like msgs already received were sent again by the broker.
3. I check the jms destination name, the queue consumers sometimes got message
from queue only, sometimes topic only (which I assume it should not), and
sometimes both queue AND topic - queue://Consumer.group1.VirtualTopic.testtopic,
topic://VirtualTopic.testtopic
4. Log file shows many rejectet messages: Duplicate message add attempt rejected
(on the queue destination). I specified the excludedDestinaton and
suppressDuplicateQueueSubscriptions to no avail.
5. Sometimes queue consomer got repeated messages after a disconnect/connect.
The disconnect was after no message had been received.
At one point I used a separate transport conector for the network connector.
Still had problems. In addition, the consumer connections sometimes were failed
over to the network connector port - not sure if that is the correct behavior.
Any help would be much appreciated. Thanks, again.
Configuration applied:
1. <destinationInterceptors>
<virtualDestinationInterceptor>
<virtualDestinations>
<virtualTopic name="VirtualTopic.>" />
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>
I tried both with and without the prefix attribute. No difference.
2. <networkConnector
uri="static:(tcp://host1:61616,tcp://host2:61616,tcp://host3:61616)"
conduitSubscriptions="false"
dynamicOnly="true"
networkTTL="3"
suppressDuplicateQueueSubscriptions="true"
>
<excludedDestinations>
<queue physicalName="Consumer.*.VirtualTopic.>"/>
</excludedDestinations>
</networkConnector>
Tried various combination of values.
3. <transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616"
updateClusterClients="true" rebalanceClusterClients="true"
updateClusterClientsOnRemove="true"/>
</transportConnectors>
broker xml config below
<!-- START SNIPPET: example -->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd">
<!-- Allows us to use system properties as variables in this configuration
file -->
<bean
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<value>file:${activemq.base}/conf/credentials.properties</value>
</property>
</bean>
<!--
The <broker> element is used to configure the ActiveMQ broker.
-->
useJmx="true"
brokerName="d01-fus-arch01.stage.root"
dataDirectory="${activemq.base}/data"
destroyApplicationContextOnStop="true">
<!--
<virtualTopic name="VirtualTopic.>"
prefix="Consumer.*."/>
<virtualTopic name="VirtualTopic.>" />
-->
<destinationInterceptors>
<virtualDestinationInterceptor>
<virtualDestinations>
<virtualTopic name="VirtualTopic.>" />
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>
<!--
For better performances use VM cursor and small memory
limit.
For more information, see:
http://activemq.apache.org/message-cursors.html
Also, if your producer is "hanging", it's probably due to producer
flow control.
For more information, see:
http://activemq.apache.org/producer-flow-control.html
-->
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" producerFlowControl="true"
memoryLimit="1mb">
<pendingSubscriberPolicy>
<vmCursor />
</pendingSubscriberPolicy>
</policyEntry>
<policyEntry queue=">" producerFlowControl="true"
memoryLimit="1mb">
<!-- Use VM cursor for better latency
For more information, see:
http://activemq.apache.org/message-cursors.html
<pendingQueuePolicy>
<vmQueueCursor/>
</pendingQueuePolicy>
-->
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<!--
The managementContext is used to configure how ActiveMQ is exposed
in
JMX. By default, ActiveMQ uses the MBean server that is started by
the JVM. For more information, see:
http://activemq.apache.org/jmx.html
-->
<managementContext>
<managementContext createConnector="false"/>
</managementContext>
<networkConnectors>
<networkConnector
uri="static:(tcp://host1:61616,tcp://host2:61616,tcp://host3:61616)"
conduitSubscriptions="false"
dynamicOnly="true"
networkTTL="3"
suppressDuplicateQueueSubscriptions="true"
>
<excludedDestinations>
<queue physicalName="Consumer.*.VirtualTopic.>"/>
</excludedDestinations>
</networkConnector>
</networkConnectors>
<!--
Configure message persistence for the broker. The default
persistence
mechanism is the KahaDB store (identified by the kahaDB tag).
For more information, see:
http://activemq.apache.org/persistence.html
-->
<persistenceAdapter>
<kahaDB directory="${activemq.base}/data/kahadb"/>
</persistenceAdapter>
<!--
The systemUsage controls the maximum amount of space the broker will
use before slowing down producers. For more information, see:
http://activemq.apache.org/producer-flow-control.html
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage limit="20 mb"/>
</memoryUsage>
<storeUsage>
<storeUsage limit="1 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="100 mb"/>
</tempUsage>
</systemUsage>
</systemUsage>
-->
<!--
The transport connectors expose ActiveMQ over a given protocol to
clients and other brokers. For more information, see:
http://activemq.apache.org/configuring-transports.html
-->
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616"
updateClusterClients="true" rebalanceClusterClients="true"
updateClusterClientsOnRemove="true"/>
</transportConnectors>
</broker>
<!--
Enable web consoles, REST and Ajax APIs and demos
It also includes Camel (with its web console), see
${ACTIVEMQ_HOME}/conf/camel.xml for more info
Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
-->
<import resource="jetty.xml"/>
</beans>