You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/08/12 06:58:39 UTC

[1/2] camel git commit: CAMEL-10239: Provide implementation for publisher acknowledgement together with basic.return

Repository: camel
Updated Branches:
  refs/heads/master 753100b4c -> 664637fce


CAMEL-10239: Provide implementation for publisher acknowledgement together with basic.return


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ff96d5b2
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ff96d5b2
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ff96d5b2

Branch: refs/heads/master
Commit: ff96d5b22a023119f72d91c598509302a9f77f3a
Parents: 753100b
Author: Florian Gessner <fl...@gmail.com>
Authored: Thu Aug 11 20:54:48 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Aug 12 08:49:22 2016 +0200

----------------------------------------------------------------------
 .../component/rabbitmq/RabbitMQEndpoint.java    | 24 +++++++++--
 .../rabbitmq/RabbitMQMessagePublisher.java      | 33 ++++++++++++---
 .../rabbitmq/RabbitMQProducerIntTest.java       | 42 +++++++++++++++++++-
 3 files changed, 89 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/ff96d5b2/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
index bf40766..cfd9a06 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
@@ -150,6 +150,8 @@ public class RabbitMQEndpoint extends DefaultEndpoint implements AsyncEndpoint {
     private boolean publisherAcknowledgements;
     @UriParam(label = "producer")
     private long publisherAcknowledgementsTimeout;
+    @UriParam(label = "producer")
+    private boolean guaranteedDeliveries;
     // camel-jms supports this setting but it is not currently configurable in camel-rabbitmq
     private boolean useMessageIDAsCorrelationID = true;
     // camel-jms supports this setting but it is not currently configurable in camel-rabbitmq
@@ -411,7 +413,7 @@ public class RabbitMQEndpoint extends DefaultEndpoint implements AsyncEndpoint {
 
     /**
      * If true the queue will not be bound to the exchange after declaring it
-     * @return 
+     * @return
      */
     public boolean isSkipQueueBind() {
         return skipQueueBind;
@@ -420,7 +422,7 @@ public class RabbitMQEndpoint extends DefaultEndpoint implements AsyncEndpoint {
     public void setSkipQueueBind(boolean skipQueueBind) {
         this.skipQueueBind = skipQueueBind;
     }
-     
+
     /**
      * This can be used if we need to declare the queue but not the exchange
      */
@@ -799,7 +801,7 @@ public class RabbitMQEndpoint extends DefaultEndpoint implements AsyncEndpoint {
     }
 
     /**
-     * When true and an inOut Exchange failed on the consumer side send the caused Exception back in the response 
+     * When true and an inOut Exchange failed on the consumer side send the caused Exception back in the response
      */
     public void setTransferException(boolean transferException) {
         this.transferException = transferException;
@@ -832,6 +834,22 @@ public class RabbitMQEndpoint extends DefaultEndpoint implements AsyncEndpoint {
     }
 
     /**
+     * When true, an exception will be thrown when the message cannot be delivered (basic.return) and the message is
+     * marked as mandatory.
+     * PublisherAcknowledgement will also be activated in this case
+     *
+     * See also <a href=https://www.rabbitmq.com/confirms.html">publisher acknowledgements</a> - When will messages be
+     * confirmed?
+     */
+    public boolean isGuaranteedDeliveries() {
+        return guaranteedDeliveries;
+    }
+
+    public void setGuaranteedDeliveries(boolean guaranteedDeliveries) {
+        this.guaranteedDeliveries = guaranteedDeliveries;
+    }
+
+    /**
      * Get replyToType for inOut exchange
      */
     public String getReplyToType() {

http://git-wip-us.apache.org/repos/asf/camel/blob/ff96d5b2/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java
index bc78665..15f69ff 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeoutException;
 
 import com.rabbitmq.client.AMQP;
 import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.ReturnListener;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.NoTypeConversionAvailableException;
@@ -37,6 +38,13 @@ import org.slf4j.LoggerFactory;
  * A method object for publishing to RabbitMQ
  */
 public class RabbitMQMessagePublisher {
+    private static final ReturnListener GUARANTEED_DELIVERY_RETURN_LISTENER = new ReturnListener() {
+        @Override
+        public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
+            throw new RuntimeCamelException("Delivery failed for exchange " + exchange + " and routing key " + routingKey + "; replyCode = " + replyCode + " replyText = " + replyText);
+        }
+    };
+
     private static final Logger LOG = LoggerFactory.getLogger(RabbitMQMessagePublisher.class);
     private final Exchange camelExchange;
     private final Channel channel;
@@ -60,7 +68,7 @@ public class RabbitMQMessagePublisher {
             LOG.debug("Removing the {} header", RabbitMQEndpoint.SERIALIZE_HEADER);
             message.getHeaders().remove(RabbitMQEndpoint.SERIALIZE_HEADER);
         }
-        
+
         return message;
     }
 
@@ -86,7 +94,7 @@ public class RabbitMQMessagePublisher {
                 throw new RuntimeCamelException(e);
             }
         }
-        
+
         publishToRabbitMQ(properties, body);
     }
 
@@ -98,17 +106,30 @@ public class RabbitMQMessagePublisher {
 
         LOG.debug("Sending message to exchange: {} with CorrelationId = {}", rabbitExchange, properties.getCorrelationId());
 
-        if (endpoint.isPublisherAcknowledgements()) {
+        if (isPublisherAcknowledgements()) {
             channel.confirmSelect();
         }
+        if (endpoint.isGuaranteedDeliveries()) {
+            channel.addReturnListener(GUARANTEED_DELIVERY_RETURN_LISTENER);
 
-        channel.basicPublish(rabbitExchange, routingKey, mandatory, immediate, properties, body);
+        }
 
-        if (endpoint.isPublisherAcknowledgements()) {
-            waitForConfirmation();
+        try {
+            channel.basicPublish(rabbitExchange, routingKey, mandatory, immediate, properties, body);
+            if (isPublisherAcknowledgements()) {
+                waitForConfirmation();
+            }
+        } finally {
+            if (endpoint.isGuaranteedDeliveries()) {
+                channel.removeReturnListener(GUARANTEED_DELIVERY_RETURN_LISTENER);
+            }
         }
     }
 
+    private boolean isPublisherAcknowledgements() {
+        return endpoint.isPublisherAcknowledgements() || endpoint.isGuaranteedDeliveries();
+    }
+
     private void waitForConfirmation() throws IOException {
         try {
             LOG.debug("Waiting for publisher acknowledgements for {}ms", endpoint.getPublisherAcknowledgementsTimeout());

http://git-wip-us.apache.org/repos/asf/camel/blob/ff96d5b2/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java
index fded387..5be1be1 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java
@@ -29,6 +29,7 @@ import com.rabbitmq.client.DefaultConsumer;
 import com.rabbitmq.client.Envelope;
 import org.apache.camel.Produce;
 import org.apache.camel.ProducerTemplate;
+import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.junit.After;
@@ -42,6 +43,9 @@ public class RabbitMQProducerIntTest extends CamelTestSupport {
     private static final String BASIC_URI = String.format(BASIC_URI_FORMAT, EXCHANGE, ROUTE);
     private static final String PUBLISHER_ACKNOWLEDGES_URI = BASIC_URI + "&mandatory=true&publisherAcknowledgements=true";
     private static final String PUBLISHER_ACKNOWLEDGES_BAD_ROUTE_URI = String.format(BASIC_URI_FORMAT, EXCHANGE, "route2") + "&publisherAcknowledgements=true";
+    private static final String GUARANTEED_DELIVERY_URI = BASIC_URI + "&mandatory=true&guaranteedDeliveries=true";
+    private static final String GUARANTEED_DELIVERY_BAD_ROUTE_NOT_MANDATORY_URI = String.format(BASIC_URI_FORMAT, EXCHANGE, "route2") + "&guaranteedDeliveries=true";
+    private static final String GUARANTEED_DELIVERY_BAD_ROUTE_URI = String.format(BASIC_URI_FORMAT, EXCHANGE, "route2") + "&mandatory=true&guaranteedDeliveries=true";
 
     @Produce(uri = "direct:start")
     protected ProducerTemplate template;
@@ -52,6 +56,15 @@ public class RabbitMQProducerIntTest extends CamelTestSupport {
     @Produce(uri = "direct:start-with-confirms-bad-route")
     protected ProducerTemplate templateWithConfirmsAndBadRoute;
 
+    @Produce(uri = "direct:start-with-guaranteed-delivery")
+    protected ProducerTemplate templateWithGuranteedDelivery;
+
+    @Produce(uri = "direct:start-with-guaranteed-delivery-bad-route")
+    protected ProducerTemplate templateWithGuranteedDeliveryAndBadRoute;
+
+    @Produce(uri = "direct:start-with-guaranteed-delivery-bad-route-but-not-mandatory")
+    protected ProducerTemplate templateWithGuranteedDeliveryBadRouteButNotMandatory;
+
     private Connection connection;
     private Channel channel;
 
@@ -64,6 +77,9 @@ public class RabbitMQProducerIntTest extends CamelTestSupport {
                 from("direct:start").to(BASIC_URI);
                 from("direct:start-with-confirms").to(PUBLISHER_ACKNOWLEDGES_URI);
                 from("direct:start-with-confirms-bad-route").to(PUBLISHER_ACKNOWLEDGES_BAD_ROUTE_URI);
+                from("direct:start-with-guaranteed-delivery").to(GUARANTEED_DELIVERY_URI);
+                from("direct:start-with-guaranteed-delivery-bad-route").to(GUARANTEED_DELIVERY_BAD_ROUTE_URI);
+                from("direct:start-with-guaranteed-delivery-bad-route-but-not-mandatory").to(GUARANTEED_DELIVERY_BAD_ROUTE_NOT_MANDATORY_URI);
             }
         };
     }
@@ -121,6 +137,31 @@ public class RabbitMQProducerIntTest extends CamelTestSupport {
         assertThatBodiesReceivedIn(received);
     }
 
+    @Test
+    public void shouldSuccessfullyProduceMessageWhenGuaranteedDeliveryIsActivatedAndMessageIsMarkedAsMandatory() throws InterruptedException, IOException, TimeoutException {
+        final List<String> received = new ArrayList<>();
+        channel.basicConsume("sammyq", true, new ArrayPopulatingConsumer(received));
+
+        templateWithGuranteedDelivery.sendBodyAndHeader("publisher ack message", RabbitMQConstants.EXCHANGE_NAME, "ex1");
+
+        assertThatBodiesReceivedIn(received, "publisher ack message");
+    }
+
+    @Test(expected = RuntimeCamelException.class)
+    public void shouldFailIfMessageIsMarkedAsMandatoryAndGuaranteedDeliveryIsActiveButNoQueueIsBound() {
+        templateWithGuranteedDeliveryAndBadRoute.sendBody("publish with ack and return message");
+    }
+
+    @Test
+    public void shouldSuccessfullyProduceMessageWhenGuaranteedDeliveryIsActivatedOnABadRouteButMessageIsNotMandatory() throws InterruptedException, IOException, TimeoutException {
+        final List<String> received = new ArrayList<>();
+        channel.basicConsume("sammyq", true, new ArrayPopulatingConsumer(received));
+
+        templateWithGuranteedDeliveryBadRouteButNotMandatory.sendBodyAndHeader("publisher ack message", RabbitMQConstants.EXCHANGE_NAME, "ex1");
+
+        assertThatBodiesReceivedIn(received);
+    }
+
     private Connection createTestConnection() throws IOException, TimeoutException {
         ConnectionFactory factory = new ConnectionFactory();
         factory.setHost("localhost");
@@ -148,4 +189,3 @@ public class RabbitMQProducerIntTest extends CamelTestSupport {
         }
     }
 }
-


[2/2] camel git commit: CAMEL-10239: Add docs. This closes #1116. This closes #1114

Posted by da...@apache.org.
CAMEL-10239: Add docs. This closes #1116. This closes #1114


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/664637fc
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/664637fc
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/664637fc

Branch: refs/heads/master
Commit: 664637fce90285f52944d8b50483d65fc82f7365
Parents: ff96d5b
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Aug 12 08:58:32 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Aug 12 08:58:32 2016 +0200

----------------------------------------------------------------------
 components/camel-rabbitmq/src/main/docs/rabbitmq.adoc | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/664637fc/components/camel-rabbitmq/src/main/docs/rabbitmq.adoc
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/docs/rabbitmq.adoc b/components/camel-rabbitmq/src/main/docs/rabbitmq.adoc
index aeade1d..34bd468 100644
--- a/components/camel-rabbitmq/src/main/docs/rabbitmq.adoc
+++ b/components/camel-rabbitmq/src/main/docs/rabbitmq.adoc
@@ -49,8 +49,9 @@ The RabbitMQ component has no options.
 
 
 
+
 // endpoint options: START
-The RabbitMQ component supports 54 endpoint options which are listed below:
+The RabbitMQ component supports 55 endpoint options which are listed below:
 
 {% raw %}
 [width="100%",cols="2s,1,1m,1m,5",options="header"]
@@ -86,6 +87,7 @@ The RabbitMQ component supports 54 endpoint options which are listed below:
 | bridgeEndpoint | producer | false | boolean | If the bridgeEndpoint is true the producer will ignore the message header of rabbitmq.EXCHANGE_NAME and rabbitmq.ROUTING_KEY
 | channelPoolMaxSize | producer | 10 | int | Get maximum number of opened channel in pool
 | channelPoolMaxWait | producer | 1000 | long | Set the maximum number of milliseconds to wait for a channel from the pool
+| guaranteedDeliveries | producer | false | boolean | When true an exception will be thrown when the message cannot be delivered (basic.return) and the message is marked as mandatory. PublisherAcknowledgement will also be activated in this case See also publisher acknowledgements - When will messages be confirmed
 | immediate | producer | false | boolean | This flag tells the server how to react if the message cannot be routed to a queue consumer immediately. If this flag is set the server will return an undeliverable message with a Return method. If this flag is zero the server will queue the message but with no guarantee that it will ever be consumed. If the header is present rabbitmq.IMMEDIATE it will override this option.
 | mandatory | producer | false | boolean | This flag tells the server how to react if the message cannot be routed to a queue. If this flag is set the server will return an unroutable message with a Return method. If this flag is zero the server silently drops the message. If the header is present rabbitmq.MANDATORY it will override this option.
 | publisherAcknowledgements | producer | false | boolean | When true the message will be published with publisher acknowledgements turned on
@@ -116,6 +118,7 @@ The RabbitMQ component supports 54 endpoint options which are listed below:
 
 
 
+
 See
 http://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html[http://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html]
 and the AMQP specification for more information on connection options.