You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/08/16 14:28:35 UTC

[4/6] camel git commit: Fix CAMEL-10229

Fix CAMEL-10229

Use a semaphore to wait for the message to be processed when
autoAck=false

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ce6eb9ed
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ce6eb9ed
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ce6eb9ed

Branch: refs/heads/camel-2.17.x
Commit: ce6eb9edd47cacd873eedb54868f599d9af2e50e
Parents: 199263a
Author: miti <pr...@textkernel.nl>
Authored: Fri Aug 12 16:41:55 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Aug 16 16:28:07 2016 +0200

----------------------------------------------------------------------
 .../component/rabbitmq/RabbitConsumer.java      | 28 ++++++++++++++++++--
 1 file changed, 26 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/ce6eb9ed/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
index fb61c4b..cb2e47f 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.rabbitmq;
 
 import java.io.IOException;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeoutException;
 
 import com.rabbitmq.client.AMQP;
@@ -41,6 +42,8 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer {
     /** Consumer tag for this consumer. */
     private volatile String consumerTag;
     private volatile boolean stopping;
+    
+    private final Semaphore lock = new Semaphore(1);
 
     /**
      * Constructs a new instance and records its association to the passed-in
@@ -59,9 +62,26 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer {
             log.warn("Unable to open channel for RabbitMQConsumer. Continuing and will try again", e);
         }
     }
-
     @Override
     public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
+    	try {
+            if (!consumer.getEndpoint().isAutoAck()) {
+            	lock.acquire();
+            }
+            doHandleDelivery(consumerTag, envelope, properties, body);
+            if (!consumer.getEndpoint().isAutoAck()) {
+            	lock.release();
+            }
+    		
+    	} catch (InterruptedException e) {
+        	log.error("Thread Interrupted!");
+    		
+    	}
+        
+			
+    }
+
+    public void doHandleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
         Exchange exchange = consumer.getEndpoint().createRabbitExchange(envelope, properties, body);
         consumer.getEndpoint().getMessageConverter().mergeAmqpProperties(exchange, properties);
 
@@ -163,12 +183,16 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer {
             channel.basicCancel(tag);
         }
         try {
+			lock.acquire();
             if (isChannelOpen()) {
                 channel.close();
             }
-        } catch (TimeoutException e) {
+            lock.release();
+		} catch (TimeoutException e) {
             log.error("Timeout occured");
             throw e;
+        } catch (InterruptedException e1) {
+        	log.error("Thread Interrupted!");
         }
     }