You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/07/07 06:09:08 UTC

[camel] branch camel-3.20.x updated: [CAMEL-19575] Fixes bug in camel-rabbitmq - RabbitMQConsumer keeps on consuming even when route shutdown is triggered. (#10604)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-3.20.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.20.x by this push:
     new da1fe9177e2 [CAMEL-19575] Fixes bug in camel-rabbitmq - RabbitMQConsumer keeps on consuming even when route shutdown is triggered. (#10604)
da1fe9177e2 is described below

commit da1fe9177e2de8377f1e416b457c34abf4fb0777
Author: Nikunj Kumar Gupta <ni...@gmail.com>
AuthorDate: Fri Jul 7 11:38:25 2023 +0530

    [CAMEL-19575] Fixes bug in camel-rabbitmq - RabbitMQConsumer keeps on consuming even when route shutdown is triggered. (#10604)
    
    * close all consumers concurrently
    Closing all consumers in RabbitMqConsumer concurrently
    
    * RabbitMQConsumer to suspend Consumer when suspending.
    
    * cancelling all RabbitConsumers in RabbitMQConsumer before trying to close them
---
 .../org/apache/camel/component/rabbitmq/RabbitConsumer.java | 13 ++++++++++++-
 .../apache/camel/component/rabbitmq/RabbitMQConsumer.java   |  7 +++++++
 2 files changed, 19 insertions(+), 1 deletion(-)

diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
index 13115cd4536..82385124190 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
@@ -48,6 +48,7 @@ class RabbitConsumer extends ServiceSupport implements com.rabbitmq.client.Consu
     private Channel channel;
     private String tag;
     private volatile String consumerTag;
+    private boolean cancelled;
 
     private final Semaphore lock = new Semaphore(1);
 
@@ -208,12 +209,22 @@ class RabbitConsumer extends ServiceSupport implements com.rabbitmq.client.Consu
                 consumer.getEndpoint().isExclusiveConsumer(), null, this);
     }
 
+    protected void cancelChannel() throws Exception {
+        if (channel == null) {
+            return;
+        }
+        if (tag != null && isChannelOpen() && !cancelled) {
+            channel.basicCancel(tag);
+            cancelled = true;
+        }
+    }
+
     @Override
     protected void doStop() throws Exception {
         if (channel == null) {
             return;
         }
-        if (tag != null && isChannelOpen()) {
+        if (tag != null && isChannelOpen() && !cancelled) {
             channel.basicCancel(tag);
         }
         try {
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index 330ec59a590..e2d446ea3aa 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -155,6 +155,13 @@ public class RabbitMQConsumer extends DefaultConsumer implements Suspendable {
         if (startConsumerCallable != null) {
             startConsumerCallable.stop();
         }
+        for (RabbitConsumer consumer : this.consumers) {
+            try {
+                consumer.cancelChannel();
+            } catch (Exception e) {
+                LOG.warn("Error occurred while cancelling consumer. This exception is ignored", e);
+            }
+        }
         for (RabbitConsumer consumer : this.consumers) {
             try {
                 ServiceHelper.stopAndShutdownService(consumer);