You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Quirin Fürgut <qu...@device-insight.com> on 2019/11/20 16:12:31 UTC

camel-paho 2.24.0: MQTT resubscribe in PahoConsumer

Hello there,

we are using Apache Camel in our production system and really appreciate
all the work you guys and the community has put into the project.

Sadly we have an issue using the camel-paho component (version 2.24.0) to
communicate with a RabbitMQ-broker (broker version is 3.7.17) via MQTT.

Once in a while the broker closes the connection and during the reconnect /
resubscribe (via an enabled *automaticReconnect*-paramter) which is done in

*org.apache.camel.component.paho.PahoConsumer.doStart()*

an MqttException occurs with the following stacktrace:

*2018-09-14 04:03:17,393 ERROR [MQTT Call: live-cgw-vr920-evt]
paho.PahoConsumer$1 (PahoConsumer.java:52) [SNR: ][JID: ] - MQTT
resubscribe failed MqttException
org.eclipse.paho.client.mqttv3.MqttException: MqttException at
org.eclipse.paho.client.mqttv3.internal.Token.waitForResponse(Token.java:148)
~[org.eclipse.paho.client.mqttv3-1.2.0.jar!/:?] at
org.eclipse.paho.client.mqttv3.internal.Token.waitForCompletion(Token.java:108)
~[org.eclipse.paho.client.mqttv3-1.2.0.jar!/:?] at
org.eclipse.paho.client.mqttv3.MqttToken.waitForCompletion(MqttToken.java:67)
~[org.eclipse.paho.client.mqttv3-1.2.0.jar!/:?] at
org.eclipse.paho.client.mqttv3.MqttClient.subscribe(MqttClient.java:432)
~[org.eclipse.paho.client.mqttv3-1.2.0.jar!/:?] at
org.eclipse.paho.client.mqttv3.MqttClient.subscribe(MqttClient.java:424)
~[org.eclipse.paho.client.mqttv3-1.2.0.jar!/:?] at
org.apache.camel.component.paho.PahoConsumer$1.connectComplete(PahoConsumer.java:50)
[camel-paho-2.21.2.jar!/:2.21.2] at
org.eclipse.paho.client.mqttv3.internal.ConnectActionListener.onSuccess(ConnectActionListener.java:104)
[org.eclipse.paho.client.mqttv3-1.2.0.jar!/:?] at
org.eclipse.paho.client.mqttv3.internal.CommsCallback.fireActionEvent(CommsCallback.java:321)
[org.eclipse.paho.client.mqttv3-1.2.0.jar!/:?] at
org.eclipse.paho.client.mqttv3.internal.CommsCallback.handleActionComplete(CommsCallback.java:260)
[org.eclipse.paho.client.mqttv3-1.2.0.jar!/:?] at
org.eclipse.paho.client.mqttv3.internal.CommsCallback.run(CommsCallback.java:190)
[org.eclipse.paho.client.mqttv3-1.2.0.jar!/:?] at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[?:1.8.0_141] at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[?:1.8.0_141] at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
[?:1.8.0_141] at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
[?:1.8.0_141] at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_141] at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_141] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_141] Caused
by: java.lang.InterruptedException at java.lang.Object.wait(Native Method)
~[?:1.8.0_141] at java.lang.Object.wait(Object.java:502) ~[?:1.8.0_141] at
org.eclipse.paho.client.mqttv3.internal.Token.waitForResponse(Token.java:143)
~[org.eclipse.paho.client.mqttv3-1.2.0.jar!/:?] ... 16 more*


After that the connection is reestablished, the resubscribe on the topic
mentioned in the stack trace fails though.

The consequence is that no messages get consumed for the topics anymore,
leading to limited features of the application.

Is this a known issue for the paho-consumer component? Is there any way to
ensure that a resubscribe succeeds (via configuration or other measures) or
would an adjustment (f.e. retry mechanism) necessary to solve the issue.

Any help and feedback regarding the issue would really be appreciated. We
also would try to provide a fix if you guys thing one would be needed.

Thx and Best Regards
-- 
Quirin Fürgut
Software Engineer



quirin.fuergut@device-insight.com  <qu...@device-insight.com>
Telefon: +49 89 4545 448-56

Device Insight GmbH, Willy-Brandt-Platz 6, D-81829 München
http://www.device-insight.com

Sitz der Gesellschaft: München
Registergericht: Amtsgericht München HRB 149018
Geschäftsführer: Stefan Hübner, Marten Schirge, Thomas Stammeier.

Re: camel-paho 2.24.0: MQTT resubscribe in PahoConsumer

Posted by Claus Ibsen <cl...@gmail.com>.
Hi

Oh its the first time I think its reported here. Do you have any
evidence or clue why the re-subscribe failed? What kind of networking
issues is it, or did the remote broker deny the connection or what?

You are welcome to dive into the code and see what can be improved.
Maybe some logic is needed to attempt to re-connection from a
scheduled background thread or something.

At first you are welcome to log a JIRA so we wont forget.

And do you have a way to reproduce the issue, maybe from a unit test?

On Wed, Nov 20, 2019 at 11:56 PM Quirin Fürgut
<qu...@device-insight.com> wrote:
>
> Hello there,
>
> we are using Apache Camel in our production system and really appreciate
> all the work you guys and the community has put into the project.
>
> Sadly we have an issue using the camel-paho component (version 2.24.0) to
> communicate with a RabbitMQ-broker (broker version is 3.7.17) via MQTT.
>
> Once in a while the broker closes the connection and during the reconnect /
> resubscribe (via an enabled *automaticReconnect*-paramter) which is done in
>
> *org.apache.camel.component.paho.PahoConsumer.doStart()*
>
> an MqttException occurs with the following stacktrace:
>
> *2018-09-14 04:03:17,393 ERROR [MQTT Call: live-cgw-vr920-evt]
> paho.PahoConsumer$1 (PahoConsumer.java:52) [SNR: ][JID: ] - MQTT
> resubscribe failed MqttException
> org.eclipse.paho.client.mqttv3.MqttException: MqttException at
> org.eclipse.paho.client.mqttv3.internal.Token.waitForResponse(Token.java:148)
> ~[org.eclipse.paho.client.mqttv3-1.2.0.jar!/:?] at
> org.eclipse.paho.client.mqttv3.internal.Token.waitForCompletion(Token.java:108)
> ~[org.eclipse.paho.client.mqttv3-1.2.0.jar!/:?] at
> org.eclipse.paho.client.mqttv3.MqttToken.waitForCompletion(MqttToken.java:67)
> ~[org.eclipse.paho.client.mqttv3-1.2.0.jar!/:?] at
> org.eclipse.paho.client.mqttv3.MqttClient.subscribe(MqttClient.java:432)
> ~[org.eclipse.paho.client.mqttv3-1.2.0.jar!/:?] at
> org.eclipse.paho.client.mqttv3.MqttClient.subscribe(MqttClient.java:424)
> ~[org.eclipse.paho.client.mqttv3-1.2.0.jar!/:?] at
> org.apache.camel.component.paho.PahoConsumer$1.connectComplete(PahoConsumer.java:50)
> [camel-paho-2.21.2.jar!/:2.21.2] at
> org.eclipse.paho.client.mqttv3.internal.ConnectActionListener.onSuccess(ConnectActionListener.java:104)
> [org.eclipse.paho.client.mqttv3-1.2.0.jar!/:?] at
> org.eclipse.paho.client.mqttv3.internal.CommsCallback.fireActionEvent(CommsCallback.java:321)
> [org.eclipse.paho.client.mqttv3-1.2.0.jar!/:?] at
> org.eclipse.paho.client.mqttv3.internal.CommsCallback.handleActionComplete(CommsCallback.java:260)
> [org.eclipse.paho.client.mqttv3-1.2.0.jar!/:?] at
> org.eclipse.paho.client.mqttv3.internal.CommsCallback.run(CommsCallback.java:190)
> [org.eclipse.paho.client.mqttv3-1.2.0.jar!/:?] at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> [?:1.8.0_141] at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> [?:1.8.0_141] at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> [?:1.8.0_141] at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> [?:1.8.0_141] at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> [?:1.8.0_141] at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> [?:1.8.0_141] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_141] Caused
> by: java.lang.InterruptedException at java.lang.Object.wait(Native Method)
> ~[?:1.8.0_141] at java.lang.Object.wait(Object.java:502) ~[?:1.8.0_141] at
> org.eclipse.paho.client.mqttv3.internal.Token.waitForResponse(Token.java:143)
> ~[org.eclipse.paho.client.mqttv3-1.2.0.jar!/:?] ... 16 more*
>
>
> After that the connection is reestablished, the resubscribe on the topic
> mentioned in the stack trace fails though.
>
> The consequence is that no messages get consumed for the topics anymore,
> leading to limited features of the application.
>
> Is this a known issue for the paho-consumer component? Is there any way to
> ensure that a resubscribe succeeds (via configuration or other measures) or
> would an adjustment (f.e. retry mechanism) necessary to solve the issue.
>
> Any help and feedback regarding the issue would really be appreciated. We
> also would try to provide a fix if you guys thing one would be needed.
>
> Thx and Best Regards
> --
> Quirin Fürgut
> Software Engineer
>
>
>
> quirin.fuergut@device-insight.com  <qu...@device-insight.com>
> Telefon: +49 89 4545 448-56
>
> Device Insight GmbH, Willy-Brandt-Platz 6, D-81829 München
> http://www.device-insight.com
>
> Sitz der Gesellschaft: München
> Registergericht: Amtsgericht München HRB 149018
> Geschäftsführer: Stefan Hübner, Marten Schirge, Thomas Stammeier.



-- 
Claus Ibsen
-----------------
http://davsclaus.com @davsclaus
Camel in Action 2: https://www.manning.com/ibsen2