You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2014/09/25 11:11:25 UTC

git commit: CAMEL-7860 Added sendReply functionality in RabbitMQConsumer with thanks to Raymond

Repository: camel
Updated Branches:
  refs/heads/master 40bd0d603 -> 85fd23d8c


CAMEL-7860 Added sendReply functionality in RabbitMQConsumer with thanks to Raymond


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/85fd23d8
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/85fd23d8
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/85fd23d8

Branch: refs/heads/master
Commit: 85fd23d8c9584f7bfaa0b8c518275c21eea358ae
Parents: 40bd0d6
Author: Willem Jiang <wi...@gmail.com>
Authored: Thu Sep 25 17:11:08 2014 +0800
Committer: Willem Jiang <wi...@gmail.com>
Committed: Thu Sep 25 17:11:08 2014 +0800

----------------------------------------------------------------------
 .../component/rabbitmq/RabbitMQConsumer.java    | 20 ++++++++++++++++++++
 1 file changed, 20 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/85fd23d8/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index 7bb0bd8..fa4893c 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -28,6 +28,8 @@ import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.Envelope;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultConsumer;
 
@@ -181,6 +183,11 @@ public class RabbitMQConsumer extends DefaultConsumer {
             Exchange exchange = consumer.endpoint.createRabbitExchange(envelope, properties, body);
             mergeAmqpProperties(exchange, properties);
 
+            boolean sendReply = properties.getReplyTo() != null;
+            if (sendReply && !exchange.getPattern().isOutCapable()) {
+                exchange.setPattern(ExchangePattern.InOut);
+            }
+
             log.trace("Created exchange [exchange={}]", exchange);
             long deliveryTag = envelope.getDeliveryTag();
             try {
@@ -191,6 +198,19 @@ public class RabbitMQConsumer extends DefaultConsumer {
 
             if (!exchange.isFailed()) {
                 // processing success
+                if (sendReply && exchange.getPattern().isOutCapable()) {
+                    Message msg;
+                    if (exchange.hasOut()) {
+                        msg = exchange.getOut();
+                    } else {
+                        msg = exchange.getIn();
+                    }
+                    AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder()
+                            .headers(msg.getHeaders())
+                            .correlationId(properties.getCorrelationId())
+                            .build();
+                    channel.basicPublish("", properties.getReplyTo(), replyProps, msg.getBody(byte[].class));
+                }
                 if (!consumer.endpoint.isAutoAck()) {
                     log.trace("Acknowledging receipt [delivery_tag={}]", deliveryTag);
                     channel.basicAck(deliveryTag, false);