You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by dm...@apache.org on 2018/07/24 10:10:54 UTC

[camel] 01/02: CAMEL-12654: RabbitMQ Headers - Headers with null value are skipped

This is an automated email from the ASF dual-hosted git repository.

dmvolod pushed a commit to branch camel-2.22.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 99a6c25304c51f30ad083c52b280a514c78d5ba1
Author: Dmitry Volodin <dm...@gmail.com>
AuthorDate: Tue Jul 24 11:08:45 2018 +0300

    CAMEL-12654: RabbitMQ Headers - Headers with null value are skipped
---
 .../src/main/docs/rabbitmq-component.adoc          |  8 ++-
 .../component/rabbitmq/RabbitMQComponent.java      | 17 ++++-
 .../camel/component/rabbitmq/RabbitMQEndpoint.java | 13 ++++
 .../rabbitmq/RabbitMQMessageConverter.java         | 13 +++-
 .../rabbitmq/reply/ReplyManagerSupport.java        |  1 +
 .../component/rabbitmq/RabbitMQComponentTest.java  |  3 +
 .../rabbitmq/RabbitMQProducerIntTest.java          | 75 ++++++++++++++++++++++
 .../springboot/RabbitMQComponentConfiguration.java | 21 ++++++
 8 files changed, 146 insertions(+), 5 deletions(-)

diff --git a/components/camel-rabbitmq/src/main/docs/rabbitmq-component.adoc b/components/camel-rabbitmq/src/main/docs/rabbitmq-component.adoc
index 7f4402b..44d383b 100644
--- a/components/camel-rabbitmq/src/main/docs/rabbitmq-component.adoc
+++ b/components/camel-rabbitmq/src/main/docs/rabbitmq-component.adoc
@@ -47,14 +47,14 @@ exchange name determines which exchange the queue will bind to.
 === Options
 
 // component options: START
-The RabbitMQ component supports 48 options, which are listed below.
+The RabbitMQ component supports 49 options, which are listed below.
 
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
 |===
 | Name | Description | Default | Type
-| *hostname* (common) | The hostname of the running rabbitmq instance or cluster. |  | String
+| *hostname* (common) | The hostname of the running RabbitMQ instance or cluster. |  | String
 | *portNumber* (common) | Port number for the host with the running rabbitmq instance or cluster. | 5672 | int
 | *username* (security) | Username in case of authenticated access | guest | String
 | *password* (security) | Password for authenticated access | guest | String
@@ -101,6 +101,7 @@ The RabbitMQ component supports 48 options, which are listed below.
 | *deadLetterQueue* (common) | The name of the dead letter queue |  | String
 | *deadLetterRoutingKey* (common) | The routing key for the dead letter exchange |  | String
 | *deadLetterExchangeType* (common) | The type of the dead letter exchange | direct | String
+| *allowNullHeaders* (producer) | Allow pass null values to header | false | boolean
 | *resolveProperty Placeholders* (advanced) | Whether the component should resolve property placeholders on itself when starting. Only properties which are of String type can use property placeholders. | true | boolean
 |===
 // component options: END
@@ -125,7 +126,7 @@ with the following path and query parameters:
 |===
 
 
-==== Query Parameters (60 parameters):
+==== Query Parameters (61 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -161,6 +162,7 @@ with the following path and query parameters:
 | *exceptionHandler* (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this options is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored. |  | ExceptionHandler
 | *exchangePattern* (consumer) | Sets the exchange pattern when the consumer creates an exchange. |  | ExchangePattern
 | *threadPoolSize* (consumer) | The consumer uses a Thread Pool Executor with a fixed number of threads. This setting allows you to set that number of threads. | 10 | int
+| *allowNullHeaders* (producer) | Allow pass null values to header | false | boolean
 | *bridgeEndpoint* (producer) | If the bridgeEndpoint is true, the producer will ignore the message header of rabbitmq.EXCHANGE_NAME and rabbitmq.ROUTING_KEY | false | boolean
 | *channelPoolMaxSize* (producer) | Get maximum number of opened channel in pool | 10 | int
 | *channelPoolMaxWait* (producer) | Set the maximum number of milliseconds to wait for a channel from the pool | 1000 | long
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
index c258a5c..867ce21 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
@@ -79,6 +79,8 @@ public class RabbitMQComponent extends UriEndpointComponent {
     private String deadLetterQueue;
     @Metadata(label = "common", defaultValue = "direct", enums = "direct,fanout,headers,topic")
     private String deadLetterExchangeType = "direct";
+    @Metadata(label = "producer")
+    private boolean allowNullHeaders;
     @Metadata(label = "security")
     private String sslProtocol;
     @Metadata(label = "security")
@@ -238,6 +240,7 @@ public class RabbitMQComponent extends UriEndpointComponent {
         endpoint.setDeadLetterExchangeType(getDeadLetterExchangeType());
         endpoint.setDeadLetterQueue(getDeadLetterQueue());
         endpoint.setDeadLetterRoutingKey(getDeadLetterRoutingKey());
+        endpoint.setAllowNullHeaders(isAllowNullHeaders());
         setProperties(endpoint, params);
 
         if (LOG.isDebugEnabled()) {
@@ -259,6 +262,8 @@ public class RabbitMQComponent extends UriEndpointComponent {
         endpoint.getExchangeArgs().putAll(IntrospectionSupport.extractProperties(argsCopy, EXCHANGE_ARG_PREFIX));
         endpoint.getQueueArgs().putAll(IntrospectionSupport.extractProperties(argsCopy, QUEUE_ARG_PREFIX));
         endpoint.getBindingArgs().putAll(IntrospectionSupport.extractProperties(argsCopy, BINDING_ARG_PREFIX));
+        // Change null headers processing for message converter
+        endpoint.getMessageConverter().setAllowNullHeaders(endpoint.isAllowNullHeaders());
 
         return endpoint;
     }
@@ -268,7 +273,7 @@ public class RabbitMQComponent extends UriEndpointComponent {
     }
 
     /**
-     * The hostname of the running rabbitmq instance or cluster.
+     * The hostname of the running RabbitMQ instance or cluster.
      */
     public void setHostname(String hostname) {
         this.hostname = hostname;
@@ -844,4 +849,14 @@ public class RabbitMQComponent extends UriEndpointComponent {
         this.deadLetterExchangeType = deadLetterExchangeType;
     }
 
+    /**
+     * Allow pass null values to header
+     */
+    public boolean isAllowNullHeaders() {
+        return allowNullHeaders;
+    }
+
+    public void setAllowNullHeaders(boolean allowNullHeaders) {
+        this.allowNullHeaders = allowNullHeaders;
+    }
 }
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
index 39de2e3..06a6713 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
@@ -173,6 +173,8 @@ public class RabbitMQEndpoint extends DefaultEndpoint implements AsyncEndpoint {
     private long publisherAcknowledgementsTimeout;
     @UriParam(label = "producer")
     private boolean guaranteedDeliveries;
+    @UriParam(label = "producer")
+    private boolean allowNullHeaders;
     // camel-jms supports this setting but it is not currently configurable in camel-rabbitmq
     private boolean useMessageIDAsCorrelationID = true;
     // camel-jms supports this setting but it is not currently configurable in camel-rabbitmq
@@ -985,6 +987,17 @@ public class RabbitMQEndpoint extends DefaultEndpoint implements AsyncEndpoint {
         this.exclusive = exclusive;
     }
 
+    /**
+     * Allow pass null values to header
+     */
+    public boolean isAllowNullHeaders() {
+        return allowNullHeaders;
+    }
+
+    public void setAllowNullHeaders(boolean allowNullHeaders) {
+        this.allowNullHeaders = allowNullHeaders;
+    }
+
     public boolean isPassive() {
         return passive;
     }
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java
index 6cb535e..07902ca 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java
@@ -37,6 +37,8 @@ import org.slf4j.LoggerFactory;
 public class RabbitMQMessageConverter {
     protected static final Logger LOG = LoggerFactory.getLogger(RabbitMQMessageConverter.class);
 
+    private boolean allowNullHeaders;
+    
     /**
      * Will take an {@link Exchange} and add header values back to the {@link Exchange#getIn()}
      */
@@ -165,7 +167,8 @@ public class RabbitMQMessageConverter {
         for (Map.Entry<String, Object> header : headers.entrySet()) {
             // filter header values.
             Object value = getValidRabbitMQHeaderValue(header.getValue());
-            if (value != null) {
+            
+            if (value != null || isAllowNullHeaders()) {
                 filteredHeaders.put(header.getKey(), header.getValue());
             } else if (LOG.isDebugEnabled()) {
                 if (header.getValue() == null) {
@@ -305,4 +308,12 @@ public class RabbitMQMessageConverter {
     private Object isSerializeHeaderEnabled(final AMQP.BasicProperties properties) {
         return properties.getHeaders().get(RabbitMQEndpoint.SERIALIZE_HEADER);
     }
+
+    public boolean isAllowNullHeaders() {
+        return allowNullHeaders;
+    }
+
+    public void setAllowNullHeaders(boolean allowNullHeaders) {
+        this.allowNullHeaders = allowNullHeaders;
+    }
 }
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java
index 15b990a..1ddf056 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java
@@ -224,6 +224,7 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl
         ObjectHelper.notNull(executorService, "executorService", this);
         ObjectHelper.notNull(endpoint, "endpoint", this);
 
+        messageConverter.setAllowNullHeaders(endpoint.isAllowNullHeaders());
         // timeout map to use for purging messages which have timed out, while waiting for an expected reply
         // when doing request/reply over JMS
         log.debug("Using timeout checker interval with {} millis", endpoint.getRequestTimeoutCheckerInterval());
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQComponentTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQComponentTest.java
index d734917..1c9ab38 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQComponentTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQComponentTest.java
@@ -44,6 +44,7 @@ public class RabbitMQComponentTest {
         assertEquals(true, endpoint.isAutoAck());
         assertEquals(true, endpoint.isAutoDelete());
         assertEquals(true, endpoint.isDurable());
+        assertEquals(false, endpoint.isAllowNullHeaders());
         assertEquals("direct", endpoint.getExchangeType());
         assertEquals(ConnectionFactory.DEFAULT_CONNECTION_TIMEOUT, endpoint.getConnectionTimeout());
         assertEquals(ConnectionFactory.DEFAULT_CHANNEL_MAX, endpoint.getRequestedChannelMax());
@@ -68,6 +69,7 @@ public class RabbitMQComponentTest {
         params.put("requestedChannelMax", 456);
         params.put("requestedFrameMax", 789);
         params.put("requestedHeartbeat", 321);
+        params.put("allowNullHeaders", true);
 
         RabbitMQEndpoint endpoint = createEndpoint(params);
 
@@ -86,6 +88,7 @@ public class RabbitMQComponentTest {
         assertEquals(456, endpoint.getRequestedChannelMax());
         assertEquals(789, endpoint.getRequestedFrameMax());
         assertEquals(321, endpoint.getRequestedHeartbeat());
+        assertEquals(true, endpoint.isAllowNullHeaders());
     }
 
     private RabbitMQEndpoint createEndpoint(Map<String, Object> params) throws Exception {
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java
index 77bb1c1..d91bcc4 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java
@@ -18,7 +18,9 @@ package org.apache.camel.component.rabbitmq;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeoutException;
 
 import com.rabbitmq.client.AMQP;
@@ -30,15 +32,22 @@ import org.apache.camel.Produce;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.util.ObjectHelper;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class RabbitMQProducerIntTest extends AbstractRabbitMQIntTest {
+    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQProducerIntTest.class);
+    
     private static final String EXCHANGE = "ex1";
     private static final String ROUTE = "route1";
+    private static final String CUSTOM_HEADER = "CustomHeader";
     private static final String BASIC_URI_FORMAT = "rabbitmq:localhost:5672/%s?routingKey=%s&username=cameltest&password=cameltest&skipQueueDeclare=true";
     private static final String BASIC_URI = String.format(BASIC_URI_FORMAT, EXCHANGE, ROUTE);
+    private static final String ALLOW_NULL_HEADERS = BASIC_URI + "&allowNullHeaders=true";
     private static final String PUBLISHER_ACKNOWLEDGES_URI = BASIC_URI + "&mandatory=true&publisherAcknowledgements=true";
     private static final String PUBLISHER_ACKNOWLEDGES_BAD_ROUTE_URI = String.format(BASIC_URI_FORMAT, EXCHANGE, "route2") + "&publisherAcknowledgements=true";
     private static final String GUARANTEED_DELIVERY_URI = BASIC_URI + "&mandatory=true&guaranteedDeliveries=true";
@@ -47,6 +56,9 @@ public class RabbitMQProducerIntTest extends AbstractRabbitMQIntTest {
 
     @Produce(uri = "direct:start")
     protected ProducerTemplate template;
+    
+    @Produce(uri = "direct:start-allow-null-headers")
+    protected ProducerTemplate templateAllowNullHeaders;
 
     @Produce(uri = "direct:start-with-confirms")
     protected ProducerTemplate templateWithConfirms;
@@ -73,6 +85,7 @@ public class RabbitMQProducerIntTest extends AbstractRabbitMQIntTest {
             @Override
             public void configure() throws Exception {
                 from("direct:start").to(BASIC_URI);
+                from("direct:start-allow-null-headers").to(ALLOW_NULL_HEADERS);
                 from("direct:start-with-confirms").to(PUBLISHER_ACKNOWLEDGES_URI);
                 from("direct:start-with-confirms-bad-route").to(PUBLISHER_ACKNOWLEDGES_BAD_ROUTE_URI);
                 from("direct:start-with-guaranteed-delivery").to(GUARANTEED_DELIVERY_URI);
@@ -105,6 +118,38 @@ public class RabbitMQProducerIntTest extends AbstractRabbitMQIntTest {
 
         assertThatBodiesReceivedIn(received, "new message");
     }
+    
+    @Test
+    public void producedMessageWithNotNullHeaders() throws InterruptedException, IOException, TimeoutException {
+        final List<String> received = new ArrayList<>();
+        final Map<String, Object> receivedHeaders = new HashMap<String, Object>();
+        Map<String, Object> headers = new HashMap<String, Object>();
+        
+        headers.put(RabbitMQConstants.EXCHANGE_NAME, EXCHANGE);
+        headers.put(CUSTOM_HEADER, CUSTOM_HEADER.toLowerCase());
+        
+        channel.basicConsume("sammyq", true, new ArrayPopulatingConsumer(received, receivedHeaders));
+
+        template.sendBodyAndHeaders("new message", headers);
+
+        assertThatBodiesAndHeadersReceivedIn(receivedHeaders, headers, received, "new message");
+    }
+    
+    @Test
+    public void producedMessageAllowNullHeaders() throws InterruptedException, IOException, TimeoutException {
+        final List<String> received = new ArrayList<>();
+        final Map<String, Object> receivedHeaders = new HashMap<String, Object>();
+        Map<String, Object> headers = new HashMap<String, Object>();
+        
+        headers.put(RabbitMQConstants.EXCHANGE_NAME, null);
+        headers.put(CUSTOM_HEADER, null);
+        
+        channel.basicConsume("sammyq", true, new ArrayPopulatingConsumer(received, receivedHeaders));
+
+        templateAllowNullHeaders.sendBodyAndHeaders("new message", headers);
+
+        assertThatBodiesAndHeadersReceivedIn(receivedHeaders, headers, received, "new message");
+    }
 
     private void assertThatBodiesReceivedIn(final List<String> received, final String... expected) throws InterruptedException {
         Thread.sleep(500);
@@ -114,6 +159,25 @@ public class RabbitMQProducerIntTest extends AbstractRabbitMQIntTest {
             assertEquals(body, received.get(0));
         }
     }
+    
+    private void assertThatBodiesAndHeadersReceivedIn(Map<String, Object> receivedHeaders, Map<String, Object> expectedHeaders,
+                                                      final List<String> received, final String... expected) throws InterruptedException {
+        Thread.sleep(500);
+
+        assertListSize(received, expected.length);
+        for (String body : expected) {
+            assertEquals(body, received.get(0));
+        }
+        
+        for (Map.Entry<String, Object> headers : expectedHeaders.entrySet()) {
+            Object receivedValue = receivedHeaders.get(headers.getKey());
+            Object expectedValue = headers.getValue();
+            
+            assertTrue("Header key " + headers.getKey() + " not found", receivedHeaders.containsKey(headers.getKey()));
+            assertEquals(0, ObjectHelper.compare(receivedValue == null ? "" : receivedValue.toString(), expectedValue == null ? "" : expectedValue.toString()));
+        }
+        
+    }
 
     @Test
     public void producedMessageIsReceivedWhenPublisherAcknowledgementsAreEnabled() throws InterruptedException, IOException, TimeoutException {
@@ -162,10 +226,18 @@ public class RabbitMQProducerIntTest extends AbstractRabbitMQIntTest {
 
     private class ArrayPopulatingConsumer extends DefaultConsumer {
         private final List<String> received;
+        private final Map<String, Object> receivedHeaders;
 
         ArrayPopulatingConsumer(final List<String> received) {
             super(RabbitMQProducerIntTest.this.channel);
             this.received = received;
+            receivedHeaders = new HashMap<String, Object>();
+        }
+        
+        ArrayPopulatingConsumer(final List<String> received, Map<String, Object> receivedHeaders) {
+            super(RabbitMQProducerIntTest.this.channel);
+            this.received = received;
+            this.receivedHeaders = receivedHeaders;
         }
 
         @Override
@@ -173,6 +245,9 @@ public class RabbitMQProducerIntTest extends AbstractRabbitMQIntTest {
                                    Envelope envelope,
                                    AMQP.BasicProperties properties,
                                    byte[] body) throws IOException {
+            LOGGER.info("AMQP.BasicProperties: {}", properties);
+            
+            receivedHeaders.putAll(properties.getHeaders());
             received.add(new String(body));
         }
     }
diff --git a/platforms/spring-boot/components-starter/camel-rabbitmq-starter/src/main/java/org/apache/camel/component/rabbitmq/springboot/RabbitMQComponentConfiguration.java b/platforms/spring-boot/components-starter/camel-rabbitmq-starter/src/main/java/org/apache/camel/component/rabbitmq/springboot/RabbitMQComponentConfiguration.java
index bedcf07..a20a915 100644
--- a/platforms/spring-boot/components-starter/camel-rabbitmq-starter/src/main/java/org/apache/camel/component/rabbitmq/springboot/RabbitMQComponentConfiguration.java
+++ b/platforms/spring-boot/components-starter/camel-rabbitmq-starter/src/main/java/org/apache/camel/component/rabbitmq/springboot/RabbitMQComponentConfiguration.java
@@ -37,7 +37,16 @@ public class RabbitMQComponentConfiguration
             ComponentConfigurationPropertiesCommon {
 
     /**
+<<<<<<< HEAD
      * The hostname of the running rabbitmq instance or cluster.
+=======
+     * Whether to enable auto configuration of the rabbitmq component. This is
+     * enabled by default.
+     */
+    private Boolean enabled;
+    /**
+     * The hostname of the running RabbitMQ instance or cluster.
+>>>>>>> fddedbc44de... CAMEL-12654: RabbitMQ Headers - Headers with null value are skipped
      */
     private String hostname;
     /**
@@ -274,6 +283,10 @@ public class RabbitMQComponentConfiguration
      */
     private String deadLetterExchangeType = "direct";
     /**
+     * Allow pass null values to header
+     */
+    private Boolean allowNullHeaders = false;
+    /**
      * Whether the component should resolve property placeholders on itself when
      * starting. Only properties which are of String type can use property
      * placeholders.
@@ -659,6 +672,14 @@ public class RabbitMQComponentConfiguration
         this.deadLetterExchangeType = deadLetterExchangeType;
     }
 
+    public Boolean getAllowNullHeaders() {
+        return allowNullHeaders;
+    }
+
+    public void setAllowNullHeaders(Boolean allowNullHeaders) {
+        this.allowNullHeaders = allowNullHeaders;
+    }
+
     public Boolean getResolvePropertyPlaceholders() {
         return resolvePropertyPlaceholders;
     }