You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2014/06/06 07:02:55 UTC

[2/4] git commit: Reconnect RabbitMQ Producer/Consumer after broker was unavailable

Reconnect RabbitMQ Producer/Consumer after broker was unavailable


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1191e19f
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1191e19f
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1191e19f

Branch: refs/heads/master
Commit: 1191e19fc194b545931530011511a55a8d6d6431
Parents: 0762be7
Author: Gerald Quintana <ge...@zenika.com>
Authored: Mon May 5 15:47:01 2014 +0200
Committer: Gerald Quintana <ge...@zenika.com>
Committed: Mon May 5 15:47:01 2014 +0200

----------------------------------------------------------------------
 .../component/rabbitmq/RabbitMQConsumer.java    | 112 +++++++++++++++----
 .../component/rabbitmq/RabbitMQEndpoint.java    |  22 ++++
 .../component/rabbitmq/RabbitMQProducer.java    |  33 +++++-
 .../rabbitmq/RabbitMQEndpointTest.java          |  30 +++--
 .../rabbitmq/RabbitMQReConnectionIntTest.java   |  77 +++++++++++++
 5 files changed, 232 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/1191e19f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index 501f2a1..0bdc93b 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -17,8 +17,9 @@
 package org.apache.camel.component.rabbitmq;
 
 import java.io.IOException;
-import java.util.HashMap;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.rabbitmq.client.AMQP;
 import com.rabbitmq.client.Channel;
@@ -36,6 +37,10 @@ public class RabbitMQConsumer extends DefaultConsumer {
     private int closeTimeout = 30 * 1000;
     
     private final RabbitMQEndpoint endpoint;
+    /**
+     * Task in charge of starting consumer
+     */
+    private StartConsumerCallable startConsumerCallable;
 
     public RabbitMQConsumer(RabbitMQEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
@@ -43,37 +48,56 @@ public class RabbitMQConsumer extends DefaultConsumer {
     }
 
     @Override
-    protected void doStart() throws Exception {
-        executor = endpoint.createExecutor();
-        log.debug("Using executor {}", executor);
-
-        conn = endpoint.connect(executor);
-        log.debug("Using conn {}", conn);
-
-        channel = conn.createChannel();
-        log.debug("Using channel {}", channel);
+    public RabbitMQEndpoint getEndpoint() {
+        return (RabbitMQEndpoint) super.getEndpoint();
+    }
 
-        channel.exchangeDeclare(endpoint.getExchangeName(),
-                endpoint.getExchangeType(),
-                endpoint.isDurable(),
-                endpoint.isAutoDelete(),
-                new HashMap<String, Object>());
+    /**
+     * Open connection and channel
+     */
+    private void openConnectionAndChannel() throws IOException {
+        log.trace("Creating connection...");
+        this.conn = getEndpoint().connect(executor);
+        log.debug("Created connection: {}", conn);
+
+        log.trace("Creating channel...");
+        this.channel = conn.createChannel();
+        log.debug("Created channel: {}", channel);
+    }
 
-        // need to make sure the queueDeclare is same with the exchange declare
-        channel.queueDeclare(endpoint.getQueue(), endpoint.isDurable(), false,
-                endpoint.isAutoDelete(), null);
-        channel.queueBind(
-                endpoint.getQueue(),
-                endpoint.getExchangeName(),
-                endpoint.getRoutingKey() == null ? "" : endpoint
-                        .getRoutingKey());
+    /**
+     * If needed, create Exchange and Queue, then add message listener
+     */
+    private void addConsumer() throws IOException {
+        getEndpoint().declareExchangeAndQueue(channel);
 
         channel.basicConsume(endpoint.getQueue(), endpoint.isAutoAck(),
                 new RabbitConsumer(this, channel));
     }
 
     @Override
-    protected void doStop() throws Exception {
+    protected void doStart() throws Exception {
+        executor = endpoint.createExecutor();
+        log.debug("Using executor {}", executor);
+        try {
+            openConnectionAndChannel();
+            addConsumer();
+        } catch (Exception e) {
+            // Open connection, and start message listener in background
+            Integer networkRecoveryInterval = getEndpoint().getNetworkRecoveryInterval();
+            final long connectionRetryInterval= networkRecoveryInterval!=null && networkRecoveryInterval >0? networkRecoveryInterval :100L;
+            startConsumerCallable=new StartConsumerCallable(connectionRetryInterval);
+            executor.submit(startConsumerCallable);
+        }
+    }
+
+    /**
+     * If needed, close Connection and Channel
+     */
+    private void closeConnectionAndChannel() throws IOException {
+        if (startConsumerCallable!=null) {
+            startConsumerCallable.stop();
+        }
         if (channel != null) {
             log.debug("Closing channel: {}", channel);
             channel.close();
@@ -84,6 +108,12 @@ public class RabbitMQConsumer extends DefaultConsumer {
             conn.close(closeTimeout);
             conn = null;
         }
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        closeConnectionAndChannel();
+
         if (executor != null) {
             if (endpoint != null && endpoint.getCamelContext() != null) {
                 endpoint.getCamelContext().getExecutorServiceManager().shutdownNow(executor);
@@ -178,4 +208,38 @@ public class RabbitMQConsumer extends DefaultConsumer {
 
     }
 
+    /**
+     * Task in charge of opening connection and adding listener when consumer is started
+     * and broker is not avaiblable.
+     */
+    private class StartConsumerCallable implements Callable<Void> {
+        private final long connectionRetryInterval;
+        private final AtomicBoolean running=new AtomicBoolean(true);
+        public StartConsumerCallable(long connectionRetryInterval) {
+            this.connectionRetryInterval = connectionRetryInterval;
+        }
+        public void stop() {
+            running.set(false);
+            RabbitMQConsumer.this.startConsumerCallable=null;
+        }
+        @Override
+        public Void call() throws Exception {
+            boolean connectionFailed=true;
+            // Reconnection loop
+            while (running.get() && connectionFailed) {
+                try {
+                    openConnectionAndChannel();
+                    connectionFailed=false;
+                } catch (Exception e) {
+                    log.debug("Connection failed, will retry in "+connectionRetryInterval+"ms", e);
+                    Thread.sleep(connectionRetryInterval);
+                }
+            }
+            if (!connectionFailed) {
+                addConsumer();
+            }
+            stop();
+            return null;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/1191e19f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
index 646a633..7f87f53 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.net.URISyntaxException;
 import java.security.KeyManagementException;
 import java.security.NoSuchAlgorithmException;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
@@ -29,6 +30,7 @@ import javax.net.ssl.TrustManager;
 
 import com.rabbitmq.client.AMQP;
 import com.rabbitmq.client.Address;
+import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
 import com.rabbitmq.client.Envelope;
@@ -126,6 +128,26 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
         }
     }
 
+	/**
+     * If needed, declare Exchange, declare Queue and bind them with Routing Key
+     */
+    public void declareExchangeAndQueue(Channel channel) throws IOException {
+        channel.exchangeDeclare(getExchangeName(),
+                getExchangeType(),
+                isDurable(),
+                isAutoDelete(),
+                new HashMap<String, Object>());
+        if (getQueue()!=null) {
+            // need to make sure the queueDeclare is same with the exchange declare
+            channel.queueDeclare(getQueue(), isDurable(), false,
+                    isAutoDelete(), null);
+            channel.queueBind(
+                    getQueue(),
+                    getExchangeName(),
+                    getRoutingKey() == null ? "" : getRoutingKey());
+        }
+    }
+
     private ConnectionFactory getOrCreateConnectionFactory() {
         if (connectionFactory == null) {
             ConnectionFactory factory = new ConnectionFactory();

http://git-wip-us.apache.org/repos/asf/camel/blob/1191e19f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
index 7763423..03177e2 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
@@ -45,11 +45,10 @@ public class RabbitMQProducer extends DefaultProducer {
     public RabbitMQEndpoint getEndpoint() {
         return (RabbitMQEndpoint) super.getEndpoint();
     }
-
-    @Override
-    protected void doStart() throws Exception {
-        this.executorService = getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this, "CamelRabbitMQProducer[" + getEndpoint().getQueue() + "]");
-
+    /**
+     * Open connection and channel
+     */
+    private void openConnectionAndChannel() throws IOException {
         log.trace("Creating connection...");
         this.conn = getEndpoint().connect(executorService);
         log.debug("Created connection: {}", conn);
@@ -60,7 +59,20 @@ public class RabbitMQProducer extends DefaultProducer {
     }
 
     @Override
-    protected void doStop() throws Exception {
+    protected void doStart() throws Exception {
+        this.executorService = getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this, "CamelRabbitMQProducer[" + getEndpoint().getQueue() + "]");
+
+        try {
+            openConnectionAndChannel();
+        } catch (IOException e) {
+            log.warn("Failed to create connection", e);
+        }
+    }
+
+    /**
+     * If needed, close Connection and Channel
+     */
+    private void closeConnectionAndChannel() throws IOException {
         if (channel != null) {
             log.debug("Closing channel: {}", channel);
             channel.close();
@@ -71,6 +83,11 @@ public class RabbitMQProducer extends DefaultProducer {
             conn.close(closeTimeout);
             conn = null;
         }
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        closeConnectionAndChannel();
         if (executorService != null) {
             getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
             executorService = null;
@@ -95,6 +112,10 @@ public class RabbitMQProducer extends DefaultProducer {
         byte[] messageBodyBytes = exchange.getIn().getMandatoryBody(byte[].class);
         AMQP.BasicProperties.Builder properties = buildProperties(exchange);
 
+        if (channel==null) {
+            // Open connection and channel lazily
+            openConnectionAndChannel();
+        }
         channel.basicPublish(exchangeName, key, properties.build(), messageBodyBytes);
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/1191e19f/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
index 11b3675..2db5005 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.rabbitmq;
 
+import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.Date;
 import java.util.HashMap;
@@ -26,7 +27,6 @@ import java.util.concurrent.ThreadPoolExecutor;
 
 import com.rabbitmq.client.AMQP;
 import com.rabbitmq.client.Address;
-import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
 import com.rabbitmq.client.Envelope;
 import com.rabbitmq.client.impl.LongStringHelper;
@@ -132,7 +132,13 @@ public class RabbitMQEndpointTest extends CamelTestSupport {
     }
 
     private ConnectionFactory createConnectionFactory(String uri) {
-        RabbitMQEndpoint endpoint = context.getEndpoint(uri, RabbitMQEndpoint.class); 
+        RabbitMQEndpoint endpoint = context.getEndpoint(uri, RabbitMQEndpoint.class);
+		try {
+			endpoint.connect(Executors.newSingleThreadExecutor());
+		} catch(IOException ioExc) {
+			// Doesn't matter if RabbitMQ is not available
+			log.debug("RabbitMQ not available", ioExc);
+		}
         return endpoint.getConnectionFactory();
     }
 
@@ -158,16 +164,16 @@ public class RabbitMQEndpointTest extends CamelTestSupport {
     @Test
     public void testCreateConnectionFactoryCustom() throws Exception {
         ConnectionFactory connectionFactory  = createConnectionFactory("rabbitmq:localhost:1234/exchange"
-                                                                      + "?username=userxxx"
-                                                                      + "&password=passxxx"
-                                                                      + "&connectionTimeout=123"
-                                                                      + "&requestedChannelMax=456"
-                                                                      + "&requestedFrameMax=789"
-                                                                      + "&requestedHeartbeat=987"
-                                                                      + "&sslProtocol=true"
-                                                                      + "&automaticRecoveryEnabled=true"
-                                                                      + "&networkRecoveryInterval=654"
-                                                                      + "&topologyRecoveryEnabled=false");
+				+ "?username=userxxx"
+				+ "&password=passxxx"
+				+ "&connectionTimeout=123"
+				+ "&requestedChannelMax=456"
+				+ "&requestedFrameMax=789"
+				+ "&requestedHeartbeat=987"
+				+ "&sslProtocol=true"
+				+ "&automaticRecoveryEnabled=true"
+				+ "&networkRecoveryInterval=654"
+				+ "&topologyRecoveryEnabled=false");
 
         assertEquals("localhost", connectionFactory.getHost());
         assertEquals(1234, connectionFactory.getPort());

http://git-wip-us.apache.org/repos/asf/camel/blob/1191e19f/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQReConnectionIntTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQReConnectionIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQReConnectionIntTest.java
new file mode 100644
index 0000000..6a63948
--- /dev/null
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQReConnectionIntTest.java
@@ -0,0 +1,77 @@
+package org.apache.camel.component.rabbitmq;
+
+import com.rabbitmq.client.AlreadyClosedException;
+import org.apache.camel.*;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+import java.net.ConnectException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Integration test to check that RabbitMQ Endpoint is able to reconnect to broker when broker
+ * is not avaibable.
+ * <ul>
+ *     <li>Stop the broker</li>
+ *     <li>Run the test: the producer complains it can not send messages, the consumer is silent</li>
+ *     <li>Start the broker: the producer sends messages, and the consumer receives messages</li>
+ *     <li>Stop the broker: the producer complains it can not send messages, the consumer is silent</li>
+ *     <li>Start the broker: the producer sends messages, and the consumer receives messages</li>
+ * </ul>
+ */
+public class RabbitMQReConnectionIntTest extends CamelTestSupport {
+    private static final String EXCHANGE = "ex3";
+
+    @Produce(uri = "direct:rabbitMQ")
+    protected ProducerTemplate directProducer;
+
+    @EndpointInject(uri = "rabbitmq:localhost:5672/" + EXCHANGE + "?username=cameltest&password=cameltest" +
+            "&queue=q3&routingKey=rk3"+
+            "&automaticRecoveryEnabled=true" +
+            "&requestedHeartbeat=1000" +
+            "&connectionTimeout=5000")
+    private Endpoint rabbitMQEndpoint;
+
+    @EndpointInject(uri = "mock:result")
+    private MockEndpoint mockEndpoint;
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() throws Exception {
+                from("direct:rabbitMQ")
+                        .id("producingRoute")
+                        .onException(AlreadyClosedException.class, ConnectException.class)
+                            .maximumRedeliveries(10)
+                            .redeliveryDelay(500L)
+                            .end()
+                        .log("Sending message")
+                        .inOnly(rabbitMQEndpoint);
+                from(rabbitMQEndpoint)
+                        .id("consumingRoute")
+                        .log("Receiving message")
+                        .to(mockEndpoint);
+            }
+        };
+    }
+    @Test
+    public void testSendEndReceive() throws Exception {
+        int nbMessages=100;
+        int failedMessages=0;
+        for(int i=0;i<nbMessages;i++) {
+            try {
+                directProducer.sendBodyAndHeader("Message #"+i, RabbitMQConstants.ROUTING_KEY, "rk3");
+            } catch (CamelExecutionException e) {
+                log.debug("Can not send message", e);
+                failedMessages++;
+            }
+            Thread.sleep(500L);
+        }
+        mockEndpoint.assertExchangeReceived(nbMessages-failedMessages);
+        assertMockEndpointsSatisfied(5, TimeUnit.SECONDS);
+    }
+}