You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/03/10 11:24:49 UTC

[camel] 01/03: CAMEL-14685 - Camel-RabbitMQ: Introduce an HeaderFilterStrategy to RabbitMQMessageConverter

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit b7385bec6947126c93ede51eb0a834b2c7cefa1c
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Tue Mar 10 11:25:27 2020 +0100

    CAMEL-14685 - Camel-RabbitMQ: Introduce an HeaderFilterStrategy to RabbitMQMessageConverter
---
 .../component/rabbitmq/RabbitMQComponent.java      |  2 +-
 .../camel/component/rabbitmq/RabbitMQEndpoint.java | 15 +++++++
 .../rabbitmq/RabbitMQHeaderFilterStrategy.java     | 33 ++++++++++++++
 .../rabbitmq/RabbitMQMessageConverter.java         | 26 +++++++++--
 .../integration/RabbitMQProducerIntTest.java       | 50 +++++++++++++++++++++-
 5 files changed, 119 insertions(+), 7 deletions(-)

diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
index 78c10ce..987abc1 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
@@ -267,7 +267,7 @@ public class RabbitMQComponent extends DefaultComponent {
 
         // Change null headers processing for message converter
         endpoint.getMessageConverter().setAllowNullHeaders(endpoint.isAllowNullHeaders());
-
+        endpoint.getMessageConverter().setAllowCustomHeaders(endpoint.isAllowCustomHeaders());
         return endpoint;
     }
 
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
index fbf35da..6c99eca 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
@@ -166,6 +166,8 @@ public class RabbitMQEndpoint extends DefaultEndpoint implements AsyncEndpoint {
     private boolean guaranteedDeliveries;
     @UriParam(label = "producer")
     private boolean allowNullHeaders;
+    @UriParam(label = "producer")
+    private boolean allowCustomHeaders = true;
     @UriParam(label = "consumer")
     private String consumerTag = "";
     // camel-jms supports this setting but it is not currently configurable in
@@ -962,4 +964,17 @@ public class RabbitMQEndpoint extends DefaultEndpoint implements AsyncEndpoint {
     public void setConsumerTag(String consumerTag) {
         this.consumerTag = consumerTag;
     }
+
+	public boolean isAllowCustomHeaders() {
+		return allowCustomHeaders;
+	}
+
+    /**
+     * Allow pass custom values to header
+     */
+	public void setAllowCustomHeaders(boolean allowCustomHeaders) {
+		this.allowCustomHeaders = allowCustomHeaders;
+	}
+    
+
 }
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQHeaderFilterStrategy.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQHeaderFilterStrategy.java
new file mode 100644
index 0000000..c7893ab
--- /dev/null
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQHeaderFilterStrategy.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rabbitmq;
+
+import org.apache.camel.support.DefaultHeaderFilterStrategy;
+
+public class RabbitMQHeaderFilterStrategy  extends DefaultHeaderFilterStrategy {
+	
+    public RabbitMQHeaderFilterStrategy() {
+        initialize();
+    }
+
+    protected void initialize() {
+        // filter headers begin with "Camel" or "org.apache.camel"
+        setOutFilterPattern("rabbitmq.*");
+        setInFilterPattern("rabbitmq.*");
+    }
+
+}
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java
index a5ceb41..78fa302 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java
@@ -30,6 +30,7 @@ import com.rabbitmq.client.Envelope;
 import com.rabbitmq.client.LongString;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
+import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.support.DefaultMessage;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
@@ -39,6 +40,8 @@ public class RabbitMQMessageConverter {
     protected static final Logger LOG = LoggerFactory.getLogger(RabbitMQMessageConverter.class);
 
     private boolean allowNullHeaders;
+    private boolean allowCustomHeaders;
+    private final HeaderFilterStrategy headerFilterStrategy = new RabbitMQHeaderFilterStrategy();
     
     /**
      * Will take an {@link Exchange} and add header values back to the {@link Exchange#getIn()}
@@ -164,14 +167,21 @@ public class RabbitMQMessageConverter {
         final Map<String, Object> headers = msg.getHeaders();
         Map<String, Object> filteredHeaders = new HashMap<>();
 
-        // TODO: Add support for a HeaderFilterStrategy. See: org.apache.camel.component.jms.JmsBinding#shouldOutputHeader
         for (Map.Entry<String, Object> header : headers.entrySet()) {
             // filter header values.
-            Object value = getValidRabbitMQHeaderValue(header.getKey(), header.getValue());
-
+        	Object value = getValidRabbitMQHeaderValue(header.getKey(), header.getValue());
+            
             // additionaly filter out the OVERRIDE header so it does not propagate
             if ((value != null || isAllowNullHeaders()) && !header.getKey().equals(RabbitMQConstants.EXCHANGE_OVERRIDE_NAME)) {
-                filteredHeaders.put(header.getKey(), header.getValue());
+            	boolean filteredHeader;
+            	if (!allowCustomHeaders) {
+            		filteredHeader = headerFilterStrategy.applyFilterToCamelHeaders(header.getKey(), header.getValue(), exchange);
+            		if (filteredHeader) {
+                    	filteredHeaders.put(header.getKey(), header.getValue());
+            		}
+            	} else {
+            	    filteredHeaders.put(header.getKey(), header.getValue());
+            	}
             } else if (LOG.isDebugEnabled()) {
                 if (header.getValue() == null) {
                     LOG.debug("Ignoring header: {} with null value", header.getKey());
@@ -329,4 +339,12 @@ public class RabbitMQMessageConverter {
     public void setAllowNullHeaders(boolean allowNullHeaders) {
         this.allowNullHeaders = allowNullHeaders;
     }
+
+	public boolean isAllowCustomHeaders() {
+		return allowCustomHeaders;
+	}
+
+	public void setAllowCustomHeaders(boolean allowCustomHeaders) {
+		this.allowCustomHeaders = allowCustomHeaders;
+	}
 }
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQProducerIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQProducerIntTest.java
index 83409aa..cca5f4b 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQProducerIntTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQProducerIntTest.java
@@ -48,7 +48,8 @@ public class RabbitMQProducerIntTest extends AbstractRabbitMQIntTest {
     private static final String CUSTOM_HEADER = "CustomHeader";
     private static final String BASIC_URI_FORMAT = "rabbitmq:localhost:5672/%s?routingKey=%s&username=cameltest&password=cameltest&skipQueueDeclare=true";
     private static final String BASIC_URI = String.format(BASIC_URI_FORMAT, EXCHANGE, ROUTE);
-    private static final String ALLOW_NULL_HEADERS = BASIC_URI + "&allowNullHeaders=true";
+    private static final String ALLOW_NULL_HEADERS = BASIC_URI + "&allowNullHeaders=true&allowCustomHeaders=false";
+    private static final String ALLOW_CUSTOM_HEADERS = BASIC_URI + "&allowCustomHeaders=true";
     private static final String PUBLISHER_ACKNOWLEDGES_URI = BASIC_URI + "&mandatory=true&publisherAcknowledgements=true";
     private static final String PUBLISHER_ACKNOWLEDGES_BAD_ROUTE_URI = String.format(BASIC_URI_FORMAT, EXCHANGE, "route2") + "&publisherAcknowledgements=true";
     private static final String GUARANTEED_DELIVERY_URI = BASIC_URI + "&mandatory=true&guaranteedDeliveries=true";
@@ -60,6 +61,12 @@ public class RabbitMQProducerIntTest extends AbstractRabbitMQIntTest {
 
     @Produce("direct:start-allow-null-headers")
     protected ProducerTemplate templateAllowNullHeaders;
+    
+    @Produce("direct:start-not-allow-custom-headers")
+    protected ProducerTemplate templateNotAllowCustomHeaders;
+    
+    @Produce("direct:start-allow-custom-headers")
+    protected ProducerTemplate templateAllowCustomHeaders;
 
     @Produce("direct:start-with-confirms")
     protected ProducerTemplate templateWithConfirms;
@@ -87,6 +94,8 @@ public class RabbitMQProducerIntTest extends AbstractRabbitMQIntTest {
             public void configure() throws Exception {
                 from("direct:start").to(BASIC_URI);
                 from("direct:start-allow-null-headers").to(ALLOW_NULL_HEADERS);
+                from("direct:start-not-allow-custom-headers").to(ALLOW_NULL_HEADERS);
+                from("direct:start-allow-custom-headers").to(ALLOW_CUSTOM_HEADERS);
                 from("direct:start-with-confirms").to(PUBLISHER_ACKNOWLEDGES_URI);
                 from("direct:start-with-confirms-bad-route").to(PUBLISHER_ACKNOWLEDGES_BAD_ROUTE_URI);
                 from("direct:start-with-guaranteed-delivery").to(GUARANTEED_DELIVERY_URI);
@@ -151,6 +160,44 @@ public class RabbitMQProducerIntTest extends AbstractRabbitMQIntTest {
 
         assertThatBodiesAndHeadersReceivedIn(receivedHeaders, headers, received, "new message");
     }
+    
+    @Test
+    public void producedMessageNotAllowCustomHeaders() throws InterruptedException, IOException, TimeoutException {
+        final List<String> received = new ArrayList<>();
+        final Map<String, Object> receivedHeaders = new HashMap<>();
+        Map<String, Object> headers = new HashMap<>();
+
+        headers.put(RabbitMQConstants.EXCHANGE_NAME, "testa");
+        headers.put(CUSTOM_HEADER, "exchange");
+
+        channel.basicConsume("sammyq", true, new ArrayPopulatingConsumer(received, receivedHeaders));
+
+        templateNotAllowCustomHeaders.sendBodyAndHeaders("new message", headers);
+
+        Thread.sleep(500);
+        assertEquals(received.get(0), "new message");
+        assertTrue(receivedHeaders.containsKey(RabbitMQConstants.EXCHANGE_NAME));
+        assertFalse(receivedHeaders.containsKey(CUSTOM_HEADER));
+    }
+    
+    @Test
+    public void producedMessageAllowCustomHeaders() throws InterruptedException, IOException, TimeoutException {
+        final List<String> received = new ArrayList<>();
+        final Map<String, Object> receivedHeaders = new HashMap<>();
+        Map<String, Object> headers = new HashMap<>();
+
+        headers.put(RabbitMQConstants.EXCHANGE_NAME, "testa");
+        headers.put(CUSTOM_HEADER, "exchange");
+
+        channel.basicConsume("sammyq", true, new ArrayPopulatingConsumer(received, receivedHeaders));
+
+        templateAllowCustomHeaders.sendBodyAndHeaders("new message", headers);
+
+        Thread.sleep(500);
+        assertEquals(received.get(0), "new message");
+        assertTrue(receivedHeaders.containsKey(RabbitMQConstants.EXCHANGE_NAME));
+        assertTrue(receivedHeaders.containsKey(CUSTOM_HEADER));
+    }
 
     private void assertThatBodiesReceivedIn(final List<String> received, final String... expected) throws InterruptedException {
         Thread.sleep(500);
@@ -173,7 +220,6 @@ public class RabbitMQProducerIntTest extends AbstractRabbitMQIntTest {
         for (Map.Entry<String, Object> headers : expectedHeaders.entrySet()) {
             Object receivedValue = receivedHeaders.get(headers.getKey());
             Object expectedValue = headers.getValue();
-
             assertTrue("Header key " + headers.getKey() + " not found", receivedHeaders.containsKey(headers.getKey()));
             assertEquals(0, ObjectHelper.compare(receivedValue == null ? "" : receivedValue.toString(), expectedValue == null ? "" : expectedValue.toString()));
         }