You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by "iulianrosca.ir" <iu...@gmail.com> on 2016/10/19 09:08:42 UTC

ActiveMQ network of brokers messages are not balanced

I am using *ActiveMQ 5.13.4* and I have set up a *network-of-brokers* with
two brokers *A* and *B* that are connected via a *networkConnector*. Each
broker has a consumer connected to it (*A' *and *B'*). *A'* and *B'* consume
messages from the same *distributed queue* (e.g. `requests-queue`). Each
consumer is a java application, represented by a Spring
*DefaultJmsListenerContainerFactory* having the setting *concurrency: 5-10*.
Basically, each consumer has max 10 threads that can consume simultaneously
messages from the queue. 

In broker configuration (`activemq.xml`) I have set the prefetch size to 1,
assuming that there are not many messages and they need a lot of processing
time.


```
<policyEntry queue=">" producerFlowControl="true" optimizedDispatch="true"
queuePrefetch="1" enableAudit="false">
```

The *desired behavior* is when I send 100 messages to `A` it starts
processing 10 messages simultaneously (one for each consumer thread) and
forwards another 10 to `B`. When `B` finishes processing the first 10
messages, it would get an extra 10. This would continue until all the
messages are processed.

The *actual behavior* is that when I send 100 messages to `A`, it sends 10
messages, one per each thread, to the local consumer `A'` and forwards the
remaining 90 to `B'`. The consumers on `A'` finish processing the messages
and they wait while 90 messages get processed on broker `B` by its
consumers. 

*Question:* How can I configure the network in order to balance messages
based on consumer availability?

The full `activemq.xml` configuration for one of the brokers is the
following. The other one has the same configuration only a different name
and IP in the networkConnector.

 /   <beans
            xmlns="http://www.springframework.org/schema/beans"
            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
            xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
      http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd">
    
        
        <bean
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
            <property name="locations">
                <value>file:${activemq.conf}/credentials.properties</value>
            </property>
        </bean>
    
        
        <bean id="logQuery"
class="io.fabric8.insight.log.log4j.Log4jLogQuery"
              lazy-init="false" scope="singleton"
              init-method="start" destroy-method="stop">
        </bean>
    
        
        <broker xmlns="http://activemq.apache.org/schema/core"
                brokerName="broker-jms-tux-01-qa-gnd"
                persistent="false"
                dataDirectory="${activemq.data}"
                cacheTempDestinations="false"
                advisorySupport="true"
                useJmx="true">
    
            <destinationPolicy>
                <policyMap>
                    <policyEntries>
                        <policyEntry topic=">" producerFlowControl="true">
                            
                            <pendingMessageLimitStrategy>
                                <constantPendingMessageLimitStrategy
limit="1000"/>
                            </pendingMessageLimitStrategy>
                            <pendingSubscriberPolicy>
                                <vmCursor/>
                            </pendingSubscriberPolicy>
                        </policyEntry>
                        <policyEntry queue=">" producerFlowControl="true"
optimizedDispatch="true" queuePrefetch="1" enableAudit="false">
    						
                            <networkBridgeFilterFactory>
                                
                                <conditionalNetworkBridgeFilterFactory
replayWhenNoConsumers="true"/>
                            </networkBridgeFilterFactory>
                            <pendingQueuePolicy>
                                <vmQueueCursor/>
                            </pendingQueuePolicy>
                            <deadLetterStrategy>
                                <sharedDeadLetterStrategy
expiration="300000"/>
                            </deadLetterStrategy>
                        </policyEntry>
                    </policyEntries>
                </policyMap>
            </destinationPolicy>
    
    
            
            <managementContext>
                <managementContext createConnector="true"
connectorPort="1099"/>
            </managementContext>
    
            
            <networkConnectors>
                <networkConnector
                       
name="broker-jms-tux-01-qa-gnd->broker-jms-tux-02-qa-gnd"
                        uri="static:(tcp://10.83.16.22:61616)"
                        conduitSubscriptions="false"
                        dynamicOnly="true"
                        networkTTL="2"
                        alwaysSyncSend="true"
    					decreaseNetworkConsumerPriority="true"
                        duplex="false"/>
            </networkConnectors>
            
            <systemUsage>
                <systemUsage>
                    <memoryUsage>
                        <memoryUsage percentOfJvmHeap="80"/>
                    </memoryUsage>
                    <storeUsage>
                        <storeUsage limit="10 gb"/>
                    </storeUsage>
                    <tempUsage>
                        <tempUsage limit="4 gb"/>
                    </tempUsage>
                </systemUsage>
            </systemUsage>
    
            
            <transportConnectors>
                
                <transportConnector name="openwire"
uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=3200000000"/>
            </transportConnectors>
    
            
            <shutdownHooks>
                <bean xmlns="http://www.springframework.org/schema/beans"
class="org.apache.activemq.hooks.SpringContextHook"/>
            </shutdownHooks>
    
        </broker>
    
        
        <import resource="jetty.xml"/>
    
    </beans>/

Thanks you in advance!



--
View this message in context: http://activemq.2283324.n4.nabble.com/ActiveMQ-network-of-brokers-messages-are-not-balanced-tp4718074.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.