You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2014/09/09 07:45:20 UTC

git commit: CAMEL-7793: camel-rabbitmq - Consumer should not ack if an exception was thrown

Repository: camel
Updated Branches:
  refs/heads/master 0fb1583a6 -> a1777c744


CAMEL-7793: camel-rabbitmq - Consumer should not ack if an exception was thrown


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a1777c74
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a1777c74
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a1777c74

Branch: refs/heads/master
Commit: a1777c7440c2112214e65c16d6cb71f4cc83528b
Parents: 0fb1583
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Sep 9 07:45:11 2014 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Sep 9 07:45:11 2014 +0200

----------------------------------------------------------------------
 .../component/rabbitmq/RabbitMQConsumer.java    | 61 +++++++++++---------
 1 file changed, 35 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/a1777c74/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index 1fe0cd4..eb3dfde 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -28,7 +28,6 @@ import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.Envelope;
 import com.rabbitmq.client.ShutdownSignalException;
-
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultConsumer;
@@ -38,10 +37,12 @@ public class RabbitMQConsumer extends DefaultConsumer {
     Connection conn;
     private int closeTimeout = 30 * 1000;
     private final RabbitMQEndpoint endpoint;
+
     /**
      * Task in charge of starting consumer
      */
     private StartConsumerCallable startConsumerCallable;
+
     /**
      * Running consumers
      */
@@ -77,7 +78,7 @@ public class RabbitMQConsumer extends DefaultConsumer {
         // setup the basicQos
         if (endpoint.isPrefetchEnabled()) {
             channel.basicQos(endpoint.getPrefetchSize(), endpoint.getPrefetchCount(),
-                             endpoint.isPrefetchGlobal());
+                    endpoint.isPrefetchGlobal());
         }
         return channel;
     }
@@ -161,6 +162,7 @@ public class RabbitMQConsumer extends DefaultConsumer {
         private final RabbitMQConsumer consumer;
         private final Channel channel;
         private String tag;
+
         /**
          * Constructs a new instance and records its association to the
          * passed-in channel.
@@ -179,36 +181,38 @@ public class RabbitMQConsumer extends DefaultConsumer {
 
             Exchange exchange = consumer.endpoint.createRabbitExchange(envelope, properties, body);
             mergeAmqpProperties(exchange, properties);
+
             log.trace("Created exchange [exchange={}]", exchange);
-            long deliveryTag = 0;
+            long deliveryTag = envelope.getDeliveryTag();
             try {
-                deliveryTag = envelope.getDeliveryTag();
                 consumer.getProcessor().process(exchange);
+            } catch (Exception e) {
+                exchange.setException(e);
+            }
+
+            if (!exchange.isFailed()) {
+                // processing success
                 if (!consumer.endpoint.isAutoAck()) {
-                    if (exchange.getException() == null) {
-                        log.trace("Acknowledging receipt [delivery_tag={}]",
-                                deliveryTag);
-                        channel.basicAck(deliveryTag, false);
-                    } else {
-                        log.trace("Unacknowledging receipt [delivery_tag={}]",
-                                deliveryTag);
-                        rejectQuicly(String.valueOf(deliveryTag));
-                    }
+                    log.trace("Acknowledging receipt [delivery_tag={}]", deliveryTag);
+                    channel.basicAck(deliveryTag, false);
                 }
-
-            } catch (Exception e) {
+            } else {
+                // processing failed, then reject and handle the exception
                 if (deliveryTag != 0 && !consumer.endpoint.isAutoAck()) {
                     channel.basicReject(deliveryTag, false);
                 }
-                getExceptionHandler().handleException("Error processing exchange", exchange, e);
+                if (exchange.getException() != null) {
+                    getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException());
+                }
             }
         }
-        
+
         /**
          * Reject a message without throw exceptions.
+         *
          * @param deliveryTagString Message tag to reject.
          */
-        protected void rejectQuicly(String deliveryTagString) {
+        protected void rejectQuietly(String deliveryTagString) {
             try {
                 long deliveryTag = Long.valueOf(deliveryTagString);
                 if (deliveryTag != 0 && !consumer.endpoint.isAutoAck()) {
@@ -216,29 +220,30 @@ public class RabbitMQConsumer extends DefaultConsumer {
                 }
             } catch (Exception e) {
                 log.error("Fail to reject message [delivery_tag={}]", deliveryTagString);
-            }           
+            }
         }
-        
+
         @Override
         public void handleCancel(String consumerTag) throws IOException {
-            rejectQuicly(consumerTag);
+            rejectQuietly(consumerTag);
         }
 
         @Override
         public void handleCancelOk(String consumerTag) {
-            rejectQuicly(consumerTag);
+            rejectQuietly(consumerTag);
         }
 
         @Override
         public void handleConsumeOk(String consumerTag) {
-            rejectQuicly(consumerTag);
+            rejectQuietly(consumerTag);
         }
 
         @Override
         public void handleShutdownSignal(String consumerTag,
-                ShutdownSignalException sig) {
-            rejectQuicly(consumerTag);
+                                         ShutdownSignalException sig) {
+            rejectQuietly(consumerTag);
         }
+
         /**
          * Will take an {@link Exchange} and add header values back to the {@link Exchange#getIn()}
          */
@@ -307,13 +312,16 @@ public class RabbitMQConsumer extends DefaultConsumer {
     private class StartConsumerCallable implements Callable<Void> {
         private final long connectionRetryInterval;
         private final AtomicBoolean running = new AtomicBoolean(true);
+
         public StartConsumerCallable(long connectionRetryInterval) {
             this.connectionRetryInterval = connectionRetryInterval;
         }
+
         public void stop() {
             running.set(false);
             RabbitMQConsumer.this.startConsumerCallable = null;
         }
+
         @Override
         public Void call() throws Exception {
             boolean connectionFailed = true;
@@ -323,7 +331,7 @@ public class RabbitMQConsumer extends DefaultConsumer {
                     openConnection();
                     connectionFailed = false;
                 } catch (Exception e) {
-                    log.debug("Connection failed, will retry in " + connectionRetryInterval + "ms", e);
+                    log.debug("Connection failed, will retry in {}" + connectionRetryInterval + "ms", e);
                     Thread.sleep(connectionRetryInterval);
                 }
             }
@@ -334,4 +342,5 @@ public class RabbitMQConsumer extends DefaultConsumer {
             return null;
         }
     }
+
 }