You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/03/28 08:24:19 UTC
[3/6] camel git commit: batch consumer will create new sessions
instead of bailing out now
batch consumer will create new sessions instead of bailing out now
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a5646bbf
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a5646bbf
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a5646bbf
Branch: refs/heads/master
Commit: a5646bbf2ff78dc87036a2c319159db962764128
Parents: 0dc847e
Author: Bryan Love <br...@iovation.com>
Authored: Wed Mar 22 16:43:33 2017 -0700
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Mar 28 10:03:54 2017 +0200
----------------------------------------------------------------------
.../component/sjms/batch/SjmsBatchConsumer.java | 34 +++++++++++++-------
1 file changed, 22 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/a5646bbf/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
index 630bdfb..2f2440d 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
@@ -298,24 +298,34 @@ public class SjmsBatchConsumer extends DefaultConsumer {
@Override
public void run() {
try {
- // a batch corresponds to a single session that will be committed or rolled back by a background thread
- final Session session = connection.createSession(TRANSACTED, Session.CLIENT_ACKNOWLEDGE);
- try {
- // only batch consumption from queues is supported - it makes no sense to transactionally consume
- // from a topic as you don't car about message loss, users can just use a regular aggregator instead
- Queue queue = session.createQueue(destinationName);
- MessageConsumer consumer = session.createConsumer(queue);
-
+ // this loop is intended to keep the consumer up and running as long as it's supposed to be, but allow it to bail if signaled
+ while (running.get()) {
+ // a batch corresponds to a single session that will be committed or rolled back by a background thread
+ final Session session = connection.createSession(TRANSACTED, Session.CLIENT_ACKNOWLEDGE);
try {
- task.consumeBatchesOnLoop(session, consumer);
+ // only batch consumption from queues is supported - it makes no sense to transactionally consume
+ // from a topic as you don't car about message loss, users can just use a regular aggregator instead
+ Queue queue = session.createQueue(destinationName);
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ try {
+ task.consumeBatchesOnLoop(session, consumer);
+ } finally {
+ closeJmsConsumer(consumer);
+ }
+ } catch (javax.jms.IllegalStateException ex) {
+ // from consumeBatchesOnLoop
+ // this will log the exception and the parent loop will create a new session
+ getExceptionHandler().handleException("Exception caught consuming from " + destinationName, ex);
+ //rest a minute to avoid destroying the logs
+ Thread.sleep(2000);
} finally {
- closeJmsConsumer(consumer);
+ closeJmsSession(session);
}
- } finally {
- closeJmsSession(session);
}
} catch (Throwable ex) {
// from consumeBatchesOnLoop
+ // catch anything besides the IllegalStateException and exit the application
getExceptionHandler().handleException("Exception caught consuming from " + destinationName, ex);
} finally {
// indicate that we have shut down