You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/08/16 14:28:32 UTC
[1/6] camel git commit: Fix CAMEL-10229
Repository: camel
Updated Branches:
refs/heads/camel-2.17.x 199263a1a -> 19f619eee
refs/heads/master 3da0654ef -> f39b83eeb
Fix CAMEL-10229
Use a semaphore to wait for the message to be processed when
autoAck=false
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/6c693842
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/6c693842
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/6c693842
Branch: refs/heads/master
Commit: 6c693842b65ac587bc40c8f9dd7cc829d21c82dd
Parents: 3da0654
Author: miti <pr...@textkernel.nl>
Authored: Fri Aug 12 16:41:55 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Aug 16 16:22:12 2016 +0200
----------------------------------------------------------------------
.../component/rabbitmq/RabbitConsumer.java | 28 ++++++++++++++++++--
1 file changed, 26 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/6c693842/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
index 21560f8..d143d9b 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
@@ -17,6 +17,7 @@
package org.apache.camel.component.rabbitmq;
import java.io.IOException;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
@@ -41,6 +42,8 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer {
/** Consumer tag for this consumer. */
private volatile String consumerTag;
private volatile boolean stopping;
+
+ private final Semaphore lock = new Semaphore(1);
/**
* Constructs a new instance and records its association to the passed-in
@@ -56,9 +59,26 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer {
log.warn("Unable to open channel for RabbitMQConsumer. Continuing and will try again", e);
}
}
-
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
+ try {
+ if (!consumer.getEndpoint().isAutoAck()) {
+ lock.acquire();
+ }
+ doHandleDelivery(consumerTag, envelope, properties, body);
+ if (!consumer.getEndpoint().isAutoAck()) {
+ lock.release();
+ }
+
+ } catch (InterruptedException e) {
+ log.error("Thread Interrupted!");
+
+ }
+
+
+ }
+
+ public void doHandleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
Exchange exchange = consumer.getEndpoint().createRabbitExchange(envelope, properties, body);
consumer.getEndpoint().getMessageConverter().mergeAmqpProperties(exchange, properties);
@@ -160,12 +180,16 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer {
channel.basicCancel(tag);
}
try {
+ lock.acquire();
if (isChannelOpen()) {
channel.close();
}
- } catch (TimeoutException e) {
+ lock.release();
+ } catch (TimeoutException e) {
log.error("Timeout occured");
throw e;
+ } catch (InterruptedException e1) {
+ log.error("Thread Interrupted!");
}
}
[2/6] camel git commit: Hardening stuff a little bit .. following
sugestions by @pcan
Posted by da...@apache.org.
Hardening stuff a little bit .. following sugestions by @pcan
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/7ee0977c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/7ee0977c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/7ee0977c
Branch: refs/heads/master
Commit: 7ee0977c9f5c327a95122f5b80202dc5dd872e40
Parents: 6c69384
Author: miti <pr...@textkernel.nl>
Authored: Tue Aug 16 10:03:39 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Aug 16 16:23:20 2016 +0200
----------------------------------------------------------------------
.../component/rabbitmq/RabbitConsumer.java | 20 ++++++++++++++------
1 file changed, 14 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/7ee0977c/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
index d143d9b..93e2499 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
@@ -65,10 +65,16 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer {
if (!consumer.getEndpoint().isAutoAck()) {
lock.acquire();
}
- doHandleDelivery(consumerTag, envelope, properties, body);
- if (!consumer.getEndpoint().isAutoAck()) {
- lock.release();
- }
+ //Channel might be open because while we were waiting for the lock, stop() has been succesfully called.
+ if (!channel.isOpen()) return;
+
+ try {
+ doHandleDelivery(consumerTag, envelope, properties, body);
+ } finally {
+ if (!consumer.getEndpoint().isAutoAck()) {
+ lock.release();
+ }
+ }
} catch (InterruptedException e) {
log.error("Thread Interrupted!");
@@ -184,13 +190,15 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer {
if (isChannelOpen()) {
channel.close();
}
- lock.release();
} catch (TimeoutException e) {
log.error("Timeout occured");
throw e;
} catch (InterruptedException e1) {
log.error("Thread Interrupted!");
- }
+ } finally {
+ lock.release();
+
+ }
}
/**
[4/6] camel git commit: Fix CAMEL-10229
Posted by da...@apache.org.
Fix CAMEL-10229
Use a semaphore to wait for the message to be processed when
autoAck=false
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ce6eb9ed
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ce6eb9ed
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ce6eb9ed
Branch: refs/heads/camel-2.17.x
Commit: ce6eb9edd47cacd873eedb54868f599d9af2e50e
Parents: 199263a
Author: miti <pr...@textkernel.nl>
Authored: Fri Aug 12 16:41:55 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Aug 16 16:28:07 2016 +0200
----------------------------------------------------------------------
.../component/rabbitmq/RabbitConsumer.java | 28 ++++++++++++++++++--
1 file changed, 26 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/ce6eb9ed/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
index fb61c4b..cb2e47f 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
@@ -17,6 +17,7 @@
package org.apache.camel.component.rabbitmq;
import java.io.IOException;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
@@ -41,6 +42,8 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer {
/** Consumer tag for this consumer. */
private volatile String consumerTag;
private volatile boolean stopping;
+
+ private final Semaphore lock = new Semaphore(1);
/**
* Constructs a new instance and records its association to the passed-in
@@ -59,9 +62,26 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer {
log.warn("Unable to open channel for RabbitMQConsumer. Continuing and will try again", e);
}
}
-
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
+ try {
+ if (!consumer.getEndpoint().isAutoAck()) {
+ lock.acquire();
+ }
+ doHandleDelivery(consumerTag, envelope, properties, body);
+ if (!consumer.getEndpoint().isAutoAck()) {
+ lock.release();
+ }
+
+ } catch (InterruptedException e) {
+ log.error("Thread Interrupted!");
+
+ }
+
+
+ }
+
+ public void doHandleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
Exchange exchange = consumer.getEndpoint().createRabbitExchange(envelope, properties, body);
consumer.getEndpoint().getMessageConverter().mergeAmqpProperties(exchange, properties);
@@ -163,12 +183,16 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer {
channel.basicCancel(tag);
}
try {
+ lock.acquire();
if (isChannelOpen()) {
channel.close();
}
- } catch (TimeoutException e) {
+ lock.release();
+ } catch (TimeoutException e) {
log.error("Timeout occured");
throw e;
+ } catch (InterruptedException e1) {
+ log.error("Thread Interrupted!");
}
}
[5/6] camel git commit: Hardening stuff a little bit .. following
sugestions by @pcan
Posted by da...@apache.org.
Hardening stuff a little bit .. following sugestions by @pcan
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/26a78cd1
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/26a78cd1
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/26a78cd1
Branch: refs/heads/camel-2.17.x
Commit: 26a78cd1ad0b108348e69f16b7be99c1d1d9130e
Parents: ce6eb9e
Author: miti <pr...@textkernel.nl>
Authored: Tue Aug 16 10:03:39 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Aug 16 16:28:14 2016 +0200
----------------------------------------------------------------------
.../component/rabbitmq/RabbitConsumer.java | 20 ++++++++++++++------
1 file changed, 14 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/26a78cd1/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
index cb2e47f..86c696a 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
@@ -68,10 +68,16 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer {
if (!consumer.getEndpoint().isAutoAck()) {
lock.acquire();
}
- doHandleDelivery(consumerTag, envelope, properties, body);
- if (!consumer.getEndpoint().isAutoAck()) {
- lock.release();
- }
+ //Channel might be open because while we were waiting for the lock, stop() has been succesfully called.
+ if (!channel.isOpen()) return;
+
+ try {
+ doHandleDelivery(consumerTag, envelope, properties, body);
+ } finally {
+ if (!consumer.getEndpoint().isAutoAck()) {
+ lock.release();
+ }
+ }
} catch (InterruptedException e) {
log.error("Thread Interrupted!");
@@ -187,13 +193,15 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer {
if (isChannelOpen()) {
channel.close();
}
- lock.release();
} catch (TimeoutException e) {
log.error("Timeout occured");
throw e;
} catch (InterruptedException e1) {
log.error("Thread Interrupted!");
- }
+ } finally {
+ lock.release();
+
+ }
}
/**
[3/6] camel git commit: Fixed CS. Polished. This closes #1119
Posted by da...@apache.org.
Fixed CS. Polished. This closes #1119
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f39b83ee
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f39b83ee
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f39b83ee
Branch: refs/heads/master
Commit: f39b83eebb6133086e01d98c1f3fb3af38f2dd09
Parents: 7ee0977
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Aug 16 16:27:35 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Aug 16 16:27:35 2016 +0200
----------------------------------------------------------------------
.../component/rabbitmq/RabbitConsumer.java | 55 ++++++++++----------
.../component/rabbitmq/RabbitMQConsumer.java | 3 +-
2 files changed, 28 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/f39b83ee/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
index 93e2499..6c20b57 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
@@ -26,7 +26,6 @@ import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
-
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Message;
@@ -42,7 +41,7 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer {
/** Consumer tag for this consumer. */
private volatile String consumerTag;
private volatile boolean stopping;
-
+
private final Semaphore lock = new Semaphore(1);
/**
@@ -59,29 +58,29 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer {
log.warn("Unable to open channel for RabbitMQConsumer. Continuing and will try again", e);
}
}
+
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- try {
+ try {
if (!consumer.getEndpoint().isAutoAck()) {
- lock.acquire();
+ lock.acquire();
}
//Channel might be open because while we were waiting for the lock, stop() has been succesfully called.
- if (!channel.isOpen()) return;
-
+ if (!channel.isOpen()) {
+ return;
+ }
+
try {
doHandleDelivery(consumerTag, envelope, properties, body);
} finally {
if (!consumer.getEndpoint().isAutoAck()) {
- lock.release();
+ lock.release();
}
- }
-
- } catch (InterruptedException e) {
- log.error("Thread Interrupted!");
-
- }
-
-
+ }
+
+ } catch (InterruptedException e) {
+ log.warn("Thread Interrupted!");
+ }
}
public void doHandleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
@@ -186,25 +185,25 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer {
channel.basicCancel(tag);
}
try {
- lock.acquire();
+ lock.acquire();
if (isChannelOpen()) {
channel.close();
}
- } catch (TimeoutException e) {
+ } catch (TimeoutException e) {
log.error("Timeout occured");
throw e;
} catch (InterruptedException e1) {
- log.error("Thread Interrupted!");
+ log.error("Thread Interrupted!");
} finally {
lock.release();
-
- }
+
+ }
}
/**
* Stores the most recently passed-in consumerTag - semantically, there
* should be only one.
- *
+ *
* @see Consumer#handleConsumeOk
*/
public void handleConsumeOk(String consumerTag) {
@@ -213,7 +212,7 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer {
/**
* Retrieve the consumer tag.
- *
+ *
* @return the most recently notified consumer tag.
*/
public String getConsumerTag() {
@@ -222,31 +221,31 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer {
/**
* No-op implementation of {@link Consumer#handleCancelOk}.
- *
+ *
* @param consumerTag
* the defined consumer tag (client- or server-generated)
*/
public void handleCancelOk(String consumerTag) {
// no work to do
- log.debug("Recieved cancelOk signal on the rabbitMQ channel");
+ log.debug("Received cancelOk signal on the rabbitMQ channel");
}
/**
* No-op implementation of {@link Consumer#handleCancel(String)}
- *
+ *
* @param consumerTag
* the defined consumer tag (client- or server-generated)
*/
public void handleCancel(String consumerTag) throws IOException {
// no work to do
- log.debug("Recieved cancel signal on the rabbitMQ channel");
+ log.debug("Received cancel signal on the rabbitMQ channel");
}
/**
* No-op implementation of {@link Consumer#handleShutdownSignal}.
*/
public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
- log.info("Recieved shutdown signal on the rabbitMQ channel");
+ log.info("Received shutdown signal on the rabbitMQ channel");
// Check if the consumer closed the connection or something else
if (!sig.isInitiatedByApplication()) {
@@ -277,7 +276,7 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer {
*/
public void handleRecoverOk(String consumerTag) {
// no work to do
- log.debug("Recieved recover ok signal on the rabbitMQ channel");
+ log.debug("Received recover ok signal on the rabbitMQ channel");
}
/**
http://git-wip-us.apache.org/repos/asf/camel/blob/f39b83ee/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index 9faffc2..9c02cb7 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -131,8 +131,7 @@ public class RabbitMQConsumer extends DefaultConsumer implements Suspendable {
try {
consumer.stop();
} catch (TimeoutException e) {
- log.error("Timeout occured");
- throw e;
+ log.warn("Timeout occurred while stopping consumer. This exception is ignored", e);
}
}
this.consumers.clear();
[6/6] camel git commit: Fixed CS. Polished. This closes #1119
Posted by da...@apache.org.
Fixed CS. Polished. This closes #1119
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/19f619ee
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/19f619ee
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/19f619ee
Branch: refs/heads/camel-2.17.x
Commit: 19f619eee5a6af6fac526de6a79905d6a2124020
Parents: 26a78cd
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Aug 16 16:27:35 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Aug 16 16:28:21 2016 +0200
----------------------------------------------------------------------
.../component/rabbitmq/RabbitConsumer.java | 55 ++++++++++----------
.../component/rabbitmq/RabbitMQConsumer.java | 3 +-
2 files changed, 28 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/19f619ee/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
index 86c696a..2a13f24 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
@@ -26,7 +26,6 @@ import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
-
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Message;
@@ -42,7 +41,7 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer {
/** Consumer tag for this consumer. */
private volatile String consumerTag;
private volatile boolean stopping;
-
+
private final Semaphore lock = new Semaphore(1);
/**
@@ -62,29 +61,29 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer {
log.warn("Unable to open channel for RabbitMQConsumer. Continuing and will try again", e);
}
}
+
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- try {
+ try {
if (!consumer.getEndpoint().isAutoAck()) {
- lock.acquire();
+ lock.acquire();
}
//Channel might be open because while we were waiting for the lock, stop() has been succesfully called.
- if (!channel.isOpen()) return;
-
+ if (!channel.isOpen()) {
+ return;
+ }
+
try {
doHandleDelivery(consumerTag, envelope, properties, body);
} finally {
if (!consumer.getEndpoint().isAutoAck()) {
- lock.release();
+ lock.release();
}
- }
-
- } catch (InterruptedException e) {
- log.error("Thread Interrupted!");
-
- }
-
-
+ }
+
+ } catch (InterruptedException e) {
+ log.warn("Thread Interrupted!");
+ }
}
public void doHandleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
@@ -189,25 +188,25 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer {
channel.basicCancel(tag);
}
try {
- lock.acquire();
+ lock.acquire();
if (isChannelOpen()) {
channel.close();
}
- } catch (TimeoutException e) {
+ } catch (TimeoutException e) {
log.error("Timeout occured");
throw e;
} catch (InterruptedException e1) {
- log.error("Thread Interrupted!");
+ log.error("Thread Interrupted!");
} finally {
lock.release();
-
- }
+
+ }
}
/**
* Stores the most recently passed-in consumerTag - semantically, there
* should be only one.
- *
+ *
* @see Consumer#handleConsumeOk
*/
public void handleConsumeOk(String consumerTag) {
@@ -216,7 +215,7 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer {
/**
* Retrieve the consumer tag.
- *
+ *
* @return the most recently notified consumer tag.
*/
public String getConsumerTag() {
@@ -225,31 +224,31 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer {
/**
* No-op implementation of {@link Consumer#handleCancelOk}.
- *
+ *
* @param consumerTag
* the defined consumer tag (client- or server-generated)
*/
public void handleCancelOk(String consumerTag) {
// no work to do
- log.debug("Recieved cancelOk signal on the rabbitMQ channel");
+ log.debug("Received cancelOk signal on the rabbitMQ channel");
}
/**
* No-op implementation of {@link Consumer#handleCancel(String)}
- *
+ *
* @param consumerTag
* the defined consumer tag (client- or server-generated)
*/
public void handleCancel(String consumerTag) throws IOException {
// no work to do
- log.debug("Recieved cancel signal on the rabbitMQ channel");
+ log.debug("Received cancel signal on the rabbitMQ channel");
}
/**
* No-op implementation of {@link Consumer#handleShutdownSignal}.
*/
public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
- log.info("Recieved shutdown signal on the rabbitMQ channel");
+ log.info("Received shutdown signal on the rabbitMQ channel");
// Check if the consumer closed the connection or something else
if (!sig.isInitiatedByApplication()) {
@@ -280,7 +279,7 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer {
*/
public void handleRecoverOk(String consumerTag) {
// no work to do
- log.debug("Recieved recover ok signal on the rabbitMQ channel");
+ log.debug("Received recover ok signal on the rabbitMQ channel");
}
/**
http://git-wip-us.apache.org/repos/asf/camel/blob/19f619ee/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index 69d3a0b..cee9ff3 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -131,8 +131,7 @@ public class RabbitMQConsumer extends DefaultConsumer implements Suspendable {
try {
consumer.stop();
} catch (TimeoutException e) {
- log.error("Timeout occured");
- throw e;
+ log.warn("Timeout occurred while stopping consumer. This exception is ignored", e);
}
}
this.consumers.clear();