You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/11/27 19:29:53 UTC

[2/2] camel git commit: CAMEL-9369: rabbit consumer should ack message if transfer exception enabled as otherwise the orginal will remain un-ack and be consumed again. Thanks to E Wong for the patch.

CAMEL-9369: rabbit consumer should ack message if transfer exception enabled as otherwise the orginal will remain un-ack and be consumed again. Thanks to E Wong for the patch.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/bccbc9c7
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/bccbc9c7
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/bccbc9c7

Branch: refs/heads/camel-2.16.x
Commit: bccbc9c76ceab9b1cd94ea255c013c48d34ad20d
Parents: bb54c6f
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Nov 27 19:29:20 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Nov 27 19:29:44 2015 +0100

----------------------------------------------------------------------
 .../component/rabbitmq/RabbitMQConsumer.java    |  5 ++
 .../rabbitmq/RabbitMQInOutIntTest.java          | 62 ++++++++++++++++++++
 2 files changed, 67 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/bccbc9c7/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index 4343554..cdb23f4 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -236,6 +236,11 @@ public class RabbitMQConsumer extends DefaultConsumer {
                 } catch (RuntimeCamelException e) {
                     getExceptionHandler().handleException("Error processing exchange", exchange, e);
                 }
+
+                if (!consumer.endpoint.isAutoAck()) {
+                    log.trace("Acknowledging receipt when transferring exception [delivery_tag={}]", deliveryTag);
+                    channel.basicAck(deliveryTag, false);
+                }
             } else {
                 boolean isRequeueHeaderSet = msg.getHeader(RabbitMQConstants.REQUEUE, false, boolean.class);
                 // processing failed, then reject and handle the exception

http://git-wip-us.apache.org/repos/asf/camel/blob/bccbc9c7/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQInOutIntTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQInOutIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQInOutIntTest.java
index 51b957f..bdd7119 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQInOutIntTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQInOutIntTest.java
@@ -35,9 +35,11 @@ import org.apache.camel.Processor;
 import org.apache.camel.Produce;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.component.rabbitmq.testbeans.TestNonSerializableObject;
 import org.apache.camel.component.rabbitmq.testbeans.TestPartiallySerializableObject;
 import org.apache.camel.component.rabbitmq.testbeans.TestSerializableObject;
+import org.apache.camel.impl.JndiRegistry;
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.junit.Test;
 
@@ -46,6 +48,7 @@ public class RabbitMQInOutIntTest extends CamelTestSupport {
     public static final String ROUTING_KEY = "rk5";
     public static final long TIMEOUT_MS = 2000;
     private static final String EXCHANGE = "ex5";
+    private static final String EXCHANGE_NO_ACK = "ex5.noAutoAck";
 
     @Produce(uri = "direct:start")
     protected ProducerTemplate template;
@@ -58,6 +61,30 @@ public class RabbitMQInOutIntTest extends CamelTestSupport {
                     + "&transferException=true&requestTimeout=" + TIMEOUT_MS)
     private Endpoint rabbitMQEndpoint;
 
+    @EndpointInject(uri = "rabbitmq:localhost:5672/" + EXCHANGE_NO_ACK + "?threadPoolSize=1&exchangeType=direct&username=cameltest&password=cameltest"
+            + "&autoAck=false&autoDelete=false&durable=false&queue=q5&routingKey=" + ROUTING_KEY
+            + "&transferException=true&requestTimeout=" + TIMEOUT_MS
+            + "&queueArgsConfigurer=#queueArgs")
+    private Endpoint noAutoAckEndpoint;
+
+    @EndpointInject(uri = "mock:result")
+    private MockEndpoint resultEndpoint;
+
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry jndi = super.createRegistry();
+
+        ArgsConfigurer queueArgs = new ArgsConfigurer() {
+            @Override
+            public void configurArgs(Map<String, Object> args) {
+                args.put("x-expires", 60000);
+            }
+        };
+        jndi.bind("queueArgs", queueArgs);
+
+        return jndi;
+    }
+
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
@@ -95,6 +122,12 @@ public class RabbitMQInOutIntTest extends CamelTestSupport {
 
                     }
                 });
+
+                from("direct:rabbitMQNoAutoAck").id("producingRouteNoAutoAck").setHeader("routeHeader", simple("routeHeader")).inOut(noAutoAckEndpoint);
+
+                from(noAutoAckEndpoint).id("consumingRouteNoAutoAck")
+                        .to(resultEndpoint)
+                        .throwException(new IllegalStateException("test exception"));
             }
         };
     }
@@ -217,4 +250,33 @@ public class RabbitMQInOutIntTest extends CamelTestSupport {
     public void inOutNullTest() {
         template.requestBodyAndHeader("direct:rabbitMQ", null, RabbitMQConstants.EXCHANGE_NAME, EXCHANGE, Object.class);
     }
+
+    @Test
+    public void messageAckOnExceptionWhereNoAutoAckTest() throws Exception {
+        Map<String, Object> headers = new HashMap<>();
+        headers.put(RabbitMQConstants.EXCHANGE_NAME, EXCHANGE_NO_ACK);
+        headers.put(RabbitMQConstants.ROUTING_KEY, ROUTING_KEY);
+
+        resultEndpoint.expectedMessageCount(1);
+
+        try {
+            String reply = template.requestBodyAndHeaders("direct:rabbitMQNoAutoAck", "testMessage", headers, String.class);
+            fail("This should have thrown an exception");
+        } catch (CamelExecutionException e) {
+            if (!(e.getCause() instanceof IllegalStateException)) {
+                throw e;
+            }
+        }
+
+        resultEndpoint.assertIsSatisfied();
+        resultEndpoint.reset();
+
+        resultEndpoint.expectedMessageCount(0);
+
+        context.stop(); //On restarting the camel context, if the message was not acknowledged the message would be reprocessed
+        context.start();
+
+        resultEndpoint.assertIsSatisfied();
+    }
+
 }