You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by Laures <ba...@googlemail.com> on 2012/06/11 14:58:56 UTC

Sugestions how to listen on several thousand topics with spring?

Hi,

I'm building a clustered messaging system that will use an embedded activemq
server for clustering. The current scenario is that i need to handle several
thousand topic listeners per node. The load on most (99%) of the topics will
be very low. But i want to handle each message asap.

I don't like the Spring DefaultMessageListenerContainer because even with a
TaskExecutor configuration its still 1 task per destination permanently
sceduled in a limited thread pool. Meaning: the more destinations, the
slower is the message consumption.

Can somebody suggest a working configuration to listen to such a large
number of mostly inactive destinations?

--
View this message in context: http://activemq.2283324.n4.nabble.com/Sugestions-how-to-listen-on-several-thousand-topics-with-spring-tp4653164.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: Sugestions how to listen on several thousand topics with spring?

Posted by Jakub Korab <ja...@gmail.com>.
Rather than looking at consuming from hundreds of topics individually, I
would approach the problem in terms of namespaces. If all of these topics
are named consistently, you could set up a smaller set of listeners against
a wildcard topic subscription (http://activemq.apache.org/wildcards.html)
perhaps using selectors on message to narrow the messages down further if
applicable.

Jakub

--
View this message in context: http://activemq.2283324.n4.nabble.com/Sugestions-how-to-listen-on-several-thousand-topics-with-spring-tp4653164p4653268.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: Sugestions how to listen on several thousand topics with spring?

Posted by Laures <ba...@googlemail.com>.
Deadlock:


BrokerService[test] Task-1 is waiting to lock java.lang.Object@95cc0b which
is held by NettyWorker-thread-1
NettyWorker-thread-1 is waiting to lock java.lang.Object@126c1c8 which is
held by BrokerService[test] Task-1




Thread stacks


BrokerService[test] Task-1 [BLOCKED; waiting to lock
java.lang.Object@95cc0b]
org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40)
org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1265)
org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1259)
org.apache.activemq.ActiveMQSession.asyncSendPacket(ActiveMQSession.java:1863)
org.apache.activemq.ActiveMQSession.sendAck(ActiveMQSession.java:2029)
org.apache.activemq.ActiveMQSession.sendAck(ActiveMQSession.java:2024)
org.apache.activemq.ActiveMQMessageConsumer.afterMessageIsConsumed(ActiveMQMessageConsumer.java:871)
org.apache.activemq.ActiveMQMessageConsumer.dispatch(ActiveMQMessageConsumer.java:1232)
org.apache.activemq.ActiveMQSessionExecutor.dispatch(ActiveMQSessionExecutor.java:134)
org.apache.activemq.ActiveMQSessionExecutor.execute(ActiveMQSessionExecutor.java:82)
org.apache.activemq.ActiveMQSession.dispatch(ActiveMQSession.java:1535)
org.apache.activemq.ActiveMQConnection$2.processMessageDispatch(ActiveMQConnection.java:1733)
org.apache.activemq.command.MessageDispatch.visit(MessageDispatch.java:109)
org.apache.activemq.ActiveMQConnection.onCommand(ActiveMQConnection.java:1714)
org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116)
org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:69)
org.apache.activemq.transport.vm.VMTransport.dispatch(VMTransport.java:121)
org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:112)
org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40)
org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1270)
org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:815)
org.apache.activemq.broker.TransportConnection.iterate(TransportConnection.java:851)
org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:127)
org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:48)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
java.lang.Thread.run(Thread.java:722)


NettyWorker-thread-1 [BLOCKED; waiting to lock java.lang.Object@126c1c8]
org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40)
org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1270)
org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:815)
org.apache.activemq.broker.TransportConnection.dispatchSync(TransportConnection.java:775)
org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:181)
org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116)
org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:69)
org.apache.activemq.transport.vm.VMTransport.dispatch(VMTransport.java:121)
org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:112)
org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40)
org.apache.activemq.transport.ResponseCorrelator.asyncRequest(ResponseCorrelator.java:81)
org.apache.activemq.transport.ResponseCorrelator.request(ResponseCorrelator.java:86)
org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1284)
org.apache.activemq.ActiveMQSession.send(ActiveMQSession.java:1760)
org.apache.activemq.ActiveMQMessageProducer.send(ActiveMQMessageProducer.java:231)
org.apache.activemq.ActiveMQMessageProducerSupport.send(ActiveMQMessageProducerSupport.java:269)
org.springframework.jms.connection.CachedMessageProducer.send(CachedMessageProducer.java:117)
org.springframework.jms.core.JmsTemplate.doSend(JmsTemplate.java:592)
org.springframework.jms.core.JmsTemplate.doSend(JmsTemplate.java:569)
org.springframework.jms.core.JmsTemplate$4.doInJms(JmsTemplate.java:546)
org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:466)
org.springframework.jms.core.JmsTemplate.send(JmsTemplate.java:543)
org.springframework.jms.core.JmsTemplate.convertAndSend(JmsTemplate.java:620)
net.bigpoint.globalchat.mom.EventSendingMessageHandler.handleMessageInternal(EventSendingMessageHandler.java:37)
org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
sun.reflect.NativeMethodAccessorImpl.invoke0(native method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:601)
org.springframework.expression.spel.support.ReflectiveMethodExecutor.execute(ReflectiveMethodExecutor.java:69)
org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:109)
org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:57)
org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:102)
org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:102)
org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:126)
org.springframework.integration.util.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:225)
org.springframework.integration.util.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:125)
org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:73)
org.springframework.integration.handler.MethodInvokingMessageHandler.handleMessageInternal(MethodInvokingMessageHandler.java:59)
org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:115)
org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:102)
org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:157)
org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:128)
org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:288)
org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:149)
org.springframework.integration.core.MessagingTemplate.convertAndSend(MessagingTemplate.java:189)
org.springframework.integration.gateway.MessagingGatewaySupport.send(MessagingGatewaySupport.java:183)
org.springframework.integration.gateway.GatewayProxyFactoryBean.invokeGatewayMethod(GatewayProxyFactoryBean.java:308)
org.springframework.integration.gateway.GatewayProxyFactoryBean.doInvoke(GatewayProxyFactoryBean.java:268)
org.springframework.integration.gateway.GatewayProxyFactoryBean.invoke(GatewayProxyFactoryBean.java:259)
org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:202)
$Proxy16.sendEvent(unknown source)
net.bigpoint.globalchat.services.UserService.authenticate(UserService.java:251)
net.bigpoint.globalchat.services.UserService$$FastClassByCGLIB$$70864d20.invoke(<generated>)
net.sf.cglib.proxy.MethodProxy.invoke(MethodProxy.java:191)
org.springframework.aop.framework.Cglib2AopProxy$DynamicAdvisedInterceptor.intercept(Cglib2AopProxy.java:618)
net.bigpoint.globalchat.services.UserService$$EnhancerByCGLIB$$53ff8b0.authenticate(<generated>)
net.bigpoint.globalchat.handler.RequestMessageHandler.doLoginUser(RequestMessageHandler.java:150)
net.bigpoint.globalchat.handler.RequestMessageHandler.receiveEvent(RequestMessageHandler.java:86)
net.bigpoint.globalchat.handler.ChatChannelHandler.messageReceived(ChatChannelHandler.java:36)
org.jboss.netty.channel.SimpleChannelHandler.handleUpstream(SimpleChannelHandler.java:95)
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:792)
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:71)
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:792)
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:321)
org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:303)
org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:208)
org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:75)
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:94)
org.jboss.netty.channel.socket.nio.AbstractNioWorker.processSelectedKeys(AbstractNioWorker.java:364)
org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:238)
org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:38)
org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
java.lang.Thread.run(Thread.java:722)



--
View this message in context: http://activemq.2283324.n4.nabble.com/Sugestions-how-to-listen-on-several-thousand-topics-with-spring-tp4653164p4653245.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: Sugestions how to listen on several thousand topics with spring?

Posted by Laures <ba...@googlemail.com>.
I finally moved away from the idea to use the session threads. Now my
listener schedules the handling of each message into a thread pool (see
SimpleMessageListenerContainer.setThreadExecutor)

In addition i use a different connection uri with new parameters:

alwaysSessionAsync=false makes sure that my vm connection to the embedded
broker does NOT create one thread per session (as i have one session per
listener and thousands of listeners).

like this my config starts the simplemessagelistenercontainer in the
internal activemq task thread and the container schedules it into a thread
pool. 

unfortunately the activemq task threads are very slow.

So i would like to use the async=false (NOT jms.sendAsync) to use the
message sending thread to handle the message and push it into the listener
where its scheduled into the handler thread pool. see:
http://fusesource.com/wiki/display/ProdInfo/Understanding+the+Threads+Allocated+in+ActiveMQ

Unfortunately this causes a deadlock (according to yourkit) between my
sending thread and the ActiveMQ Broker[name] thread.

How can i make these two properties work? or alternatively: how can i speed
up the embedded activemq for async sends?

--
View this message in context: http://activemq.2283324.n4.nabble.com/Sugestions-how-to-listen-on-several-thousand-topics-with-spring-tp4653164p4653243.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Re: Sugestions how to listen on several thousand topics with spring?

Posted by Laures <ba...@googlemail.com>.
currently i'm using a CachedConnectionFactory from spring to limit the number
of open sessions.

	<bean id="connectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory"
destroy-method="destroy">
		<property name="targetConnectionFactory" ref="amqConnectionFactory" />
		<property name="sessionCacheSize" value="60" />
	</bean>
		
	<bean id="amqConnectionFactory"
class="org.apache.activemq.ActiveMQConnectionFactory">
		<property name="brokerURL"
value="vm://localhost:61616?async=false&amp;jms.alwaysSessionAsync=true" />
		<property name="userName" value="admin" />
		<property name="password" value="admin" />
	</bean>

together with the connection uri parameters this means that activemq uses
one thread per session to process messages in the
simplemessagelistenercontainer instances and there is a maximum of 60
concurrent sessions.

my problem with this config is that fore some reasons sessions are closed by
the cached factory to early which causes exceptions like this:

2012-06-12 11:46:06,051 DEBUG [ActiveMQ Session Task-19]
o.s.j.l.SimpleMessageListenerContainer - Listener exception after container
shutdown
javax.jms.IllegalStateException: The Session is closed
	at
org.apache.activemq.ActiveMQSession.checkClosed(ActiveMQSession.java:722)
~[activemq-core-5.5.1.jar:5.5.1]
	at
org.apache.activemq.ActiveMQSession.getTransacted(ActiveMQSession.java:520)
~[activemq-core-5.5.1.jar:5.5.1]
	at sun.reflect.GeneratedMethodAccessor72.invoke(Unknown Source) ~[na:na]
	at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[na:1.7.0_04]
	at java.lang.reflect.Method.invoke(Method.java:601) ~[na:1.7.0_04]
	at
org.springframework.jms.connection.CachingConnectionFactory$CachedSessionInvocationHandler.invoke(CachingConnectionFactory.java:344)
~[spring-jms-3.1.0.RELEASE.jar:3.1.0.RELEASE]
	at $Proxy31.getTransacted(Unknown Source) ~[na:na]
	at
org.springframework.jms.listener.AbstractMessageListenerContainer.commitIfNecessary(AbstractMessageListenerContainer.java:572)
~[spring-jms-3.1.0.RELEASE.jar:3.1.0.RELEASE]
	at
org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:481)
~[spring-jms-3.1.0.RELEASE.jar:3.1.0.RELEASE]
	at
org.springframework.jms.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:439)
~[spring-jms-3.1.0.RELEASE.jar:3.1.0.RELEASE]
	at
org.springframework.jms.listener.SimpleMessageListenerContainer.processMessage(SimpleMessageListenerContainer.java:311)
[spring-jms-3.1.0.RELEASE.jar:3.1.0.RELEASE]
	at
org.springframework.jms.listener.SimpleMessageListenerContainer$2.onMessage(SimpleMessageListenerContainer.java:287)
[spring-jms-3.1.0.RELEASE.jar:3.1.0.RELEASE]
	at
org.apache.activemq.ActiveMQMessageConsumer.dispatch(ActiveMQMessageConsumer.java:1230)
[activemq-core-5.5.1.jar:5.5.1]
	at
org.apache.activemq.ActiveMQSessionExecutor.dispatch(ActiveMQSessionExecutor.java:134)
[activemq-core-5.5.1.jar:5.5.1]
	at
org.apache.activemq.ActiveMQSessionExecutor.iterate(ActiveMQSessionExecutor.java:205)
[activemq-core-5.5.1.jar:5.5.1]
	at
org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:127)
[activemq-core-5.5.1.jar:5.5.1]
	at
org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:48)
[activemq-core-5.5.1.jar:5.5.1]
	at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
[na:1.7.0_04]
	at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
[na:1.7.0_04]
	at java.lang.Thread.run(Thread.java:722) [na:1.7.0_04]

--
View this message in context: http://activemq.2283324.n4.nabble.com/Sugestions-how-to-listen-on-several-thousand-topics-with-spring-tp4653164p4653195.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.