You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/10/18 11:34:22 UTC

[2/2] git commit: Rabbit consumer should include rabbit properties in Camel Exchange message. Thanks to Daniel Williams for the patch.

Rabbit consumer should include rabbit properties in Camel Exchange message. Thanks to Daniel Williams for the patch.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1c00d97b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1c00d97b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1c00d97b

Branch: refs/heads/camel-2.12.x
Commit: 1c00d97bdabff5bdd0f624b8c2e451f2da4dec34
Parents: b220ce0
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Oct 18 11:34:10 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Oct 18 11:34:32 2013 +0200

----------------------------------------------------------------------
 .../component/rabbitmq/RabbitMQConsumer.java    | 86 +++++++++++++++-----
 1 file changed, 67 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/1c00d97b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index acc95d7..4f13045 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -29,7 +29,7 @@ import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultConsumer;
 
 public class RabbitMQConsumer extends DefaultConsumer {
-    
+
     ExecutorService executor;
     Connection conn;
     Channel channel;
@@ -55,18 +55,21 @@ public class RabbitMQConsumer extends DefaultConsumer {
         channel = conn.createChannel();
         log.debug("Using channel {}", channel);
 
-        channel.exchangeDeclare(endpoint.getExchangeName(),
-                "direct",
-                endpoint.isDurable(),
-                endpoint.isAutoDelete(),
+        channel.exchangeDeclare(endpoint.getExchangeName(), "direct",
+                endpoint.isDurable(), endpoint.isAutoDelete(),
                 new HashMap<String, Object>());
-        
-        // need to make sure the queueDeclare is same with the exchange declare
-        channel.queueDeclare(endpoint.getQueue(), endpoint.isDurable(), false, endpoint.isAutoDelete(), null);
-        channel.queueBind(endpoint.getQueue(), endpoint.getExchangeName(),
-                endpoint.getRoutingKey() == null ? "" : endpoint.getRoutingKey());
 
-        channel.basicConsume(endpoint.getQueue(), endpoint.isAutoAck(), new RabbitConsumer(this, channel));
+        // need to make sure the queueDeclare is same with the exchange declare
+        channel.queueDeclare(endpoint.getQueue(), endpoint.isDurable(), false,
+                endpoint.isAutoDelete(), null);
+        channel.queueBind(
+                endpoint.getQueue(),
+                endpoint.getExchangeName(),
+                endpoint.getRoutingKey() == null ? "" : endpoint
+                        .getRoutingKey());
+
+        channel.basicConsume(endpoint.getQueue(), endpoint.isAutoAck(),
+                new RabbitConsumer(this, channel));
     }
 
     @Override
@@ -76,13 +79,14 @@ public class RabbitMQConsumer extends DefaultConsumer {
         if (conn != null) {
             try {
                 conn.close();
-            } catch (Exception ignored) { 
+            } catch (Exception ignored) {
                 // ignored
             }
         }
 
         channel = null;
         conn = null;
+
         if (executor != null) {
             if (getEndpoint() != null && getEndpoint().getCamelContext() != null) {
                 getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executor);
@@ -99,7 +103,8 @@ public class RabbitMQConsumer extends DefaultConsumer {
         private final Channel channel;
 
         /**
-         * Constructs a new instance and records its association to the passed-in channel.
+         * Constructs a new instance and records its association to the
+         * passed-in channel.
          *
          * @param channel the channel to which this consumer is attached
          */
@@ -110,13 +115,12 @@ public class RabbitMQConsumer extends DefaultConsumer {
         }
 
         @Override
-        public void handleDelivery(String consumerTag,
-                                   Envelope envelope,
-                                   AMQP.BasicProperties properties,
-                                   byte[] body) throws IOException {
+        public void handleDelivery(String consumerTag, Envelope envelope,
+                                   AMQP.BasicProperties properties, byte[] body) throws IOException {
 
             Exchange exchange = consumer.endpoint.createRabbitExchange(envelope, body);
-            log.trace("Created exchange [exchange={}]", new Object[]{exchange});
+            mergeAmqpProperties(exchange, properties);
+            log.trace("Created exchange [exchange={}]", exchange);
 
             try {
                 consumer.getProcessor().process(exchange);
@@ -131,6 +135,50 @@ public class RabbitMQConsumer extends DefaultConsumer {
                 getExceptionHandler().handleException("Error processing exchange", exchange, e);
             }
         }
+
+        /**
+         * Will take an {@link Exchange} and add header values back to the {@link Exchange#getIn()}
+         */
+        private void mergeAmqpProperties(Exchange exchange, AMQP.BasicProperties properties) {
+
+            if (properties.getType() != null) {
+                exchange.getIn().setHeader(RabbitMQConstants.TYPE, properties.getType());
+            }
+            if (properties.getAppId() != null) {
+                exchange.getIn().setHeader(RabbitMQConstants.APP_ID, properties.getAppId());
+            }
+            if (properties.getClusterId() != null) {
+                exchange.getIn().setHeader(RabbitMQConstants.CLUSTERID, properties.getClusterId());
+            }
+            if (properties.getContentEncoding() != null) {
+                exchange.getIn().setHeader(RabbitMQConstants.CONTENT_ENCODING, properties.getContentEncoding());
+            }
+            if (properties.getContentType() != null) {
+                exchange.getIn().setHeader(RabbitMQConstants.CONTENT_TYPE, properties.getContentType());
+            }
+            if (properties.getCorrelationId() != null) {
+                exchange.getIn().setHeader(RabbitMQConstants.CORRELATIONID, properties.getCorrelationId());
+            }
+            if (properties.getExpiration() != null) {
+                exchange.getIn().setHeader(RabbitMQConstants.EXPIRATION, properties.getExpiration());
+            }
+            if (properties.getMessageId() != null) {
+                exchange.getIn().setHeader(RabbitMQConstants.MESSAGE_ID, properties.getMessageId());
+            }
+            if (properties.getPriority() != null) {
+                exchange.getIn().setHeader(RabbitMQConstants.PRIORITY, properties.getPriority());
+            }
+            if (properties.getReplyTo() != null) {
+                exchange.getIn().setHeader(RabbitMQConstants.REPLY_TO, properties.getReplyTo());
+            }
+            if (properties.getTimestamp() != null) {
+                exchange.getIn().setHeader(RabbitMQConstants.TIMESTAMP, properties.getTimestamp());
+            }
+            if (properties.getUserId() != null) {
+                exchange.getIn().setHeader(RabbitMQConstants.USERID, properties.getUserId());
+            }
+        }
+
     }
-}
 
+}