You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/12/21 07:27:36 UTC
[camel] branch camel-2.x updated: CAMEL-14307: allow empty routing
key when declaring RabbitMQ dead letter exchange (#3427)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-2.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-2.x by this push:
new 371d588 CAMEL-14307: allow empty routing key when declaring RabbitMQ dead letter exchange (#3427)
371d588 is described below
commit 371d588cf8dfc0ff9e1e801af3bd5bcfddd5d1c9
Author: rvanderhallen <54...@users.noreply.github.com>
AuthorDate: Sat Dec 21 08:27:25 2019 +0100
CAMEL-14307: allow empty routing key when declaring RabbitMQ dead letter exchange (#3427)
---
.../component/rabbitmq/RabbitMQDeclareSupport.java | 5 +-
.../RabbitMQDeadLetterRoutingKeyIntTest.java | 168 +++++++++++++++++++++
2 files changed, 172 insertions(+), 1 deletion(-)
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQDeclareSupport.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQDeclareSupport.java
index 0fcce01..125d9e0 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQDeclareSupport.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQDeclareSupport.java
@@ -91,7 +91,10 @@ public class RabbitMQDeclareSupport {
private void populateQueueArgumentsFromDeadLetterExchange(final Map<String, Object> queueArgs) {
if (endpoint.getDeadLetterExchange() != null) {
queueArgs.put(RabbitMQConstants.RABBITMQ_DEAD_LETTER_EXCHANGE, endpoint.getDeadLetterExchange());
- queueArgs.put(RabbitMQConstants.RABBITMQ_DEAD_LETTER_ROUTING_KEY, endpoint.getDeadLetterRoutingKey());
+
+ if (endpoint.getDeadLetterRoutingKey() != null) {
+ queueArgs.put(RabbitMQConstants.RABBITMQ_DEAD_LETTER_ROUTING_KEY, endpoint.getDeadLetterRoutingKey());
+ }
}
}
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQDeadLetterRoutingKeyIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQDeadLetterRoutingKeyIntTest.java
new file mode 100644
index 0000000..70fa772
--- /dev/null
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQDeadLetterRoutingKeyIntTest.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rabbitmq;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.DefaultConsumer;
+import com.rabbitmq.client.Envelope;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertEquals;
+
+public class RabbitMQDeadLetterRoutingKeyIntTest extends AbstractRabbitMQIntTest {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQDeadLetterRoutingKeyIntTest.class);
+
+ private static final String CONSUMER = "rabbitmq:ex9?hostname=localhost&portNumber=5672&username=cameltest&password=cameltest"
+ + "&skipExchangeDeclare=false"
+ + "&skipQueueDeclare=false"
+ + "&autoDelete=false"
+ + "&durable=true"
+ + "&autoAck=false"
+ + "&queue=q9"
+ + "&routingKey=rk1"
+ + "&deadLetterExchange=dlx"
+ + "&deadLetterQueue=dlq"
+ + "&deadLetterExchangeType=fanout";
+
+ private static final String CONSUMER_WITH_DEADLETTER_ROUTING_KEY = "rabbitmq:ex10?hostname=localhost&portNumber=5672&username=cameltest&password=cameltest"
+ + "&skipExchangeDeclare=false"
+ + "&skipQueueDeclare=false"
+ + "&autoDelete=false&durable=true"
+ + "&autoAck=false&queue=q10"
+ + "&routingKey=rk1"
+ + "&deadLetterExchange=dlx"
+ + "&deadLetterQueue=dlq"
+ + "&deadLetterExchangeType=fanout"
+ + "&deadLetterRoutingKey=rk2";
+
+ private Connection connection;
+ private Channel channel;
+ private Channel deadLetterChannel;
+
+ @EndpointInject(uri = "mock:received")
+ private MockEndpoint receivedEndpoint;
+
+ @Produce(uri = "direct:start")
+ private ProducerTemplate template;
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+
+ @Override
+ public void configure() {
+ from(CONSUMER).to(receivedEndpoint);
+ from(CONSUMER_WITH_DEADLETTER_ROUTING_KEY).to(receivedEndpoint);
+ }
+ };
+ }
+
+ @Before
+ public void setUpRabbitMQ() throws Exception {
+ connection = connection();
+ channel = connection.createChannel();
+ deadLetterChannel = connection.createChannel();
+ }
+
+ @After
+ public void tearDownRabbitMQ() throws Exception {
+ channel.abort();
+ deadLetterChannel.abort();
+ connection.abort();
+ }
+
+ @Test
+ public void originalRoutingKeyIsReceived() throws IOException, InterruptedException {
+ final List<String> received = new ArrayList<>();
+ final StringBuilder routingKey = new StringBuilder();
+
+ AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
+ .contentType("text/plain")
+ .contentEncoding(StandardCharsets.UTF_8.toString()).build();
+
+ receivedEndpoint.whenAnyExchangeReceived(exchange -> {
+ throw new Exception("Simulated exception");
+ });
+
+ channel.basicPublish("ex9", "rk1", properties, "new message".getBytes(StandardCharsets.UTF_8));
+
+ deadLetterChannel.basicConsume("dlq", true, new DeadLetterRoutingKeyConsumer(received, routingKey));
+
+ Thread.sleep(500);
+
+ assertListSize(received, 1);
+ assertEquals("rk1", routingKey.toString());
+ }
+
+ @Test
+ public void deadLetterRoutingKeyIsReceived() throws IOException, InterruptedException {
+ final List<String> received = new ArrayList<>();
+ StringBuilder routingKey = new StringBuilder();
+
+ AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
+ .contentType("text/plain")
+ .contentEncoding(StandardCharsets.UTF_8.toString()).build();
+
+ receivedEndpoint.whenAnyExchangeReceived(exchange -> {
+ throw new Exception("Simulated exception");
+ });
+
+ channel.basicPublish("ex10", "rk1", properties, "new message".getBytes(StandardCharsets.UTF_8));
+
+ deadLetterChannel.basicConsume("dlq", true, new DeadLetterRoutingKeyConsumer(received, routingKey));
+
+ Thread.sleep(500);
+
+ assertListSize(received, 1);
+ assertEquals("rk2", routingKey.toString());
+ }
+
+ private class DeadLetterRoutingKeyConsumer extends DefaultConsumer {
+ private final StringBuilder routingKey;
+ private final List<String> received;
+
+ DeadLetterRoutingKeyConsumer(final List<String> received, final StringBuilder routingKey) {
+ super(RabbitMQDeadLetterRoutingKeyIntTest.this.deadLetterChannel);
+ this.received = received;
+ this.routingKey = routingKey;
+ }
+
+ @Override
+ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
+ LOGGER.info("AMQP.BasicProperties: {}", properties);
+
+ received.add(new String(body));
+ routingKey.append(envelope.getRoutingKey());
+ }
+ }
+}