You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/01/28 09:25:50 UTC
[2/2] camel git commit: CAMEL-8742 - Reconect when connections are
closed.
CAMEL-8742 - Reconect when connections are closed.
* Cancel reply manager when sending a message in the producer fails
* Change the SuspendResume test to use a different queue since it conflicts with other queue settings
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f3eae07c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f3eae07c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f3eae07c
Branch: refs/heads/master
Commit: f3eae07c562708499e6b9240d8ad21a4ed640013
Parents: 034b73c
Author: Brad Reitmeyer <br...@cisco.com>
Authored: Tue Jan 26 16:39:12 2016 -0600
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Jan 28 09:21:20 2016 +0100
----------------------------------------------------------------------
.../component/rabbitmq/RabbitConsumer.java | 293 +++++++++++++++++++
.../component/rabbitmq/RabbitMQConsumer.java | 184 +++---------
.../component/rabbitmq/RabbitMQEndpoint.java | 5 +-
.../component/rabbitmq/RabbitMQProducer.java | 64 +++-
.../component/rabbitmq/reply/ReplyManager.java | 7 +
.../rabbitmq/reply/ReplyManagerSupport.java | 9 +
.../rabbitmq/RabbitMQConsumerTest.java | 2 +
.../rabbitmq/RabbitMQReConnectionIntTest.java | 3 +-
.../rabbitmq/RabbitMQSupendResumeIntTest.java | 4 +-
9 files changed, 404 insertions(+), 167 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/f3eae07c/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
new file mode 100644
index 0000000..a03e7f8
--- /dev/null
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
@@ -0,0 +1,293 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rabbitmq;
+
+import java.io.IOException;
+import java.util.concurrent.TimeoutException;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.Consumer;
+import com.rabbitmq.client.Envelope;
+import com.rabbitmq.client.ShutdownSignalException;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Message;
+import org.apache.camel.RuntimeCamelException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class RabbitConsumer implements com.rabbitmq.client.Consumer {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+ private final RabbitMQConsumer consumer;
+ private Channel channel;
+ private String tag;
+ /** Consumer tag for this consumer. */
+ private volatile String consumerTag;
+ private volatile boolean stopping;
+
+ /**
+ * Constructs a new instance and records its association to the passed-in
+ * channel.
+ *
+ * @param channel
+ * the channel to which this consumer is attached
+ */
+ public RabbitConsumer(RabbitMQConsumer consumer) {
+ // super(channel);
+ this.consumer = consumer;
+ try {
+ Connection conn = consumer.getConnection();
+ this.channel = openChannel(conn);
+ } catch (IOException | TimeoutException e) {
+ log.warn("Unable to open channel for RabbitMQConsumer. Continuing and will try again", e);
+ }
+ }
+
+ @Override
+ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
+ Exchange exchange = consumer.getEndpoint().createRabbitExchange(envelope, properties, body);
+ consumer.getEndpoint().getMessageConverter().mergeAmqpProperties(exchange, properties);
+
+ boolean sendReply = properties.getReplyTo() != null;
+ if (sendReply && !exchange.getPattern().isOutCapable()) {
+ log.debug("In an inOut capable route");
+ exchange.setPattern(ExchangePattern.InOut);
+ }
+
+ log.trace("Created exchange [exchange={}]", exchange);
+ long deliveryTag = envelope.getDeliveryTag();
+ try {
+ consumer.getProcessor().process(exchange);
+ } catch (Exception e) {
+ exchange.setException(e);
+ }
+
+ // obtain the message after processing
+ Message msg;
+ if (exchange.hasOut()) {
+ msg = exchange.getOut();
+ } else {
+ msg = exchange.getIn();
+ }
+
+ if (exchange.getException() != null) {
+ consumer.getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException());
+ }
+
+ if (!exchange.isFailed()) {
+ // processing success
+ if (sendReply && exchange.getPattern().isOutCapable()) {
+ try {
+ consumer.getEndpoint().publishExchangeToChannel(exchange, channel, properties.getReplyTo());
+ } catch (RuntimeCamelException e) {
+ // set the exception on the exchange so it can send the
+ // exception back to the producer
+ exchange.setException(e);
+ consumer.getExceptionHandler().handleException("Error processing exchange", exchange, e);
+ }
+ }
+ if (!consumer.getEndpoint().isAutoAck()) {
+ log.trace("Acknowledging receipt [delivery_tag={}]", deliveryTag);
+ channel.basicAck(deliveryTag, false);
+ }
+ }
+ // The exchange could have failed when sending the above message
+ if (exchange.isFailed()) {
+ if (consumer.getEndpoint().isTransferException() && exchange.getPattern().isOutCapable()) {
+ // the inOut exchange failed so put the exception in the body
+ // and send back
+ msg.setBody(exchange.getException());
+ exchange.setOut(msg);
+ exchange.getOut().setHeader(RabbitMQConstants.CORRELATIONID, exchange.getIn().getHeader(RabbitMQConstants.CORRELATIONID));
+ try {
+ consumer.getEndpoint().publishExchangeToChannel(exchange, channel, properties.getReplyTo());
+ } catch (RuntimeCamelException e) {
+ consumer.getExceptionHandler().handleException("Error processing exchange", exchange, e);
+ }
+
+ if (!consumer.getEndpoint().isAutoAck()) {
+ log.trace("Acknowledging receipt when transferring exception [delivery_tag={}]", deliveryTag);
+ channel.basicAck(deliveryTag, false);
+ }
+ } else {
+ boolean isRequeueHeaderSet = msg.getHeader(RabbitMQConstants.REQUEUE, false, boolean.class);
+ // processing failed, then reject and handle the exception
+ if (deliveryTag != 0 && !consumer.getEndpoint().isAutoAck()) {
+ log.trace("Rejecting receipt [delivery_tag={}] with requeue={}", deliveryTag, isRequeueHeaderSet);
+ if (isRequeueHeaderSet) {
+ channel.basicReject(deliveryTag, true);
+ } else {
+ channel.basicReject(deliveryTag, false);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Bind consumer to channel
+ */
+ public void start() throws IOException {
+ if (channel == null) {
+ throw new IOException("The RabbitMQ channel is not open");
+ }
+ tag = channel.basicConsume(consumer.getEndpoint().getQueue(), consumer.getEndpoint().isAutoAck(), this);
+ }
+
+ /**
+ * Unbind consumer from channel
+ */
+ public void stop() throws IOException, TimeoutException {
+ stopping = true;
+ if (channel == null) {
+ return;
+ }
+ if (tag != null) {
+ channel.basicCancel(tag);
+ }
+ try {
+ channel.close();
+ } catch (TimeoutException e) {
+ log.error("Timeout occured");
+ throw e;
+ }
+ }
+
+ /**
+ * Stores the most recently passed-in consumerTag - semantically, there
+ * should be only one.
+ *
+ * @see Consumer#handleConsumeOk
+ */
+ public void handleConsumeOk(String consumerTag) {
+ this.consumerTag = consumerTag;
+ }
+
+ /**
+ * Retrieve the consumer tag.
+ *
+ * @return the most recently notified consumer tag.
+ */
+ public String getConsumerTag() {
+ return consumerTag;
+ }
+
+ /**
+ * No-op implementation of {@link Consumer#handleCancelOk}.
+ *
+ * @param consumerTag
+ * the defined consumer tag (client- or server-generated)
+ */
+ public void handleCancelOk(String consumerTag) {
+ // no work to do
+ log.debug("Recieved cancelOk signal on the rabbitMQ channel");
+ }
+
+ /**
+ * No-op implementation of {@link Consumer#handleCancel(String)}
+ *
+ * @param consumerTag
+ * the defined consumer tag (client- or server-generated)
+ */
+ public void handleCancel(String consumerTag) throws IOException {
+ // no work to do
+ log.debug("Recieved cancel signal on the rabbitMQ channel");
+ }
+
+ /**
+ * No-op implementation of {@link Consumer#handleShutdownSignal}.
+ */
+ public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
+ log.info("Recieved shutdown signal on the rabbitMQ channel");
+
+ // Check if the consumer closed the connection or something else
+ if (!sig.isInitiatedByApplication()) {
+ // Something else closed the connection so reconnect
+ boolean connected = false;
+ while (!connected && !stopping) {
+ try {
+ reconnect();
+ connected = true;
+ } catch (IOException | TimeoutException e) {
+ log.warn("Unable to obtain a RabbitMQ channel. Will try again");
+
+ Integer networkRecoveryInterval = consumer.getEndpoint().getNetworkRecoveryInterval();
+ final long connectionRetryInterval = networkRecoveryInterval != null && networkRecoveryInterval > 0
+ ? networkRecoveryInterval : 100L;
+ try {
+ Thread.sleep(connectionRetryInterval);
+ } catch (InterruptedException e1) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * No-op implementation of {@link Consumer#handleRecoverOk}.
+ */
+ public void handleRecoverOk(String consumerTag) {
+ // no work to do
+ log.debug("Recieved recover ok signal on the rabbitMQ channel");
+ }
+
+ /**
+ * If the RabbitMQ connection is good this returns without changing
+ * anything. If the connection is down it will attempt to reconnect
+ */
+ public void reconnect() throws IOException, TimeoutException {
+ if (isChannelOpen()) {
+ // The connection is good, so nothing to do
+ return;
+ }
+ log.info("Attempting to open a new rabbitMQ channel");
+ Connection conn = consumer.getConnection();
+ channel = openChannel(conn);
+ // Register the channel to the tag
+ start();
+ }
+
+ private boolean isChannelOpen() {
+ return channel != null && channel.isOpen();
+ }
+
+ /**
+ * Open channel
+ */
+ private Channel openChannel(Connection conn) throws IOException {
+ log.trace("Creating channel...");
+ Channel channel = conn.createChannel();
+ log.debug("Created channel: {}", channel);
+ // setup the basicQos
+ if (consumer.getEndpoint().isPrefetchEnabled()) {
+ channel.basicQos(consumer.getEndpoint().getPrefetchSize(), consumer.getEndpoint().getPrefetchCount(),
+ consumer.getEndpoint().isPrefetchGlobal());
+ }
+
+ // This really only needs to be called on the first consumer or on
+ // reconnections.
+ if (consumer.getEndpoint().isDeclare()) {
+ consumer.getEndpoint().declareExchangeAndQueue(channel);
+ }
+ return channel;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/f3eae07c/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index 280ed2a..24b2856 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -24,16 +24,9 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
-import com.rabbitmq.client.AMQP;
-import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.Envelope;
-import org.apache.camel.Exchange;
-import org.apache.camel.ExchangePattern;
-import org.apache.camel.Message;
import org.apache.camel.Processor;
-import org.apache.camel.RuntimeCamelException;
import org.apache.camel.impl.DefaultConsumer;
public class RabbitMQConsumer extends DefaultConsumer {
@@ -72,42 +65,46 @@ public class RabbitMQConsumer extends DefaultConsumer {
}
/**
- * Open channel
+ * Returns the exiting open connection or opens a new one
+ * @throws IOException
+ * @throws TimeoutException
*/
- private Channel openChannel() throws IOException {
- log.trace("Creating channel...");
- Channel channel = conn.createChannel();
- log.debug("Created channel: {}", channel);
- // setup the basicQos
- if (endpoint.isPrefetchEnabled()) {
- channel.basicQos(endpoint.getPrefetchSize(), endpoint.getPrefetchCount(), endpoint.isPrefetchGlobal());
+ protected synchronized Connection getConnection() throws IOException, TimeoutException {
+ if (this.conn != null && this.conn.isOpen()) {
+ return this.conn;
}
- return channel;
+ log.debug("The existing connection is closed");
+ openConnection();
+ return this.conn;
}
+
/**
* Add a consumer thread for given channel
*/
private void startConsumers() throws IOException {
- // First channel used to declare Exchange and Queue
- Channel channel = openChannel();
- if (getEndpoint().isDeclare()) {
- getEndpoint().declareExchangeAndQueue(channel);
+
+ // Create consumers but don't start yet
+ for (int i = 0; i < endpoint.getConcurrentConsumers(); i++) {
+ createConsumer();
}
- startConsumer(channel);
- // Other channels
- for (int i = 1; i < endpoint.getConcurrentConsumers(); i++) {
- channel = openChannel();
- startConsumer(channel);
+
+ // Try starting consumers (which will fail if RabbitMQ can't connect)
+ try {
+ for (RabbitConsumer consumer : this.consumers) {
+ consumer.start();
+ }
+ } catch (Exception e) {
+ log.info("Connection failed, will start background thread to retry!", e);
+ reconnect();
}
}
/**
* Add a consumer thread for given channel
*/
- private void startConsumer(Channel channel) throws IOException {
- RabbitConsumer consumer = new RabbitConsumer(this, channel);
- consumer.start();
+ private void createConsumer() throws IOException {
+ RabbitConsumer consumer = new RabbitConsumer(this);
this.consumers.add(consumer);
}
@@ -115,16 +112,13 @@ public class RabbitMQConsumer extends DefaultConsumer {
protected void doStart() throws Exception {
executor = endpoint.createExecutor();
log.debug("Using executor {}", executor);
- try {
- openConnection();
- startConsumers();
- } catch (Exception e) {
- log.info("Connection failed, will start background thread to retry!", e);
- reconnect();
- }
+ startConsumers();
}
- private void reconnect() {
+ private synchronized void reconnect() {
+ if (startConsumerCallable != null) {
+ return;
+ }
// Open connection, and start message listener in background
Integer networkRecoveryInterval = getEndpoint().getNetworkRecoveryInterval();
final long connectionRetryInterval = networkRecoveryInterval != null && networkRecoveryInterval > 0 ? networkRecoveryInterval : 100L;
@@ -179,119 +173,7 @@ public class RabbitMQConsumer extends DefaultConsumer {
}
}
- class RabbitConsumer extends com.rabbitmq.client.DefaultConsumer {
-
- private final RabbitMQConsumer consumer;
- private final Channel channel;
- private String tag;
-
- /**
- * Constructs a new instance and records its association to the
- * passed-in channel.
- *
- * @param channel the channel to which this consumer is attached
- */
- public RabbitConsumer(RabbitMQConsumer consumer, Channel channel) {
- super(channel);
- this.consumer = consumer;
- this.channel = channel;
- }
-
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- Exchange exchange = consumer.endpoint.createRabbitExchange(envelope, properties, body);
- endpoint.getMessageConverter().mergeAmqpProperties(exchange, properties);
-
- boolean sendReply = properties.getReplyTo() != null;
- if (sendReply && !exchange.getPattern().isOutCapable()) {
- log.debug("In an inOut capable route");
- exchange.setPattern(ExchangePattern.InOut);
- }
-
- log.trace("Created exchange [exchange={}]", exchange);
- long deliveryTag = envelope.getDeliveryTag();
- try {
- consumer.getProcessor().process(exchange);
- } catch (Exception e) {
- exchange.setException(e);
- }
-
- // obtain the message after processing
- Message msg;
- if (exchange.hasOut()) {
- msg = exchange.getOut();
- } else {
- msg = exchange.getIn();
- }
-
- if (exchange.getException() != null) {
- getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException());
- }
-
- if (!exchange.isFailed()) {
- // processing success
- if (sendReply && exchange.getPattern().isOutCapable()) {
- try {
- endpoint.publishExchangeToChannel(exchange, channel, properties.getReplyTo());
- } catch (RuntimeCamelException e) {
- getExceptionHandler().handleException("Error processing exchange", exchange, e);
- }
- }
- if (!consumer.endpoint.isAutoAck()) {
- log.trace("Acknowledging receipt [delivery_tag={}]", deliveryTag);
- channel.basicAck(deliveryTag, false);
- }
- } else if (endpoint.isTransferException() && exchange.getPattern().isOutCapable()) {
- // the inOut exchange failed so put the exception in the body
- // and send back
- msg.setBody(exchange.getException());
- exchange.setOut(msg);
- try {
- endpoint.publishExchangeToChannel(exchange, channel, properties.getReplyTo());
- } catch (RuntimeCamelException e) {
- getExceptionHandler().handleException("Error processing exchange", exchange, e);
- }
-
- if (!consumer.endpoint.isAutoAck()) {
- log.trace("Acknowledging receipt when transferring exception [delivery_tag={}]", deliveryTag);
- channel.basicAck(deliveryTag, false);
- }
- } else {
- boolean isRequeueHeaderSet = msg.getHeader(RabbitMQConstants.REQUEUE, false, boolean.class);
- // processing failed, then reject and handle the exception
- if (deliveryTag != 0 && !consumer.endpoint.isAutoAck()) {
- log.trace("Rejecting receipt [delivery_tag={}] with requeue={}", deliveryTag, isRequeueHeaderSet);
- if (isRequeueHeaderSet) {
- channel.basicReject(deliveryTag, true);
- } else {
- channel.basicReject(deliveryTag, false);
- }
- }
- }
- }
- /**
- * Bind consumer to channel
- */
- public void start() throws IOException {
- tag = channel.basicConsume(endpoint.getQueue(), endpoint.isAutoAck(), this);
- }
-
- /**
- * Unbind consumer from channel
- */
- public void stop() throws IOException, TimeoutException {
- if (tag != null) {
- channel.basicCancel(tag);
- }
- try {
- channel.close();
- } catch (TimeoutException e) {
- log.error("Timeout occured");
- throw e;
- }
- }
- }
/**
* Task in charge of opening connection and adding listener when consumer is
@@ -316,10 +198,12 @@ public class RabbitMQConsumer extends DefaultConsumer {
// Reconnection loop
while (running.get() && connectionFailed) {
try {
- openConnection();
+ for (RabbitConsumer consumer : consumers) {
+ consumer.reconnect();
+ }
connectionFailed = false;
} catch (Exception e) {
- log.info("Connection failed, will retry in {}" + connectionRetryInterval + "ms", e);
+ log.info("Connection failed, will retry in " + connectionRetryInterval + "ms", e);
Thread.sleep(connectionRetryInterval);
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/f3eae07c/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
index 41eab3f..3600d33 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
@@ -23,6 +23,7 @@ import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
+
import javax.net.ssl.TrustManager;
import com.rabbitmq.client.AMQP;
@@ -31,6 +32,7 @@ import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Envelope;
+
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
@@ -41,14 +43,11 @@ import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriPath;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
@UriEndpoint(scheme = "rabbitmq", title = "RabbitMQ", syntax = "rabbitmq:hostname:portNumber/exchangeName", consumerClass = RabbitMQConsumer.class, label = "messaging")
public class RabbitMQEndpoint extends DefaultEndpoint {
// header to indicate that the message body needs to be de-serialized
public static final String SERIALIZE_HEADER = "CamelSerialize";
- private static final Logger LOG = LoggerFactory.getLogger(RabbitMQEndpoint.class);
@UriPath @Metadata(required = "true")
private String hostname;
http://git-wip-us.apache.org/repos/asf/camel/blob/f3eae07c/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
index a96d6fd..8c877aa 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
@@ -22,7 +22,6 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
-
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
@@ -70,7 +69,21 @@ public class RabbitMQProducer extends DefaultAsyncProducer {
* Do something with a pooled channel (similar to Spring JDBC TransactionTemplate#execute)
*/
private <T> T execute(ChannelCallback<T> callback) throws Exception {
- Channel channel = channelPool.borrowObject();
+ Channel channel;
+ try {
+ channel = channelPool.borrowObject();
+ } catch (IllegalStateException e) {
+ // Since this method is not synchronized its possible the
+ // channelPool has been cleared by another thread
+ checkConnectionAndChannelPool();
+ channel = channelPool.borrowObject();
+ }
+ if (!channel.isOpen()) {
+ log.warn("Got a closed channel from the pool");
+ // Reconnect if another thread hasn't yet
+ checkConnectionAndChannelPool();
+ channel = channelPool.borrowObject();
+ }
try {
return callback.doWithChannel(channel);
} finally {
@@ -80,8 +93,9 @@ public class RabbitMQProducer extends DefaultAsyncProducer {
/**
* Open connection and initialize channel pool
+ * @throws Exception
*/
- private void openConnectionAndChannelPool() throws Exception {
+ private synchronized void openConnectionAndChannelPool() throws Exception {
log.trace("Creating connection...");
this.conn = getEndpoint().connect(executorService);
log.debug("Created connection: {}", conn);
@@ -100,6 +114,22 @@ public class RabbitMQProducer extends DefaultAsyncProducer {
}
}
+ /**
+ * This will reconnect only if the connection is closed.
+ * @throws Exception
+ */
+ private synchronized void checkConnectionAndChannelPool() throws Exception {
+ if (this.conn == null || !this.conn.isOpen()) {
+ log.info("Reconnecting to RabbitMQ");
+ try {
+ closeConnectionAndChannel();
+ } catch (Exception e) {
+ // no op
+ }
+ openConnectionAndChannelPool();
+ }
+ }
+
@Override
protected void doStart() throws Exception {
this.executorService = getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this, "CamelRabbitMQProducer[" + getEndpoint().getQueue() + "]");
@@ -107,15 +137,23 @@ public class RabbitMQProducer extends DefaultAsyncProducer {
try {
openConnectionAndChannelPool();
} catch (IOException e) {
- log.warn("Failed to create connection", e);
+ log.warn("Failed to create connection. It will attempt to connect again when publishing a message.", e);
}
}
/**
* If needed, close Connection and Channel
+ * @throws IOException
*/
- private void closeConnectionAndChannel() throws Exception {
- channelPool.close();
+ private synchronized void closeConnectionAndChannel() throws IOException {
+ if (channelPool != null) {
+ try {
+ channelPool.close();
+ channelPool = null;
+ } catch (Exception e) {
+ throw new IOException("Error closing channelPool", e);
+ }
+ }
if (conn != null) {
log.debug("Closing connection: {} with timeout: {} ms.", conn, closeTimeout);
conn.close(closeTimeout);
@@ -194,8 +232,12 @@ public class RabbitMQProducer extends DefaultAsyncProducer {
log.debug("Registering reply for {}", correlationId);
replyManager.registerReply(replyManager, exchange, callback, originalCorrelationId, correlationId, timeout);
-
- basicPublish(exchange, exchangeName, key);
+ try {
+ basicPublish(exchange, exchangeName, key);
+ } catch (Exception e) {
+ replyManager.cancelCorrelationId(correlationId);
+ throw e;
+ }
// continue routing asynchronously (reply will be processed async when its received)
return false;
}
@@ -230,8 +272,8 @@ public class RabbitMQProducer extends DefaultAsyncProducer {
*/
private void basicPublish(final Exchange camelExchange, final String rabbitExchange, final String routingKey) throws Exception {
if (channelPool == null) {
- // Open connection and channel lazily
- openConnectionAndChannelPool();
+ // Open connection and channel lazily if another thread hasn't
+ checkConnectionAndChannelPool();
}
execute(new ChannelCallback<Void>() {
@Override
@@ -327,4 +369,4 @@ public class RabbitMQProducer extends DefaultAsyncProducer {
return replyManager;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/f3eae07c/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManager.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManager.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManager.java
index f6eb64a..4b6110a 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManager.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManager.java
@@ -87,4 +87,11 @@ public interface ReplyManager {
* @param holder containing needed data to process the reply and continue routing
*/
void processReply(ReplyHolder holder);
+
+ /**
+ * Unregister a correlationId when you no longer need a reply
+ *
+ * @param correlationId
+ */
+ void cancelCorrelationId(String correlationId);
}
http://git-wip-us.apache.org/repos/asf/camel/blob/f3eae07c/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java
index b6dacfa..9897159 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java
@@ -110,6 +110,15 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl
protected abstract ReplyHandler createReplyHandler(ReplyManager replyManager, Exchange exchange, AsyncCallback callback,
String originalCorrelationId, String correlationId, long requestTimeout);
+
+
+ public void cancelCorrelationId(String correlationId) {
+ ReplyHandler handler = correlation.get(correlationId);
+ if (handler != null) {
+ log.warn("Cancelling correlationID: {}", correlationId);
+ correlation.remove(correlationId);
+ }
+ }
public void onMessage(AMQP.BasicProperties properties, byte[] message) {
String correlationID = properties.getCorrelationId();
http://git-wip-us.apache.org/repos/asf/camel/blob/f3eae07c/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java
index a6676b7..ef6b096 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java
@@ -44,6 +44,7 @@ public class RabbitMQConsumerTest {
ThreadPoolExecutor e = (ThreadPoolExecutor) Executors.newFixedThreadPool(3);
Mockito.when(endpoint.createExecutor()).thenReturn(e);
+ Mockito.when(endpoint.getConcurrentConsumers()).thenReturn(1);
Mockito.when(endpoint.connect(Matchers.any(ExecutorService.class))).thenReturn(conn);
Mockito.when(conn.createChannel()).thenReturn(channel);
@@ -59,6 +60,7 @@ public class RabbitMQConsumerTest {
RabbitMQConsumer consumer = new RabbitMQConsumer(endpoint, processor);
Mockito.when(endpoint.createExecutor()).thenReturn(Executors.newFixedThreadPool(3));
+ Mockito.when(endpoint.getConcurrentConsumers()).thenReturn(1);
Mockito.when(endpoint.connect(Matchers.any(ExecutorService.class))).thenReturn(conn);
Mockito.when(conn.createChannel()).thenReturn(channel);
http://git-wip-us.apache.org/repos/asf/camel/blob/f3eae07c/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQReConnectionIntTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQReConnectionIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQReConnectionIntTest.java
index 302440c..ec72f7b 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQReConnectionIntTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQReConnectionIntTest.java
@@ -33,13 +33,14 @@ import org.junit.Test;
/**
* Integration test to check that RabbitMQ Endpoint is able to reconnect to broker when broker
- * is not avaibable.
+ * is not available.
* <ul>
* <li>Stop the broker</li>
* <li>Run the test: the producer complains it can not send messages, the consumer is silent</li>
* <li>Start the broker: the producer sends messages, and the consumer receives messages</li>
* <li>Stop the broker: the producer complains it can not send messages, the consumer is silent</li>
* <li>Start the broker: the producer sends messages, and the consumer receives messages</li>
+ * <li>Kill all connections from the broker: the producer sends messages, and the consumer receives messages</li>
* </ul>
*/
public class RabbitMQReConnectionIntTest extends CamelTestSupport {
http://git-wip-us.apache.org/repos/asf/camel/blob/f3eae07c/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSupendResumeIntTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSupendResumeIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSupendResumeIntTest.java
index fd269a8..c5c3481 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSupendResumeIntTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQSupendResumeIntTest.java
@@ -28,12 +28,12 @@ import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;
public class RabbitMQSupendResumeIntTest extends CamelTestSupport {
- private static final String EXCHANGE = "ex4";
+ private static final String EXCHANGE = "ex6";
@EndpointInject(uri = "mock:result")
private MockEndpoint resultEndpoint;
- @EndpointInject(uri = "rabbitmq:localhost:5672/" + EXCHANGE + "?username=cameltest&password=cameltest&queue=q3&routingKey=rk3&autoDelete=false")
+ @EndpointInject(uri = "rabbitmq:localhost:5672/" + EXCHANGE + "?username=cameltest&password=cameltest&queue=q6&routingKey=rk3&autoDelete=false")
private Endpoint rabbitMQEndpoint;
@Produce(uri = "direct:start")