You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/03/28 08:24:20 UTC
[4/6] camel git commit: added keepAliveDelay URI param to prevent
premature exit
added keepAliveDelay URI param to prevent premature exit
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/bd6b87c5
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/bd6b87c5
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/bd6b87c5
Branch: refs/heads/master
Commit: bd6b87c5855c339396c547ca63bdfe70b4b0aa5a
Parents: a5646bb
Author: Bryan Love <br...@iovation.com>
Authored: Thu Mar 23 11:25:46 2017 -0700
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Mar 28 10:03:54 2017 +0200
----------------------------------------------------------------------
.../component/sjms/batch/SjmsBatchConsumer.java | 21 +++++++++++++++-----
.../component/sjms/batch/SjmsBatchEndpoint.java | 6 ++++++
2 files changed, 22 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/bd6b87c5/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
index 2f2440d..c386c66 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
@@ -128,7 +128,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
super.doStart();
boolean recovery = getEndpoint().isAsyncStartListener();
- StartConsumerTask task = new StartConsumerTask(recovery, getEndpoint().getRecoveryInterval());
+ StartConsumerTask task = new StartConsumerTask(recovery, getEndpoint().getRecoveryInterval(), getEndpoint().getKeepAliveDelay());
if (recovery) {
// use a background thread to keep starting the consumer until
@@ -145,11 +145,13 @@ public class SjmsBatchConsumer extends DefaultConsumer {
private boolean recoveryEnabled;
private int recoveryInterval;
+ private int keepAliveDelay;
private long attempt;
- public StartConsumerTask(boolean recoveryEnabled, int recoveryInterval) {
+ public StartConsumerTask(boolean recoveryEnabled, int recoveryInterval, int keepAliveDelay) {
this.recoveryEnabled = recoveryEnabled;
this.recoveryInterval = recoveryInterval;
+ this.keepAliveDelay = keepAliveDelay;
}
@Override
@@ -183,6 +185,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
final List<AtomicBoolean> triggers = new ArrayList<>();
for (int i = 0; i < consumerCount; i++) {
BatchConsumptionLoop loop = new BatchConsumptionLoop();
+ loop.setKeepAliveDelay(keepAliveDelay);
triggers.add(loop.getCompletionTimeoutTrigger());
jmsConsumerExecutors.submit(loop);
}
@@ -290,16 +293,20 @@ public class SjmsBatchConsumer extends DefaultConsumer {
private final AtomicBoolean completionTimeoutTrigger = new AtomicBoolean();
private final BatchConsumptionTask task = new BatchConsumptionTask(completionTimeoutTrigger);
+ private int keepAliveDelay;
public AtomicBoolean getCompletionTimeoutTrigger() {
return completionTimeoutTrigger;
}
+ public void setKeepAliveDelay(int i){
+ keepAliveDelay = i;
+ }
@Override
public void run() {
try {
// this loop is intended to keep the consumer up and running as long as it's supposed to be, but allow it to bail if signaled
- while (running.get()) {
+ while (running.get() || isStarting()) {
// a batch corresponds to a single session that will be committed or rolled back by a background thread
final Session session = connection.createSession(TRANSACTED, Session.CLIENT_ACKNOWLEDGE);
try {
@@ -315,10 +322,12 @@ public class SjmsBatchConsumer extends DefaultConsumer {
}
} catch (javax.jms.IllegalStateException ex) {
// from consumeBatchesOnLoop
+ // if keepAliveDelay was not specified just rethrow to break the loop. This preserves original default behavior
+ if(keepAliveDelay == -1) throw ex;
// this will log the exception and the parent loop will create a new session
getExceptionHandler().handleException("Exception caught consuming from " + destinationName, ex);
- //rest a minute to avoid destroying the logs
- Thread.sleep(2000);
+ //sleep to avoid log spamming
+ Thread.sleep(keepAliveDelay);
} finally {
closeJmsSession(session);
}
@@ -401,6 +410,8 @@ public class SjmsBatchConsumer extends DefaultConsumer {
long waitTime = (usingTimeout && (timeElapsed > 0))
? getReceiveWaitTime(timeElapsed)
: pollDuration;
+
+
Message message = consumer.receive(waitTime);
if (running.get()) {
http://git-wip-us.apache.org/repos/asf/camel/blob/bd6b87c5/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
index 84e1fd1..2e8affb 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
@@ -93,6 +93,8 @@ public class SjmsBatchEndpoint extends DefaultEndpoint implements HeaderFilterSt
private boolean asyncStartListener;
@UriParam(label = "advanced", defaultValue = "5000")
private int recoveryInterval = 5000;
+ @UriParam(label = "advanced", defaultValue = "-1")
+ private int keepAliveDelay = -1;
public SjmsBatchEndpoint() {
}
@@ -397,6 +399,10 @@ public class SjmsBatchEndpoint extends DefaultEndpoint implements HeaderFilterSt
return recoveryInterval;
}
+ public int getKeepAliveDelay() {
+ return recoveryInterval;
+ }
+
/**
* Specifies the interval between recovery attempts, i.e. when a connection is being refreshed, in milliseconds.
* The default is 5000 ms, that is, 5 seconds.