You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2014/09/25 11:11:25 UTC
git commit: CAMEL-7860 Added sendReply functionality in
RabbitMQConsumer with thanks to Raymond
Repository: camel
Updated Branches:
refs/heads/master 40bd0d603 -> 85fd23d8c
CAMEL-7860 Added sendReply functionality in RabbitMQConsumer with thanks to Raymond
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/85fd23d8
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/85fd23d8
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/85fd23d8
Branch: refs/heads/master
Commit: 85fd23d8c9584f7bfaa0b8c518275c21eea358ae
Parents: 40bd0d6
Author: Willem Jiang <wi...@gmail.com>
Authored: Thu Sep 25 17:11:08 2014 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Thu Sep 25 17:11:08 2014 +0800
----------------------------------------------------------------------
.../component/rabbitmq/RabbitMQConsumer.java | 20 ++++++++++++++++++++
1 file changed, 20 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/85fd23d8/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index 7bb0bd8..fa4893c 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -28,6 +28,8 @@ import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Envelope;
import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;
@@ -181,6 +183,11 @@ public class RabbitMQConsumer extends DefaultConsumer {
Exchange exchange = consumer.endpoint.createRabbitExchange(envelope, properties, body);
mergeAmqpProperties(exchange, properties);
+ boolean sendReply = properties.getReplyTo() != null;
+ if (sendReply && !exchange.getPattern().isOutCapable()) {
+ exchange.setPattern(ExchangePattern.InOut);
+ }
+
log.trace("Created exchange [exchange={}]", exchange);
long deliveryTag = envelope.getDeliveryTag();
try {
@@ -191,6 +198,19 @@ public class RabbitMQConsumer extends DefaultConsumer {
if (!exchange.isFailed()) {
// processing success
+ if (sendReply && exchange.getPattern().isOutCapable()) {
+ Message msg;
+ if (exchange.hasOut()) {
+ msg = exchange.getOut();
+ } else {
+ msg = exchange.getIn();
+ }
+ AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder()
+ .headers(msg.getHeaders())
+ .correlationId(properties.getCorrelationId())
+ .build();
+ channel.basicPublish("", properties.getReplyTo(), replyProps, msg.getBody(byte[].class));
+ }
if (!consumer.endpoint.isAutoAck()) {
log.trace("Acknowledging receipt [delivery_tag={}]", deliveryTag);
channel.basicAck(deliveryTag, false);