You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "Claus Ibsen (JIRA)" <ji...@apache.org> on 2013/10/01 11:44:26 UTC

[jira] [Updated] (CAMEL-6717) camel-mqtt - dead lock when processing fetching/sending messages at high frequency

     [ https://issues.apache.org/jira/browse/CAMEL-6717?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Claus Ibsen updated CAMEL-6717:
-------------------------------

    Fix Version/s:     (was: 2.11.3)
                       (was: 2.12.2)
                       (was: 2.13.0)
                   Future

> camel-mqtt - dead lock when processing fetching/sending messages at high frequency
> ----------------------------------------------------------------------------------
>
>                 Key: CAMEL-6717
>                 URL: https://issues.apache.org/jira/browse/CAMEL-6717
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-mqtt
>    Affects Versions: 2.11.0
>         Environment: jdk 1.6.32
>            Reporter: Ralf Kornberger
>             Fix For: Future
>
>
> I'm using Apache Camel with MQTT to fetch data from a Mosquitto broker. Data are published there at high
> frequency (< 10s) by serveral devices. After receiving the data, I send an acknowlege message back. This is done by publishing a message
> to a topic for each device. I'm using the Fusesource MQTT Client (version 2.5) for this.
> I encountered the following problem: after some time (can be 15 minutes up to 1 day) some thing "weird" happens.
> The application stops receiving or sending any data via MQTT. Looking at it with jstack reveals the following:
> "hawtdispatch-DEFAULT-2" daemon prio=10 tid=0x00007facc1a2f000 nid=0x782d waiting on condition [0x00007fac42bcf000]
>    java.lang.Thread.State: WAITING (parking)
> 	at sun.misc.Unsafe.park(Native Method)
> 	- parking to wait for  <0x000000078e792b88> (a java.util.concurrent.CountDownLatch$Sync)
> 	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
> 	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
> 	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
> 	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
> 	at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207)
> 	at org.fusesource.mqtt.client.Promise.await(Promise.java:88)
> 	at org.fusesource.mqtt.client.BlockingConnection.publish(BlockingConnection.java:73)
> 	at org.fusesource.mqtt.client.BlockingConnection.publish(BlockingConnection.java:82)
> 	at net.centersight.plugins.agent.protomqtt.comm.MQTTManager.sendACKMessage(MQTTManager.java:92)
> 	at net.centersight.plugins.agent.protomqtt.comm.MQTTCommunication.sendACKMessage(MQTTCommunication.java:116)
> 	at net.centersight.plugins.agent.protomqtt.camel.AgentMQTTbatchACKer.process(AgentMQTTbatchACKer.java:47)
> 	at sun.reflect.GeneratedMethodAccessor101.invoke(Unknown Source)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
> 	at java.lang.reflect.Method.invoke(Method.java:597)
> 	at org.apache.camel.component.bean.MethodInfo.invoke(MethodInfo.java:341)
> 	at org.apache.camel.component.bean.MethodInfo$1.proceed(MethodInfo.java:238)
> 	at org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:166)
> 	at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
> 	at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
> 	at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
> 	at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:73)
> 	at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
> 	at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
> 	at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
> 	at org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:91)
> 	at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
> 	at org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:334)
> 	at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:220)
> 	at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:45)
> 	at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
> 	at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:303)
> 	at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
> 	at org.apache.camel.processor.Pipeline.process(Pipeline.java:117)
> 	at org.apache.camel.processor.Pipeline.process(Pipeline.java:80)
> 	at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:45)
> 	at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
> 	at org.apache.camel.processor.UnitOfWorkProcessor.processAsync(UnitOfWorkProcessor.java:150)
> 	at org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:117)
> 	at org.apache.camel.processor.RouteInflightRepositoryProcessor.processNext(RouteInflightRepositoryProcessor.java:48)
> 	at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
> 	at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
> 	at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
> 	at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
> 	at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:73)
> 	at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99)
> 	at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:86)
> 	at org.apache.camel.component.mqtt.MQTTConsumer.processExchange(MQTTConsumer.java:46)
> 	at org.apache.camel.component.mqtt.MQTTEndpoint$1.onPublish(MQTTEndpoint.java:88)
> 	at org.fusesource.mqtt.client.CallbackConnection.toReceiver(CallbackConnection.java:815)
> 	at org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:732)
> 	at org.fusesource.mqtt.client.CallbackConnection.access$17(CallbackConnection.java:727)
> 	at org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392)
> 	at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:660)
> 	at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538)
> 	at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:226)
> 	at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:96)
> 	at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)
>    Locked ownable synchronizers:
> 	- None
> 	
> Appearently, both the Camel receiving thread and the Fusesource client thread are hanging at
> at org.fusesource.mqtt.client.Promise.await(Promise.java:88)
> Since I use BlockingConnection in my sending client, I took a look at the Fusesource MQTT client.
> In BlockingConnection.java, function 
> public void publish(final UTF8Buffer topic, final Buffer payload, final QoS qos, final boolean retain) throws Exception 
> in line 80, a Future is received on publishing. And there is an await() afterwards.
> When I change this await() to await(30L, TimeUnit.SECONDS), the problem still occurs, but the application keeps working.
> I've put in debug printouts at the trace class which show me that at the time the problem occurs the MQTT client seems to loose the connection to the broker and tries to reestablish it. Debug logs also show that the timeout exception throw by the timeouted await comes every minute for ca. 20 minutes. Then the problem "vanishes" and comes again after serval hours.
> Ps.: I also posted this at github, in the Fusesource MQTT issue tracker:
> https://github.com/fusesource/mqtt-client/issues/21#issuecomment-23861700



--
This message was sent by Atlassian JIRA
(v6.1#6144)