You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/03/28 08:24:19 UTC

[3/6] camel git commit: batch consumer will create new sessions instead of bailing out now

batch consumer will create new sessions instead of bailing out now


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a5646bbf
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a5646bbf
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a5646bbf

Branch: refs/heads/master
Commit: a5646bbf2ff78dc87036a2c319159db962764128
Parents: 0dc847e
Author: Bryan Love <br...@iovation.com>
Authored: Wed Mar 22 16:43:33 2017 -0700
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Mar 28 10:03:54 2017 +0200

----------------------------------------------------------------------
 .../component/sjms/batch/SjmsBatchConsumer.java | 34 +++++++++++++-------
 1 file changed, 22 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/a5646bbf/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
index 630bdfb..2f2440d 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
@@ -298,24 +298,34 @@ public class SjmsBatchConsumer extends DefaultConsumer {
         @Override
         public void run() {
             try {
-                // a batch corresponds to a single session that will be committed or rolled back by a background thread
-                final Session session = connection.createSession(TRANSACTED, Session.CLIENT_ACKNOWLEDGE);
-                try {
-                    // only batch consumption from queues is supported - it makes no sense to transactionally consume
-                    // from a topic as you don't car about message loss, users can just use a regular aggregator instead
-                    Queue queue = session.createQueue(destinationName);
-                    MessageConsumer consumer = session.createConsumer(queue);
-
+                // this loop is intended to keep the consumer up and running as long as it's supposed to be, but allow it to bail if signaled
+                while (running.get()) {
+                    // a batch corresponds to a single session that will be committed or rolled back by a background thread
+                    final Session session = connection.createSession(TRANSACTED, Session.CLIENT_ACKNOWLEDGE);
                     try {
-                        task.consumeBatchesOnLoop(session, consumer);
+                        // only batch consumption from queues is supported - it makes no sense to transactionally consume
+                        // from a topic as you don't car about message loss, users can just use a regular aggregator instead
+                        Queue queue = session.createQueue(destinationName);
+                        MessageConsumer consumer = session.createConsumer(queue);
+
+                        try {
+                            task.consumeBatchesOnLoop(session, consumer);
+                        } finally {
+                            closeJmsConsumer(consumer);
+                        }
+                    } catch (javax.jms.IllegalStateException ex) {
+                        // from consumeBatchesOnLoop
+                        // this will log the exception and the parent loop will create a new session
+                        getExceptionHandler().handleException("Exception caught consuming from " + destinationName, ex);
+                        //rest a minute to avoid destroying the logs
+                        Thread.sleep(2000);
                     } finally {
-                        closeJmsConsumer(consumer);
+                        closeJmsSession(session);
                     }
-                } finally {
-                    closeJmsSession(session);
                 }
             } catch (Throwable ex) {
                 // from consumeBatchesOnLoop
+                // catch anything besides the IllegalStateException and exit the application
                 getExceptionHandler().handleException("Exception caught consuming from " + destinationName, ex);
             } finally {
                 // indicate that we have shut down