You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/12/21 07:27:36 UTC

[camel] branch camel-2.x updated: CAMEL-14307: allow empty routing key when declaring RabbitMQ dead letter exchange (#3427)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-2.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-2.x by this push:
     new 371d588  CAMEL-14307: allow empty routing key when declaring RabbitMQ dead letter exchange (#3427)
371d588 is described below

commit 371d588cf8dfc0ff9e1e801af3bd5bcfddd5d1c9
Author: rvanderhallen <54...@users.noreply.github.com>
AuthorDate: Sat Dec 21 08:27:25 2019 +0100

    CAMEL-14307: allow empty routing key when declaring RabbitMQ dead letter exchange (#3427)
---
 .../component/rabbitmq/RabbitMQDeclareSupport.java |   5 +-
 .../RabbitMQDeadLetterRoutingKeyIntTest.java       | 168 +++++++++++++++++++++
 2 files changed, 172 insertions(+), 1 deletion(-)

diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQDeclareSupport.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQDeclareSupport.java
index 0fcce01..125d9e0 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQDeclareSupport.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQDeclareSupport.java
@@ -91,7 +91,10 @@ public class RabbitMQDeclareSupport {
     private void populateQueueArgumentsFromDeadLetterExchange(final Map<String, Object> queueArgs) {
         if (endpoint.getDeadLetterExchange() != null) {
             queueArgs.put(RabbitMQConstants.RABBITMQ_DEAD_LETTER_EXCHANGE, endpoint.getDeadLetterExchange());
-            queueArgs.put(RabbitMQConstants.RABBITMQ_DEAD_LETTER_ROUTING_KEY, endpoint.getDeadLetterRoutingKey());
+
+            if (endpoint.getDeadLetterRoutingKey() != null) {
+                queueArgs.put(RabbitMQConstants.RABBITMQ_DEAD_LETTER_ROUTING_KEY, endpoint.getDeadLetterRoutingKey());
+            }
         }
     }
 
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQDeadLetterRoutingKeyIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQDeadLetterRoutingKeyIntTest.java
new file mode 100644
index 0000000..70fa772
--- /dev/null
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQDeadLetterRoutingKeyIntTest.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rabbitmq;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.DefaultConsumer;
+import com.rabbitmq.client.Envelope;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertEquals;
+
+public class RabbitMQDeadLetterRoutingKeyIntTest extends AbstractRabbitMQIntTest {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQDeadLetterRoutingKeyIntTest.class);
+
+    private static final String CONSUMER = "rabbitmq:ex9?hostname=localhost&portNumber=5672&username=cameltest&password=cameltest"
+            + "&skipExchangeDeclare=false"
+            + "&skipQueueDeclare=false"
+            + "&autoDelete=false"
+            + "&durable=true"
+            + "&autoAck=false"
+            + "&queue=q9"
+            + "&routingKey=rk1"
+            + "&deadLetterExchange=dlx"
+            + "&deadLetterQueue=dlq"
+            + "&deadLetterExchangeType=fanout";
+    
+    private static final String CONSUMER_WITH_DEADLETTER_ROUTING_KEY = "rabbitmq:ex10?hostname=localhost&portNumber=5672&username=cameltest&password=cameltest"
+            + "&skipExchangeDeclare=false"
+            + "&skipQueueDeclare=false"
+            + "&autoDelete=false&durable=true"
+            + "&autoAck=false&queue=q10"
+            + "&routingKey=rk1"
+            + "&deadLetterExchange=dlx"
+            + "&deadLetterQueue=dlq"
+            + "&deadLetterExchangeType=fanout"
+            + "&deadLetterRoutingKey=rk2";
+
+    private Connection connection;
+    private Channel channel;
+    private Channel deadLetterChannel;
+
+    @EndpointInject(uri = "mock:received")
+    private MockEndpoint receivedEndpoint;
+
+    @Produce(uri = "direct:start")
+    private ProducerTemplate template;
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() {
+                from(CONSUMER).to(receivedEndpoint);
+                from(CONSUMER_WITH_DEADLETTER_ROUTING_KEY).to(receivedEndpoint);
+            }
+        };
+    }
+
+    @Before
+    public void setUpRabbitMQ() throws Exception {
+        connection = connection();
+        channel = connection.createChannel();
+        deadLetterChannel = connection.createChannel();
+    }
+
+    @After
+    public void tearDownRabbitMQ() throws Exception {
+        channel.abort();
+        deadLetterChannel.abort();
+        connection.abort();
+    }
+
+    @Test
+    public void originalRoutingKeyIsReceived() throws IOException, InterruptedException {
+        final List<String> received = new ArrayList<>();
+        final StringBuilder routingKey = new StringBuilder();
+
+        AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
+                .contentType("text/plain")
+                .contentEncoding(StandardCharsets.UTF_8.toString()).build();
+
+        receivedEndpoint.whenAnyExchangeReceived(exchange -> { 
+            throw new Exception("Simulated exception"); 
+        });
+
+        channel.basicPublish("ex9", "rk1", properties, "new message".getBytes(StandardCharsets.UTF_8));
+
+        deadLetterChannel.basicConsume("dlq", true, new DeadLetterRoutingKeyConsumer(received, routingKey));
+
+        Thread.sleep(500);
+
+        assertListSize(received, 1);
+        assertEquals("rk1", routingKey.toString());
+    }
+
+    @Test
+    public void deadLetterRoutingKeyIsReceived() throws IOException, InterruptedException {
+        final List<String> received = new ArrayList<>();
+        StringBuilder routingKey = new StringBuilder();
+
+        AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
+                .contentType("text/plain")
+                .contentEncoding(StandardCharsets.UTF_8.toString()).build();
+
+        receivedEndpoint.whenAnyExchangeReceived(exchange -> { 
+            throw new Exception("Simulated exception"); 
+        });
+
+        channel.basicPublish("ex10", "rk1", properties, "new message".getBytes(StandardCharsets.UTF_8));
+
+        deadLetterChannel.basicConsume("dlq", true, new DeadLetterRoutingKeyConsumer(received, routingKey));
+
+        Thread.sleep(500);
+
+        assertListSize(received, 1);
+        assertEquals("rk2", routingKey.toString());
+    }
+
+    private class DeadLetterRoutingKeyConsumer extends DefaultConsumer {
+        private final StringBuilder routingKey;
+        private final List<String> received;
+
+        DeadLetterRoutingKeyConsumer(final List<String> received, final StringBuilder routingKey) {
+            super(RabbitMQDeadLetterRoutingKeyIntTest.this.deadLetterChannel);
+            this.received = received;
+            this.routingKey = routingKey;
+        }
+
+        @Override
+        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
+            LOGGER.info("AMQP.BasicProperties: {}", properties);
+
+            received.add(new String(body));
+            routingKey.append(envelope.getRoutingKey());
+        }
+    }
+}