You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/03/28 08:24:20 UTC

[4/6] camel git commit: added keepAliveDelay URI param to prevent premature exit

added keepAliveDelay URI param to prevent premature exit


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/bd6b87c5
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/bd6b87c5
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/bd6b87c5

Branch: refs/heads/master
Commit: bd6b87c5855c339396c547ca63bdfe70b4b0aa5a
Parents: a5646bb
Author: Bryan Love <br...@iovation.com>
Authored: Thu Mar 23 11:25:46 2017 -0700
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Mar 28 10:03:54 2017 +0200

----------------------------------------------------------------------
 .../component/sjms/batch/SjmsBatchConsumer.java | 21 +++++++++++++++-----
 .../component/sjms/batch/SjmsBatchEndpoint.java |  6 ++++++
 2 files changed, 22 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/bd6b87c5/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
index 2f2440d..c386c66 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
@@ -128,7 +128,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
         super.doStart();
 
         boolean recovery = getEndpoint().isAsyncStartListener();
-        StartConsumerTask task = new StartConsumerTask(recovery, getEndpoint().getRecoveryInterval());
+        StartConsumerTask task = new StartConsumerTask(recovery, getEndpoint().getRecoveryInterval(), getEndpoint().getKeepAliveDelay());
 
         if (recovery) {
             // use a background thread to keep starting the consumer until
@@ -145,11 +145,13 @@ public class SjmsBatchConsumer extends DefaultConsumer {
 
         private boolean recoveryEnabled;
         private int recoveryInterval;
+        private int keepAliveDelay;
         private long attempt;
 
-        public StartConsumerTask(boolean recoveryEnabled, int recoveryInterval) {
+        public StartConsumerTask(boolean recoveryEnabled, int recoveryInterval, int keepAliveDelay) {
             this.recoveryEnabled = recoveryEnabled;
             this.recoveryInterval = recoveryInterval;
+            this.keepAliveDelay = keepAliveDelay;
         }
 
         @Override
@@ -183,6 +185,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
                     final List<AtomicBoolean> triggers = new ArrayList<>();
                     for (int i = 0; i < consumerCount; i++) {
                         BatchConsumptionLoop loop = new BatchConsumptionLoop();
+                        loop.setKeepAliveDelay(keepAliveDelay);
                         triggers.add(loop.getCompletionTimeoutTrigger());
                         jmsConsumerExecutors.submit(loop);
                     }
@@ -290,16 +293,20 @@ public class SjmsBatchConsumer extends DefaultConsumer {
 
         private final AtomicBoolean completionTimeoutTrigger = new AtomicBoolean();
         private final BatchConsumptionTask task = new BatchConsumptionTask(completionTimeoutTrigger);
+        private int keepAliveDelay;
 
         public AtomicBoolean getCompletionTimeoutTrigger() {
             return completionTimeoutTrigger;
         }
+        public void setKeepAliveDelay(int i){
+            keepAliveDelay = i;
+        }
 
         @Override
         public void run() {
             try {
                 // this loop is intended to keep the consumer up and running as long as it's supposed to be, but allow it to bail if signaled
-                while (running.get()) {
+                while (running.get() || isStarting()) {
                     // a batch corresponds to a single session that will be committed or rolled back by a background thread
                     final Session session = connection.createSession(TRANSACTED, Session.CLIENT_ACKNOWLEDGE);
                     try {
@@ -315,10 +322,12 @@ public class SjmsBatchConsumer extends DefaultConsumer {
                         }
                     } catch (javax.jms.IllegalStateException ex) {
                         // from consumeBatchesOnLoop
+                        // if keepAliveDelay was not specified just rethrow to break the loop. This preserves original default behavior
+                        if(keepAliveDelay == -1) throw ex;
                         // this will log the exception and the parent loop will create a new session
                         getExceptionHandler().handleException("Exception caught consuming from " + destinationName, ex);
-                        //rest a minute to avoid destroying the logs
-                        Thread.sleep(2000);
+                        //sleep to avoid log spamming
+                        Thread.sleep(keepAliveDelay);
                     } finally {
                         closeJmsSession(session);
                     }
@@ -401,6 +410,8 @@ public class SjmsBatchConsumer extends DefaultConsumer {
                     long waitTime = (usingTimeout && (timeElapsed > 0))
                             ? getReceiveWaitTime(timeElapsed)
                             : pollDuration;
+
+
                     Message message = consumer.receive(waitTime);
 
                     if (running.get()) {

http://git-wip-us.apache.org/repos/asf/camel/blob/bd6b87c5/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
index 84e1fd1..2e8affb 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
@@ -93,6 +93,8 @@ public class SjmsBatchEndpoint extends DefaultEndpoint implements HeaderFilterSt
     private boolean asyncStartListener;
     @UriParam(label = "advanced", defaultValue = "5000")
     private int recoveryInterval = 5000;
+    @UriParam(label = "advanced", defaultValue = "-1")
+    private int keepAliveDelay = -1;
 
     public SjmsBatchEndpoint() {
     }
@@ -397,6 +399,10 @@ public class SjmsBatchEndpoint extends DefaultEndpoint implements HeaderFilterSt
         return recoveryInterval;
     }
 
+    public int getKeepAliveDelay() {
+        return recoveryInterval;
+    }
+
     /**
      * Specifies the interval between recovery attempts, i.e. when a connection is being refreshed, in milliseconds.
      * The default is 5000 ms, that is, 5 seconds.