You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "Marco Collovati (Jira)" <ji...@apache.org> on 2020/10/25 16:40:00 UTC

[jira] [Comment Edited] (CAMEL-15748) Paho consumer never connects if the broker is not reachable at startup

    [ https://issues.apache.org/jira/browse/CAMEL-15748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17220325#comment-17220325 ] 

Marco Collovati edited comment on CAMEL-15748 at 10/25/20, 4:39 PM:
--------------------------------------------------------------------

[~davsclaus] I'm working on a PR for master, but I have a problem with the test.

As mentioned above, with paho 1.2.5 the call to MqttClient.connect hangs indefinitely, regardless the connection timeout settings.

Can this be handled somehow in test code?

The only workaround I found is to set *timeToWait* on MqttClient and reset it to *-1* after connection is established, but I think this is not a good idea
{code:java}
client.setTimeToWait(1000 + getEndpoint().getConfiguration().getConnectionTimeout() * 1000);
client.connect(connectOptions);
client.setTimeToWait(-1);

{code}

Does it make sense to add a `connectTimeToWait` configuration on PahoConfiguration?


was (Author: mcollovati):
[~davsclaus] I'm working on a PR for master, but I have a problem with the test.

As mentioned above, with paho 1.2.5 the call to MqttClient.connect hangs indefinitely, regardless the connection timeout settings.

Can this be handled somehow in test code?

The only workaround I found is to set *timeToWait* on MqttClient and reset it to *-1* after connection is established, but I think this is not a good idea
{code:java}
client.setTimeToWait(1000 + getEndpoint().getConfiguration().getConnectionTimeout() * 1000);
client.connect(connectOptions);
client.setTimeToWait(-1);

{code}

> Paho consumer never connects if the broker is not reachable at startup
> ----------------------------------------------------------------------
>
>                 Key: CAMEL-15748
>                 URL: https://issues.apache.org/jira/browse/CAMEL-15748
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-paho
>    Affects Versions: 3.5.0, 3.4.4, 3.6.0
>            Reporter: Marco Collovati
>            Priority: Major
>             Fix For: 3.4.5, 3.7.0
>
>         Attachments: PahoConsumerRestartTest.java
>
>
> Having a route with a paho consumer in a route, if the broker is not reachable at startup, camel context fail fast and shuts down.
>  This can be avoided by setting the *SupervisingRouteController*, but this way, even if camel context does not fail, the consumer is never able to establish a connection.
> The reason is that when *PahoConsumer* starts for the first time, it creates a *MqttClient* and stores it into *client* field; the call to *client.connect* throws an exception due to broker down;
>  
> {code:java}
>  
>     @Override
>     protected void doStart() throws Exception {
>         super.doStart();        connectOptions = PahoEndpoint.createMqttConnectOptions(getEndpoint().getConfiguration());        if (client == null) {
>             clientId = getEndpoint().getConfiguration().getClientId();
>             if (clientId == null) {
>                 clientId = "camel-" + MqttClient.generateClientId();
>             }
>             stopClient = true;
>             client = new MqttClient(
>                     getEndpoint().getConfiguration().getBrokerUrl(),
>                     clientId,
>                     PahoEndpoint.createMqttClientPersistence(getEndpoint().getConfiguration()));
>             LOG.debug("Connecting client: {} to broker: {}", clientId, getEndpoint().getConfiguration().getBrokerUrl());
>             client.connect(connectOptions);
>         }
>         
>         // other code omitted for brevity
>         
>         client.subscribe(getEndpoint().getTopic(), getEndpoint().getConfiguration().getQos());{code}
>  
> after that *doStop* is invoked but the *client* instance is not nullified, because it is not connected
>  
> {code:java}
>     @Override
>     protected void doStop() throws Exception {
>         super.doStop();        if (stopClient && client != null && client.isConnected()) {
>             String topic = getEndpoint().getTopic();
>             // only unsubscribe if we are not durable
>             if (getEndpoint().getConfiguration().isCleanSession()) {
>                 LOG.debug("Unsubscribing client: {} from topic: {}", clientId, topic);
>                 client.unsubscribe(topic);
>             } else {
>                 LOG.debug("Client: {} is durable so will not unsubscribe from topic: {}", clientId, topic);
>             }
>             LOG.debug("Disconnecting client: {} from broker: {}", clientId, getEndpoint().getConfiguration().getBrokerUrl());
>             client.disconnect();
>             client = null;
>         }
>     }
> {code}
>  
> when the supervisor tries to restart the route, *client* instance already exists, but the call to *client.subscribe* fails because the client is not connected.
> Perhaps always nullify *client* in *doStop* should resolve the issue; however I have no idea it this solution will impact in other ways.
>  A better solution may be to handle automatic reconnect in the consumer, like *RabbitConsumer* does, for example.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)