You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/08/16 14:28:37 UTC
[6/6] camel git commit: Fixed CS. Polished. This closes #1119
Fixed CS. Polished. This closes #1119
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/19f619ee
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/19f619ee
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/19f619ee
Branch: refs/heads/camel-2.17.x
Commit: 19f619eee5a6af6fac526de6a79905d6a2124020
Parents: 26a78cd
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Aug 16 16:27:35 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Aug 16 16:28:21 2016 +0200
----------------------------------------------------------------------
.../component/rabbitmq/RabbitConsumer.java | 55 ++++++++++----------
.../component/rabbitmq/RabbitMQConsumer.java | 3 +-
2 files changed, 28 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/19f619ee/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
index 86c696a..2a13f24 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
@@ -26,7 +26,6 @@ import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
-
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Message;
@@ -42,7 +41,7 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer {
/** Consumer tag for this consumer. */
private volatile String consumerTag;
private volatile boolean stopping;
-
+
private final Semaphore lock = new Semaphore(1);
/**
@@ -62,29 +61,29 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer {
log.warn("Unable to open channel for RabbitMQConsumer. Continuing and will try again", e);
}
}
+
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- try {
+ try {
if (!consumer.getEndpoint().isAutoAck()) {
- lock.acquire();
+ lock.acquire();
}
//Channel might be open because while we were waiting for the lock, stop() has been succesfully called.
- if (!channel.isOpen()) return;
-
+ if (!channel.isOpen()) {
+ return;
+ }
+
try {
doHandleDelivery(consumerTag, envelope, properties, body);
} finally {
if (!consumer.getEndpoint().isAutoAck()) {
- lock.release();
+ lock.release();
}
- }
-
- } catch (InterruptedException e) {
- log.error("Thread Interrupted!");
-
- }
-
-
+ }
+
+ } catch (InterruptedException e) {
+ log.warn("Thread Interrupted!");
+ }
}
public void doHandleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
@@ -189,25 +188,25 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer {
channel.basicCancel(tag);
}
try {
- lock.acquire();
+ lock.acquire();
if (isChannelOpen()) {
channel.close();
}
- } catch (TimeoutException e) {
+ } catch (TimeoutException e) {
log.error("Timeout occured");
throw e;
} catch (InterruptedException e1) {
- log.error("Thread Interrupted!");
+ log.error("Thread Interrupted!");
} finally {
lock.release();
-
- }
+
+ }
}
/**
* Stores the most recently passed-in consumerTag - semantically, there
* should be only one.
- *
+ *
* @see Consumer#handleConsumeOk
*/
public void handleConsumeOk(String consumerTag) {
@@ -216,7 +215,7 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer {
/**
* Retrieve the consumer tag.
- *
+ *
* @return the most recently notified consumer tag.
*/
public String getConsumerTag() {
@@ -225,31 +224,31 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer {
/**
* No-op implementation of {@link Consumer#handleCancelOk}.
- *
+ *
* @param consumerTag
* the defined consumer tag (client- or server-generated)
*/
public void handleCancelOk(String consumerTag) {
// no work to do
- log.debug("Recieved cancelOk signal on the rabbitMQ channel");
+ log.debug("Received cancelOk signal on the rabbitMQ channel");
}
/**
* No-op implementation of {@link Consumer#handleCancel(String)}
- *
+ *
* @param consumerTag
* the defined consumer tag (client- or server-generated)
*/
public void handleCancel(String consumerTag) throws IOException {
// no work to do
- log.debug("Recieved cancel signal on the rabbitMQ channel");
+ log.debug("Received cancel signal on the rabbitMQ channel");
}
/**
* No-op implementation of {@link Consumer#handleShutdownSignal}.
*/
public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
- log.info("Recieved shutdown signal on the rabbitMQ channel");
+ log.info("Received shutdown signal on the rabbitMQ channel");
// Check if the consumer closed the connection or something else
if (!sig.isInitiatedByApplication()) {
@@ -280,7 +279,7 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer {
*/
public void handleRecoverOk(String consumerTag) {
// no work to do
- log.debug("Recieved recover ok signal on the rabbitMQ channel");
+ log.debug("Received recover ok signal on the rabbitMQ channel");
}
/**
http://git-wip-us.apache.org/repos/asf/camel/blob/19f619ee/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index 69d3a0b..cee9ff3 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -131,8 +131,7 @@ public class RabbitMQConsumer extends DefaultConsumer implements Suspendable {
try {
consumer.stop();
} catch (TimeoutException e) {
- log.error("Timeout occured");
- throw e;
+ log.warn("Timeout occurred while stopping consumer. This exception is ignored", e);
}
}
this.consumers.clear();