You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2014/09/09 07:45:20 UTC
git commit: CAMEL-7793: camel-rabbitmq - Consumer should not ack if
an exception was thrown
Repository: camel
Updated Branches:
refs/heads/master 0fb1583a6 -> a1777c744
CAMEL-7793: camel-rabbitmq - Consumer should not ack if an exception was thrown
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a1777c74
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a1777c74
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a1777c74
Branch: refs/heads/master
Commit: a1777c7440c2112214e65c16d6cb71f4cc83528b
Parents: 0fb1583
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Sep 9 07:45:11 2014 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Sep 9 07:45:11 2014 +0200
----------------------------------------------------------------------
.../component/rabbitmq/RabbitMQConsumer.java | 61 +++++++++++---------
1 file changed, 35 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/a1777c74/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index 1fe0cd4..eb3dfde 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -28,7 +28,6 @@ import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
-
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;
@@ -38,10 +37,12 @@ public class RabbitMQConsumer extends DefaultConsumer {
Connection conn;
private int closeTimeout = 30 * 1000;
private final RabbitMQEndpoint endpoint;
+
/**
* Task in charge of starting consumer
*/
private StartConsumerCallable startConsumerCallable;
+
/**
* Running consumers
*/
@@ -77,7 +78,7 @@ public class RabbitMQConsumer extends DefaultConsumer {
// setup the basicQos
if (endpoint.isPrefetchEnabled()) {
channel.basicQos(endpoint.getPrefetchSize(), endpoint.getPrefetchCount(),
- endpoint.isPrefetchGlobal());
+ endpoint.isPrefetchGlobal());
}
return channel;
}
@@ -161,6 +162,7 @@ public class RabbitMQConsumer extends DefaultConsumer {
private final RabbitMQConsumer consumer;
private final Channel channel;
private String tag;
+
/**
* Constructs a new instance and records its association to the
* passed-in channel.
@@ -179,36 +181,38 @@ public class RabbitMQConsumer extends DefaultConsumer {
Exchange exchange = consumer.endpoint.createRabbitExchange(envelope, properties, body);
mergeAmqpProperties(exchange, properties);
+
log.trace("Created exchange [exchange={}]", exchange);
- long deliveryTag = 0;
+ long deliveryTag = envelope.getDeliveryTag();
try {
- deliveryTag = envelope.getDeliveryTag();
consumer.getProcessor().process(exchange);
+ } catch (Exception e) {
+ exchange.setException(e);
+ }
+
+ if (!exchange.isFailed()) {
+ // processing success
if (!consumer.endpoint.isAutoAck()) {
- if (exchange.getException() == null) {
- log.trace("Acknowledging receipt [delivery_tag={}]",
- deliveryTag);
- channel.basicAck(deliveryTag, false);
- } else {
- log.trace("Unacknowledging receipt [delivery_tag={}]",
- deliveryTag);
- rejectQuicly(String.valueOf(deliveryTag));
- }
+ log.trace("Acknowledging receipt [delivery_tag={}]", deliveryTag);
+ channel.basicAck(deliveryTag, false);
}
-
- } catch (Exception e) {
+ } else {
+ // processing failed, then reject and handle the exception
if (deliveryTag != 0 && !consumer.endpoint.isAutoAck()) {
channel.basicReject(deliveryTag, false);
}
- getExceptionHandler().handleException("Error processing exchange", exchange, e);
+ if (exchange.getException() != null) {
+ getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException());
+ }
}
}
-
+
/**
* Reject a message without throw exceptions.
+ *
* @param deliveryTagString Message tag to reject.
*/
- protected void rejectQuicly(String deliveryTagString) {
+ protected void rejectQuietly(String deliveryTagString) {
try {
long deliveryTag = Long.valueOf(deliveryTagString);
if (deliveryTag != 0 && !consumer.endpoint.isAutoAck()) {
@@ -216,29 +220,30 @@ public class RabbitMQConsumer extends DefaultConsumer {
}
} catch (Exception e) {
log.error("Fail to reject message [delivery_tag={}]", deliveryTagString);
- }
+ }
}
-
+
@Override
public void handleCancel(String consumerTag) throws IOException {
- rejectQuicly(consumerTag);
+ rejectQuietly(consumerTag);
}
@Override
public void handleCancelOk(String consumerTag) {
- rejectQuicly(consumerTag);
+ rejectQuietly(consumerTag);
}
@Override
public void handleConsumeOk(String consumerTag) {
- rejectQuicly(consumerTag);
+ rejectQuietly(consumerTag);
}
@Override
public void handleShutdownSignal(String consumerTag,
- ShutdownSignalException sig) {
- rejectQuicly(consumerTag);
+ ShutdownSignalException sig) {
+ rejectQuietly(consumerTag);
}
+
/**
* Will take an {@link Exchange} and add header values back to the {@link Exchange#getIn()}
*/
@@ -307,13 +312,16 @@ public class RabbitMQConsumer extends DefaultConsumer {
private class StartConsumerCallable implements Callable<Void> {
private final long connectionRetryInterval;
private final AtomicBoolean running = new AtomicBoolean(true);
+
public StartConsumerCallable(long connectionRetryInterval) {
this.connectionRetryInterval = connectionRetryInterval;
}
+
public void stop() {
running.set(false);
RabbitMQConsumer.this.startConsumerCallable = null;
}
+
@Override
public Void call() throws Exception {
boolean connectionFailed = true;
@@ -323,7 +331,7 @@ public class RabbitMQConsumer extends DefaultConsumer {
openConnection();
connectionFailed = false;
} catch (Exception e) {
- log.debug("Connection failed, will retry in " + connectionRetryInterval + "ms", e);
+ log.debug("Connection failed, will retry in {}" + connectionRetryInterval + "ms", e);
Thread.sleep(connectionRetryInterval);
}
}
@@ -334,4 +342,5 @@ public class RabbitMQConsumer extends DefaultConsumer {
return null;
}
}
+
}