You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/11/27 19:29:53 UTC
[2/2] camel git commit: CAMEL-9369: rabbit consumer should ack
message if transfer exception enabled as otherwise the orginal will remain
un-ack and be consumed again. Thanks to E Wong for the patch.
CAMEL-9369: rabbit consumer should ack message if transfer exception enabled as otherwise the orginal will remain un-ack and be consumed again. Thanks to E Wong for the patch.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/bccbc9c7
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/bccbc9c7
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/bccbc9c7
Branch: refs/heads/camel-2.16.x
Commit: bccbc9c76ceab9b1cd94ea255c013c48d34ad20d
Parents: bb54c6f
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Nov 27 19:29:20 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Nov 27 19:29:44 2015 +0100
----------------------------------------------------------------------
.../component/rabbitmq/RabbitMQConsumer.java | 5 ++
.../rabbitmq/RabbitMQInOutIntTest.java | 62 ++++++++++++++++++++
2 files changed, 67 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/bccbc9c7/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index 4343554..cdb23f4 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -236,6 +236,11 @@ public class RabbitMQConsumer extends DefaultConsumer {
} catch (RuntimeCamelException e) {
getExceptionHandler().handleException("Error processing exchange", exchange, e);
}
+
+ if (!consumer.endpoint.isAutoAck()) {
+ log.trace("Acknowledging receipt when transferring exception [delivery_tag={}]", deliveryTag);
+ channel.basicAck(deliveryTag, false);
+ }
} else {
boolean isRequeueHeaderSet = msg.getHeader(RabbitMQConstants.REQUEUE, false, boolean.class);
// processing failed, then reject and handle the exception
http://git-wip-us.apache.org/repos/asf/camel/blob/bccbc9c7/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQInOutIntTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQInOutIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQInOutIntTest.java
index 51b957f..bdd7119 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQInOutIntTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQInOutIntTest.java
@@ -35,9 +35,11 @@ import org.apache.camel.Processor;
import org.apache.camel.Produce;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.component.rabbitmq.testbeans.TestNonSerializableObject;
import org.apache.camel.component.rabbitmq.testbeans.TestPartiallySerializableObject;
import org.apache.camel.component.rabbitmq.testbeans.TestSerializableObject;
+import org.apache.camel.impl.JndiRegistry;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;
@@ -46,6 +48,7 @@ public class RabbitMQInOutIntTest extends CamelTestSupport {
public static final String ROUTING_KEY = "rk5";
public static final long TIMEOUT_MS = 2000;
private static final String EXCHANGE = "ex5";
+ private static final String EXCHANGE_NO_ACK = "ex5.noAutoAck";
@Produce(uri = "direct:start")
protected ProducerTemplate template;
@@ -58,6 +61,30 @@ public class RabbitMQInOutIntTest extends CamelTestSupport {
+ "&transferException=true&requestTimeout=" + TIMEOUT_MS)
private Endpoint rabbitMQEndpoint;
+ @EndpointInject(uri = "rabbitmq:localhost:5672/" + EXCHANGE_NO_ACK + "?threadPoolSize=1&exchangeType=direct&username=cameltest&password=cameltest"
+ + "&autoAck=false&autoDelete=false&durable=false&queue=q5&routingKey=" + ROUTING_KEY
+ + "&transferException=true&requestTimeout=" + TIMEOUT_MS
+ + "&queueArgsConfigurer=#queueArgs")
+ private Endpoint noAutoAckEndpoint;
+
+ @EndpointInject(uri = "mock:result")
+ private MockEndpoint resultEndpoint;
+
+ @Override
+ protected JndiRegistry createRegistry() throws Exception {
+ JndiRegistry jndi = super.createRegistry();
+
+ ArgsConfigurer queueArgs = new ArgsConfigurer() {
+ @Override
+ public void configurArgs(Map<String, Object> args) {
+ args.put("x-expires", 60000);
+ }
+ };
+ jndi.bind("queueArgs", queueArgs);
+
+ return jndi;
+ }
+
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@@ -95,6 +122,12 @@ public class RabbitMQInOutIntTest extends CamelTestSupport {
}
});
+
+ from("direct:rabbitMQNoAutoAck").id("producingRouteNoAutoAck").setHeader("routeHeader", simple("routeHeader")).inOut(noAutoAckEndpoint);
+
+ from(noAutoAckEndpoint).id("consumingRouteNoAutoAck")
+ .to(resultEndpoint)
+ .throwException(new IllegalStateException("test exception"));
}
};
}
@@ -217,4 +250,33 @@ public class RabbitMQInOutIntTest extends CamelTestSupport {
public void inOutNullTest() {
template.requestBodyAndHeader("direct:rabbitMQ", null, RabbitMQConstants.EXCHANGE_NAME, EXCHANGE, Object.class);
}
+
+ @Test
+ public void messageAckOnExceptionWhereNoAutoAckTest() throws Exception {
+ Map<String, Object> headers = new HashMap<>();
+ headers.put(RabbitMQConstants.EXCHANGE_NAME, EXCHANGE_NO_ACK);
+ headers.put(RabbitMQConstants.ROUTING_KEY, ROUTING_KEY);
+
+ resultEndpoint.expectedMessageCount(1);
+
+ try {
+ String reply = template.requestBodyAndHeaders("direct:rabbitMQNoAutoAck", "testMessage", headers, String.class);
+ fail("This should have thrown an exception");
+ } catch (CamelExecutionException e) {
+ if (!(e.getCause() instanceof IllegalStateException)) {
+ throw e;
+ }
+ }
+
+ resultEndpoint.assertIsSatisfied();
+ resultEndpoint.reset();
+
+ resultEndpoint.expectedMessageCount(0);
+
+ context.stop(); //On restarting the camel context, if the message was not acknowledged the message would be reprocessed
+ context.start();
+
+ resultEndpoint.assertIsSatisfied();
+ }
+
}