You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/08/16 14:28:35 UTC
[4/6] camel git commit: Fix CAMEL-10229
Fix CAMEL-10229
Use a semaphore to wait for the message to be processed when
autoAck=false
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ce6eb9ed
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ce6eb9ed
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ce6eb9ed
Branch: refs/heads/camel-2.17.x
Commit: ce6eb9edd47cacd873eedb54868f599d9af2e50e
Parents: 199263a
Author: miti <pr...@textkernel.nl>
Authored: Fri Aug 12 16:41:55 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Aug 16 16:28:07 2016 +0200
----------------------------------------------------------------------
.../component/rabbitmq/RabbitConsumer.java | 28 ++++++++++++++++++--
1 file changed, 26 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/ce6eb9ed/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
index fb61c4b..cb2e47f 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
@@ -17,6 +17,7 @@
package org.apache.camel.component.rabbitmq;
import java.io.IOException;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
@@ -41,6 +42,8 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer {
/** Consumer tag for this consumer. */
private volatile String consumerTag;
private volatile boolean stopping;
+
+ private final Semaphore lock = new Semaphore(1);
/**
* Constructs a new instance and records its association to the passed-in
@@ -59,9 +62,26 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer {
log.warn("Unable to open channel for RabbitMQConsumer. Continuing and will try again", e);
}
}
-
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
+ try {
+ if (!consumer.getEndpoint().isAutoAck()) {
+ lock.acquire();
+ }
+ doHandleDelivery(consumerTag, envelope, properties, body);
+ if (!consumer.getEndpoint().isAutoAck()) {
+ lock.release();
+ }
+
+ } catch (InterruptedException e) {
+ log.error("Thread Interrupted!");
+
+ }
+
+
+ }
+
+ public void doHandleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
Exchange exchange = consumer.getEndpoint().createRabbitExchange(envelope, properties, body);
consumer.getEndpoint().getMessageConverter().mergeAmqpProperties(exchange, properties);
@@ -163,12 +183,16 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer {
channel.basicCancel(tag);
}
try {
+ lock.acquire();
if (isChannelOpen()) {
channel.close();
}
- } catch (TimeoutException e) {
+ lock.release();
+ } catch (TimeoutException e) {
log.error("Timeout occured");
throw e;
+ } catch (InterruptedException e1) {
+ log.error("Thread Interrupted!");
}
}