You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/11/09 10:22:48 UTC
[camel] branch main updated: CAMEL-17121: converted camel-rabbitmq
to repeatable tasks
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new fb55460 CAMEL-17121: converted camel-rabbitmq to repeatable tasks
fb55460 is described below
commit fb55460805d6049f4d3529ebf8a7b4a6002f14fa
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Mon Nov 8 06:28:42 2021 +0100
CAMEL-17121: converted camel-rabbitmq to repeatable tasks
---
.../camel/component/rabbitmq/RabbitConsumer.java | 73 ++++++++++++++--------
.../camel/component/rabbitmq/RabbitMQConsumer.java | 46 +++++++++-----
.../camel/component/rabbitmq/RabbitMQEndpoint.java | 9 +++
3 files changed, 88 insertions(+), 40 deletions(-)
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
index 6549ac4..91f12f3 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
@@ -17,6 +17,8 @@
package org.apache.camel.component.rabbitmq;
import java.io.IOException;
+import java.time.Duration;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeoutException;
@@ -32,6 +34,9 @@ import org.apache.camel.ExchangePattern;
import org.apache.camel.Message;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.support.service.ServiceSupport;
+import org.apache.camel.support.task.BlockingTask;
+import org.apache.camel.support.task.Tasks;
+import org.apache.camel.support.task.budget.Budgets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,7 +48,6 @@ class RabbitConsumer extends ServiceSupport implements com.rabbitmq.client.Consu
private Channel channel;
private String tag;
private volatile String consumerTag;
- private volatile boolean stopping;
private final Semaphore lock = new Semaphore(1);
@@ -286,6 +290,27 @@ class RabbitConsumer extends ServiceSupport implements com.rabbitmq.client.Consu
}
}
+ private boolean doReconnect() {
+ if (isStopping()) {
+ return true;
+ }
+
+ try {
+ reconnect();
+ return true;
+ } catch (Exception e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Unable to obtain a RabbitMQ channel. Will try again. Caused by: {}.", e.getMessage());
+ } else {
+ LOG.warn(
+ "Unable to obtain a RabbitMQ channel. Will try again. Caused by: {}. Stacktrace logged at DEBUG logging level.",
+ e.getMessage());
+ }
+
+ return false;
+ }
+ }
+
/**
* No-op implementation of {@link Consumer#handleShutdownSignal}.
*/
@@ -293,32 +318,28 @@ class RabbitConsumer extends ServiceSupport implements com.rabbitmq.client.Consu
public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
LOG.info("Received shutdown signal on the rabbitMQ channel");
- // Check if the consumer closed the connection or something else
- if (!sig.isInitiatedByApplication()) {
- // Something else closed the connection so reconnect
- boolean connected = false;
- while (!connected && !isStopping()) {
- try {
- reconnect();
- connected = true;
- } catch (Exception e) {
- LOG.warn(
- "Unable to obtain a RabbitMQ channel. Will try again. Caused by: {}. Stacktrace logged at DEBUG logging level.",
- e.getMessage());
- // include stacktrace in DEBUG logging
- LOG.debug(e.getMessage(), e);
-
- Integer networkRecoveryInterval = consumer.getEndpoint().getNetworkRecoveryInterval();
- final long connectionRetryInterval
- = networkRecoveryInterval != null && networkRecoveryInterval > 0 ? networkRecoveryInterval : 100L;
- try {
- Thread.sleep(connectionRetryInterval);
- } catch (InterruptedException e1) {
- Thread.currentThread().interrupt();
- }
- }
- }
+ if (sig.isInitiatedByApplication()) {
+ LOG.debug("Nothing to do because the consumer closed the connection");
+ return;
}
+
+ Integer networkRecoveryInterval = consumer.getEndpoint().getNetworkRecoveryInterval();
+ final long connectionRetryInterval
+ = networkRecoveryInterval != null && networkRecoveryInterval > 0 ? networkRecoveryInterval : 100L;
+
+ String taskName = "shutdown-handler";
+ ScheduledExecutorService service = consumer.getEndpoint().createScheduledExecutor(taskName);
+
+ BlockingTask task = Tasks.backgroundTask()
+ .withBudget(Budgets.timeBudget()
+ .withUnlimitedDuration()
+ .withInterval(Duration.ofMillis(connectionRetryInterval))
+ .build())
+ .withScheduledExecutor(service)
+ .withName(taskName)
+ .build();
+
+ task.run(this::doReconnect);
}
/**
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index b03516d..330ec59 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -17,6 +17,7 @@
package org.apache.camel.component.rabbitmq;
import java.io.IOException;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
@@ -32,6 +33,10 @@ import org.apache.camel.Processor;
import org.apache.camel.Suspendable;
import org.apache.camel.support.DefaultConsumer;
import org.apache.camel.support.service.ServiceHelper;
+import org.apache.camel.support.task.BlockingTask;
+import org.apache.camel.support.task.Tasks;
+import org.apache.camel.support.task.budget.Budgets;
+import org.apache.camel.support.task.budget.IterationBoundedBudget;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -214,22 +219,35 @@ public class RabbitMQConsumer extends DefaultConsumer implements Suspendable {
RabbitMQConsumer.this.startConsumerCallable = null;
}
- @Override
- public Void call() throws Exception {
- boolean connectionFailed = true;
- // Reconnection loop
- while (running.get() && connectionFailed) {
- try {
- for (RabbitConsumer consumer : consumers) {
- consumer.reconnect();
- }
- connectionFailed = false;
- } catch (Exception e) {
- LOG.info("Connection failed, will retry in {} ms", connectionRetryInterval, e);
- Thread.sleep(connectionRetryInterval);
+ private boolean reconnect() {
+ if (!running.get()) {
+ return true;
+ }
+
+ try {
+ for (RabbitConsumer consumer : consumers) {
+ consumer.reconnect();
}
+
+ return true;
+ } catch (Exception e) {
+ LOG.info("Connection failed, will retry in {} ms", connectionRetryInterval, e);
+
+ return false;
}
- stop();
+ }
+
+ @Override
+ public Void call() throws Exception {
+ BlockingTask task = Tasks.foregroundTask()
+ .withBudget(Budgets.iterationBudget()
+ .withInterval(Duration.ofMillis(connectionRetryInterval))
+ .withMaxIterations(IterationBoundedBudget.UNLIMITED_ITERATIONS)
+ .build())
+ .withName("rabbitmq-reconnection-loop")
+ .build();
+
+ task.run(this::reconnect);
return null;
}
}
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
index 4207ce4..5408988 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
@@ -21,6 +21,7 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import javax.net.ssl.TrustManager;
@@ -277,6 +278,14 @@ public class RabbitMQEndpoint extends DefaultEndpoint implements AsyncEndpoint {
}
}
+ protected ScheduledExecutorService createScheduledExecutor(String name) {
+ if (getCamelContext() != null) {
+ return getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(this, name);
+ } else {
+ return Executors.newSingleThreadScheduledExecutor(tf -> new Thread(name));
+ }
+ }
+
public String getUsername() {
return username;
}