You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by dcheckoway <dc...@gmail.com> on 2009/06/02 01:38:57 UTC

BUG: JMS consumer hangs while producing

Help!

I have a transactional consumer invoked by Camel for messages on
"testQueueA" -- it ends up producing/sending thousands of messages to the
JMS queue "testQueueB".  Depending upon how many messages need to be sent,
this scenario hangs.  This happens when talking to ActiveMQ via TCP.

I wrote a simplified test case, and one interesting thing I found was that
the "bigger" my messages were, the fewer would be sent before everything
hangs.  It implied to me that there was some sort of buffer limit being
imposed.

Here's the code:

import java.util.logging.Logger;
import org.apache.camel.*;
import org.springframework.beans.factory.annotation.*;

public class ConsumerThatAlsoProduces {
    Logger logger = Logger.getLogger(getClass().getName());
    @Autowired
    ProducerTemplate producerTemplate;
    @Autowired
    @Qualifier("testQueueB")
    Endpoint testQueueB;
    
    public void onTestMessage(TestMessage msg) {
        logger.info("Received from A: " + msg);
        final int numToProduce = 20000;
        logger.info("Producing " + numToProduce + " messages on queueB");
        for (int k = 1; k <= numToProduce; ++k) {
            logger.info("Sending " + k + " of " + numToProduce);
            producerTemplate.sendBody(testQueueB, new TestMessage(k));
        }
    }
}

When I force a thread dump, this is what I'm seeing for the thread of
interest:

"DefaultMessageListenerContainer-1" prio=5 tid=0x00000001019ff800
nid=0x111cae000 runnable [0x0000000111cab000..0x0000000111cadad0]
   java.lang.Thread.State: RUNNABLE
	at java.net.SocketOutputStream.socketWrite0(Native Method)
	at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:92)
	at java.net.SocketOutputStream.write(SocketOutputStream.java:136)
	at
org.apache.activemq.transport.tcp.TcpBufferedOutputStream.write(TcpBufferedOutputStream.java:96)
	at java.io.DataOutputStream.write(DataOutputStream.java:90)
	- locked <0x000000010703aa28> (a java.io.DataOutputStream)
	at
org.apache.activemq.openwire.v3.BaseDataStreamMarshaller.tightMarshalByteSequence2(BaseDataStreamMarshaller.java:432)
	at
org.apache.activemq.openwire.v3.MessageMarshaller.tightMarshal2(MessageMarshaller.java:173)
	at
org.apache.activemq.openwire.v3.ActiveMQMessageMarshaller.tightMarshal2(ActiveMQMessageMarshaller.java:90)
	at
org.apache.activemq.openwire.v3.ActiveMQObjectMessageMarshaller.tightMarshal2(ActiveMQObjectMessageMarshaller.java:90)
	at
org.apache.activemq.openwire.OpenWireFormat.marshal(OpenWireFormat.java:240)
	- locked <0x000000010701bb78> (a
org.apache.activemq.openwire.OpenWireFormat)
	at
org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:166)
	at
org.apache.activemq.transport.InactivityMonitor.oneway(InactivityMonitor.java:237)
	- locked <0x000000010705a298> (a java.util.concurrent.atomic.AtomicBoolean)
	at
org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:83)
	at
org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:104)
	at
org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40)
	- locked <0x0000000106fdd0e8> (a java.lang.Object)
	at
org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
	at
org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1225)
	at
org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1219)
	at org.apache.activemq.ActiveMQSession.send(ActiveMQSession.java:1676)
	- locked <0x0000000106fe9f00> (a java.lang.Object)
	at
org.apache.activemq.ActiveMQMessageProducer.send(ActiveMQMessageProducer.java:231)
	at org.apache.activemq.pool.PooledProducer.send(PooledProducer.java:74)
	- locked <0x0000000107095d88> (a
org.apache.activemq.ActiveMQMessageProducer)
	at org.apache.activemq.pool.PooledProducer.send(PooledProducer.java:59)
	at org.springframework.jms.core.JmsTemplate.doSend(JmsTemplate.java:597)
	at
org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.doSend(JmsConfiguration.java:237)
	at org.springframework.jms.core.JmsTemplate.doSend(JmsTemplate.java:574)
	at org.springframework.jms.core.JmsTemplate$4.doInJms(JmsTemplate.java:551)
	at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:471)
	at org.springframework.jms.core.JmsTemplate.send(JmsTemplate.java:548)
	at org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:301)
	at
org.apache.camel.impl.ProducerCache$1.doInProducer(ProducerCache.java:165)
	at
org.apache.camel.impl.ProducerCache$1.doInProducer(ProducerCache.java:151)
	at org.apache.camel.impl.ProducerCache.doInProducer(ProducerCache.java:136)
	at org.apache.camel.impl.ProducerCache.sendExchange(ProducerCache.java:150)
	at org.apache.camel.impl.ProducerCache.send(ProducerCache.java:86)
	at
org.apache.camel.impl.DefaultProducerTemplate.send(DefaultProducerTemplate.java:98)
	at
org.apache.camel.impl.DefaultProducerTemplate.sendBody(DefaultProducerTemplate.java:111)
	at ConsumerThatAlsoProduces.onTestMessage(ConsumerThatAlsoProduces.java:19)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
	at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
	at java.lang.reflect.Method.invoke(Method.java:597)
	at org.apache.camel.component.bean.MethodInfo.invoke(MethodInfo.java:173)
	at org.apache.camel.component.bean.MethodInfo$1.proceed(MethodInfo.java:95)
	at
org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:111)
	at
org.apache.camel.processor.interceptor.StreamCachingInterceptor.process(StreamCachingInterceptor.java:52)
	at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:61)
	at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:61)
	at
org.apache.camel.processor.DefaultErrorHandler.process(DefaultErrorHandler.java:52)
	at
org.apache.camel.processor.DefaultChannel.process(DefaultChannel.java:147)
	at
org.apache.camel.spring.spi.TransactionErrorHandler$1.doInTransactionWithoutResult(TransactionErrorHandler.java:110)
	at
org.springframework.transaction.support.TransactionCallbackWithoutResult.doInTransaction(TransactionCallbackWithoutResult.java:33)
	at
org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:128)
	at
org.apache.camel.spring.spi.TransactionErrorHandler.process(TransactionErrorHandler.java:80)
	at
org.apache.camel.processor.interceptor.StreamCachingInterceptor.process(StreamCachingInterceptor.java:52)
	at
org.apache.camel.processor.DefaultErrorHandler.process(DefaultErrorHandler.java:52)
	at
org.apache.camel.processor.DefaultChannel.process(DefaultChannel.java:147)
	at
org.apache.camel.processor.UnitOfWorkProcessor.processNext(UnitOfWorkProcessor.java:54)
	at
org.apache.camel.processor.DelegateProcessor.process(DelegateProcessor.java:48)
	at
org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:76)
	at
org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:543)
	at
org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:482)
	at
org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:451)
	at
org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:323)
	at
org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:241)
	at
org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:982)
	at
org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:881)
	at java.lang.Thread.run(Thread.java:637)


I'm pretty sure the problem is that since my consumer is running inside a
JMS transaction, the messages it tries to send are being "buffered" by the
ActiveMQ server until the transaction commits.  Thus the catch...the
transaction won't commit until my consumer returns, which won't happen until
all of the messages are sent.

On this link  http://activemq.apache.org/how-do-transactions-work.html
http://activemq.apache.org/how-do-transactions-work.html  I noticed this
comment:

"Now the operations carried out on a transacted session inside a
transaction, like a send message or acknowledge message, do not really
perform a real send or acknowledge until the commit occurs. So the Broker
explicitly handles these cases separately - essentially buffering up the
commands until the commit occurs when the messages are really sent or
acknowledged."

What I find interesting is that if I run an embedded ActiveMQ broker and
talk via vm://localhost instead of tcp://localhost:61616, the problem goes
away.  This ONLY happens when talking TCP.

FWIW, I've been using Camel 1.6.1 with ActiveMQ 5.2.0, but I have also
tested this against ActiveMQ 5.3-SNAPSHOT (June 1, 2009), which uses Camel
2.0.  Both straight out of the box.  Same results with both old & new
versions.

Is this a bug in ActiveMQ?  Does it really impose some weenie-small limit on
what you can post back to the server while still in a transaction?  Is it a
hard limit?  Configurable?  Have I screwed up my configuration?  Should I be
using some different method to have my consumer in turn produce more
messages transactionally?

I'm attaching a zip of my test (minus the "lib" dir, which contains all the
typical stuff you'd expect in an ActiveMQ/Camel setup).  Maybe somebody can
take a peek and let me know what I'm doing wrong.

http://www.nabble.com/file/p23824159/camel-activemq-bug.tar.gz
camel-activemq-bug.tar.gz 
http://www.nabble.com/file/p23824159/camel-activemq-bug.zip
camel-activemq-bug.zip 

Help!  Thanks!!
-- 
View this message in context: http://www.nabble.com/BUG%3A-JMS-consumer-hangs-while-producing-tp23824159p23824159.html
Sent from the Camel - Users (activemq) mailing list archive at Nabble.com.


Re: BUG: JMS consumer hangs while producing

Posted by Willem Jiang <wi...@gmail.com>.
Hi,

It is more like a ActiveMQ's issue.
Did you send this issue into the users@activemq.apache.org ?

Willem

dcheckoway wrote:
> Help!
> 
> I have a transactional consumer invoked by Camel for messages on
> "testQueueA" -- it ends up producing/sending thousands of messages to the
> JMS queue "testQueueB".  Depending upon how many messages need to be sent,
> this scenario hangs.  This happens when talking to ActiveMQ via TCP.
> 
> I wrote a simplified test case, and one interesting thing I found was that
> the "bigger" my messages were, the fewer would be sent before everything
> hangs.  It implied to me that there was some sort of buffer limit being
> imposed.
> 
> Here's the code:
> 
> import java.util.logging.Logger;
> import org.apache.camel.*;
> import org.springframework.beans.factory.annotation.*;
> 
> public class ConsumerThatAlsoProduces {
>     Logger logger = Logger.getLogger(getClass().getName());
>     @Autowired
>     ProducerTemplate producerTemplate;
>     @Autowired
>     @Qualifier("testQueueB")
>     Endpoint testQueueB;
>     
>     public void onTestMessage(TestMessage msg) {
>         logger.info("Received from A: " + msg);
>         final int numToProduce = 20000;
>         logger.info("Producing " + numToProduce + " messages on queueB");
>         for (int k = 1; k <= numToProduce; ++k) {
>             logger.info("Sending " + k + " of " + numToProduce);
>             producerTemplate.sendBody(testQueueB, new TestMessage(k));
>         }
>     }
> }
> 
> When I force a thread dump, this is what I'm seeing for the thread of
> interest:
> 
> "DefaultMessageListenerContainer-1" prio=5 tid=0x00000001019ff800
> nid=0x111cae000 runnable [0x0000000111cab000..0x0000000111cadad0]
>    java.lang.Thread.State: RUNNABLE
> 	at java.net.SocketOutputStream.socketWrite0(Native Method)
> 	at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:92)
> 	at java.net.SocketOutputStream.write(SocketOutputStream.java:136)
> 	at
> org.apache.activemq.transport.tcp.TcpBufferedOutputStream.write(TcpBufferedOutputStream.java:96)
> 	at java.io.DataOutputStream.write(DataOutputStream.java:90)
> 	- locked <0x000000010703aa28> (a java.io.DataOutputStream)
> 	at
> org.apache.activemq.openwire.v3.BaseDataStreamMarshaller.tightMarshalByteSequence2(BaseDataStreamMarshaller.java:432)
> 	at
> org.apache.activemq.openwire.v3.MessageMarshaller.tightMarshal2(MessageMarshaller.java:173)
> 	at
> org.apache.activemq.openwire.v3.ActiveMQMessageMarshaller.tightMarshal2(ActiveMQMessageMarshaller.java:90)
> 	at
> org.apache.activemq.openwire.v3.ActiveMQObjectMessageMarshaller.tightMarshal2(ActiveMQObjectMessageMarshaller.java:90)
> 	at
> org.apache.activemq.openwire.OpenWireFormat.marshal(OpenWireFormat.java:240)
> 	- locked <0x000000010701bb78> (a
> org.apache.activemq.openwire.OpenWireFormat)
> 	at
> org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:166)
> 	at
> org.apache.activemq.transport.InactivityMonitor.oneway(InactivityMonitor.java:237)
> 	- locked <0x000000010705a298> (a java.util.concurrent.atomic.AtomicBoolean)
> 	at
> org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:83)
> 	at
> org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:104)
> 	at
> org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40)
> 	- locked <0x0000000106fdd0e8> (a java.lang.Object)
> 	at
> org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
> 	at
> org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1225)
> 	at
> org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1219)
> 	at org.apache.activemq.ActiveMQSession.send(ActiveMQSession.java:1676)
> 	- locked <0x0000000106fe9f00> (a java.lang.Object)
> 	at
> org.apache.activemq.ActiveMQMessageProducer.send(ActiveMQMessageProducer.java:231)
> 	at org.apache.activemq.pool.PooledProducer.send(PooledProducer.java:74)
> 	- locked <0x0000000107095d88> (a
> org.apache.activemq.ActiveMQMessageProducer)
> 	at org.apache.activemq.pool.PooledProducer.send(PooledProducer.java:59)
> 	at org.springframework.jms.core.JmsTemplate.doSend(JmsTemplate.java:597)
> 	at
> org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.doSend(JmsConfiguration.java:237)
> 	at org.springframework.jms.core.JmsTemplate.doSend(JmsTemplate.java:574)
> 	at org.springframework.jms.core.JmsTemplate$4.doInJms(JmsTemplate.java:551)
> 	at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:471)
> 	at org.springframework.jms.core.JmsTemplate.send(JmsTemplate.java:548)
> 	at org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:301)
> 	at
> org.apache.camel.impl.ProducerCache$1.doInProducer(ProducerCache.java:165)
> 	at
> org.apache.camel.impl.ProducerCache$1.doInProducer(ProducerCache.java:151)
> 	at org.apache.camel.impl.ProducerCache.doInProducer(ProducerCache.java:136)
> 	at org.apache.camel.impl.ProducerCache.sendExchange(ProducerCache.java:150)
> 	at org.apache.camel.impl.ProducerCache.send(ProducerCache.java:86)
> 	at
> org.apache.camel.impl.DefaultProducerTemplate.send(DefaultProducerTemplate.java:98)
> 	at
> org.apache.camel.impl.DefaultProducerTemplate.sendBody(DefaultProducerTemplate.java:111)
> 	at ConsumerThatAlsoProduces.onTestMessage(ConsumerThatAlsoProduces.java:19)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
> 	at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
> 	at java.lang.reflect.Method.invoke(Method.java:597)
> 	at org.apache.camel.component.bean.MethodInfo.invoke(MethodInfo.java:173)
> 	at org.apache.camel.component.bean.MethodInfo$1.proceed(MethodInfo.java:95)
> 	at
> org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:111)
> 	at
> org.apache.camel.processor.interceptor.StreamCachingInterceptor.process(StreamCachingInterceptor.java:52)
> 	at
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:61)
> 	at
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:61)
> 	at
> org.apache.camel.processor.DefaultErrorHandler.process(DefaultErrorHandler.java:52)
> 	at
> org.apache.camel.processor.DefaultChannel.process(DefaultChannel.java:147)
> 	at
> org.apache.camel.spring.spi.TransactionErrorHandler$1.doInTransactionWithoutResult(TransactionErrorHandler.java:110)
> 	at
> org.springframework.transaction.support.TransactionCallbackWithoutResult.doInTransaction(TransactionCallbackWithoutResult.java:33)
> 	at
> org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:128)
> 	at
> org.apache.camel.spring.spi.TransactionErrorHandler.process(TransactionErrorHandler.java:80)
> 	at
> org.apache.camel.processor.interceptor.StreamCachingInterceptor.process(StreamCachingInterceptor.java:52)
> 	at
> org.apache.camel.processor.DefaultErrorHandler.process(DefaultErrorHandler.java:52)
> 	at
> org.apache.camel.processor.DefaultChannel.process(DefaultChannel.java:147)
> 	at
> org.apache.camel.processor.UnitOfWorkProcessor.processNext(UnitOfWorkProcessor.java:54)
> 	at
> org.apache.camel.processor.DelegateProcessor.process(DelegateProcessor.java:48)
> 	at
> org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:76)
> 	at
> org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:543)
> 	at
> org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:482)
> 	at
> org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:451)
> 	at
> org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:323)
> 	at
> org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:241)
> 	at
> org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:982)
> 	at
> org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:881)
> 	at java.lang.Thread.run(Thread.java:637)
> 
> 
> I'm pretty sure the problem is that since my consumer is running inside a
> JMS transaction, the messages it tries to send are being "buffered" by the
> ActiveMQ server until the transaction commits.  Thus the catch...the
> transaction won't commit until my consumer returns, which won't happen until
> all of the messages are sent.
> 
> On this link  http://activemq.apache.org/how-do-transactions-work.html
> http://activemq.apache.org/how-do-transactions-work.html  I noticed this
> comment:
> 
> "Now the operations carried out on a transacted session inside a
> transaction, like a send message or acknowledge message, do not really
> perform a real send or acknowledge until the commit occurs. So the Broker
> explicitly handles these cases separately - essentially buffering up the
> commands until the commit occurs when the messages are really sent or
> acknowledged."
> 
> What I find interesting is that if I run an embedded ActiveMQ broker and
> talk via vm://localhost instead of tcp://localhost:61616, the problem goes
> away.  This ONLY happens when talking TCP.
> 
> FWIW, I've been using Camel 1.6.1 with ActiveMQ 5.2.0, but I have also
> tested this against ActiveMQ 5.3-SNAPSHOT (June 1, 2009), which uses Camel
> 2.0.  Both straight out of the box.  Same results with both old & new
> versions.
> 
> Is this a bug in ActiveMQ?  Does it really impose some weenie-small limit on
> what you can post back to the server while still in a transaction?  Is it a
> hard limit?  Configurable?  Have I screwed up my configuration?  Should I be
> using some different method to have my consumer in turn produce more
> messages transactionally?
> 
> I'm attaching a zip of my test (minus the "lib" dir, which contains all the
> typical stuff you'd expect in an ActiveMQ/Camel setup).  Maybe somebody can
> take a peek and let me know what I'm doing wrong.
> 
> http://www.nabble.com/file/p23824159/camel-activemq-bug.tar.gz
> camel-activemq-bug.tar.gz 
> http://www.nabble.com/file/p23824159/camel-activemq-bug.zip
> camel-activemq-bug.zip 
> 
> Help!  Thanks!!