You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/01/28 09:25:50 UTC

[2/2] camel git commit: CAMEL-8742 - Reconect when connections are closed.

CAMEL-8742 - Reconect when connections are closed.

* Cancel reply manager when sending a message in the producer fails
* Change the SuspendResume test to use a different queue since it conflicts with other queue settings


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f3eae07c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f3eae07c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f3eae07c

Branch: refs/heads/master
Commit: f3eae07c562708499e6b9240d8ad21a4ed640013
Parents: 034b73c
Author: Brad Reitmeyer <br...@cisco.com>
Authored: Tue Jan 26 16:39:12 2016 -0600
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Jan 28 09:21:20 2016 +0100

----------------------------------------------------------------------
 .../component/rabbitmq/RabbitConsumer.java      | 293 +++++++++++++++++++
 .../component/rabbitmq/RabbitMQConsumer.java    | 184 +++---------
 .../component/rabbitmq/RabbitMQEndpoint.java    |   5 +-
 .../component/rabbitmq/RabbitMQProducer.java    |  64 +++-
 .../component/rabbitmq/reply/ReplyManager.java  |   7 +
 .../rabbitmq/reply/ReplyManagerSupport.java     |   9 +
 .../rabbitmq/RabbitMQConsumerTest.java          |   2 +
 .../rabbitmq/RabbitMQReConnectionIntTest.java   |   3 +-
 .../rabbitmq/RabbitMQSupendResumeIntTest.java   |   4 +-
 9 files changed, 404 insertions(+), 167 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/f3eae07c/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
new file mode 100644
index 0000000..a03e7f8
--- /dev/null
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
@@ -0,0 +1,293 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rabbitmq;
+
+import java.io.IOException;
+import java.util.concurrent.TimeoutException;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.Consumer;
+import com.rabbitmq.client.Envelope;
+import com.rabbitmq.client.ShutdownSignalException;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Message;
+import org.apache.camel.RuntimeCamelException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class RabbitConsumer implements com.rabbitmq.client.Consumer {
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private final RabbitMQConsumer consumer;
+    private Channel channel;
+    private String tag;
+    /** Consumer tag for this consumer. */
+    private volatile String consumerTag;
+    private volatile boolean stopping;
+
+    /**
+     * Constructs a new instance and records its association to the passed-in
+     * channel.
+     *
+     * @param channel
+     *            the channel to which this consumer is attached
+     */
+    public RabbitConsumer(RabbitMQConsumer consumer) {
+        // super(channel);
+        this.consumer = consumer;
+        try {
+            Connection conn = consumer.getConnection();
+            this.channel = openChannel(conn);
+        } catch (IOException | TimeoutException e) {
+            log.warn("Unable to open channel for RabbitMQConsumer. Continuing and will try again", e);
+        }
+    }
+
+    @Override
+    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
+        Exchange exchange = consumer.getEndpoint().createRabbitExchange(envelope, properties, body);
+        consumer.getEndpoint().getMessageConverter().mergeAmqpProperties(exchange, properties);
+
+        boolean sendReply = properties.getReplyTo() != null;
+        if (sendReply && !exchange.getPattern().isOutCapable()) {
+            log.debug("In an inOut capable route");
+            exchange.setPattern(ExchangePattern.InOut);
+        }
+
+        log.trace("Created exchange [exchange={}]", exchange);
+        long deliveryTag = envelope.getDeliveryTag();
+        try {
+            consumer.getProcessor().process(exchange);
+        } catch (Exception e) {
+            exchange.setException(e);
+        }
+
+        // obtain the message after processing
+        Message msg;
+        if (exchange.hasOut()) {
+            msg = exchange.getOut();
+        } else {
+            msg = exchange.getIn();
+        }
+
+        if (exchange.getException() != null) {
+            consumer.getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException());
+        }
+
+        if (!exchange.isFailed()) {
+            // processing success
+            if (sendReply && exchange.getPattern().isOutCapable()) {
+                try {
+                    consumer.getEndpoint().publishExchangeToChannel(exchange, channel, properties.getReplyTo());
+                } catch (RuntimeCamelException e) {
+                    // set the exception on the exchange so it can send the
+                    // exception back to the producer
+                    exchange.setException(e);
+                    consumer.getExceptionHandler().handleException("Error processing exchange", exchange, e);
+                }
+            }
+            if (!consumer.getEndpoint().isAutoAck()) {
+                log.trace("Acknowledging receipt [delivery_tag={}]", deliveryTag);
+                channel.basicAck(deliveryTag, false);
+            }
+        }
+        // The exchange could have failed when sending the above message
+        if (exchange.isFailed()) {
+            if (consumer.getEndpoint().isTransferException() && exchange.getPattern().isOutCapable()) {
+                // the inOut exchange failed so put the exception in the body
+                // and send back
+                msg.setBody(exchange.getException());
+                exchange.setOut(msg);
+                exchange.getOut().setHeader(RabbitMQConstants.CORRELATIONID, exchange.getIn().getHeader(RabbitMQConstants.CORRELATIONID));
+                try {
+                    consumer.getEndpoint().publishExchangeToChannel(exchange, channel, properties.getReplyTo());
+                } catch (RuntimeCamelException e) {
+                    consumer.getExceptionHandler().handleException("Error processing exchange", exchange, e);
+                }
+
+                if (!consumer.getEndpoint().isAutoAck()) {
+                    log.trace("Acknowledging receipt when transferring exception [delivery_tag={}]", deliveryTag);
+                    channel.basicAck(deliveryTag, false);
+                }
+            } else {
+                boolean isRequeueHeaderSet = msg.getHeader(RabbitMQConstants.REQUEUE, false, boolean.class);
+                // processing failed, then reject and handle the exception
+                if (deliveryTag != 0 && !consumer.getEndpoint().isAutoAck()) {
+                    log.trace("Rejecting receipt [delivery_tag={}] with requeue={}", deliveryTag, isRequeueHeaderSet);
+                    if (isRequeueHeaderSet) {
+                        channel.basicReject(deliveryTag, true);
+                    } else {
+                        channel.basicReject(deliveryTag, false);
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Bind consumer to channel
+     */
+    public void start() throws IOException {
+        if (channel == null) {
+            throw new IOException("The RabbitMQ channel is not open");
+        }
+        tag = channel.basicConsume(consumer.getEndpoint().getQueue(), consumer.getEndpoint().isAutoAck(), this);
+    }
+
+    /**
+     * Unbind consumer from channel
+     */
+    public void stop() throws IOException, TimeoutException {
+        stopping = true;
+        if (channel == null) {
+            return;
+        }
+        if (tag != null) {
+            channel.basicCancel(tag);
+        }
+        try {
+            channel.close();
+        } catch (TimeoutException e) {
+            log.error("Timeout occured");
+            throw e;
+        }
+    }
+
+    /**
+     * Stores the most recently passed-in consumerTag - semantically, there
+     * should be only one.
+     * 
+     * @see Consumer#handleConsumeOk
+     */
+    public void handleConsumeOk(String consumerTag) {
+        this.consumerTag = consumerTag;
+    }
+
+    /**
+     * Retrieve the consumer tag.
+     * 
+     * @return the most recently notified consumer tag.
+     */
+    public String getConsumerTag() {
+        return consumerTag;
+    }
+
+    /**
+     * No-op implementation of {@link Consumer#handleCancelOk}.
+     * 
+     * @param consumerTag
+     *            the defined consumer tag (client- or server-generated)
+     */
+    public void handleCancelOk(String consumerTag) {
+        // no work to do
+        log.debug("Recieved cancelOk signal on the rabbitMQ channel");
+    }
+
+    /**
+     * No-op implementation of {@link Consumer#handleCancel(String)}
+     * 
+     * @param consumerTag
+     *            the defined consumer tag (client- or server-generated)
+     */
+    public void handleCancel(String consumerTag) throws IOException {
+        // no work to do
+        log.debug("Recieved cancel signal on the rabbitMQ channel");
+    }
+
+    /**
+     * No-op implementation of {@link Consumer#handleShutdownSignal}.
+     */
+    public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
+        log.info("Recieved shutdown signal on the rabbitMQ channel");
+
+        // Check if the consumer closed the connection or something else
+        if (!sig.isInitiatedByApplication()) {
+            // Something else closed the connection so reconnect
+            boolean connected = false;
+            while (!connected && !stopping) {
+                try {
+                    reconnect();
+                    connected = true;
+                } catch (IOException | TimeoutException e) {
+                    log.warn("Unable to obtain a RabbitMQ channel. Will try again");
+
+                    Integer networkRecoveryInterval = consumer.getEndpoint().getNetworkRecoveryInterval();
+                    final long connectionRetryInterval = networkRecoveryInterval != null && networkRecoveryInterval > 0
+                            ? networkRecoveryInterval : 100L;
+                    try {
+                        Thread.sleep(connectionRetryInterval);
+                    } catch (InterruptedException e1) {
+                        Thread.currentThread().interrupt();
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * No-op implementation of {@link Consumer#handleRecoverOk}.
+     */
+    public void handleRecoverOk(String consumerTag) {
+        // no work to do
+        log.debug("Recieved recover ok signal on the rabbitMQ channel");
+    }
+
+    /**
+     * If the RabbitMQ connection is good this returns without changing
+     * anything. If the connection is down it will attempt to reconnect
+     */
+    public void reconnect() throws IOException, TimeoutException {
+        if (isChannelOpen()) {
+            // The connection is good, so nothing to do
+            return;
+        }
+        log.info("Attempting to open a new rabbitMQ channel");
+        Connection conn = consumer.getConnection();
+        channel = openChannel(conn);
+        // Register the channel to the tag
+        start();
+    }
+
+    private boolean isChannelOpen() {
+        return channel != null && channel.isOpen();
+    }
+
+    /**
+     * Open channel
+     */
+    private Channel openChannel(Connection conn) throws IOException {
+        log.trace("Creating channel...");
+        Channel channel = conn.createChannel();
+        log.debug("Created channel: {}", channel);
+        // setup the basicQos
+        if (consumer.getEndpoint().isPrefetchEnabled()) {
+            channel.basicQos(consumer.getEndpoint().getPrefetchSize(), consumer.getEndpoint().getPrefetchCount(),
+                    consumer.getEndpoint().isPrefetchGlobal());
+        }
+
+        // This really only needs to be called on the first consumer or on
+        // reconnections.
+        if (consumer.getEndpoint().isDeclare()) {
+            consumer.getEndpoint().declareExchangeAndQueue(channel);
+        }
+        return channel;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/f3eae07c/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index 280ed2a..24b2856 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -24,16 +24,9 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import com.rabbitmq.client.AMQP;
-import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.Envelope;
 
-import org.apache.camel.Exchange;
-import org.apache.camel.ExchangePattern;
-import org.apache.camel.Message;
 import org.apache.camel.Processor;
-import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.impl.DefaultConsumer;
 
 public class RabbitMQConsumer extends DefaultConsumer {
@@ -72,42 +65,46 @@ public class RabbitMQConsumer extends DefaultConsumer {
     }
 
     /**
-     * Open channel
+     * Returns the exiting open connection or opens a new one
+     * @throws IOException
+     * @throws TimeoutException
      */
-    private Channel openChannel() throws IOException {
-        log.trace("Creating channel...");
-        Channel channel = conn.createChannel();
-        log.debug("Created channel: {}", channel);
-        // setup the basicQos
-        if (endpoint.isPrefetchEnabled()) {
-            channel.basicQos(endpoint.getPrefetchSize(), endpoint.getPrefetchCount(), endpoint.isPrefetchGlobal());
+    protected synchronized Connection getConnection() throws IOException, TimeoutException {
+        if (this.conn != null && this.conn.isOpen()) {
+            return this.conn;
         }
-        return channel;
+        log.debug("The existing connection is closed");
+        openConnection();
+        return this.conn;
     }
 
+
     /**
      * Add a consumer thread for given channel
      */
     private void startConsumers() throws IOException {
-        // First channel used to declare Exchange and Queue
-        Channel channel = openChannel();
-        if (getEndpoint().isDeclare()) {
-            getEndpoint().declareExchangeAndQueue(channel);
+
+        // Create consumers but don't start yet
+        for (int i = 0; i < endpoint.getConcurrentConsumers(); i++) {
+            createConsumer();
         }
-        startConsumer(channel);
-        // Other channels
-        for (int i = 1; i < endpoint.getConcurrentConsumers(); i++) {
-            channel = openChannel();
-            startConsumer(channel);
+
+        // Try starting consumers (which will fail if RabbitMQ can't connect)
+        try {
+            for (RabbitConsumer consumer : this.consumers) {
+                consumer.start();
+            }
+        } catch (Exception e) {
+            log.info("Connection failed, will start background thread to retry!", e);
+            reconnect();
         }
     }
 
     /**
      * Add a consumer thread for given channel
      */
-    private void startConsumer(Channel channel) throws IOException {
-        RabbitConsumer consumer = new RabbitConsumer(this, channel);
-        consumer.start();
+    private void createConsumer() throws IOException {
+        RabbitConsumer consumer = new RabbitConsumer(this);
         this.consumers.add(consumer);
     }
 
@@ -115,16 +112,13 @@ public class RabbitMQConsumer extends DefaultConsumer {
     protected void doStart() throws Exception {
         executor = endpoint.createExecutor();
         log.debug("Using executor {}", executor);
-        try {
-            openConnection();
-            startConsumers();
-        } catch (Exception e) {
-            log.info("Connection failed, will start background thread to retry!", e);
-            reconnect();
-        }
+        startConsumers();
     }
 
-    private void reconnect() {
+    private synchronized void reconnect() {
+        if (startConsumerCallable != null) {
+            return;
+        }
         // Open connection, and start message listener in background
         Integer networkRecoveryInterval = getEndpoint().getNetworkRecoveryInterval();
         final long connectionRetryInterval = networkRecoveryInterval != null && networkRecoveryInterval > 0 ? networkRecoveryInterval : 100L;
@@ -179,119 +173,7 @@ public class RabbitMQConsumer extends DefaultConsumer {
         }
     }
 
-    class RabbitConsumer extends com.rabbitmq.client.DefaultConsumer {
-
-        private final RabbitMQConsumer consumer;
-        private final Channel channel;
-        private String tag;
-
-        /**
-         * Constructs a new instance and records its association to the
-         * passed-in channel.
-         *
-         * @param channel the channel to which this consumer is attached
-         */
-        public RabbitConsumer(RabbitMQConsumer consumer, Channel channel) {
-            super(channel);
-            this.consumer = consumer;
-            this.channel = channel;
-        }
-
-        @Override
-        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
-            Exchange exchange = consumer.endpoint.createRabbitExchange(envelope, properties, body);
-            endpoint.getMessageConverter().mergeAmqpProperties(exchange, properties);
-
-            boolean sendReply = properties.getReplyTo() != null;
-            if (sendReply && !exchange.getPattern().isOutCapable()) {
-                log.debug("In an inOut capable route");
-                exchange.setPattern(ExchangePattern.InOut);
-            }
-
-            log.trace("Created exchange [exchange={}]", exchange);
-            long deliveryTag = envelope.getDeliveryTag();
-            try {
-                consumer.getProcessor().process(exchange);
-            } catch (Exception e) {
-                exchange.setException(e);
-            }
-
-            // obtain the message after processing
-            Message msg;
-            if (exchange.hasOut()) {
-                msg = exchange.getOut();
-            } else {
-                msg = exchange.getIn();
-            }
-            
-            if (exchange.getException() != null) {
-                getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException());
-            }
-            
-            if (!exchange.isFailed()) {
-                // processing success
-                if (sendReply && exchange.getPattern().isOutCapable()) {
-                    try {
-                        endpoint.publishExchangeToChannel(exchange, channel, properties.getReplyTo());
-                    } catch (RuntimeCamelException e) {
-                        getExceptionHandler().handleException("Error processing exchange", exchange, e);
-                    }
-                }
-                if (!consumer.endpoint.isAutoAck()) {
-                    log.trace("Acknowledging receipt [delivery_tag={}]", deliveryTag);
-                    channel.basicAck(deliveryTag, false);
-                }
-            } else if (endpoint.isTransferException() && exchange.getPattern().isOutCapable()) {
-                // the inOut exchange failed so put the exception in the body
-                // and send back
-                msg.setBody(exchange.getException());
-                exchange.setOut(msg);
-                try {
-                    endpoint.publishExchangeToChannel(exchange, channel, properties.getReplyTo());
-                } catch (RuntimeCamelException e) {
-                    getExceptionHandler().handleException("Error processing exchange", exchange, e);
-                }
-
-                if (!consumer.endpoint.isAutoAck()) {
-                    log.trace("Acknowledging receipt when transferring exception [delivery_tag={}]", deliveryTag);
-                    channel.basicAck(deliveryTag, false);
-                }
-            } else {
-                boolean isRequeueHeaderSet = msg.getHeader(RabbitMQConstants.REQUEUE, false, boolean.class);
-                // processing failed, then reject and handle the exception
-                if (deliveryTag != 0 && !consumer.endpoint.isAutoAck()) {
-                    log.trace("Rejecting receipt [delivery_tag={}] with requeue={}", deliveryTag, isRequeueHeaderSet);
-                    if (isRequeueHeaderSet) {
-                        channel.basicReject(deliveryTag, true);
-                    } else {
-                        channel.basicReject(deliveryTag, false);
-                    }
-                }
-            }
-        }
 
-        /**
-         * Bind consumer to channel
-         */
-        public void start() throws IOException {
-            tag = channel.basicConsume(endpoint.getQueue(), endpoint.isAutoAck(), this);
-        }
-
-        /**
-         * Unbind consumer from channel
-         */
-        public void stop() throws IOException, TimeoutException {
-            if (tag != null) {
-                channel.basicCancel(tag);
-            }
-            try {
-                channel.close();
-            } catch (TimeoutException e) {
-                log.error("Timeout occured");
-                throw e;
-            }
-        }
-    }
 
     /**
      * Task in charge of opening connection and adding listener when consumer is
@@ -316,10 +198,12 @@ public class RabbitMQConsumer extends DefaultConsumer {
             // Reconnection loop
             while (running.get() && connectionFailed) {
                 try {
-                    openConnection();
+                    for (RabbitConsumer consumer : consumers) {
+                        consumer.reconnect();
+                    }
                     connectionFailed = false;
                 } catch (Exception e) {
-                    log.info("Connection failed, will retry in {}" + connectionRetryInterval + "ms", e);
+                    log.info("Connection failed, will retry in " + connectionRetryInterval + "ms", e);
                     Thread.sleep(connectionRetryInterval);
                 }
             }

http://git-wip-us.apache.org/repos/asf/camel/blob/f3eae07c/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
index 41eab3f..3600d33 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
@@ -23,6 +23,7 @@ import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeoutException;
+
 import javax.net.ssl.TrustManager;
 
 import com.rabbitmq.client.AMQP;
@@ -31,6 +32,7 @@ import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
 import com.rabbitmq.client.Envelope;
+
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
@@ -41,14 +43,11 @@ import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriPath;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 @UriEndpoint(scheme = "rabbitmq", title = "RabbitMQ", syntax = "rabbitmq:hostname:portNumber/exchangeName", consumerClass = RabbitMQConsumer.class, label = "messaging")
 public class RabbitMQEndpoint extends DefaultEndpoint {
     // header to indicate that the message body needs to be de-serialized
     public static final String SERIALIZE_HEADER = "CamelSerialize";
-    private static final Logger LOG = LoggerFactory.getLogger(RabbitMQEndpoint.class);
 
     @UriPath @Metadata(required = "true")
     private String hostname;

http://git-wip-us.apache.org/repos/asf/camel/blob/f3eae07c/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
index a96d6fd..8c877aa 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
@@ -22,7 +22,6 @@ import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-
 import com.rabbitmq.client.AMQP;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
@@ -70,7 +69,21 @@ public class RabbitMQProducer extends DefaultAsyncProducer {
      * Do something with a pooled channel (similar to Spring JDBC TransactionTemplate#execute)
      */
     private <T> T execute(ChannelCallback<T> callback) throws Exception {
-        Channel channel = channelPool.borrowObject();
+        Channel channel;
+        try {
+            channel = channelPool.borrowObject();
+        } catch (IllegalStateException e) {
+            // Since this method is not synchronized its possible the
+            // channelPool has been cleared by another thread
+            checkConnectionAndChannelPool();
+            channel = channelPool.borrowObject();
+        }
+        if (!channel.isOpen()) {
+            log.warn("Got a closed channel from the pool");
+            // Reconnect if another thread hasn't yet
+            checkConnectionAndChannelPool();
+            channel = channelPool.borrowObject();
+        }
         try {
             return callback.doWithChannel(channel);
         } finally {
@@ -80,8 +93,9 @@ public class RabbitMQProducer extends DefaultAsyncProducer {
 
     /**
      * Open connection and initialize channel pool
+     * @throws Exception
      */
-    private void openConnectionAndChannelPool() throws Exception {
+    private synchronized void openConnectionAndChannelPool() throws Exception {
         log.trace("Creating connection...");
         this.conn = getEndpoint().connect(executorService);
         log.debug("Created connection: {}", conn);
@@ -100,6 +114,22 @@ public class RabbitMQProducer extends DefaultAsyncProducer {
         }
     }
 
+    /**
+     * This will reconnect only if the connection is closed.
+     * @throws Exception
+     */
+    private synchronized void checkConnectionAndChannelPool() throws Exception {
+        if (this.conn == null || !this.conn.isOpen()) {
+            log.info("Reconnecting to RabbitMQ");
+            try {
+                closeConnectionAndChannel();
+            } catch (Exception e) {
+                // no op
+            }
+            openConnectionAndChannelPool();
+        }
+    }
+
     @Override
     protected void doStart() throws Exception {
         this.executorService = getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this, "CamelRabbitMQProducer[" + getEndpoint().getQueue() + "]");
@@ -107,15 +137,23 @@ public class RabbitMQProducer extends DefaultAsyncProducer {
         try {
             openConnectionAndChannelPool();
         } catch (IOException e) {
-            log.warn("Failed to create connection", e);
+            log.warn("Failed to create connection. It will attempt to connect again when publishing a message.", e);
         }
     }
 
     /**
      * If needed, close Connection and Channel
+     * @throws IOException
      */
-    private void closeConnectionAndChannel() throws Exception {
-        channelPool.close();
+    private synchronized void closeConnectionAndChannel() throws IOException {
+        if (channelPool != null) {
+            try {
+                channelPool.close();
+                channelPool = null;
+            } catch (Exception e) {
+                throw new IOException("Error closing channelPool", e);
+            }
+        }
         if (conn != null) {
             log.debug("Closing connection: {} with timeout: {} ms.", conn, closeTimeout);
             conn.close(closeTimeout);
@@ -194,8 +232,12 @@ public class RabbitMQProducer extends DefaultAsyncProducer {
         log.debug("Registering reply for {}", correlationId);
 
         replyManager.registerReply(replyManager, exchange, callback, originalCorrelationId, correlationId, timeout);
-
-        basicPublish(exchange, exchangeName, key);
+        try {
+            basicPublish(exchange, exchangeName, key);
+        } catch (Exception e) {
+            replyManager.cancelCorrelationId(correlationId);
+            throw e;
+        }
         // continue routing asynchronously (reply will be processed async when its received)
         return false;
     }
@@ -230,8 +272,8 @@ public class RabbitMQProducer extends DefaultAsyncProducer {
      */
     private void basicPublish(final Exchange camelExchange, final String rabbitExchange, final String routingKey) throws Exception {
         if (channelPool == null) {
-            // Open connection and channel lazily
-            openConnectionAndChannelPool();
+            // Open connection and channel lazily if another thread hasn't
+            checkConnectionAndChannelPool();
         }
         execute(new ChannelCallback<Void>() {
             @Override
@@ -327,4 +369,4 @@ public class RabbitMQProducer extends DefaultAsyncProducer {
 
         return replyManager;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/f3eae07c/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManager.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManager.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManager.java
index f6eb64a..4b6110a 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManager.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManager.java
@@ -87,4 +87,11 @@ public interface ReplyManager {
      * @param holder  containing needed data to process the reply and continue routing
      */
     void processReply(ReplyHolder holder);
+
+    /**
+     * Unregister a correlationId when you no longer need a reply
+     * 
+     * @param correlationId
+     */
+    void cancelCorrelationId(String correlationId);
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/f3eae07c/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java
index b6dacfa..9897159 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java
@@ -110,6 +110,15 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl
 
     protected abstract ReplyHandler createReplyHandler(ReplyManager replyManager, Exchange exchange, AsyncCallback callback,
                                                        String originalCorrelationId, String correlationId, long requestTimeout);
+    
+
+    public void cancelCorrelationId(String correlationId) {
+        ReplyHandler handler = correlation.get(correlationId);
+        if (handler != null) {
+            log.warn("Cancelling correlationID: {}", correlationId);
+            correlation.remove(correlationId);
+        }
+    }
 
     public void onMessage(AMQP.BasicProperties properties, byte[] message) {
         String correlationID = properties.getCorrelationId();

http://git-wip-us.apache.org/repos/asf/camel/blob/f3eae07c/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java
index a6676b7..ef6b096 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java
@@ -44,6 +44,7 @@ public class RabbitMQConsumerTest {
 
         ThreadPoolExecutor e = (ThreadPoolExecutor) Executors.newFixedThreadPool(3);
         Mockito.when(endpoint.createExecutor()).thenReturn(e);
+        Mockito.when(endpoint.getConcurrentConsumers()).thenReturn(1);
         Mockito.when(endpoint.connect(Matchers.any(ExecutorService.class))).thenReturn(conn);
         Mockito.when(conn.createChannel()).thenReturn(channel);
 
@@ -59,6 +60,7 @@ public class RabbitMQConsumerTest {
         RabbitMQConsumer consumer = new RabbitMQConsumer(endpoint, processor);
 
         Mockito.when(endpoint.createExecutor()).thenReturn(Executors.newFixedThreadPool(3));
+        Mockito.when(endpoint.getConcurrentConsumers()).thenReturn(1);
         Mockito.when(endpoint.connect(Matchers.any(ExecutorService.class))).thenReturn(conn);
         Mockito.when(conn.createChannel()).thenReturn(channel);
 

http://git-wip-us.apache.org/repos/asf/camel/blob/f3eae07c/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQReConnectionIntTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQReConnectionIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQReConnectionIntTest.java
index 302440c..ec72f7b 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQReConnectionIntTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQReConnectionIntTest.java
@@ -33,13 +33,14 @@ import org.junit.Test;
 
 /**
  * Integration test to check that RabbitMQ Endpoint is able to reconnect to broker when broker
- * is not avaibable.
+ * is not available.
  * <ul>
  * <li>Stop the broker</li>
  * <li>Run the test: the producer complains it can not send messages, the consumer is silent</li>
  * <li>Start the broker: the producer sends messages, and the consumer receives messages</li>
  * <li>Stop the broker: the producer complains it can not send messages, the consumer is silent</li>
  * <li>Start the broker: the producer sends messages, and the consumer receives messages</li>
+ * <li>Kill all connections from the broker: the producer sends messages, and the consumer receives messages</li>
  * </ul>
  */
 public class RabbitMQReConnectionIntTest extends CamelTestSupport {

http://git-wip-us.apache.org/repos/asf/camel/blob/f3eae07c/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSupendResumeIntTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSupendResumeIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSupendResumeIntTest.java
index fd269a8..c5c3481 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSupendResumeIntTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSupendResumeIntTest.java
@@ -28,12 +28,12 @@ import org.apache.camel.test.junit4.CamelTestSupport;
 import org.junit.Test;
 
 public class RabbitMQSupendResumeIntTest extends CamelTestSupport {
-    private static final String EXCHANGE = "ex4";
+    private static final String EXCHANGE = "ex6";
 
     @EndpointInject(uri = "mock:result")
     private MockEndpoint resultEndpoint;
 
-    @EndpointInject(uri = "rabbitmq:localhost:5672/" + EXCHANGE + "?username=cameltest&password=cameltest&queue=q3&routingKey=rk3&autoDelete=false")
+    @EndpointInject(uri = "rabbitmq:localhost:5672/" + EXCHANGE + "?username=cameltest&password=cameltest&queue=q6&routingKey=rk3&autoDelete=false")
     private Endpoint rabbitMQEndpoint;
 
     @Produce(uri = "direct:start")