You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/12/30 10:54:31 UTC

[1/2] camel git commit: RabbitMq consumer should be able to suspend and resume

Repository: camel
Updated Branches:
  refs/heads/camel-2.16.x 3d7051793 -> 556600393
  refs/heads/master 0fb50fcf5 -> 1d0bc598b


RabbitMq consumer should be able to suspend and resume


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1d0bc598
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1d0bc598
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1d0bc598

Branch: refs/heads/master
Commit: 1d0bc598b3c7e6b7ed7ffa06d6cdf7885ada010d
Parents: 0fb50fc
Author: Preben Asmussen <pr...@gmail.com>
Authored: Mon Dec 28 15:16:06 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Dec 30 10:53:21 2015 +0100

----------------------------------------------------------------------
 .../component/rabbitmq/RabbitMQConsumer.java    | 43 ++++++----
 .../rabbitmq/RabbitMQSupendResumeIntTest.java   | 84 ++++++++++++++++++++
 2 files changed, 111 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/1d0bc598/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index a71769e..eaf2b6c 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -36,7 +36,6 @@ import org.apache.camel.Processor;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.impl.DefaultConsumer;
 
-
 public class RabbitMQConsumer extends DefaultConsumer {
     private ExecutorService executor;
     private Connection conn;
@@ -60,7 +59,7 @@ public class RabbitMQConsumer extends DefaultConsumer {
 
     @Override
     public RabbitMQEndpoint getEndpoint() {
-        return (RabbitMQEndpoint) super.getEndpoint();
+        return (RabbitMQEndpoint)super.getEndpoint();
     }
 
     /**
@@ -81,8 +80,7 @@ public class RabbitMQConsumer extends DefaultConsumer {
         log.debug("Created channel: {}", channel);
         // setup the basicQos
         if (endpoint.isPrefetchEnabled()) {
-            channel.basicQos(endpoint.getPrefetchSize(), endpoint.getPrefetchCount(),
-                            endpoint.isPrefetchGlobal());
+            channel.basicQos(endpoint.getPrefetchSize(), endpoint.getPrefetchCount(), endpoint.isPrefetchGlobal());
         }
         return channel;
     }
@@ -122,16 +120,25 @@ public class RabbitMQConsumer extends DefaultConsumer {
             startConsumers();
         } catch (Exception e) {
             log.info("Connection failed, will start background thread to retry!", e);
-            // Open connection, and start message listener in background
-            Integer networkRecoveryInterval = getEndpoint().getNetworkRecoveryInterval();
-            final long connectionRetryInterval = networkRecoveryInterval != null && networkRecoveryInterval > 0 ? networkRecoveryInterval : 100L;
-            startConsumerCallable = new StartConsumerCallable(connectionRetryInterval);
-            executor.submit(startConsumerCallable);
+            reconnect();
         }
     }
 
+    private void reconnect() {
+        // Open connection, and start message listener in background
+        Integer networkRecoveryInterval = getEndpoint().getNetworkRecoveryInterval();
+        final long connectionRetryInterval = networkRecoveryInterval != null && networkRecoveryInterval > 0 ? networkRecoveryInterval : 100L;
+        startConsumerCallable = new StartConsumerCallable(connectionRetryInterval);
+        executor.submit(startConsumerCallable);
+    }
+
+    @Override
+    protected void doResume() throws Exception {
+        reconnect();
+    }
+
     /**
-     * If needed, close Connection and Channels 
+     * If needed, close Connection and Channels
      */
     private void closeConnectionAndChannel() throws IOException, TimeoutException {
         if (startConsumerCallable != null) {
@@ -154,6 +161,11 @@ public class RabbitMQConsumer extends DefaultConsumer {
     }
 
     @Override
+    protected void doSuspend() throws Exception {
+        closeConnectionAndChannel();
+    }
+
+    @Override
     protected void doStop() throws Exception {
         closeConnectionAndChannel();
 
@@ -186,9 +198,7 @@ public class RabbitMQConsumer extends DefaultConsumer {
         }
 
         @Override
-        public void handleDelivery(String consumerTag, Envelope envelope,
-                                   AMQP.BasicProperties properties, byte[] body) throws IOException {
-
+        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
             Exchange exchange = consumer.endpoint.createRabbitExchange(envelope, properties, body);
             endpoint.getMessageConverter().mergeAmqpProperties(exchange, properties);
 
@@ -228,7 +238,8 @@ public class RabbitMQConsumer extends DefaultConsumer {
                     channel.basicAck(deliveryTag, false);
                 }
             } else if (endpoint.isTransferException() && exchange.getPattern().isOutCapable()) {
-                // the inOut exchange failed so put the exception in the body and send back
+                // the inOut exchange failed so put the exception in the body
+                // and send back
                 msg.setBody(exchange.getException());
                 exchange.setOut(msg);
                 try {
@@ -282,8 +293,8 @@ public class RabbitMQConsumer extends DefaultConsumer {
     }
 
     /**
-     * Task in charge of opening connection and adding listener when consumer is started
-     * and broker is not available.
+     * Task in charge of opening connection and adding listener when consumer is
+     * started and broker is not available.
      */
     private class StartConsumerCallable implements Callable<Void> {
         private final long connectionRetryInterval;

http://git-wip-us.apache.org/repos/asf/camel/blob/1d0bc598/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSupendResumeIntTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSupendResumeIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSupendResumeIntTest.java
new file mode 100644
index 0000000..fd269a8
--- /dev/null
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSupendResumeIntTest.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rabbitmq;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class RabbitMQSupendResumeIntTest extends CamelTestSupport {
+    private static final String EXCHANGE = "ex4";
+
+    @EndpointInject(uri = "mock:result")
+    private MockEndpoint resultEndpoint;
+
+    @EndpointInject(uri = "rabbitmq:localhost:5672/" + EXCHANGE + "?username=cameltest&password=cameltest&queue=q3&routingKey=rk3&autoDelete=false")
+    private Endpoint rabbitMQEndpoint;
+
+    @Produce(uri = "direct:start")
+    private ProducerTemplate template;
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").routeId("producer").log("sending ${body}").to(rabbitMQEndpoint);
+                from(rabbitMQEndpoint).routeId("consumer").log("got ${body}").to("mock:result");
+            }
+        };
+    }
+
+    @Test
+    public void testSuspendedResume() throws Exception {
+        resultEndpoint.expectedMessageCount(1);
+        resultEndpoint.expectedBodiesReceived("hello");
+
+        template.sendBody("hello");
+
+        assertMockEndpointsSatisfied();
+
+        context.suspendRoute("consumer");
+
+        // sleep a bit to ensure its properly suspended
+        Thread.sleep(2000);
+
+        resetMocks();
+        resultEndpoint.expectedMessageCount(0);
+
+        template.sendBody("Hello2");
+
+        assertMockEndpointsSatisfied(1, TimeUnit.SECONDS);
+
+        resetMocks();
+        resultEndpoint.expectedBodiesReceived("Hello2");
+        resultEndpoint.expectedMessageCount(1);
+
+        context.resumeRoute("consumer");
+
+        assertMockEndpointsSatisfied();
+    }
+
+}


[2/2] camel git commit: RabbitMq consumer should be able to suspend and resume

Posted by da...@apache.org.
RabbitMq consumer should be able to suspend and resume


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/55660039
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/55660039
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/55660039

Branch: refs/heads/camel-2.16.x
Commit: 556600393b6790291c63effa7350ae7c31e1160a
Parents: 3d70517
Author: Preben Asmussen <pr...@gmail.com>
Authored: Mon Dec 28 15:16:06 2015 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Dec 30 10:54:23 2015 +0100

----------------------------------------------------------------------
 .../component/rabbitmq/RabbitMQConsumer.java    | 43 ++++++----
 .../rabbitmq/RabbitMQSupendResumeIntTest.java   | 84 ++++++++++++++++++++
 2 files changed, 111 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/55660039/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index cdb23f4..b535915 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -36,7 +36,6 @@ import org.apache.camel.Processor;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.impl.DefaultConsumer;
 
-
 public class RabbitMQConsumer extends DefaultConsumer {
     private ExecutorService executor;
     private Connection conn;
@@ -60,7 +59,7 @@ public class RabbitMQConsumer extends DefaultConsumer {
 
     @Override
     public RabbitMQEndpoint getEndpoint() {
-        return (RabbitMQEndpoint) super.getEndpoint();
+        return (RabbitMQEndpoint)super.getEndpoint();
     }
 
     /**
@@ -81,8 +80,7 @@ public class RabbitMQConsumer extends DefaultConsumer {
         log.debug("Created channel: {}", channel);
         // setup the basicQos
         if (endpoint.isPrefetchEnabled()) {
-            channel.basicQos(endpoint.getPrefetchSize(), endpoint.getPrefetchCount(),
-                            endpoint.isPrefetchGlobal());
+            channel.basicQos(endpoint.getPrefetchSize(), endpoint.getPrefetchCount(), endpoint.isPrefetchGlobal());
         }
         return channel;
     }
@@ -122,16 +120,25 @@ public class RabbitMQConsumer extends DefaultConsumer {
             startConsumers();
         } catch (Exception e) {
             log.info("Connection failed, will start background thread to retry!", e);
-            // Open connection, and start message listener in background
-            Integer networkRecoveryInterval = getEndpoint().getNetworkRecoveryInterval();
-            final long connectionRetryInterval = networkRecoveryInterval != null && networkRecoveryInterval > 0 ? networkRecoveryInterval : 100L;
-            startConsumerCallable = new StartConsumerCallable(connectionRetryInterval);
-            executor.submit(startConsumerCallable);
+            reconnect();
         }
     }
 
+    private void reconnect() {
+        // Open connection, and start message listener in background
+        Integer networkRecoveryInterval = getEndpoint().getNetworkRecoveryInterval();
+        final long connectionRetryInterval = networkRecoveryInterval != null && networkRecoveryInterval > 0 ? networkRecoveryInterval : 100L;
+        startConsumerCallable = new StartConsumerCallable(connectionRetryInterval);
+        executor.submit(startConsumerCallable);
+    }
+
+    @Override
+    protected void doResume() throws Exception {
+        reconnect();
+    }
+
     /**
-     * If needed, close Connection and Channels 
+     * If needed, close Connection and Channels
      */
     private void closeConnectionAndChannel() throws IOException, TimeoutException {
         if (startConsumerCallable != null) {
@@ -154,6 +161,11 @@ public class RabbitMQConsumer extends DefaultConsumer {
     }
 
     @Override
+    protected void doSuspend() throws Exception {
+        closeConnectionAndChannel();
+    }
+
+    @Override
     protected void doStop() throws Exception {
         closeConnectionAndChannel();
 
@@ -186,9 +198,7 @@ public class RabbitMQConsumer extends DefaultConsumer {
         }
 
         @Override
-        public void handleDelivery(String consumerTag, Envelope envelope,
-                                   AMQP.BasicProperties properties, byte[] body) throws IOException {
-
+        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
             Exchange exchange = consumer.endpoint.createRabbitExchange(envelope, properties, body);
             endpoint.getMessageConverter().mergeAmqpProperties(exchange, properties);
 
@@ -228,7 +238,8 @@ public class RabbitMQConsumer extends DefaultConsumer {
                     channel.basicAck(deliveryTag, false);
                 }
             } else if (endpoint.isTransferException() && exchange.getPattern().isOutCapable()) {
-                // the inOut exchange failed so put the exception in the body and send back
+                // the inOut exchange failed so put the exception in the body
+                // and send back
                 msg.setBody(exchange.getException());
                 exchange.setOut(msg);
                 try {
@@ -282,8 +293,8 @@ public class RabbitMQConsumer extends DefaultConsumer {
     }
 
     /**
-     * Task in charge of opening connection and adding listener when consumer is started
-     * and broker is not available.
+     * Task in charge of opening connection and adding listener when consumer is
+     * started and broker is not available.
      */
     private class StartConsumerCallable implements Callable<Void> {
         private final long connectionRetryInterval;

http://git-wip-us.apache.org/repos/asf/camel/blob/55660039/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSupendResumeIntTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSupendResumeIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSupendResumeIntTest.java
new file mode 100644
index 0000000..fd269a8
--- /dev/null
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSupendResumeIntTest.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rabbitmq;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Produce;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class RabbitMQSupendResumeIntTest extends CamelTestSupport {
+    private static final String EXCHANGE = "ex4";
+
+    @EndpointInject(uri = "mock:result")
+    private MockEndpoint resultEndpoint;
+
+    @EndpointInject(uri = "rabbitmq:localhost:5672/" + EXCHANGE + "?username=cameltest&password=cameltest&queue=q3&routingKey=rk3&autoDelete=false")
+    private Endpoint rabbitMQEndpoint;
+
+    @Produce(uri = "direct:start")
+    private ProducerTemplate template;
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").routeId("producer").log("sending ${body}").to(rabbitMQEndpoint);
+                from(rabbitMQEndpoint).routeId("consumer").log("got ${body}").to("mock:result");
+            }
+        };
+    }
+
+    @Test
+    public void testSuspendedResume() throws Exception {
+        resultEndpoint.expectedMessageCount(1);
+        resultEndpoint.expectedBodiesReceived("hello");
+
+        template.sendBody("hello");
+
+        assertMockEndpointsSatisfied();
+
+        context.suspendRoute("consumer");
+
+        // sleep a bit to ensure its properly suspended
+        Thread.sleep(2000);
+
+        resetMocks();
+        resultEndpoint.expectedMessageCount(0);
+
+        template.sendBody("Hello2");
+
+        assertMockEndpointsSatisfied(1, TimeUnit.SECONDS);
+
+        resetMocks();
+        resultEndpoint.expectedBodiesReceived("Hello2");
+        resultEndpoint.expectedMessageCount(1);
+
+        context.resumeRoute("consumer");
+
+        assertMockEndpointsSatisfied();
+    }
+
+}