You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/03/10 11:24:49 UTC
[camel] 01/03: CAMEL-14685 - Camel-RabbitMQ: Introduce an
HeaderFilterStrategy to RabbitMQMessageConverter
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit b7385bec6947126c93ede51eb0a834b2c7cefa1c
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Tue Mar 10 11:25:27 2020 +0100
CAMEL-14685 - Camel-RabbitMQ: Introduce an HeaderFilterStrategy to RabbitMQMessageConverter
---
.../component/rabbitmq/RabbitMQComponent.java | 2 +-
.../camel/component/rabbitmq/RabbitMQEndpoint.java | 15 +++++++
.../rabbitmq/RabbitMQHeaderFilterStrategy.java | 33 ++++++++++++++
.../rabbitmq/RabbitMQMessageConverter.java | 26 +++++++++--
.../integration/RabbitMQProducerIntTest.java | 50 +++++++++++++++++++++-
5 files changed, 119 insertions(+), 7 deletions(-)
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
index 78c10ce..987abc1 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
@@ -267,7 +267,7 @@ public class RabbitMQComponent extends DefaultComponent {
// Change null headers processing for message converter
endpoint.getMessageConverter().setAllowNullHeaders(endpoint.isAllowNullHeaders());
-
+ endpoint.getMessageConverter().setAllowCustomHeaders(endpoint.isAllowCustomHeaders());
return endpoint;
}
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
index fbf35da..6c99eca 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
@@ -166,6 +166,8 @@ public class RabbitMQEndpoint extends DefaultEndpoint implements AsyncEndpoint {
private boolean guaranteedDeliveries;
@UriParam(label = "producer")
private boolean allowNullHeaders;
+ @UriParam(label = "producer")
+ private boolean allowCustomHeaders = true;
@UriParam(label = "consumer")
private String consumerTag = "";
// camel-jms supports this setting but it is not currently configurable in
@@ -962,4 +964,17 @@ public class RabbitMQEndpoint extends DefaultEndpoint implements AsyncEndpoint {
public void setConsumerTag(String consumerTag) {
this.consumerTag = consumerTag;
}
+
+ public boolean isAllowCustomHeaders() {
+ return allowCustomHeaders;
+ }
+
+ /**
+ * Allow pass custom values to header
+ */
+ public void setAllowCustomHeaders(boolean allowCustomHeaders) {
+ this.allowCustomHeaders = allowCustomHeaders;
+ }
+
+
}
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQHeaderFilterStrategy.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQHeaderFilterStrategy.java
new file mode 100644
index 0000000..c7893ab
--- /dev/null
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQHeaderFilterStrategy.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rabbitmq;
+
+import org.apache.camel.support.DefaultHeaderFilterStrategy;
+
+public class RabbitMQHeaderFilterStrategy extends DefaultHeaderFilterStrategy {
+
+ public RabbitMQHeaderFilterStrategy() {
+ initialize();
+ }
+
+ protected void initialize() {
+ // filter headers begin with "Camel" or "org.apache.camel"
+ setOutFilterPattern("rabbitmq.*");
+ setInFilterPattern("rabbitmq.*");
+ }
+
+}
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java
index a5ceb41..78fa302 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java
@@ -30,6 +30,7 @@ import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.LongString;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
+import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.support.DefaultMessage;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
@@ -39,6 +40,8 @@ public class RabbitMQMessageConverter {
protected static final Logger LOG = LoggerFactory.getLogger(RabbitMQMessageConverter.class);
private boolean allowNullHeaders;
+ private boolean allowCustomHeaders;
+ private final HeaderFilterStrategy headerFilterStrategy = new RabbitMQHeaderFilterStrategy();
/**
* Will take an {@link Exchange} and add header values back to the {@link Exchange#getIn()}
@@ -164,14 +167,21 @@ public class RabbitMQMessageConverter {
final Map<String, Object> headers = msg.getHeaders();
Map<String, Object> filteredHeaders = new HashMap<>();
- // TODO: Add support for a HeaderFilterStrategy. See: org.apache.camel.component.jms.JmsBinding#shouldOutputHeader
for (Map.Entry<String, Object> header : headers.entrySet()) {
// filter header values.
- Object value = getValidRabbitMQHeaderValue(header.getKey(), header.getValue());
-
+ Object value = getValidRabbitMQHeaderValue(header.getKey(), header.getValue());
+
// additionaly filter out the OVERRIDE header so it does not propagate
if ((value != null || isAllowNullHeaders()) && !header.getKey().equals(RabbitMQConstants.EXCHANGE_OVERRIDE_NAME)) {
- filteredHeaders.put(header.getKey(), header.getValue());
+ boolean filteredHeader;
+ if (!allowCustomHeaders) {
+ filteredHeader = headerFilterStrategy.applyFilterToCamelHeaders(header.getKey(), header.getValue(), exchange);
+ if (filteredHeader) {
+ filteredHeaders.put(header.getKey(), header.getValue());
+ }
+ } else {
+ filteredHeaders.put(header.getKey(), header.getValue());
+ }
} else if (LOG.isDebugEnabled()) {
if (header.getValue() == null) {
LOG.debug("Ignoring header: {} with null value", header.getKey());
@@ -329,4 +339,12 @@ public class RabbitMQMessageConverter {
public void setAllowNullHeaders(boolean allowNullHeaders) {
this.allowNullHeaders = allowNullHeaders;
}
+
+ public boolean isAllowCustomHeaders() {
+ return allowCustomHeaders;
+ }
+
+ public void setAllowCustomHeaders(boolean allowCustomHeaders) {
+ this.allowCustomHeaders = allowCustomHeaders;
+ }
}
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQProducerIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQProducerIntTest.java
index 83409aa..cca5f4b 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQProducerIntTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/integration/RabbitMQProducerIntTest.java
@@ -48,7 +48,8 @@ public class RabbitMQProducerIntTest extends AbstractRabbitMQIntTest {
private static final String CUSTOM_HEADER = "CustomHeader";
private static final String BASIC_URI_FORMAT = "rabbitmq:localhost:5672/%s?routingKey=%s&username=cameltest&password=cameltest&skipQueueDeclare=true";
private static final String BASIC_URI = String.format(BASIC_URI_FORMAT, EXCHANGE, ROUTE);
- private static final String ALLOW_NULL_HEADERS = BASIC_URI + "&allowNullHeaders=true";
+ private static final String ALLOW_NULL_HEADERS = BASIC_URI + "&allowNullHeaders=true&allowCustomHeaders=false";
+ private static final String ALLOW_CUSTOM_HEADERS = BASIC_URI + "&allowCustomHeaders=true";
private static final String PUBLISHER_ACKNOWLEDGES_URI = BASIC_URI + "&mandatory=true&publisherAcknowledgements=true";
private static final String PUBLISHER_ACKNOWLEDGES_BAD_ROUTE_URI = String.format(BASIC_URI_FORMAT, EXCHANGE, "route2") + "&publisherAcknowledgements=true";
private static final String GUARANTEED_DELIVERY_URI = BASIC_URI + "&mandatory=true&guaranteedDeliveries=true";
@@ -60,6 +61,12 @@ public class RabbitMQProducerIntTest extends AbstractRabbitMQIntTest {
@Produce("direct:start-allow-null-headers")
protected ProducerTemplate templateAllowNullHeaders;
+
+ @Produce("direct:start-not-allow-custom-headers")
+ protected ProducerTemplate templateNotAllowCustomHeaders;
+
+ @Produce("direct:start-allow-custom-headers")
+ protected ProducerTemplate templateAllowCustomHeaders;
@Produce("direct:start-with-confirms")
protected ProducerTemplate templateWithConfirms;
@@ -87,6 +94,8 @@ public class RabbitMQProducerIntTest extends AbstractRabbitMQIntTest {
public void configure() throws Exception {
from("direct:start").to(BASIC_URI);
from("direct:start-allow-null-headers").to(ALLOW_NULL_HEADERS);
+ from("direct:start-not-allow-custom-headers").to(ALLOW_NULL_HEADERS);
+ from("direct:start-allow-custom-headers").to(ALLOW_CUSTOM_HEADERS);
from("direct:start-with-confirms").to(PUBLISHER_ACKNOWLEDGES_URI);
from("direct:start-with-confirms-bad-route").to(PUBLISHER_ACKNOWLEDGES_BAD_ROUTE_URI);
from("direct:start-with-guaranteed-delivery").to(GUARANTEED_DELIVERY_URI);
@@ -151,6 +160,44 @@ public class RabbitMQProducerIntTest extends AbstractRabbitMQIntTest {
assertThatBodiesAndHeadersReceivedIn(receivedHeaders, headers, received, "new message");
}
+
+ @Test
+ public void producedMessageNotAllowCustomHeaders() throws InterruptedException, IOException, TimeoutException {
+ final List<String> received = new ArrayList<>();
+ final Map<String, Object> receivedHeaders = new HashMap<>();
+ Map<String, Object> headers = new HashMap<>();
+
+ headers.put(RabbitMQConstants.EXCHANGE_NAME, "testa");
+ headers.put(CUSTOM_HEADER, "exchange");
+
+ channel.basicConsume("sammyq", true, new ArrayPopulatingConsumer(received, receivedHeaders));
+
+ templateNotAllowCustomHeaders.sendBodyAndHeaders("new message", headers);
+
+ Thread.sleep(500);
+ assertEquals(received.get(0), "new message");
+ assertTrue(receivedHeaders.containsKey(RabbitMQConstants.EXCHANGE_NAME));
+ assertFalse(receivedHeaders.containsKey(CUSTOM_HEADER));
+ }
+
+ @Test
+ public void producedMessageAllowCustomHeaders() throws InterruptedException, IOException, TimeoutException {
+ final List<String> received = new ArrayList<>();
+ final Map<String, Object> receivedHeaders = new HashMap<>();
+ Map<String, Object> headers = new HashMap<>();
+
+ headers.put(RabbitMQConstants.EXCHANGE_NAME, "testa");
+ headers.put(CUSTOM_HEADER, "exchange");
+
+ channel.basicConsume("sammyq", true, new ArrayPopulatingConsumer(received, receivedHeaders));
+
+ templateAllowCustomHeaders.sendBodyAndHeaders("new message", headers);
+
+ Thread.sleep(500);
+ assertEquals(received.get(0), "new message");
+ assertTrue(receivedHeaders.containsKey(RabbitMQConstants.EXCHANGE_NAME));
+ assertTrue(receivedHeaders.containsKey(CUSTOM_HEADER));
+ }
private void assertThatBodiesReceivedIn(final List<String> received, final String... expected) throws InterruptedException {
Thread.sleep(500);
@@ -173,7 +220,6 @@ public class RabbitMQProducerIntTest extends AbstractRabbitMQIntTest {
for (Map.Entry<String, Object> headers : expectedHeaders.entrySet()) {
Object receivedValue = receivedHeaders.get(headers.getKey());
Object expectedValue = headers.getValue();
-
assertTrue("Header key " + headers.getKey() + " not found", receivedHeaders.containsKey(headers.getKey()));
assertEquals(0, ObjectHelper.compare(receivedValue == null ? "" : receivedValue.toString(), expectedValue == null ? "" : expectedValue.toString()));
}