You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/07/07 06:09:08 UTC
[camel] branch camel-3.20.x updated: [CAMEL-19575] Fixes bug in camel-rabbitmq - RabbitMQConsumer keeps on consuming even when route shutdown is triggered. (#10604)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-3.20.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.20.x by this push:
new da1fe9177e2 [CAMEL-19575] Fixes bug in camel-rabbitmq - RabbitMQConsumer keeps on consuming even when route shutdown is triggered. (#10604)
da1fe9177e2 is described below
commit da1fe9177e2de8377f1e416b457c34abf4fb0777
Author: Nikunj Kumar Gupta <ni...@gmail.com>
AuthorDate: Fri Jul 7 11:38:25 2023 +0530
[CAMEL-19575] Fixes bug in camel-rabbitmq - RabbitMQConsumer keeps on consuming even when route shutdown is triggered. (#10604)
* close all consumers concurrently
Closing all consumers in RabbitMqConsumer concurrently
* RabbitMQConsumer to suspend Consumer when suspending.
* cancelling all RabbitConsumers in RabbitMQConsumer before trying to close them
---
.../org/apache/camel/component/rabbitmq/RabbitConsumer.java | 13 ++++++++++++-
.../apache/camel/component/rabbitmq/RabbitMQConsumer.java | 7 +++++++
2 files changed, 19 insertions(+), 1 deletion(-)
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
index 13115cd4536..82385124190 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
@@ -48,6 +48,7 @@ class RabbitConsumer extends ServiceSupport implements com.rabbitmq.client.Consu
private Channel channel;
private String tag;
private volatile String consumerTag;
+ private boolean cancelled;
private final Semaphore lock = new Semaphore(1);
@@ -208,12 +209,22 @@ class RabbitConsumer extends ServiceSupport implements com.rabbitmq.client.Consu
consumer.getEndpoint().isExclusiveConsumer(), null, this);
}
+ protected void cancelChannel() throws Exception {
+ if (channel == null) {
+ return;
+ }
+ if (tag != null && isChannelOpen() && !cancelled) {
+ channel.basicCancel(tag);
+ cancelled = true;
+ }
+ }
+
@Override
protected void doStop() throws Exception {
if (channel == null) {
return;
}
- if (tag != null && isChannelOpen()) {
+ if (tag != null && isChannelOpen() && !cancelled) {
channel.basicCancel(tag);
}
try {
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index 330ec59a590..e2d446ea3aa 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -155,6 +155,13 @@ public class RabbitMQConsumer extends DefaultConsumer implements Suspendable {
if (startConsumerCallable != null) {
startConsumerCallable.stop();
}
+ for (RabbitConsumer consumer : this.consumers) {
+ try {
+ consumer.cancelChannel();
+ } catch (Exception e) {
+ LOG.warn("Error occurred while cancelling consumer. This exception is ignored", e);
+ }
+ }
for (RabbitConsumer consumer : this.consumers) {
try {
ServiceHelper.stopAndShutdownService(consumer);