You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by "iulianrosca.ir" <iu...@gmail.com> on 2016/10/19 09:08:42 UTC
ActiveMQ network of brokers messages are not balanced
I am using *ActiveMQ 5.13.4* and I have set up a *network-of-brokers* with
two brokers *A* and *B* that are connected via a *networkConnector*. Each
broker has a consumer connected to it (*A' *and *B'*). *A'* and *B'* consume
messages from the same *distributed queue* (e.g. `requests-queue`). Each
consumer is a java application, represented by a Spring
*DefaultJmsListenerContainerFactory* having the setting *concurrency: 5-10*.
Basically, each consumer has max 10 threads that can consume simultaneously
messages from the queue.
In broker configuration (`activemq.xml`) I have set the prefetch size to 1,
assuming that there are not many messages and they need a lot of processing
time.
```
<policyEntry queue=">" producerFlowControl="true" optimizedDispatch="true"
queuePrefetch="1" enableAudit="false">
```
The *desired behavior* is when I send 100 messages to `A` it starts
processing 10 messages simultaneously (one for each consumer thread) and
forwards another 10 to `B`. When `B` finishes processing the first 10
messages, it would get an extra 10. This would continue until all the
messages are processed.
The *actual behavior* is that when I send 100 messages to `A`, it sends 10
messages, one per each thread, to the local consumer `A'` and forwards the
remaining 90 to `B'`. The consumers on `A'` finish processing the messages
and they wait while 90 messages get processed on broker `B` by its
consumers.
*Question:* How can I configure the network in order to balance messages
based on consumer availability?
The full `activemq.xml` configuration for one of the brokers is the
following. The other one has the same configuration only a different name
and IP in the networkConnector.
/ <beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd">
<bean
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<value>file:${activemq.conf}/credentials.properties</value>
</property>
</bean>
<bean id="logQuery"
class="io.fabric8.insight.log.log4j.Log4jLogQuery"
lazy-init="false" scope="singleton"
init-method="start" destroy-method="stop">
</bean>
<broker xmlns="http://activemq.apache.org/schema/core"
brokerName="broker-jms-tux-01-qa-gnd"
persistent="false"
dataDirectory="${activemq.data}"
cacheTempDestinations="false"
advisorySupport="true"
useJmx="true">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" producerFlowControl="true">
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy
limit="1000"/>
</pendingMessageLimitStrategy>
<pendingSubscriberPolicy>
<vmCursor/>
</pendingSubscriberPolicy>
</policyEntry>
<policyEntry queue=">" producerFlowControl="true"
optimizedDispatch="true" queuePrefetch="1" enableAudit="false">
<networkBridgeFilterFactory>
<conditionalNetworkBridgeFilterFactory
replayWhenNoConsumers="true"/>
</networkBridgeFilterFactory>
<pendingQueuePolicy>
<vmQueueCursor/>
</pendingQueuePolicy>
<deadLetterStrategy>
<sharedDeadLetterStrategy
expiration="300000"/>
</deadLetterStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<managementContext>
<managementContext createConnector="true"
connectorPort="1099"/>
</managementContext>
<networkConnectors>
<networkConnector
name="broker-jms-tux-01-qa-gnd->broker-jms-tux-02-qa-gnd"
uri="static:(tcp://10.83.16.22:61616)"
conduitSubscriptions="false"
dynamicOnly="true"
networkTTL="2"
alwaysSyncSend="true"
decreaseNetworkConsumerPriority="true"
duplex="false"/>
</networkConnectors>
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage percentOfJvmHeap="80"/>
</memoryUsage>
<storeUsage>
<storeUsage limit="10 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="4 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
<transportConnectors>
<transportConnector name="openwire"
uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=3200000000"/>
</transportConnectors>
<shutdownHooks>
<bean xmlns="http://www.springframework.org/schema/beans"
class="org.apache.activemq.hooks.SpringContextHook"/>
</shutdownHooks>
</broker>
<import resource="jetty.xml"/>
</beans>/
Thanks you in advance!
--
View this message in context: http://activemq.2283324.n4.nabble.com/ActiveMQ-network-of-brokers-messages-are-not-balanced-tp4718074.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.