You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2018/01/06 12:02:52 UTC

[camel] branch master updated: CAMEL-12111: Fix reconnect if broker is down on startup. Also fix so channels share connections again. Also fix consumers getting started twice on reconnect at startup. Also fix null pointers if automaticRecoveryEnabled is not set on the endpoint.

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 8311af0  CAMEL-12111: Fix reconnect if broker is down on startup.  Also fix so channels share connections again.  Also fix consumers getting started twice on reconnect at startup.  Also fix null pointers if automaticRecoveryEnabled is not set on the endpoint.
8311af0 is described below

commit 8311af003ab888ca53a042ce5e9955b7cd5a845b
Author: Jeremy Isikoff <ji...@yahoo.com>
AuthorDate: Thu Jan 4 09:42:37 2018 -0500

    CAMEL-12111: Fix reconnect if broker is down on startup.  Also fix so channels share connections again.  Also fix consumers getting started twice on reconnect at startup.  Also fix null pointers if automaticRecoveryEnabled is not set on the endpoint.
---
 .../camel/component/rabbitmq/RabbitConsumer.java   |  9 +++++++--
 .../camel/component/rabbitmq/RabbitMQConsumer.java | 22 ++++++++++++++--------
 2 files changed, 21 insertions(+), 10 deletions(-)

diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
index e96367c..ffef62d 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
@@ -295,10 +295,10 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer {
         if (isChannelOpen()) {
             // The connection is good, so nothing to do
             return;
-        } else if (!isChannelOpen() && this.consumer.getEndpoint().getAutomaticRecoveryEnabled()) {
+        } else if (channel != null && !channel.isOpen() && isAutomaticRecoveryEnabled()) {
             // Still need to wait for channel to re-open
             throw new IOException("Waiting for channel to re-open.");
-        } else if (!this.consumer.getEndpoint().getAutomaticRecoveryEnabled()) {
+        } else if (channel == null || !isAutomaticRecoveryEnabled()) {
             log.info("Attempting to open a new rabbitMQ channel");
             Connection conn = consumer.getConnection();
             channel = openChannel(conn);
@@ -307,6 +307,11 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer {
         }
     }
 
+    private boolean isAutomaticRecoveryEnabled() {
+        return this.consumer.getEndpoint().getAutomaticRecoveryEnabled() != null
+            && this.consumer.getEndpoint().getAutomaticRecoveryEnabled();
+    }
+
     private boolean isChannelOpen() {
         return channel != null && channel.isOpen();
     }
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index 95a6609..c0aaa6d 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -74,7 +74,7 @@ public class RabbitMQConsumer extends DefaultConsumer implements Suspendable {
         if (this.conn == null) {
             openConnection();
             return this.conn;
-        } else if (!this.conn.isOpen() && this.endpoint.getAutomaticRecoveryEnabled()) {
+        } else if (this.conn.isOpen() || (!this.conn.isOpen() && isAutomaticRecoveryEnabled())) {
             return this.conn;
         } else {
             log.debug("The existing connection is closed");
@@ -83,16 +83,24 @@ public class RabbitMQConsumer extends DefaultConsumer implements Suspendable {
         }
     }
 
-
+    private boolean isAutomaticRecoveryEnabled() {
+        return this.endpoint.getAutomaticRecoveryEnabled() != null
+            && this.endpoint.getAutomaticRecoveryEnabled();
+    }
     /**
-     * Add a consumer thread for given channel
+     * Create the consumers but don't start yet
      */
-    private void startConsumers() throws IOException {
-
+    private void createConsumers() throws IOException {
         // Create consumers but don't start yet
         for (int i = 0; i < endpoint.getConcurrentConsumers(); i++) {
             createConsumer();
         }
+    }
+
+    /**
+     * Start the consumers (already created)
+     */
+    private void startConsumers() {
 
         // Try starting consumers (which will fail if RabbitMQ can't connect)
         try {
@@ -160,6 +168,7 @@ public class RabbitMQConsumer extends DefaultConsumer implements Suspendable {
     protected void doStart() throws Exception {
         executor = endpoint.createExecutor();
         log.debug("Using executor {}", executor);
+        createConsumers();
         startConsumers();
     }
 
@@ -211,9 +220,6 @@ public class RabbitMQConsumer extends DefaultConsumer implements Suspendable {
                     Thread.sleep(connectionRetryInterval);
                 }
             }
-            if (!connectionFailed) {
-                startConsumers();
-            }
             stop();
             return null;
         }

-- 
To stop receiving notification emails like this one, please contact
['"commits@camel.apache.org" <co...@camel.apache.org>'].