You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by agentalpha <yo...@gmail.com> on 2015/03/17 17:09:25 UTC

Camel ActiveMQ In/OUT endpoint creates additional consumers on response queue which are more than maxConcurrentConsumers

Hi Everyone,

We are using Camel 2.13.2 and ActiveMQ 5.9.0.
The configuration of activemq broker is as follows:
	<bean id="activemq"
class="org.apache.activemq.camel.component.ActiveMQComponent">
		<property name="configuration" ref="jmsConfig" />
		<property name="transacted" value="false" />
		<property name="acceptMessagesWhileStopping" value="false" />
		<property name="cacheLevelName" value="CACHE_CONSUMER" />
	</bean>
	
	<bean id="jmsConnectionFactory"
class="org.apache.activemq.ActiveMQConnectionFactory">
		<property name="brokerURL" value="$[ren.brokerUrl]" />
		<property name="useAsyncSend" value="true" />
	</bean>

	<bean id="pooledConnectionFactory"
class="org.apache.activemq.pool.PooledConnectionFactory"
		init-method="start" destroy-method="stop">
		<property name="maxConnections" value="10" />
		<property name="maximumActiveSessionPerConnection" value="-1" />
		<property name="expiryTimeout" value="0" />
		<property name="idleTimeout" value="0" />
		<property name="connectionFactory" ref="jmsConnectionFactory" />
	</bean>
	
	<bean id="jmsConfig"
class="org.apache.camel.component.jms.JmsConfiguration">
		<property name="connectionFactory" ref="pooledConnectionFactory" />
		<property name="concurrentConsumers" value="15" />
		<property name="maxConcurrentConsumers" value="15" />
		<property name="maxMessagesPerTask" value="5" />
		<property name="idleTaskExecutionLimit" value="0" />
		<property name="idleConsumerLimit" value="0" />
	</bean>


As mentioned, we have concurrent and max concurrent consumers having value
15.

We have following route sample:
<route id="openApiRoute" trace="true" autoStartup="false"
			xmlns="http://camel.apache.org/schema/blueprint">
			<from uri="openAPI" />
			<onException>
				<exception>java.lang.Throwable</exception>
				<handled>
					<constant>true</constant>
				</handled>
				<bean ref="errorHandlerBean" />
				<convertBodyTo type="com.company.openapi.SoapMap" />
			</onException>
			<onException>
				<exception>java.lang.Exception</exception>
				<handled>
					<constant>true</constant>
				</handled>
				<bean ref="errorHandlerBean" />
				<convertBodyTo type="com.company.openapi.SoapMap" />
			</onException>
			<onException>
				<exception>org.apache.camel.ExchangeTimedOutException</exception>
				<redeliveryPolicy logRetryAttempted="true"
					retryAttemptedLogLevel="WARN"
				
maximumRedeliveries="{{ren.context.camel.openApiRoute.maximumRedeliveries}}"
					redeliveryDelay="{{ren.context.camel.openApiRoute.redeliveryDelay}}" />
				<handled>
					<simple>${header.requestType} != 'broadcast'</simple>
				</handled>
				<choice>
					<when>
						<simple>${header.requestType} != 'broadcast'</simple>
						<bean ref="errorHandlerBean" />
						<convertBodyTo type="com.company.openapi.SoapMap" />
					</when>
				</choice>
			</onException>
			
			<convertBodyTo type="java.util.Map" />
			<choice>
				<when>
					<simple>${header.requestType} == 'broadcast'</simple>
					<bean ref ="broadcasterBean"/>
					<recipientList parallelProcessing="true"
strategyRef="aggregatorStrategy" streaming="true" stopOnException="false"
prop:timeout="{{ren.context.camel.httpRoute.aggregatorTimeout}}">
							<header>recipientList</header>
					</recipientList>
					<process ref="broadcastResultProcessor" />
				</when>
				<when>
					<simple>${header.requestType} == 'forward'</simple>
					<bean ref="dynamicRouterBean" method="route" />
				</when>
				<otherwise>
					<bean ref="operationTypeNotSupported"
method="throwOperationTypeNotSupportedException" />
				</otherwise>
			</choice>
			<convertBodyTo type="com.company.openapi.SoapMap" />
</route>


What we are essentially doing, is fetching a request on CXF and forwarding
it to a jms endpoint using IN/OUT messaging. We use dynamic router for
performing this operation dynamically.
A sample dynamic url for this jms endpoint is :

activemq:queue:soapRequestQ2?exchangePattern=InOut&asyncConsumer=true&useMessageIDAsCorrelationID=false&requestTimeout=5000&transacted=false&replyTo=responseQ&replyToType=Exclusive

Now few problematic behaviors of automatic consumer creation on
responseQueue are:
1. Whenever the route starts and requests are sent, 15 consumers are created
on the response queue. After that no matter how much the load is, these 15
consumers are handling the requests properly (as per our tps expectations).
Now if we dynamically update the above sample jms url in our system (even
just the value of requestTimeout) and send another request, we see that the
reponseQ now has additional 15 (total of 30) consumers on it. And right
after this, most of the requests start failing for unable to map the
response to the request. 
The message is sent in the request queue, picked up by the client, the
response is sent, which we can see being enqueued adn dequeued in the
response queue (from activeMQ web console), the response is fetched by our
application but QueueReplyManager fails to map it to original request,
mentioning Response received for unknown correlationId xyz. And just after a
few seconds we see the exchangetimeout happening for the message with same
correlationid.
Is this because the new set of 15 consumers got created in different session
than the original one and hence they are not able to share the
correlationId?
Even though I have maxConcurrentConsumers set to 15 for that specific queue,
why is it still increasing beyond that value? (for every change in url, it
keeps on increasing by 15).

2. We have same behaviour when we use the same queue/endpoint url for single
request and in receipient list.
The single forward request registers its own 15 consumers and when we
perform a receipient list url, it creates additional 15 consumers on the
queues (again maxconcurrentconsumers is 15 only).

3. When use the dynamic router (routing to the same jms in/out url) in
multiple routes, we always have 15 consumers on the response queue.
But we use receipient list in multiple routes (routing to same set of jms
in/out urls), it creates set of 15 consumers per route (meaning if we have 3
routes it creates 45 consumers).
And again we start getting the same issue of response not being mapped to
the request for unknown correlation id.

Is there any book/blog from where I can understand how and why is this
happening? How does camel/activemq creates and manages consumers on queues,
specially when we are using the building blocks of dynamic router,
receipient list, activemq endpoint etc.

I have been trying to understand and find a correct solution for this since
couple of weeks now.
Please help me in this.
Thanks.

BR!
Yogesh





--
View this message in context: http://camel.465427.n5.nabble.com/Camel-ActiveMQ-In-OUT-endpoint-creates-additional-consumers-on-response-queue-which-are-more-than-mas-tp5764288.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Camel ActiveMQ In/OUT endpoint creates additional consumers on response queue which are more than maxConcurrentConsumers

Posted by agentalpha <yo...@gmail.com>.
Thank you for the quick reply Claus.

I have one more question regarding my issues.
Trying to dig some more in the camel activemq in/out messaging behaviour,
here are some more results that I needed help to understand.

I tried using the route the I have given above using two karaf instances
(kind of clustered environment).
The route reads a request from CXF endpoint and sends the request to IN/OUT
active endpoint, receives the response back and sends it back to the CXF
endpoint.

When I'm running only a single karaf instance (one cluster) then everything
works fine. I have 15 consumers created on the request and response queue
(concurrentConsumers and maxConcurrentConsumers are 15 in my case) in
separate unique session (maxACtiveSessionPerConnection is -1 and
maxConnections are 10).
The request is sent to the requestQueue, the response is fetched and mapped
to the request using correlationId (all of this is taken care by Camel
ACtiveMQ - no customized code from my side on top of it).

When I start the second karaf instance (cluster2), it creates its additional
set of 15 consumers (resulting to a total of 30) on the request and response
Queue. And this is where the problem starts.
After this when I send a request, sometimes, it is put in the request queue
(verified from ActiveMQ web console, I can see the message in the queue and
enqueue count is incremented by 1) but even though that queue has 30
consumers, no one picks up the message and it is moved to DLQ after
expiration.

Sometimes, if the message is processed by the consumer of the request queue,
it puts the response on the response queue but now I get response received
for unknown correlationId and right after the timeout, I see
ExchangeTimedOut exception for the same correlation id for which the
response was received above and QueueReplyMAnager was not able to map it to
the correct request.

I want to understand how this works internally. If there is any
book/guide/blog where I can read/understand this, it will be of a great help
to me.

So far my analysis is that, KAraf instance 1, creates 15 consumers each
having its own session as part of one connection. When I start Karaf 2, it
creates a second connection to the broker and thus having 15 concurrent
consumers each having same session as the 15 consumers of karaf 1 (verified
this from activemq webconsole - active consumers list on the queue). So each
session now has 2 consumers.

I'm using in memory persistent for ActiveMQ broker.

Even if this the case, ideally, any one of the consumer should be able to
consume the message no matter which Karaf instance's producer sent the
message to that queue. So all producers should be able to put the message
and any one of the consumers should be able to read the message process it
and also map the correlation id to the original request.
But this is not happening. Why is so? And what is the correct approach or
configuration to solve this problem?

Many Thanks.

BR!
Yogesh



--
View this message in context: http://camel.465427.n5.nabble.com/Camel-ActiveMQ-In-OUT-endpoint-creates-additional-consumers-on-response-queue-which-are-more-than-mas-tp5764288p5764344.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Camel ActiveMQ In/OUT endpoint creates additional consumers on response queue which are more than maxConcurrentConsumers

Posted by Claus Ibsen <cl...@gmail.com>.
Hi

When doing request/reply over JMS using camel-jms then Camel uses the
concurrentConsumers settings you may have configured on the component
/ endpoint.

So if you only want 1 consumer, then set that on the endpoint

Though I have logged a ticket to allow having separate options for
regular concurrent consumers vs for request/reply
https://issues.apache.org/jira/browse/CAMEL-8503

On Tue, Mar 17, 2015 at 5:09 PM, agentalpha <yo...@gmail.com> wrote:
> Hi Everyone,
>
> We are using Camel 2.13.2 and ActiveMQ 5.9.0.
> The configuration of activemq broker is as follows:
>         <bean id="activemq"
> class="org.apache.activemq.camel.component.ActiveMQComponent">
>                 <property name="configuration" ref="jmsConfig" />
>                 <property name="transacted" value="false" />
>                 <property name="acceptMessagesWhileStopping" value="false" />
>                 <property name="cacheLevelName" value="CACHE_CONSUMER" />
>         </bean>
>
>         <bean id="jmsConnectionFactory"
> class="org.apache.activemq.ActiveMQConnectionFactory">
>                 <property name="brokerURL" value="$[ren.brokerUrl]" />
>                 <property name="useAsyncSend" value="true" />
>         </bean>
>
>         <bean id="pooledConnectionFactory"
> class="org.apache.activemq.pool.PooledConnectionFactory"
>                 init-method="start" destroy-method="stop">
>                 <property name="maxConnections" value="10" />
>                 <property name="maximumActiveSessionPerConnection" value="-1" />
>                 <property name="expiryTimeout" value="0" />
>                 <property name="idleTimeout" value="0" />
>                 <property name="connectionFactory" ref="jmsConnectionFactory" />
>         </bean>
>
>         <bean id="jmsConfig"
> class="org.apache.camel.component.jms.JmsConfiguration">
>                 <property name="connectionFactory" ref="pooledConnectionFactory" />
>                 <property name="concurrentConsumers" value="15" />
>                 <property name="maxConcurrentConsumers" value="15" />
>                 <property name="maxMessagesPerTask" value="5" />
>                 <property name="idleTaskExecutionLimit" value="0" />
>                 <property name="idleConsumerLimit" value="0" />
>         </bean>
>
>
> As mentioned, we have concurrent and max concurrent consumers having value
> 15.
>
> We have following route sample:
> <route id="openApiRoute" trace="true" autoStartup="false"
>                         xmlns="http://camel.apache.org/schema/blueprint">
>                         <from uri="openAPI" />
>                         <onException>
>                                 <exception>java.lang.Throwable</exception>
>                                 <handled>
>                                         <constant>true</constant>
>                                 </handled>
>                                 <bean ref="errorHandlerBean" />
>                                 <convertBodyTo type="com.company.openapi.SoapMap" />
>                         </onException>
>                         <onException>
>                                 <exception>java.lang.Exception</exception>
>                                 <handled>
>                                         <constant>true</constant>
>                                 </handled>
>                                 <bean ref="errorHandlerBean" />
>                                 <convertBodyTo type="com.company.openapi.SoapMap" />
>                         </onException>
>                         <onException>
>                                 <exception>org.apache.camel.ExchangeTimedOutException</exception>
>                                 <redeliveryPolicy logRetryAttempted="true"
>                                         retryAttemptedLogLevel="WARN"
>
> maximumRedeliveries="{{ren.context.camel.openApiRoute.maximumRedeliveries}}"
>                                         redeliveryDelay="{{ren.context.camel.openApiRoute.redeliveryDelay}}" />
>                                 <handled>
>                                         <simple>${header.requestType} != 'broadcast'</simple>
>                                 </handled>
>                                 <choice>
>                                         <when>
>                                                 <simple>${header.requestType} != 'broadcast'</simple>
>                                                 <bean ref="errorHandlerBean" />
>                                                 <convertBodyTo type="com.company.openapi.SoapMap" />
>                                         </when>
>                                 </choice>
>                         </onException>
>
>                         <convertBodyTo type="java.util.Map" />
>                         <choice>
>                                 <when>
>                                         <simple>${header.requestType} == 'broadcast'</simple>
>                                         <bean ref ="broadcasterBean"/>
>                                         <recipientList parallelProcessing="true"
> strategyRef="aggregatorStrategy" streaming="true" stopOnException="false"
> prop:timeout="{{ren.context.camel.httpRoute.aggregatorTimeout}}">
>                                                         <header>recipientList</header>
>                                         </recipientList>
>                                         <process ref="broadcastResultProcessor" />
>                                 </when>
>                                 <when>
>                                         <simple>${header.requestType} == 'forward'</simple>
>                                         <bean ref="dynamicRouterBean" method="route" />
>                                 </when>
>                                 <otherwise>
>                                         <bean ref="operationTypeNotSupported"
> method="throwOperationTypeNotSupportedException" />
>                                 </otherwise>
>                         </choice>
>                         <convertBodyTo type="com.company.openapi.SoapMap" />
> </route>
>
>
> What we are essentially doing, is fetching a request on CXF and forwarding
> it to a jms endpoint using IN/OUT messaging. We use dynamic router for
> performing this operation dynamically.
> A sample dynamic url for this jms endpoint is :
>
> activemq:queue:soapRequestQ2?exchangePattern=InOut&asyncConsumer=true&useMessageIDAsCorrelationID=false&requestTimeout=5000&transacted=false&replyTo=responseQ&replyToType=Exclusive
>
> Now few problematic behaviors of automatic consumer creation on
> responseQueue are:
> 1. Whenever the route starts and requests are sent, 15 consumers are created
> on the response queue. After that no matter how much the load is, these 15
> consumers are handling the requests properly (as per our tps expectations).
> Now if we dynamically update the above sample jms url in our system (even
> just the value of requestTimeout) and send another request, we see that the
> reponseQ now has additional 15 (total of 30) consumers on it. And right
> after this, most of the requests start failing for unable to map the
> response to the request.
> The message is sent in the request queue, picked up by the client, the
> response is sent, which we can see being enqueued adn dequeued in the
> response queue (from activeMQ web console), the response is fetched by our
> application but QueueReplyManager fails to map it to original request,
> mentioning Response received for unknown correlationId xyz. And just after a
> few seconds we see the exchangetimeout happening for the message with same
> correlationid.
> Is this because the new set of 15 consumers got created in different session
> than the original one and hence they are not able to share the
> correlationId?
> Even though I have maxConcurrentConsumers set to 15 for that specific queue,
> why is it still increasing beyond that value? (for every change in url, it
> keeps on increasing by 15).
>
> 2. We have same behaviour when we use the same queue/endpoint url for single
> request and in receipient list.
> The single forward request registers its own 15 consumers and when we
> perform a receipient list url, it creates additional 15 consumers on the
> queues (again maxconcurrentconsumers is 15 only).
>
> 3. When use the dynamic router (routing to the same jms in/out url) in
> multiple routes, we always have 15 consumers on the response queue.
> But we use receipient list in multiple routes (routing to same set of jms
> in/out urls), it creates set of 15 consumers per route (meaning if we have 3
> routes it creates 45 consumers).
> And again we start getting the same issue of response not being mapped to
> the request for unknown correlation id.
>
> Is there any book/blog from where I can understand how and why is this
> happening? How does camel/activemq creates and manages consumers on queues,
> specially when we are using the building blocks of dynamic router,
> receipient list, activemq endpoint etc.
>
> I have been trying to understand and find a correct solution for this since
> couple of weeks now.
> Please help me in this.
> Thanks.
>
> BR!
> Yogesh
>
>
>
>
>
> --
> View this message in context: http://camel.465427.n5.nabble.com/Camel-ActiveMQ-In-OUT-endpoint-creates-additional-consumers-on-response-queue-which-are-more-than-mas-tp5764288.html
> Sent from the Camel - Users mailing list archive at Nabble.com.



-- 
Claus Ibsen
-----------------
Red Hat, Inc.
Email: cibsen@redhat.com
Twitter: davsclaus
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen
hawtio: http://hawt.io/
fabric8: http://fabric8.io/