You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/10/18 11:34:21 UTC

[1/2] git commit: Rabbit consumer should include rabbit properties in Camel Exchange message. Thanks to Daniel Williams for the patch.

Updated Branches:
  refs/heads/camel-2.12.x b220ce070 -> 1c00d97bd
  refs/heads/master 3d01c27c6 -> 4a89ad969


Rabbit consumer should include rabbit properties in Camel Exchange message. Thanks to Daniel Williams for the patch.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/4a89ad96
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/4a89ad96
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/4a89ad96

Branch: refs/heads/master
Commit: 4a89ad969859d4adee09eddbb43cfd89a91efd57
Parents: 3d01c27
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Oct 18 11:34:10 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Oct 18 11:34:10 2013 +0200

----------------------------------------------------------------------
 .../component/rabbitmq/RabbitMQConsumer.java    | 86 +++++++++++++++-----
 1 file changed, 67 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/4a89ad96/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index acc95d7..4f13045 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -29,7 +29,7 @@ import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultConsumer;
 
 public class RabbitMQConsumer extends DefaultConsumer {
-    
+
     ExecutorService executor;
     Connection conn;
     Channel channel;
@@ -55,18 +55,21 @@ public class RabbitMQConsumer extends DefaultConsumer {
         channel = conn.createChannel();
         log.debug("Using channel {}", channel);
 
-        channel.exchangeDeclare(endpoint.getExchangeName(),
-                "direct",
-                endpoint.isDurable(),
-                endpoint.isAutoDelete(),
+        channel.exchangeDeclare(endpoint.getExchangeName(), "direct",
+                endpoint.isDurable(), endpoint.isAutoDelete(),
                 new HashMap<String, Object>());
-        
-        // need to make sure the queueDeclare is same with the exchange declare
-        channel.queueDeclare(endpoint.getQueue(), endpoint.isDurable(), false, endpoint.isAutoDelete(), null);
-        channel.queueBind(endpoint.getQueue(), endpoint.getExchangeName(),
-                endpoint.getRoutingKey() == null ? "" : endpoint.getRoutingKey());
 
-        channel.basicConsume(endpoint.getQueue(), endpoint.isAutoAck(), new RabbitConsumer(this, channel));
+        // need to make sure the queueDeclare is same with the exchange declare
+        channel.queueDeclare(endpoint.getQueue(), endpoint.isDurable(), false,
+                endpoint.isAutoDelete(), null);
+        channel.queueBind(
+                endpoint.getQueue(),
+                endpoint.getExchangeName(),
+                endpoint.getRoutingKey() == null ? "" : endpoint
+                        .getRoutingKey());
+
+        channel.basicConsume(endpoint.getQueue(), endpoint.isAutoAck(),
+                new RabbitConsumer(this, channel));
     }
 
     @Override
@@ -76,13 +79,14 @@ public class RabbitMQConsumer extends DefaultConsumer {
         if (conn != null) {
             try {
                 conn.close();
-            } catch (Exception ignored) { 
+            } catch (Exception ignored) {
                 // ignored
             }
         }
 
         channel = null;
         conn = null;
+
         if (executor != null) {
             if (getEndpoint() != null && getEndpoint().getCamelContext() != null) {
                 getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executor);
@@ -99,7 +103,8 @@ public class RabbitMQConsumer extends DefaultConsumer {
         private final Channel channel;
 
         /**
-         * Constructs a new instance and records its association to the passed-in channel.
+         * Constructs a new instance and records its association to the
+         * passed-in channel.
          *
          * @param channel the channel to which this consumer is attached
          */
@@ -110,13 +115,12 @@ public class RabbitMQConsumer extends DefaultConsumer {
         }
 
         @Override
-        public void handleDelivery(String consumerTag,
-                                   Envelope envelope,
-                                   AMQP.BasicProperties properties,
-                                   byte[] body) throws IOException {
+        public void handleDelivery(String consumerTag, Envelope envelope,
+                                   AMQP.BasicProperties properties, byte[] body) throws IOException {
 
             Exchange exchange = consumer.endpoint.createRabbitExchange(envelope, body);
-            log.trace("Created exchange [exchange={}]", new Object[]{exchange});
+            mergeAmqpProperties(exchange, properties);
+            log.trace("Created exchange [exchange={}]", exchange);
 
             try {
                 consumer.getProcessor().process(exchange);
@@ -131,6 +135,50 @@ public class RabbitMQConsumer extends DefaultConsumer {
                 getExceptionHandler().handleException("Error processing exchange", exchange, e);
             }
         }
+
+        /**
+         * Will take an {@link Exchange} and add header values back to the {@link Exchange#getIn()}
+         */
+        private void mergeAmqpProperties(Exchange exchange, AMQP.BasicProperties properties) {
+
+            if (properties.getType() != null) {
+                exchange.getIn().setHeader(RabbitMQConstants.TYPE, properties.getType());
+            }
+            if (properties.getAppId() != null) {
+                exchange.getIn().setHeader(RabbitMQConstants.APP_ID, properties.getAppId());
+            }
+            if (properties.getClusterId() != null) {
+                exchange.getIn().setHeader(RabbitMQConstants.CLUSTERID, properties.getClusterId());
+            }
+            if (properties.getContentEncoding() != null) {
+                exchange.getIn().setHeader(RabbitMQConstants.CONTENT_ENCODING, properties.getContentEncoding());
+            }
+            if (properties.getContentType() != null) {
+                exchange.getIn().setHeader(RabbitMQConstants.CONTENT_TYPE, properties.getContentType());
+            }
+            if (properties.getCorrelationId() != null) {
+                exchange.getIn().setHeader(RabbitMQConstants.CORRELATIONID, properties.getCorrelationId());
+            }
+            if (properties.getExpiration() != null) {
+                exchange.getIn().setHeader(RabbitMQConstants.EXPIRATION, properties.getExpiration());
+            }
+            if (properties.getMessageId() != null) {
+                exchange.getIn().setHeader(RabbitMQConstants.MESSAGE_ID, properties.getMessageId());
+            }
+            if (properties.getPriority() != null) {
+                exchange.getIn().setHeader(RabbitMQConstants.PRIORITY, properties.getPriority());
+            }
+            if (properties.getReplyTo() != null) {
+                exchange.getIn().setHeader(RabbitMQConstants.REPLY_TO, properties.getReplyTo());
+            }
+            if (properties.getTimestamp() != null) {
+                exchange.getIn().setHeader(RabbitMQConstants.TIMESTAMP, properties.getTimestamp());
+            }
+            if (properties.getUserId() != null) {
+                exchange.getIn().setHeader(RabbitMQConstants.USERID, properties.getUserId());
+            }
+        }
+
     }
-}
 
+}


[2/2] git commit: Rabbit consumer should include rabbit properties in Camel Exchange message. Thanks to Daniel Williams for the patch.

Posted by da...@apache.org.
Rabbit consumer should include rabbit properties in Camel Exchange message. Thanks to Daniel Williams for the patch.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1c00d97b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1c00d97b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1c00d97b

Branch: refs/heads/camel-2.12.x
Commit: 1c00d97bdabff5bdd0f624b8c2e451f2da4dec34
Parents: b220ce0
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Oct 18 11:34:10 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Oct 18 11:34:32 2013 +0200

----------------------------------------------------------------------
 .../component/rabbitmq/RabbitMQConsumer.java    | 86 +++++++++++++++-----
 1 file changed, 67 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/1c00d97b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index acc95d7..4f13045 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -29,7 +29,7 @@ import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultConsumer;
 
 public class RabbitMQConsumer extends DefaultConsumer {
-    
+
     ExecutorService executor;
     Connection conn;
     Channel channel;
@@ -55,18 +55,21 @@ public class RabbitMQConsumer extends DefaultConsumer {
         channel = conn.createChannel();
         log.debug("Using channel {}", channel);
 
-        channel.exchangeDeclare(endpoint.getExchangeName(),
-                "direct",
-                endpoint.isDurable(),
-                endpoint.isAutoDelete(),
+        channel.exchangeDeclare(endpoint.getExchangeName(), "direct",
+                endpoint.isDurable(), endpoint.isAutoDelete(),
                 new HashMap<String, Object>());
-        
-        // need to make sure the queueDeclare is same with the exchange declare
-        channel.queueDeclare(endpoint.getQueue(), endpoint.isDurable(), false, endpoint.isAutoDelete(), null);
-        channel.queueBind(endpoint.getQueue(), endpoint.getExchangeName(),
-                endpoint.getRoutingKey() == null ? "" : endpoint.getRoutingKey());
 
-        channel.basicConsume(endpoint.getQueue(), endpoint.isAutoAck(), new RabbitConsumer(this, channel));
+        // need to make sure the queueDeclare is same with the exchange declare
+        channel.queueDeclare(endpoint.getQueue(), endpoint.isDurable(), false,
+                endpoint.isAutoDelete(), null);
+        channel.queueBind(
+                endpoint.getQueue(),
+                endpoint.getExchangeName(),
+                endpoint.getRoutingKey() == null ? "" : endpoint
+                        .getRoutingKey());
+
+        channel.basicConsume(endpoint.getQueue(), endpoint.isAutoAck(),
+                new RabbitConsumer(this, channel));
     }
 
     @Override
@@ -76,13 +79,14 @@ public class RabbitMQConsumer extends DefaultConsumer {
         if (conn != null) {
             try {
                 conn.close();
-            } catch (Exception ignored) { 
+            } catch (Exception ignored) {
                 // ignored
             }
         }
 
         channel = null;
         conn = null;
+
         if (executor != null) {
             if (getEndpoint() != null && getEndpoint().getCamelContext() != null) {
                 getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executor);
@@ -99,7 +103,8 @@ public class RabbitMQConsumer extends DefaultConsumer {
         private final Channel channel;
 
         /**
-         * Constructs a new instance and records its association to the passed-in channel.
+         * Constructs a new instance and records its association to the
+         * passed-in channel.
          *
          * @param channel the channel to which this consumer is attached
          */
@@ -110,13 +115,12 @@ public class RabbitMQConsumer extends DefaultConsumer {
         }
 
         @Override
-        public void handleDelivery(String consumerTag,
-                                   Envelope envelope,
-                                   AMQP.BasicProperties properties,
-                                   byte[] body) throws IOException {
+        public void handleDelivery(String consumerTag, Envelope envelope,
+                                   AMQP.BasicProperties properties, byte[] body) throws IOException {
 
             Exchange exchange = consumer.endpoint.createRabbitExchange(envelope, body);
-            log.trace("Created exchange [exchange={}]", new Object[]{exchange});
+            mergeAmqpProperties(exchange, properties);
+            log.trace("Created exchange [exchange={}]", exchange);
 
             try {
                 consumer.getProcessor().process(exchange);
@@ -131,6 +135,50 @@ public class RabbitMQConsumer extends DefaultConsumer {
                 getExceptionHandler().handleException("Error processing exchange", exchange, e);
             }
         }
+
+        /**
+         * Will take an {@link Exchange} and add header values back to the {@link Exchange#getIn()}
+         */
+        private void mergeAmqpProperties(Exchange exchange, AMQP.BasicProperties properties) {
+
+            if (properties.getType() != null) {
+                exchange.getIn().setHeader(RabbitMQConstants.TYPE, properties.getType());
+            }
+            if (properties.getAppId() != null) {
+                exchange.getIn().setHeader(RabbitMQConstants.APP_ID, properties.getAppId());
+            }
+            if (properties.getClusterId() != null) {
+                exchange.getIn().setHeader(RabbitMQConstants.CLUSTERID, properties.getClusterId());
+            }
+            if (properties.getContentEncoding() != null) {
+                exchange.getIn().setHeader(RabbitMQConstants.CONTENT_ENCODING, properties.getContentEncoding());
+            }
+            if (properties.getContentType() != null) {
+                exchange.getIn().setHeader(RabbitMQConstants.CONTENT_TYPE, properties.getContentType());
+            }
+            if (properties.getCorrelationId() != null) {
+                exchange.getIn().setHeader(RabbitMQConstants.CORRELATIONID, properties.getCorrelationId());
+            }
+            if (properties.getExpiration() != null) {
+                exchange.getIn().setHeader(RabbitMQConstants.EXPIRATION, properties.getExpiration());
+            }
+            if (properties.getMessageId() != null) {
+                exchange.getIn().setHeader(RabbitMQConstants.MESSAGE_ID, properties.getMessageId());
+            }
+            if (properties.getPriority() != null) {
+                exchange.getIn().setHeader(RabbitMQConstants.PRIORITY, properties.getPriority());
+            }
+            if (properties.getReplyTo() != null) {
+                exchange.getIn().setHeader(RabbitMQConstants.REPLY_TO, properties.getReplyTo());
+            }
+            if (properties.getTimestamp() != null) {
+                exchange.getIn().setHeader(RabbitMQConstants.TIMESTAMP, properties.getTimestamp());
+            }
+            if (properties.getUserId() != null) {
+                exchange.getIn().setHeader(RabbitMQConstants.USERID, properties.getUserId());
+            }
+        }
+
     }
-}
 
+}