You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by Alexandre Normand <al...@opower.com> on 2013/03/25 18:22:44 UTC

Intermittent problem: only a few message consumers receiving/processing messages

Hi, 
We've been having this issue 3 times now where we have only a few message consumers processing messages. Since our usage pattern is a low-number of messages but with long processing times, we effectively have contention because we aren't using more than a few message consumers to process messages. Our max number of listener threads is 12 and each time this happened, we ran thread dumps that showed us that only a few (and in one case, a single) consumers were actually doing work. The same thread dumps showed that we had indeed a few other listener threads waiting for messages. I'm assuming that because all of the listeners weren't actually busy, this prevented additional listener threads to be created, making matters even worse. 

The problem seems to be on the client side of ActiveMQ/Spring since the server does eventually dispatch messages to the remaining "working" listeners. One thing important to note is that we have prefetching disabled although the DefaultMessageListenerContainer's cache level is still at the default value of CACHE_CONSUMER. Also, restarting the jetty instance that is the client "fixes" the issue. 

Does anyone have any ideas as to what this could be? 

Here's an extract of the thead dump I referred to (in this one, only one thread, lowPriorityJobStarterContainer-3, was effectively doing any work). The relevant thread group names are *JobStarterContainer:
2013-03-15 01:13:46 Full thread dump Java HotSpot(TM) 64-Bit Server VM (16.3-b01 mixed mode): ... "process reaper" daemon prio=10 tid=0x00007fca84002800 nid=0x4519 runnable [0x00007fcaf0248000] java.lang.Thread.State: RUNNABLE at java.lang.UNIXProcess.waitForProcessExit(Native Method) at java.lang.UNIXProcess.access$900(UNIXProcess.java:20) at java.lang.UNIXProcess$1$1.run(UNIXProcess.java:132) "ActiveMQ Session Task-7001" prio=10 tid=0x00007fca88003800 nid=0x22a0 waiting on condition [0x00007fcaf1b61000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00007fcb6d36c740> (a java.util.concurrent.SynchronousQueue$TransferStack) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198) at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:424) at java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:323) at java.util.concurrent.SynchronousQueue.
poll(SynchronousQueue.java:874) at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:945) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907) at java.lang.Thread.run(Thread.java:619) "ActiveMQ Connection Executor: tcp://broker-a.va.opower.it:61616" prio=10 tid=0x00007fca88007800 nid=0xc37 waiting on condition [0x00007fcaf1d63000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00007fcb6d36cab8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1925) at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399) at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecut
or.java:907) at java.lang.Thread.run(Thread.java:619) "ActiveMQ InactivityMonitor WriteCheckTimer" daemon prio=10 tid=0x00007fca88001800 nid=0xc35 in Object.wait() [0x00007fca53af9000] java.lang.Thread.State: TIMED_WAITING (on object monitor) at java.lang.Object.wait(Native Method) at java.util.TimerThread.mainLoop(Timer.java:509) - locked <0x00007fcb6de2d518> (a java.util.TaskQueue) at java.util.TimerThread.run(Timer.java:462) "ActiveMQ InactivityMonitor ReadCheckTimer" daemon prio=10 tid=0x00007fca88005800 nid=0xc34 in Object.wait() [0x00007fca53bfa000] java.lang.Thread.State: TIMED_WAITING (on object monitor) at java.lang.Object.wait(Native Method) at java.util.TimerThread.mainLoop(Timer.java:509) - locked <0x00007fcb6de2b5f8> (a java.util.TaskQueue) at java.util.TimerThread.run(Timer.java:462) "ActiveMQ Transport: tcp://broker-a.va.opower.it/10.20.64.131:61616@52598" prio=10 tid=0x00007fcac8009000 nid=0xc2a runnable [0x00007fcaf0e54000] java.lang.Thread.State: RUNNABLE at java.ne
t.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.read(SocketInputStream.java:129) at org.apache.activemq.transport.tcp.TcpBufferedInputStream.fill(TcpBufferedInputStream.java:50) at org.apache.activemq.transport.tcp.TcpTransport$2.fill(TcpTransport.java:604) at org.apache.activemq.transport.tcp.TcpBufferedInputStream.read(TcpBufferedInputStream.java:58) at org.apache.activemq.transport.tcp.TcpTransport$2.read(TcpTransport.java:589) at java.io.DataInputStream.readInt(DataInputStream.java:370) at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:275) at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:221) at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:213) at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:196) at java.lang.Thread.run(Thread.java:619) "ActiveMQConnection[ID:bwpr001.va.opower.it-43015-1362931933661-1:1] Scheduler" daemon prio=10 tid=0x00007fca
c4001800 nid=0x79fb in Object.wait() [0x00007fcaf1c62000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) at java.lang.Object.wait(Object.java:485) at java.util.TimerThread.mainLoop(Timer.java:483) - locked <0x00007fcb6ddec2e8> (a java.util.TaskQueue) at java.util.TimerThread.run(Timer.java:462) "defaultPriorityJobStarterContainer-5" prio=10 tid=0x00007fca7c001800 nid=0x22c5 in Object.wait() [0x00007fcaf0b51000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) at java.lang.Object.wait(Object.java:485) at org.apache.activemq.SimplePriorityMessageDispatchChannel.dequeue(SimplePriorityMessageDispatchChannel.java:87) - locked <0x00007fcb6d5b0628> (a java.lang.Object) at org.apache.activemq.ActiveMQMessageConsumer.dequeue(ActiveMQMessageConsumer.java:470) at org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:592) at org.springframework.jms.listener.AbstractPollingMessageLis
tenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:431) at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:311) at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:264) at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1071) at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1063) at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:960) at java.lang.Thread.run(Thread.java:619) "defaultPriorityJobStarterContainer-4" prio=10 tid=0x00007fcac8007800 nid=0x23b in Object.wait() [0x00007fcaf0d53000] java.lang.Threa
d.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) at java.lang.Object.wait(Object.java:485) at org.apache.activemq.SimplePriorityMessageDispatchChannel.dequeue(SimplePriorityMessageDispatchChannel.java:87) - locked <0x00007fcb6d5b2628> (a java.lang.Object) at org.apache.activemq.ActiveMQMessageConsumer.dequeue(ActiveMQMessageConsumer.java:470) at org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:592) at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:431) at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:311) at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:264) at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeLis
tener(DefaultMessageListenerContainer.java:1071) at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1063) at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:960) at java.lang.Thread.run(Thread.java:619) "lowPriorityJobStarterContainer-4" prio=10 tid=0x00007fcaac005800 nid=0x6859 in Object.wait() [0x00007fcaf175d000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) at java.lang.Object.wait(Object.java:485) at org.apache.activemq.SimplePriorityMessageDispatchChannel.dequeue(SimplePriorityMessageDispatchChannel.java:87) - locked <0x00007fcb6d510d90> (a java.lang.Object) at org.apache.activemq.ActiveMQMessageConsumer.dequeue(ActiveMQMessageConsumer.java:470) at org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:592) at org.springfra
mework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:431) at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:311) at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:264) at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1071) at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1063) at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:960) at java.lang.Thread.run(Thread.java:619) "lowPriorityJobStarterContainer-3" prio=10 tid=0x00007fcaac001000 nid=0x65f2 in Object.w
ait() [0x00007fcaf0f54000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) at java.lang.Object.wait(Object.java:485) at java.lang.UNIXProcess.waitFor(UNIXProcess.java:165) - locked <0x00007fcb6de715d0> (a java.lang.UNIXProcess) at org.apache.commons.exec.DefaultExecutor.executeInternal(DefaultExecutor.java:347) at org.apache.commons.exec.DefaultExecutor.execute(DefaultExecutor.java:160) at org.apache.commons.exec.DefaultExecutor.execute(DefaultExecutor.java:147) at opower.batch.agent.JobStarter.executeCommandLineInDirectory(JobStarter.java:108) at opower.batch.agent.JobStarter.startJob(JobStarter.java:55) at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.springframework.util.MethodInvoker.invoke(MethodInvoker.java:273) at org.springframework.jms.listener.adapter.MessageListenerAdap
ter.invokeListenerMethod(MessageListenerAdapter.java:463) at org.springframework.jms.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:355) at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:537) at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:497) at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:468) at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:326) at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:264) at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1071) 
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1063) at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:960) at java.lang.Thread.run(Thread.java:619) "DestroyJavaVM" prio=10 tid=0x00007fcbb0007800 nid=0x3ce3 waiting on condition [0x0000000000000000] java.lang.Thread.State: RUNNABLE "pool-1-thread-1 - Acceptor0 Ajp13SocketConnector@0.0.0.0:8080" prio=10 tid=0x00007fcbb0921000 nid=0x3d1d runnable [0x00007fcaf185e000] java.lang.Thread.State: RUNNABLE at java.net.PlainSocketImpl.socketAccept(Native Method) at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:390) - locked <0x00007fcb6d3889c8> (a java.net.SocksSocketImpl) at java.net.ServerSocket.implAccept(ServerSocket.java:453) at java.net.ServerSocket.accept(ServerSocket.java:421) at org.mortbay.jetty.bio.SocketConnector.accept(SocketConnector.j
ava:99) at org.mortbay.jetty.AbstractConnector$Acceptor.run(AbstractConnector.java:707) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:619) "highPriorityJobStarterContainer-1" prio=10 tid=0x00007fcbb022b000 nid=0x3d1b in Object.wait() [0x00007fcaf1a60000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) at java.lang.Object.wait(Object.java:485) at org.apache.activemq.SimplePriorityMessageDispatchChannel.dequeue(SimplePriorityMessageDispatchChannel.java:87) - locked <0x00007fcb6d38a7e0> (a java.lang.Object) at org.apache.activemq.ActiveMQMessageConsumer.dequeue(ActiveMQMessageConsumer.java:470) at org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:592) at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPolling
MessageListenerContainer.java:431) at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:311) at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:264) at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1071) at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1063) at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:960) at java.lang.Thread.run(Thread.java:619) "lowPriorityJobStarterContainer-1" prio=10 tid=0x00007fcbb0175800 nid=0x3d0f in Object.wait() [0x00007fcaf2066000] java.lang.Thread.State: WAITING (on object monitor) at java.lan
g.Object.wait(Native Method) at java.lang.Object.wait(Object.java:485) at org.apache.activemq.SimplePriorityMessageDispatchChannel.dequeue(SimplePriorityMessageDispatchChannel.java:87) - locked <0x00007fcb6d38cef8> (a java.lang.Object) at org.apache.activemq.ActiveMQMessageConsumer.dequeue(ActiveMQMessageConsumer.java:470) at org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:592) at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:431) at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:311) at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:264) at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1071)
 at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1063) at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:960) at java.lang.Thread.run(Thread.java:619) "FileWatchdog" daemon prio=10 tid=0x00007fcbb0455000 nid=0x3d0a waiting on condition [0x00007fcb4c1b5000] java.lang.Thread.State: TIMED_WAITING (sleeping) at java.lang.Thread.sleep(Native Method) at org.apache.log4j.helpers.FileWatchdog.run(FileWatchdog.java:104) "Timer-0" daemon prio=10 tid=0x00007fcbb0aa5800 nid=0x3d09 in Object.wait() [0x00007fcb4c2b6000] java.lang.Thread.State: TIMED_WAITING (on object monitor) at java.lang.Object.wait(Native Method) at java.util.TimerThread.mainLoop(Timer.java:509) - locked <0x00007fcb6d002268> (a java.util.TaskQueue) at java.util.TimerThread.run(Timer.java:462) "Low Memory Detector" daemon prio=10 tid=0x00007
fcbb00d2000 nid=0x3cfc runnable [0x0000000000000000] java.lang.Thread.State: RUNNABLE "CompilerThread1" daemon prio=10 tid=0x00007fcbb00d0000 nid=0x3cfb waiting on condition [0x0000000000000000] java.lang.Thread.State: RUNNABLE "CompilerThread0" daemon prio=10 tid=0x00007fcbb00cd000 nid=0x3cfa waiting on condition [0x0000000000000000] java.lang.Thread.State: RUNNABLE "Signal Dispatcher" daemon prio=10 tid=0x00007fcbb00cb000 nid=0x3cf9 runnable [0x0000000000000000] java.lang.Thread.State: RUNNABLE "Finalizer" daemon prio=10 tid=0x00007fcbb00ab800 nid=0x3cf8 in Object.wait() [0x00007fcb4cffe000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:118) - locked <0x00007fcb6d016348> (a java.lang.ref.ReferenceQueue$Lock) at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:134) at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:159) "Reference Handler" daemon prio=10 tid=0x0
0007fcbb00a9800 nid=0x3cf7 in Object.wait() [0x00007fcbb4110000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) at java.lang.Object.wait(Object.java:485) at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:116) - locked <0x00007fcb6d0040d0> (a java.lang.ref.Reference$Lock) "VM Thread" prio=10 tid=0x00007fcbb00a5800 nid=0x3cf6 runnable "GC task thread#0 (ParallelGC)" prio=10 tid=0x00007fcbb001a800 nid=0x3ce4 runnable "GC task thread#1 (ParallelGC)" prio=10 tid=0x00007fcbb001c000 nid=0x3ce5 runnable "GC task thread#2 (ParallelGC)" prio=10 tid=0x00007fcbb001e000 nid=0x3ce6 runnable "GC task thread#3 (ParallelGC)" prio=10 tid=0x00007fcbb0020000 nid=0x3ce7 runnable "GC task thread#4 (ParallelGC)" prio=10 tid=0x00007fcbb0021800 nid=0x3ce8 runnable "GC task thread#5 (ParallelGC)" prio=10 tid=0x00007fcbb0023800 nid=0x3ce9 runnable "GC task thread#6 (ParallelGC)" prio=10 tid=0x00007fcbb0025800 nid=0x3cea runnable "GC task thread#7 (Paral
lelGC)" prio=10 tid=0x00007fcbb0027000 nid=0x3ceb runnable "GC task thread#8 (ParallelGC)" prio=10 tid=0x00007fcbb0029000 nid=0x3cec runnable "GC task thread#9 (ParallelGC)" prio=10 tid=0x00007fcbb002b000 nid=0x3ced runnable "GC task thread#10 (ParallelGC)" prio=10 tid=0x00007fcbb002c800 nid=0x3cee runnable "GC task thread#11 (ParallelGC)" prio=10 tid=0x00007fcbb002e800 nid=0x3cef runnable "GC task thread#12 (ParallelGC)" prio=10 tid=0x00007fcbb0030000 nid=0x3cf0 runnable "GC task thread#13 (ParallelGC)" prio=10 tid=0x00007fcbb0032000 nid=0x3cf1 runnable "GC task thread#14 (ParallelGC)" prio=10 tid=0x00007fcbb0034000 nid=0x3cf2 runnable "GC task thread#15 (ParallelGC)" prio=10 tid=0x00007fcbb0035800 nid=0x3cf3 runnable "GC task thread#16 (ParallelGC)" prio=10 tid=0x00007fcbb0037800 nid=0x3cf4 runnable "GC task thread#17 (ParallelGC)" prio=10 tid=0x00007fcbb0039800 nid=0x3cf5 runnable "VM Periodic Task Thread" prio=10 tid=0x00007fcbb00d5000 nid=0x3cfd waiting on condition JNI global r
eferences: 700 


Thanks, 

-- 
Alexandre Normand