You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2022/01/13 16:27:43 UTC
[camel] branch camel-3.14.x updated: Backport CAMEL-17472 (#6738)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-3.14.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.14.x by this push:
new 436a019 Backport CAMEL-17472 (#6738)
436a019 is described below
commit 436a019d4053eae131c79c1886ef67d86280d1f4
Author: Otavio Rodolfo Piske <or...@users.noreply.github.com>
AuthorDate: Thu Jan 13 17:26:05 2022 +0100
Backport CAMEL-17472 (#6738)
* CAMEL-17472: fix consumer reconnect no longer works
Includes:
- do comply with unlimited duration tasks
- improved log messages for easier debug
* camel-smpp: updated details about running the manual integration tests
* CAMEL-17477: respect the re-connect delay when reconnecting
* CAMEL-17472: do not exhaust scheduled service
Includes:
- allow giving more time to the shutdown of background tasks
- fix preventing other tasks from being scheduled
- minor related code cleanup
---
.../apache/camel/component/smpp/SmppConsumer.java | 99 +++++++++++++---------
.../apache/camel/component/smpp/SmppProducer.java | 77 ++++++++++-------
.../org/apache/camel/component/smpp/SmppUtils.java | 35 ++++++--
.../integration/SmppConsumerReconnectManualIT.java | 6 +-
.../integration/SmppProducerReconnectManualIT.java | 6 +-
.../apache/camel/support/task/BackgroundTask.java | 23 +++--
.../support/task/budget/TimeBoundedBudget.java | 7 +-
.../task/BackgroundIterationTimeTaskTest.java | 18 ++++
8 files changed, 180 insertions(+), 91 deletions(-)
diff --git a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java
index 2368bba..8723c1e 100644
--- a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java
+++ b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java
@@ -17,6 +17,7 @@
package org.apache.camel.component.smpp;
import java.io.IOException;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.camel.Processor;
@@ -32,15 +33,16 @@ import org.jsmpp.extra.SessionState;
import org.jsmpp.session.BindParameter;
import org.jsmpp.session.MessageReceiverListener;
import org.jsmpp.session.SMPPSession;
-import org.jsmpp.session.Session;
import org.jsmpp.session.SessionStateListener;
import org.jsmpp.util.DefaultComposer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.camel.component.smpp.SmppUtils.createExecutor;
import static org.apache.camel.component.smpp.SmppUtils.isServiceStopping;
import static org.apache.camel.component.smpp.SmppUtils.isSessionClosed;
import static org.apache.camel.component.smpp.SmppUtils.newReconnectTask;
+import static org.apache.camel.component.smpp.SmppUtils.shutdownReconnectService;
/**
* An implementation of consumer which use the SMPP protocol
@@ -48,13 +50,16 @@ import static org.apache.camel.component.smpp.SmppUtils.newReconnectTask;
public class SmppConsumer extends DefaultConsumer {
private static final Logger LOG = LoggerFactory.getLogger(SmppConsumer.class);
+ private static final String RECONNECT_TASK_NAME = "smpp-consumer-reconnect";
- private SmppConfiguration configuration;
- private SMPPSession session;
- private MessageReceiverListener messageReceiverListener;
- private SessionStateListener internalSessionStateListener;
+ private final SmppConfiguration configuration;
+ private final MessageReceiverListener messageReceiverListener;
+ private final SessionStateListener internalSessionStateListener;
private final ReentrantLock reconnectLock = new ReentrantLock();
+ private final ScheduledExecutorService reconnectService;
+
+ private SMPPSession session;
/**
* The constructor which gets a smpp endpoint, a smpp configuration and a processor
@@ -62,19 +67,19 @@ public class SmppConsumer extends DefaultConsumer {
public SmppConsumer(SmppEndpoint endpoint, SmppConfiguration config, Processor processor) {
super(endpoint, processor);
+ this.reconnectService = createExecutor(this, endpoint, RECONNECT_TASK_NAME);
+
this.configuration = config;
- this.internalSessionStateListener = new SessionStateListener() {
- @Override
- public void onStateChange(SessionState newState, SessionState oldState, Session source) {
- if (configuration.getSessionStateListener() != null) {
- configuration.getSessionStateListener().onStateChange(newState, oldState, source);
- }
-
- if (newState.equals(SessionState.CLOSED)) {
- LOG.warn("Lost connection to: {} - trying to reconnect...", getEndpoint().getConnectionString());
- closeSession();
- reconnect(configuration.getInitialReconnectDelay());
- }
+ this.internalSessionStateListener = (newState, oldState, source) -> {
+ if (configuration.getSessionStateListener() != null) {
+ configuration.getSessionStateListener().onStateChange(newState, oldState, source);
+ }
+
+ if (newState.equals(SessionState.CLOSED)) {
+ LOG.warn("Lost connection to: {} - trying to reconnect...", getEndpoint().getConnectionString());
+ closeSession();
+
+ reconnect(configuration.getInitialReconnectDelay());
}
};
this.messageReceiverListener
@@ -92,21 +97,21 @@ public class SmppConsumer extends DefaultConsumer {
}
private SMPPSession createSession() throws IOException {
- SMPPSession session = createSMPPSession();
- session.setEnquireLinkTimer(configuration.getEnquireLinkTimer());
- session.setTransactionTimer(configuration.getTransactionTimer());
- session.setPduProcessorDegree(this.configuration.getPduProcessorDegree());
- session.setQueueCapacity(this.configuration.getPduProcessorQueueCapacity());
- session.addSessionStateListener(internalSessionStateListener);
- session.setMessageReceiverListener(messageReceiverListener);
- session.connectAndBind(this.configuration.getHost(), this.configuration.getPort(),
+ SMPPSession newSession = createSMPPSession();
+ newSession.setEnquireLinkTimer(configuration.getEnquireLinkTimer());
+ newSession.setTransactionTimer(configuration.getTransactionTimer());
+ newSession.setPduProcessorDegree(this.configuration.getPduProcessorDegree());
+ newSession.setQueueCapacity(this.configuration.getPduProcessorQueueCapacity());
+ newSession.addSessionStateListener(internalSessionStateListener);
+ newSession.setMessageReceiverListener(messageReceiverListener);
+ newSession.connectAndBind(this.configuration.getHost(), this.configuration.getPort(),
new BindParameter(
BindType.BIND_RX, this.configuration.getSystemId(),
this.configuration.getPassword(), this.configuration.getSystemType(),
TypeOfNumber.UNKNOWN, NumberingPlanIndicator.UNKNOWN,
configuration.getAddressRange()));
- return session;
+ return newSession;
}
/**
@@ -125,6 +130,8 @@ public class SmppConsumer extends DefaultConsumer {
@Override
protected void doStop() throws Exception {
+ shutdownReconnectService(reconnectService);
+
LOG.debug("Disconnecting from: {}...", getEndpoint().getConnectionString());
super.doStop();
@@ -143,29 +150,43 @@ public class SmppConsumer extends DefaultConsumer {
}
private boolean doReconnect() {
- if (isServiceStopping(this)) {
- return true;
- }
-
- if (isSessionClosed(session)) {
- try {
- LOG.info("Trying to reconnect to {}", getEndpoint().getConnectionString());
- session = createSession();
+ try {
+ LOG.info("Trying to reconnect to {}", getEndpoint().getConnectionString());
+ if (isServiceStopping(this)) {
return true;
- } catch (IOException e) {
- LOG.warn("Failed to reconnect to {}", getEndpoint().getConnectionString());
- closeSession();
+ }
- return false;
+ if (isSessionClosed(session)) {
+ return tryCreateSession();
}
+
+ LOG.info("Nothing to do: the session is not closed");
+ } catch (Exception e) {
+ LOG.error("Unable to reconnect to {}: {}", getEndpoint().getConnectionString(), e.getMessage(), e);
+ return false;
}
return true;
}
+ private boolean tryCreateSession() {
+ try {
+ LOG.info("Creating a new session to {}", getEndpoint().getConnectionString());
+ session = createSession();
+ LOG.info("Reconnected to {}", getEndpoint().getConnectionString());
+ return true;
+ } catch (IOException e) {
+ LOG.warn("Failed to reconnect to {}", getEndpoint().getConnectionString());
+ closeSession();
+
+ return false;
+ }
+ }
+
private void reconnect(final long initialReconnectDelay) {
if (reconnectLock.tryLock()) {
- BlockingTask task = newReconnectTask(this, getEndpoint(), initialReconnectDelay,
+ BlockingTask task = newReconnectTask(reconnectService, RECONNECT_TASK_NAME, initialReconnectDelay,
+ configuration.getReconnectDelay(),
configuration.getMaxReconnect());
try {
diff --git a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java
index a7bc510..c56d78e 100644
--- a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java
+++ b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java
@@ -17,6 +17,7 @@
package org.apache.camel.component.smpp;
import java.io.IOException;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.camel.Exchange;
@@ -32,15 +33,16 @@ import org.jsmpp.bean.TypeOfNumber;
import org.jsmpp.extra.SessionState;
import org.jsmpp.session.BindParameter;
import org.jsmpp.session.SMPPSession;
-import org.jsmpp.session.Session;
import org.jsmpp.session.SessionStateListener;
import org.jsmpp.util.DefaultComposer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.camel.component.smpp.SmppUtils.createExecutor;
import static org.apache.camel.component.smpp.SmppUtils.isServiceStopping;
import static org.apache.camel.component.smpp.SmppUtils.isSessionClosed;
import static org.apache.camel.component.smpp.SmppUtils.newReconnectTask;
+import static org.apache.camel.component.smpp.SmppUtils.shutdownReconnectService;
/**
* An implementation of @{link Producer} which use the SMPP protocol
@@ -48,27 +50,30 @@ import static org.apache.camel.component.smpp.SmppUtils.newReconnectTask;
public class SmppProducer extends DefaultProducer {
private static final Logger LOG = LoggerFactory.getLogger(SmppProducer.class);
+ private static final String RECONNECT_TASK_NAME = "smpp-producer-reconnect";
- private SmppConfiguration configuration;
- private SMPPSession session;
- private SessionStateListener internalSessionStateListener;
+ private final SmppConfiguration configuration;
+ private final SessionStateListener internalSessionStateListener;
private final ReentrantLock connectLock = new ReentrantLock();
+ private final ScheduledExecutorService reconnectService;
+
+ private SMPPSession session;
public SmppProducer(SmppEndpoint endpoint, SmppConfiguration config) {
super(endpoint);
+
+ this.reconnectService = createExecutor(this, endpoint, RECONNECT_TASK_NAME);
+
this.configuration = config;
- this.internalSessionStateListener = new SessionStateListener() {
- @Override
- public void onStateChange(SessionState newState, SessionState oldState, Session source) {
- if (configuration.getSessionStateListener() != null) {
- configuration.getSessionStateListener().onStateChange(newState, oldState, source);
- }
+ this.internalSessionStateListener = (newState, oldState, source) -> {
+ if (configuration.getSessionStateListener() != null) {
+ configuration.getSessionStateListener().onStateChange(newState, oldState, source);
+ }
- if (newState.equals(SessionState.CLOSED)) {
- LOG.warn("Lost connection to: {} - trying to reconnect...", getEndpoint().getConnectionString());
- closeSession();
- reconnect(configuration.getInitialReconnectDelay());
- }
+ if (newState.equals(SessionState.CLOSED)) {
+ LOG.warn("Lost connection to: {} - trying to reconnect...", getEndpoint().getConnectionString());
+ closeSession();
+ reconnect(configuration.getInitialReconnectDelay());
}
};
}
@@ -164,6 +169,8 @@ public class SmppProducer extends DefaultProducer {
@Override
protected void doStop() throws Exception {
+ shutdownReconnectService(reconnectService);
+
LOG.debug("Disconnecting from: {}...", getEndpoint().getConnectionString());
super.doStop();
@@ -183,8 +190,8 @@ public class SmppProducer extends DefaultProducer {
private void reconnect(final long initialReconnectDelay) {
if (connectLock.tryLock()) {
- BlockingTask task = newReconnectTask(this, getEndpoint(), initialReconnectDelay,
- configuration.getMaxReconnect());
+ BlockingTask task = newReconnectTask(reconnectService, RECONNECT_TASK_NAME, initialReconnectDelay,
+ configuration.getReconnectDelay(), configuration.getMaxReconnect());
try {
task.run(this::doReconnect);
@@ -195,26 +202,38 @@ public class SmppProducer extends DefaultProducer {
}
private boolean doReconnect() {
- if (isServiceStopping(this)) {
- return true;
- }
-
- if (isSessionClosed(session)) {
- try {
- LOG.info("Trying to reconnect to {}", getEndpoint().getConnectionString());
- session = createSession();
+ try {
+ LOG.info("Trying to reconnect to {}", getEndpoint().getConnectionString());
+ if (isServiceStopping(this)) {
return true;
- } catch (IOException e) {
- LOG.warn("Failed to reconnect to {}", getEndpoint().getConnectionString());
- closeSession();
+ }
- return false;
+ if (isSessionClosed(session)) {
+ return tryCreateSession();
}
+
+ LOG.info("Nothing to do: the session is not closed");
+ } catch (Exception e) {
+ LOG.error("Unable to reconnect to {}: {}", getEndpoint().getConnectionString(), e.getMessage(), e);
+ return false;
}
return true;
}
+ private boolean tryCreateSession() {
+ try {
+
+ session = createSession();
+ return true;
+ } catch (IOException e) {
+ LOG.warn("Failed to reconnect to {}", getEndpoint().getConnectionString());
+ closeSession();
+
+ return false;
+ }
+ }
+
@Override
public SmppEndpoint getEndpoint() {
return (SmppEndpoint) super.getEndpoint();
diff --git a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppUtils.java b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppUtils.java
index 2d2eaf0..44867e0 100644
--- a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppUtils.java
+++ b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppUtils.java
@@ -19,9 +19,12 @@ package org.apache.camel.component.smpp;
import java.time.Duration;
import java.util.Calendar;
import java.util.Date;
+import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import org.apache.camel.Endpoint;
+import org.apache.camel.spi.ExecutorServiceManager;
import org.apache.camel.support.service.BaseService;
import org.apache.camel.support.task.BlockingTask;
import org.apache.camel.support.task.Tasks;
@@ -34,9 +37,10 @@ import org.jsmpp.extra.SessionState;
import org.jsmpp.session.SMPPSession;
import org.jsmpp.util.AbsoluteTimeFormatter;
import org.jsmpp.util.TimeFormatter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public final class SmppUtils {
-
/**
* See http://unicode.org/Public/MAPPINGS/ETSI/GSM0338.TXT
*/
@@ -67,6 +71,7 @@ public final class SmppUtils {
{ 60, 91 }, { 61, 126 }, { 62, 93 }, { 64, 124 }, { 101, 164 }
};
+ private static final Logger LOG = LoggerFactory.getLogger(SmppUtils.class);
private static final TimeFormatter TIME_FORMATTER = new AbsoluteTimeFormatter();
private SmppUtils() {
@@ -281,21 +286,37 @@ public final class SmppUtils {
return session == null || session.getSessionState().equals(SessionState.CLOSED);
}
- public static BlockingTask newReconnectTask(
- BaseService source, Endpoint endpoint, long initialReconnectDelay,
- int maxReconnect) {
- final String taskName = "smpp-reconnect";
- ScheduledExecutorService service = endpoint.getCamelContext().getExecutorServiceManager()
- .newSingleThreadScheduledExecutor(source, taskName);
+ public static ScheduledExecutorService createExecutor(BaseService service, Endpoint endpoint, String taskName) {
+ if (endpoint.getCamelContext() != null && endpoint.getCamelContext().getExecutorServiceManager() != null) {
+ ExecutorServiceManager manager = endpoint.getCamelContext().getExecutorServiceManager();
+ return manager.newSingleThreadScheduledExecutor(service, taskName);
+ } else {
+ LOG.warn("Not using the Camel scheduled thread executor");
+ return Executors.newSingleThreadScheduledExecutor();
+ }
+ }
+ public static BlockingTask newReconnectTask(
+ ScheduledExecutorService service, String taskName, long initialReconnectDelay,
+ long reconnectDelay, int maxReconnect) {
return Tasks.backgroundTask()
.withBudget(Budgets.iterationTimeBudget()
.withInitialDelay(Duration.ofMillis(initialReconnectDelay))
.withMaxIterations(maxReconnect)
.withUnlimitedDuration()
+ .withInterval(Duration.ofMillis(reconnectDelay))
.build())
.withScheduledExecutor(service)
.withName(taskName)
.build();
}
+
+ public static void shutdownReconnectService(ScheduledExecutorService service) throws InterruptedException {
+ service.shutdown();
+ if (!service.awaitTermination(1, TimeUnit.SECONDS)) {
+ LOG.warn("The reconnect service did not finish executing within the timeout");
+
+ service.shutdownNow();
+ }
+ }
}
diff --git a/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/integration/SmppConsumerReconnectManualIT.java b/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/integration/SmppConsumerReconnectManualIT.java
index 396aa65..4063ed3 100644
--- a/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/integration/SmppConsumerReconnectManualIT.java
+++ b/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/integration/SmppConsumerReconnectManualIT.java
@@ -24,7 +24,11 @@ import org.junit.jupiter.api.Test;
/**
* Spring based integration test for the smpp component. To run this test, ensure that the SMSC is running on: host:
* localhost port: 2775 user: smppclient password: password <br/>
- * A SMSC for test is available here: http://www.seleniumsoftware.com/downloads.html
+ * In the past, a SMSC for test was available here: http://www.seleniumsoftware.com/downloads.html.
+ *
+ * Since it is not available anymore, it's possible to test the reconnect logic manually using the nc CLI tool:
+ *
+ * nc -lv 2775
*/
@Disabled("Must be manually tested")
public class SmppConsumerReconnectManualIT extends CamelTestSupport {
diff --git a/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/integration/SmppProducerReconnectManualIT.java b/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/integration/SmppProducerReconnectManualIT.java
index 824620e..3f74b2c 100644
--- a/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/integration/SmppProducerReconnectManualIT.java
+++ b/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/integration/SmppProducerReconnectManualIT.java
@@ -24,7 +24,11 @@ import org.junit.jupiter.api.Test;
/**
* Spring based integration test for the smpp component. To run this test, ensure that the SMSC is running on: host:
* localhost port: 2775 user: smppclient password: password <br/>
- * A SMSC for test is available here: http://www.seleniumsoftware.com/downloads.html
+ * In the past, a SMSC for test was available here: http://www.seleniumsoftware.com/downloads.html.
+ *
+ * Since it is not available anymore, it's possible to test the reconnect logic manually using the nc CLI tool:
+ *
+ * nc -lv 2775
*/
@Disabled("Must be manually tested")
public class SmppProducerReconnectManualIT extends CamelTestSupport {
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java b/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java
index 3f4ee1e..01fe497 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java
@@ -20,6 +20,7 @@ package org.apache.camel.support.task;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
@@ -127,29 +128,27 @@ public class BackgroundTask implements BlockingTask {
public <T> boolean run(Predicate<T> predicate, T payload) {
CountDownLatch latch = new CountDownLatch(1);
- // We need it to be cancellable/non-runnable after reaching a certain point, and it needs to be deterministic.
- // This is why we ignore the ScheduledFuture returned and implement the go/no-go using a latch.
- service.scheduleAtFixedRate(() -> runTaskWrapper(latch, predicate, payload),
+ Future<?> task = service.scheduleAtFixedRate(() -> runTaskWrapper(latch, predicate, payload),
budget.initialDelay(), budget.interval(), TimeUnit.MILLISECONDS);
- return waitForTaskCompletion(latch, service);
+ return waitForTaskCompletion(latch, task);
}
@Override
public boolean run(BooleanSupplier supplier) {
CountDownLatch latch = new CountDownLatch(1);
- // We need it to be cancellable/non-runnable after reaching a certain point, and it needs to be deterministic.
- // This is why we ignore the ScheduledFuture returned and implement the go/no-go using a latch.
- service.scheduleAtFixedRate(() -> runTaskWrapper(latch, supplier), budget.initialDelay(),
+ Future<?> task = service.scheduleAtFixedRate(() -> runTaskWrapper(latch, supplier), budget.initialDelay(),
budget.interval(), TimeUnit.MILLISECONDS);
- return waitForTaskCompletion(latch, service);
+ return waitForTaskCompletion(latch, task);
}
- private boolean waitForTaskCompletion(CountDownLatch latch, ScheduledExecutorService service) {
+ private boolean waitForTaskCompletion(CountDownLatch latch, Future<?> task) {
boolean completed = false;
try {
+ // We need it to be cancellable/non-runnable after reaching a certain point, and it needs to be deterministic.
+ // This is why we ignore the ScheduledFuture returned and implement the go/no-go using a latch.
if (budget.maxDuration() == TimeBoundedBudget.UNLIMITED_DURATION) {
latch.await();
completed = true;
@@ -163,14 +162,12 @@ public class BackgroundTask implements BlockingTask {
}
}
- service.shutdown();
- service.awaitTermination(1, TimeUnit.SECONDS);
+ task.cancel(true);
} catch (InterruptedException e) {
- LOG.warn("Interrupted while waiting for the repeatable task to execute");
+ LOG.warn("Interrupted while waiting for the repeatable task to execute: {}", e.getMessage(), e);
Thread.currentThread().interrupt();
} finally {
elapsed = budget.elapsed();
- service.shutdownNow();
}
return completed;
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/task/budget/TimeBoundedBudget.java b/core/camel-support/src/main/java/org/apache/camel/support/task/budget/TimeBoundedBudget.java
index 1c57329..ed48374 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/task/budget/TimeBoundedBudget.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/task/budget/TimeBoundedBudget.java
@@ -55,7 +55,12 @@ public class TimeBoundedBudget implements TimeBudget {
@Override
public boolean canContinue() {
- // ... if time budget is NOT exhausted
+ // ... unless running forever
+ if (maxDuration == UNLIMITED_DURATION) {
+ return true;
+ }
+
+ // ... or if time budget is NOT exhausted
if (elapsed().toMillis() >= maxDuration) {
return false;
}
diff --git a/core/camel-support/src/test/java/org/apache/camel/support/task/BackgroundIterationTimeTaskTest.java b/core/camel-support/src/test/java/org/apache/camel/support/task/BackgroundIterationTimeTaskTest.java
index b4fb5d2..cd5f7fb 100644
--- a/core/camel-support/src/test/java/org/apache/camel/support/task/BackgroundIterationTimeTaskTest.java
+++ b/core/camel-support/src/test/java/org/apache/camel/support/task/BackgroundIterationTimeTaskTest.java
@@ -145,4 +145,22 @@ public class BackgroundIterationTimeTaskTest extends TaskTestSupport {
assertTrue(taskCount < maxIterations);
assertFalse(completed, "The task did not complete because of timeout, the return should be false");
}
+
+ @DisplayName("Test that the task runs until the boolean supplier succeeds")
+ @Test
+ @Timeout(10)
+ void testRunNoMoreBooleanSupplierWithForever() {
+ BackgroundTask task = Tasks.backgroundTask()
+ .withScheduledExecutor(Executors.newSingleThreadScheduledExecutor())
+ .withBudget(Budgets.iterationTimeBudget()
+ .withMaxIterations(Integer.MAX_VALUE)
+ .withInitialDelay(Duration.ofSeconds(1))
+ .withUnlimitedDuration()
+ .build())
+ .build();
+
+ boolean completed = task.run(this::taskPredicateWithDeterministicStop, 4);
+ assertTrue(maxIterations > taskCount, "The task execution should not exceed the max iterations");
+ assertTrue(completed, "The task did not complete, the return should be false");
+ }
}