You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/10/28 12:51:54 UTC

[2/2] git commit: CAMEL-6821: Added support for custom headers being transferred in camel-rabbitmq. Thanks to David Keen for the patch.

CAMEL-6821: Added support for custom headers being transferred in camel-rabbitmq. Thanks to David Keen for the patch.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/609998bb
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/609998bb
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/609998bb

Branch: refs/heads/camel-2.12.x
Commit: 609998bb3f45699f72b7537b2357df89ae441a3e
Parents: 43c0d79
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Oct 28 12:46:12 2013 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Oct 28 12:46:25 2013 +0100

----------------------------------------------------------------------
 .../component/rabbitmq/RabbitMQConsumer.java    |  2 +-
 .../component/rabbitmq/RabbitMQEndpoint.java    | 19 +++++++-
 .../component/rabbitmq/RabbitMQProducer.java    | 45 +++++++++++++++++
 .../rabbitmq/RabbitMQEndpointTest.java          | 51 +++++++++++++++++++-
 .../rabbitmq/RabbitMQProducerTest.java          | 33 +++++++++++++
 5 files changed, 146 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/609998bb/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index 4f13045..f2fa128 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -118,7 +118,7 @@ public class RabbitMQConsumer extends DefaultConsumer {
         public void handleDelivery(String consumerTag, Envelope envelope,
                                    AMQP.BasicProperties properties, byte[] body) throws IOException {
 
-            Exchange exchange = consumer.endpoint.createRabbitExchange(envelope, body);
+            Exchange exchange = consumer.endpoint.createRabbitExchange(envelope, properties, body);
             mergeAmqpProperties(exchange, properties);
             log.trace("Created exchange [exchange={}]", exchange);
 

http://git-wip-us.apache.org/repos/asf/camel/blob/609998bb/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
index ffb8515..4423721 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
@@ -18,13 +18,17 @@ package org.apache.camel.component.rabbitmq;
 
 import java.io.IOException;
 import java.net.URISyntaxException;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
+import com.rabbitmq.client.AMQP;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
 import com.rabbitmq.client.Envelope;
+import com.rabbitmq.client.LongString;
+
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
@@ -56,7 +60,7 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
         super(endpointUri, component);
     }
 
-    public Exchange createRabbitExchange(Envelope envelope, byte[] body) {
+    public Exchange createRabbitExchange(Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
         Exchange exchange = new DefaultExchange(getCamelContext(), getExchangePattern());
 
         Message message = new DefaultMessage();
@@ -65,6 +69,19 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
         message.setHeader(RabbitMQConstants.ROUTING_KEY, envelope.getRoutingKey());
         message.setHeader(RabbitMQConstants.EXCHANGE_NAME, envelope.getExchange());
         message.setHeader(RabbitMQConstants.DELIVERY_TAG, envelope.getDeliveryTag());
+
+        Map<String, Object> headers = properties.getHeaders();
+        if (headers != null) {
+            for (Map.Entry<String, Object> entry : headers.entrySet()) {
+                // Convert LongStrings to String.
+                if (entry.getValue() instanceof LongString) {
+                    message.setHeader(entry.getKey(), entry.getValue().toString());
+                } else {
+                    message.setHeader(entry.getKey(), entry.getValue());
+                }
+            }
+        }
+
         message.setBody(body);
 
         return exchange;

http://git-wip-us.apache.org/repos/asf/camel/blob/609998bb/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
index 1336de9..3bebb3f 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
@@ -17,7 +17,10 @@
 package org.apache.camel.component.rabbitmq;
 
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.Executors;
 
 import com.rabbitmq.client.AMQP;
@@ -132,6 +135,48 @@ public class RabbitMQProducer extends DefaultProducer {
             properties.timestamp(new Date(Long.parseLong(timestamp.toString())));
         }
 
+        final Map<String, Object> headers = exchange.getIn().getHeaders();
+        Map<String, Object> filteredHeaders = new HashMap<String, Object>();
+
+        // TODO: Add support for a HeaderFilterStrategy. See: org.apache.camel.component.jms.JmsBinding#shouldOutputHeader
+        for (Map.Entry<String, Object> header : headers.entrySet()) {
+
+            // filter header values.
+            Object value = getValidRabbitMQHeaderValue(header.getValue());
+            if (value != null) {
+                filteredHeaders.put(header.getKey(), header.getValue());
+            } else if (log.isDebugEnabled()) {
+                log.debug("Ignoring header: {} of class: {} with value: {}",
+                    new Object[]{header.getKey(), header.getValue().getClass().getName(), header.getValue()});
+            }
+        }
+
+        properties.headers(filteredHeaders);
+
         return properties;
     }
+
+    /**
+     * Strategy to test if the given header is valid
+     *
+     * @param headerValue  the header value
+     * @return  the value to use, <tt>null</tt> to ignore this header
+     * @see com.rabbitmq.client.impl.Frame#fieldValueSize
+     */
+    private Object getValidRabbitMQHeaderValue(Object headerValue) {
+        if (headerValue instanceof String) {
+            return headerValue;
+        } else if (headerValue instanceof BigDecimal) {
+            return headerValue;
+        } else if (headerValue instanceof Number) {
+            return headerValue;
+        } else if (headerValue instanceof Boolean) {
+            return headerValue;
+        } else if (headerValue instanceof Date) {
+            return headerValue;
+        } else if (headerValue instanceof byte[]) {
+            return headerValue;
+        }
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/609998bb/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
index df29279..2f30177 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
@@ -16,10 +16,16 @@
  */
 package org.apache.camel.component.rabbitmq;
 
+import java.math.BigDecimal;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ThreadPoolExecutor;
 
+import com.rabbitmq.client.AMQP;
 import com.rabbitmq.client.Envelope;
+import com.rabbitmq.client.impl.LongStringHelper;
 import org.apache.camel.Exchange;
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.junit.Test;
@@ -28,9 +34,10 @@ import org.mockito.Mockito;
 public class RabbitMQEndpointTest extends CamelTestSupport {
 
     private Envelope envelope = Mockito.mock(Envelope.class);
+    private AMQP.BasicProperties properties = Mockito.mock(AMQP.BasicProperties.class);
 
     @Test
-    public void testCreatingRabbitExchangeSetsHeaders() throws Exception {
+    public void testCreatingRabbitExchangeSetsStandardHeaders() throws Exception {
         RabbitMQEndpoint endpoint = context.getEndpoint("rabbitmq:localhost/exchange", RabbitMQEndpoint.class);
 
         String routingKey = UUID.randomUUID().toString();
@@ -40,9 +47,10 @@ public class RabbitMQEndpointTest extends CamelTestSupport {
         Mockito.when(envelope.getRoutingKey()).thenReturn(routingKey);
         Mockito.when(envelope.getExchange()).thenReturn(exchangeName);
         Mockito.when(envelope.getDeliveryTag()).thenReturn(tag);
+        Mockito.when(properties.getHeaders()).thenReturn(null);
 
         byte[] body = new byte[20];
-        Exchange exchange = endpoint.createRabbitExchange(envelope, body);
+        Exchange exchange = endpoint.createRabbitExchange(envelope, properties, body);
         assertEquals(exchangeName, exchange.getIn().getHeader(RabbitMQConstants.EXCHANGE_NAME));
         assertEquals(routingKey, exchange.getIn().getHeader(RabbitMQConstants.ROUTING_KEY));
         assertEquals(tag, exchange.getIn().getHeader(RabbitMQConstants.DELIVERY_TAG));
@@ -50,6 +58,45 @@ public class RabbitMQEndpointTest extends CamelTestSupport {
     }
 
     @Test
+    public void testCreatingRabbitExchangeSetsCustomHeaders() throws Exception {
+        RabbitMQEndpoint endpoint = context.getEndpoint("rabbitmq:localhost/exchange", RabbitMQEndpoint.class);
+
+        String routingKey = UUID.randomUUID().toString();
+        String exchangeName = UUID.randomUUID().toString();
+        long tag = UUID.randomUUID().toString().hashCode();
+
+        Mockito.when(envelope.getRoutingKey()).thenReturn(routingKey);
+        Mockito.when(envelope.getExchange()).thenReturn(exchangeName);
+        Mockito.when(envelope.getDeliveryTag()).thenReturn(tag);
+
+        Map<String, Object> customHeaders = new HashMap<String, Object>();
+        customHeaders.put("stringHeader", "A string");
+        customHeaders.put("bigDecimalHeader", new BigDecimal("12.34"));
+        customHeaders.put("integerHeader", 42);
+        customHeaders.put("doubleHeader", 42.24);
+        customHeaders.put("booleanHeader", true);
+        customHeaders.put("dateHeader", new Date(0));
+        customHeaders.put("byteArrayHeader", "foo".getBytes());
+        customHeaders.put("longStringHeader", LongStringHelper.asLongString("Some really long string"));
+        Mockito.when(properties.getHeaders()).thenReturn(customHeaders);
+
+        byte[] body = new byte[20];
+        Exchange exchange = endpoint.createRabbitExchange(envelope, properties, body);
+        assertEquals(exchangeName, exchange.getIn().getHeader(RabbitMQConstants.EXCHANGE_NAME));
+        assertEquals(routingKey, exchange.getIn().getHeader(RabbitMQConstants.ROUTING_KEY));
+        assertEquals(tag, exchange.getIn().getHeader(RabbitMQConstants.DELIVERY_TAG));
+        assertEquals("A string", exchange.getIn().getHeader("stringHeader"));
+        assertEquals(new BigDecimal("12.34"), exchange.getIn().getHeader("bigDecimalHeader"));
+        assertEquals(42, exchange.getIn().getHeader("integerHeader"));
+        assertEquals(42.24, exchange.getIn().getHeader("doubleHeader"));
+        assertEquals(true, exchange.getIn().getHeader("booleanHeader"));
+        assertEquals(new Date(0), exchange.getIn().getHeader("dateHeader"));
+        assertArrayEquals("foo".getBytes(), (byte[]) exchange.getIn().getHeader("byteArrayHeader"));
+        assertEquals("Some really long string", exchange.getIn().getHeader("longStringHeader"));
+        assertEquals(body, exchange.getIn().getBody());
+    }
+
+    @Test
     public void creatingExecutorUsesThreadPoolSettings() throws Exception {
         RabbitMQEndpoint endpoint = context.getEndpoint("rabbitmq:localhost/exchange?threadPoolSize=20", RabbitMQEndpoint.class);
         assertEquals(20, endpoint.getThreadPoolSize());

http://git-wip-us.apache.org/repos/asf/camel/blob/609998bb/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java
index 188d540..7b2df60 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java
@@ -18,6 +18,10 @@
 package org.apache.camel.component.rabbitmq;
 
 import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 
 import com.rabbitmq.client.AMQP;
@@ -31,7 +35,9 @@ import org.junit.Test;
 import org.mockito.Matchers;
 import org.mockito.Mockito;
 
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 
 public class RabbitMQProducerTest {
 
@@ -150,4 +156,31 @@ public class RabbitMQProducerTest {
         AMQP.BasicProperties props = producer.buildProperties(exchange).build();
         assertEquals(12345123, props.getTimestamp().getTime());
     }
+
+    @Test
+    public void testPropertiesUsesCustomHeaders() throws IOException {
+        RabbitMQProducer producer = new RabbitMQProducer(endpoint);
+        Map<String, Object> customHeaders = new HashMap<String, Object>();
+        customHeaders.put("stringHeader", "A string");
+        customHeaders.put("bigDecimalHeader", new BigDecimal("12.34"));
+        customHeaders.put("integerHeader", 42);
+        customHeaders.put("doubleHeader", 42.24);
+        customHeaders.put("booleanHeader", true);
+        customHeaders.put("dateHeader", new Date(0));
+        customHeaders.put("byteArrayHeader", "foo".getBytes());
+        customHeaders.put("invalidHeader", new Something());
+        message.setHeaders(customHeaders);
+        AMQP.BasicProperties props = producer.buildProperties(exchange).build();
+        assertEquals("A string", props.getHeaders().get("stringHeader"));
+        assertEquals(new BigDecimal("12.34"), props.getHeaders().get("bigDecimalHeader"));
+        assertEquals(42, props.getHeaders().get("integerHeader"));
+        assertEquals(42.24, props.getHeaders().get("doubleHeader"));
+        assertEquals(true, props.getHeaders().get("booleanHeader"));
+        assertEquals(new Date(0), props.getHeaders().get("dateHeader"));
+        assertArrayEquals("foo".getBytes(), (byte[]) props.getHeaders().get("byteArrayHeader"));
+        assertNull(props.getHeaders().get("invalidHeader"));
+    }
+
+    private static class Something {
+    }
 }