You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/10/18 11:34:21 UTC
[1/2] git commit: Rabbit consumer should include rabbit properties in
Camel Exchange message. Thanks to Daniel Williams for the patch.
Updated Branches:
refs/heads/camel-2.12.x b220ce070 -> 1c00d97bd
refs/heads/master 3d01c27c6 -> 4a89ad969
Rabbit consumer should include rabbit properties in Camel Exchange message. Thanks to Daniel Williams for the patch.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/4a89ad96
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/4a89ad96
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/4a89ad96
Branch: refs/heads/master
Commit: 4a89ad969859d4adee09eddbb43cfd89a91efd57
Parents: 3d01c27
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Oct 18 11:34:10 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Oct 18 11:34:10 2013 +0200
----------------------------------------------------------------------
.../component/rabbitmq/RabbitMQConsumer.java | 86 +++++++++++++++-----
1 file changed, 67 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/4a89ad96/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index acc95d7..4f13045 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -29,7 +29,7 @@ import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;
public class RabbitMQConsumer extends DefaultConsumer {
-
+
ExecutorService executor;
Connection conn;
Channel channel;
@@ -55,18 +55,21 @@ public class RabbitMQConsumer extends DefaultConsumer {
channel = conn.createChannel();
log.debug("Using channel {}", channel);
- channel.exchangeDeclare(endpoint.getExchangeName(),
- "direct",
- endpoint.isDurable(),
- endpoint.isAutoDelete(),
+ channel.exchangeDeclare(endpoint.getExchangeName(), "direct",
+ endpoint.isDurable(), endpoint.isAutoDelete(),
new HashMap<String, Object>());
-
- // need to make sure the queueDeclare is same with the exchange declare
- channel.queueDeclare(endpoint.getQueue(), endpoint.isDurable(), false, endpoint.isAutoDelete(), null);
- channel.queueBind(endpoint.getQueue(), endpoint.getExchangeName(),
- endpoint.getRoutingKey() == null ? "" : endpoint.getRoutingKey());
- channel.basicConsume(endpoint.getQueue(), endpoint.isAutoAck(), new RabbitConsumer(this, channel));
+ // need to make sure the queueDeclare is same with the exchange declare
+ channel.queueDeclare(endpoint.getQueue(), endpoint.isDurable(), false,
+ endpoint.isAutoDelete(), null);
+ channel.queueBind(
+ endpoint.getQueue(),
+ endpoint.getExchangeName(),
+ endpoint.getRoutingKey() == null ? "" : endpoint
+ .getRoutingKey());
+
+ channel.basicConsume(endpoint.getQueue(), endpoint.isAutoAck(),
+ new RabbitConsumer(this, channel));
}
@Override
@@ -76,13 +79,14 @@ public class RabbitMQConsumer extends DefaultConsumer {
if (conn != null) {
try {
conn.close();
- } catch (Exception ignored) {
+ } catch (Exception ignored) {
// ignored
}
}
channel = null;
conn = null;
+
if (executor != null) {
if (getEndpoint() != null && getEndpoint().getCamelContext() != null) {
getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executor);
@@ -99,7 +103,8 @@ public class RabbitMQConsumer extends DefaultConsumer {
private final Channel channel;
/**
- * Constructs a new instance and records its association to the passed-in channel.
+ * Constructs a new instance and records its association to the
+ * passed-in channel.
*
* @param channel the channel to which this consumer is attached
*/
@@ -110,13 +115,12 @@ public class RabbitMQConsumer extends DefaultConsumer {
}
@Override
- public void handleDelivery(String consumerTag,
- Envelope envelope,
- AMQP.BasicProperties properties,
- byte[] body) throws IOException {
+ public void handleDelivery(String consumerTag, Envelope envelope,
+ AMQP.BasicProperties properties, byte[] body) throws IOException {
Exchange exchange = consumer.endpoint.createRabbitExchange(envelope, body);
- log.trace("Created exchange [exchange={}]", new Object[]{exchange});
+ mergeAmqpProperties(exchange, properties);
+ log.trace("Created exchange [exchange={}]", exchange);
try {
consumer.getProcessor().process(exchange);
@@ -131,6 +135,50 @@ public class RabbitMQConsumer extends DefaultConsumer {
getExceptionHandler().handleException("Error processing exchange", exchange, e);
}
}
+
+ /**
+ * Will take an {@link Exchange} and add header values back to the {@link Exchange#getIn()}
+ */
+ private void mergeAmqpProperties(Exchange exchange, AMQP.BasicProperties properties) {
+
+ if (properties.getType() != null) {
+ exchange.getIn().setHeader(RabbitMQConstants.TYPE, properties.getType());
+ }
+ if (properties.getAppId() != null) {
+ exchange.getIn().setHeader(RabbitMQConstants.APP_ID, properties.getAppId());
+ }
+ if (properties.getClusterId() != null) {
+ exchange.getIn().setHeader(RabbitMQConstants.CLUSTERID, properties.getClusterId());
+ }
+ if (properties.getContentEncoding() != null) {
+ exchange.getIn().setHeader(RabbitMQConstants.CONTENT_ENCODING, properties.getContentEncoding());
+ }
+ if (properties.getContentType() != null) {
+ exchange.getIn().setHeader(RabbitMQConstants.CONTENT_TYPE, properties.getContentType());
+ }
+ if (properties.getCorrelationId() != null) {
+ exchange.getIn().setHeader(RabbitMQConstants.CORRELATIONID, properties.getCorrelationId());
+ }
+ if (properties.getExpiration() != null) {
+ exchange.getIn().setHeader(RabbitMQConstants.EXPIRATION, properties.getExpiration());
+ }
+ if (properties.getMessageId() != null) {
+ exchange.getIn().setHeader(RabbitMQConstants.MESSAGE_ID, properties.getMessageId());
+ }
+ if (properties.getPriority() != null) {
+ exchange.getIn().setHeader(RabbitMQConstants.PRIORITY, properties.getPriority());
+ }
+ if (properties.getReplyTo() != null) {
+ exchange.getIn().setHeader(RabbitMQConstants.REPLY_TO, properties.getReplyTo());
+ }
+ if (properties.getTimestamp() != null) {
+ exchange.getIn().setHeader(RabbitMQConstants.TIMESTAMP, properties.getTimestamp());
+ }
+ if (properties.getUserId() != null) {
+ exchange.getIn().setHeader(RabbitMQConstants.USERID, properties.getUserId());
+ }
+ }
+
}
-}
+}
[2/2] git commit: Rabbit consumer should include rabbit properties in
Camel Exchange message. Thanks to Daniel Williams for the patch.
Posted by da...@apache.org.
Rabbit consumer should include rabbit properties in Camel Exchange message. Thanks to Daniel Williams for the patch.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1c00d97b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1c00d97b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1c00d97b
Branch: refs/heads/camel-2.12.x
Commit: 1c00d97bdabff5bdd0f624b8c2e451f2da4dec34
Parents: b220ce0
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Oct 18 11:34:10 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Oct 18 11:34:32 2013 +0200
----------------------------------------------------------------------
.../component/rabbitmq/RabbitMQConsumer.java | 86 +++++++++++++++-----
1 file changed, 67 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/1c00d97b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index acc95d7..4f13045 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -29,7 +29,7 @@ import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;
public class RabbitMQConsumer extends DefaultConsumer {
-
+
ExecutorService executor;
Connection conn;
Channel channel;
@@ -55,18 +55,21 @@ public class RabbitMQConsumer extends DefaultConsumer {
channel = conn.createChannel();
log.debug("Using channel {}", channel);
- channel.exchangeDeclare(endpoint.getExchangeName(),
- "direct",
- endpoint.isDurable(),
- endpoint.isAutoDelete(),
+ channel.exchangeDeclare(endpoint.getExchangeName(), "direct",
+ endpoint.isDurable(), endpoint.isAutoDelete(),
new HashMap<String, Object>());
-
- // need to make sure the queueDeclare is same with the exchange declare
- channel.queueDeclare(endpoint.getQueue(), endpoint.isDurable(), false, endpoint.isAutoDelete(), null);
- channel.queueBind(endpoint.getQueue(), endpoint.getExchangeName(),
- endpoint.getRoutingKey() == null ? "" : endpoint.getRoutingKey());
- channel.basicConsume(endpoint.getQueue(), endpoint.isAutoAck(), new RabbitConsumer(this, channel));
+ // need to make sure the queueDeclare is same with the exchange declare
+ channel.queueDeclare(endpoint.getQueue(), endpoint.isDurable(), false,
+ endpoint.isAutoDelete(), null);
+ channel.queueBind(
+ endpoint.getQueue(),
+ endpoint.getExchangeName(),
+ endpoint.getRoutingKey() == null ? "" : endpoint
+ .getRoutingKey());
+
+ channel.basicConsume(endpoint.getQueue(), endpoint.isAutoAck(),
+ new RabbitConsumer(this, channel));
}
@Override
@@ -76,13 +79,14 @@ public class RabbitMQConsumer extends DefaultConsumer {
if (conn != null) {
try {
conn.close();
- } catch (Exception ignored) {
+ } catch (Exception ignored) {
// ignored
}
}
channel = null;
conn = null;
+
if (executor != null) {
if (getEndpoint() != null && getEndpoint().getCamelContext() != null) {
getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executor);
@@ -99,7 +103,8 @@ public class RabbitMQConsumer extends DefaultConsumer {
private final Channel channel;
/**
- * Constructs a new instance and records its association to the passed-in channel.
+ * Constructs a new instance and records its association to the
+ * passed-in channel.
*
* @param channel the channel to which this consumer is attached
*/
@@ -110,13 +115,12 @@ public class RabbitMQConsumer extends DefaultConsumer {
}
@Override
- public void handleDelivery(String consumerTag,
- Envelope envelope,
- AMQP.BasicProperties properties,
- byte[] body) throws IOException {
+ public void handleDelivery(String consumerTag, Envelope envelope,
+ AMQP.BasicProperties properties, byte[] body) throws IOException {
Exchange exchange = consumer.endpoint.createRabbitExchange(envelope, body);
- log.trace("Created exchange [exchange={}]", new Object[]{exchange});
+ mergeAmqpProperties(exchange, properties);
+ log.trace("Created exchange [exchange={}]", exchange);
try {
consumer.getProcessor().process(exchange);
@@ -131,6 +135,50 @@ public class RabbitMQConsumer extends DefaultConsumer {
getExceptionHandler().handleException("Error processing exchange", exchange, e);
}
}
+
+ /**
+ * Will take an {@link Exchange} and add header values back to the {@link Exchange#getIn()}
+ */
+ private void mergeAmqpProperties(Exchange exchange, AMQP.BasicProperties properties) {
+
+ if (properties.getType() != null) {
+ exchange.getIn().setHeader(RabbitMQConstants.TYPE, properties.getType());
+ }
+ if (properties.getAppId() != null) {
+ exchange.getIn().setHeader(RabbitMQConstants.APP_ID, properties.getAppId());
+ }
+ if (properties.getClusterId() != null) {
+ exchange.getIn().setHeader(RabbitMQConstants.CLUSTERID, properties.getClusterId());
+ }
+ if (properties.getContentEncoding() != null) {
+ exchange.getIn().setHeader(RabbitMQConstants.CONTENT_ENCODING, properties.getContentEncoding());
+ }
+ if (properties.getContentType() != null) {
+ exchange.getIn().setHeader(RabbitMQConstants.CONTENT_TYPE, properties.getContentType());
+ }
+ if (properties.getCorrelationId() != null) {
+ exchange.getIn().setHeader(RabbitMQConstants.CORRELATIONID, properties.getCorrelationId());
+ }
+ if (properties.getExpiration() != null) {
+ exchange.getIn().setHeader(RabbitMQConstants.EXPIRATION, properties.getExpiration());
+ }
+ if (properties.getMessageId() != null) {
+ exchange.getIn().setHeader(RabbitMQConstants.MESSAGE_ID, properties.getMessageId());
+ }
+ if (properties.getPriority() != null) {
+ exchange.getIn().setHeader(RabbitMQConstants.PRIORITY, properties.getPriority());
+ }
+ if (properties.getReplyTo() != null) {
+ exchange.getIn().setHeader(RabbitMQConstants.REPLY_TO, properties.getReplyTo());
+ }
+ if (properties.getTimestamp() != null) {
+ exchange.getIn().setHeader(RabbitMQConstants.TIMESTAMP, properties.getTimestamp());
+ }
+ if (properties.getUserId() != null) {
+ exchange.getIn().setHeader(RabbitMQConstants.USERID, properties.getUserId());
+ }
+ }
+
}
-}
+}