You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/11/09 10:22:48 UTC

[camel] branch main updated: CAMEL-17121: converted camel-rabbitmq to repeatable tasks

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new fb55460  CAMEL-17121: converted camel-rabbitmq to repeatable tasks
fb55460 is described below

commit fb55460805d6049f4d3529ebf8a7b4a6002f14fa
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Mon Nov 8 06:28:42 2021 +0100

    CAMEL-17121: converted camel-rabbitmq to repeatable tasks
---
 .../camel/component/rabbitmq/RabbitConsumer.java   | 73 ++++++++++++++--------
 .../camel/component/rabbitmq/RabbitMQConsumer.java | 46 +++++++++-----
 .../camel/component/rabbitmq/RabbitMQEndpoint.java |  9 +++
 3 files changed, 88 insertions(+), 40 deletions(-)

diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
index 6549ac4..91f12f3 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
@@ -17,6 +17,8 @@
 package org.apache.camel.component.rabbitmq;
 
 import java.io.IOException;
+import java.time.Duration;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeoutException;
 
@@ -32,6 +34,9 @@ import org.apache.camel.ExchangePattern;
 import org.apache.camel.Message;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.support.service.ServiceSupport;
+import org.apache.camel.support.task.BlockingTask;
+import org.apache.camel.support.task.Tasks;
+import org.apache.camel.support.task.budget.Budgets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,7 +48,6 @@ class RabbitConsumer extends ServiceSupport implements com.rabbitmq.client.Consu
     private Channel channel;
     private String tag;
     private volatile String consumerTag;
-    private volatile boolean stopping;
 
     private final Semaphore lock = new Semaphore(1);
 
@@ -286,6 +290,27 @@ class RabbitConsumer extends ServiceSupport implements com.rabbitmq.client.Consu
         }
     }
 
+    private boolean doReconnect() {
+        if (isStopping()) {
+            return true;
+        }
+
+        try {
+            reconnect();
+            return true;
+        } catch (Exception e) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Unable to obtain a RabbitMQ channel. Will try again. Caused by: {}.", e.getMessage());
+            } else {
+                LOG.warn(
+                        "Unable to obtain a RabbitMQ channel. Will try again. Caused by: {}. Stacktrace logged at DEBUG logging level.",
+                        e.getMessage());
+            }
+
+            return false;
+        }
+    }
+
     /**
      * No-op implementation of {@link Consumer#handleShutdownSignal}.
      */
@@ -293,32 +318,28 @@ class RabbitConsumer extends ServiceSupport implements com.rabbitmq.client.Consu
     public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
         LOG.info("Received shutdown signal on the rabbitMQ channel");
 
-        // Check if the consumer closed the connection or something else
-        if (!sig.isInitiatedByApplication()) {
-            // Something else closed the connection so reconnect
-            boolean connected = false;
-            while (!connected && !isStopping()) {
-                try {
-                    reconnect();
-                    connected = true;
-                } catch (Exception e) {
-                    LOG.warn(
-                            "Unable to obtain a RabbitMQ channel. Will try again. Caused by: {}. Stacktrace logged at DEBUG logging level.",
-                            e.getMessage());
-                    // include stacktrace in DEBUG logging
-                    LOG.debug(e.getMessage(), e);
-
-                    Integer networkRecoveryInterval = consumer.getEndpoint().getNetworkRecoveryInterval();
-                    final long connectionRetryInterval
-                            = networkRecoveryInterval != null && networkRecoveryInterval > 0 ? networkRecoveryInterval : 100L;
-                    try {
-                        Thread.sleep(connectionRetryInterval);
-                    } catch (InterruptedException e1) {
-                        Thread.currentThread().interrupt();
-                    }
-                }
-            }
+        if (sig.isInitiatedByApplication()) {
+            LOG.debug("Nothing to do because the consumer closed the connection");
+            return;
         }
+
+        Integer networkRecoveryInterval = consumer.getEndpoint().getNetworkRecoveryInterval();
+        final long connectionRetryInterval
+                = networkRecoveryInterval != null && networkRecoveryInterval > 0 ? networkRecoveryInterval : 100L;
+
+        String taskName = "shutdown-handler";
+        ScheduledExecutorService service = consumer.getEndpoint().createScheduledExecutor(taskName);
+
+        BlockingTask task = Tasks.backgroundTask()
+                .withBudget(Budgets.timeBudget()
+                        .withUnlimitedDuration()
+                        .withInterval(Duration.ofMillis(connectionRetryInterval))
+                        .build())
+                .withScheduledExecutor(service)
+                .withName(taskName)
+                .build();
+
+        task.run(this::doReconnect);
     }
 
     /**
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index b03516d..330ec59 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.rabbitmq;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Callable;
@@ -32,6 +33,10 @@ import org.apache.camel.Processor;
 import org.apache.camel.Suspendable;
 import org.apache.camel.support.DefaultConsumer;
 import org.apache.camel.support.service.ServiceHelper;
+import org.apache.camel.support.task.BlockingTask;
+import org.apache.camel.support.task.Tasks;
+import org.apache.camel.support.task.budget.Budgets;
+import org.apache.camel.support.task.budget.IterationBoundedBudget;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -214,22 +219,35 @@ public class RabbitMQConsumer extends DefaultConsumer implements Suspendable {
             RabbitMQConsumer.this.startConsumerCallable = null;
         }
 
-        @Override
-        public Void call() throws Exception {
-            boolean connectionFailed = true;
-            // Reconnection loop
-            while (running.get() && connectionFailed) {
-                try {
-                    for (RabbitConsumer consumer : consumers) {
-                        consumer.reconnect();
-                    }
-                    connectionFailed = false;
-                } catch (Exception e) {
-                    LOG.info("Connection failed, will retry in {} ms", connectionRetryInterval, e);
-                    Thread.sleep(connectionRetryInterval);
+        private boolean reconnect() {
+            if (!running.get()) {
+                return true;
+            }
+
+            try {
+                for (RabbitConsumer consumer : consumers) {
+                    consumer.reconnect();
                 }
+
+                return true;
+            } catch (Exception e) {
+                LOG.info("Connection failed, will retry in {} ms", connectionRetryInterval, e);
+
+                return false;
             }
-            stop();
+        }
+
+        @Override
+        public Void call() throws Exception {
+            BlockingTask task = Tasks.foregroundTask()
+                    .withBudget(Budgets.iterationBudget()
+                            .withInterval(Duration.ofMillis(connectionRetryInterval))
+                            .withMaxIterations(IterationBoundedBudget.UNLIMITED_ITERATIONS)
+                            .build())
+                    .withName("rabbitmq-reconnection-loop")
+                    .build();
+
+            task.run(this::reconnect);
             return null;
         }
     }
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
index 4207ce4..5408988 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
@@ -21,6 +21,7 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeoutException;
 
 import javax.net.ssl.TrustManager;
@@ -277,6 +278,14 @@ public class RabbitMQEndpoint extends DefaultEndpoint implements AsyncEndpoint {
         }
     }
 
+    protected ScheduledExecutorService createScheduledExecutor(String name) {
+        if (getCamelContext() != null) {
+            return getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(this, name);
+        } else {
+            return Executors.newSingleThreadScheduledExecutor(tf -> new Thread(name));
+        }
+    }
+
     public String getUsername() {
         return username;
     }