You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2022/01/13 16:27:43 UTC

[camel] branch camel-3.14.x updated: Backport CAMEL-17472 (#6738)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-3.14.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.14.x by this push:
     new 436a019  Backport CAMEL-17472 (#6738)
436a019 is described below

commit 436a019d4053eae131c79c1886ef67d86280d1f4
Author: Otavio Rodolfo Piske <or...@users.noreply.github.com>
AuthorDate: Thu Jan 13 17:26:05 2022 +0100

    Backport CAMEL-17472 (#6738)
    
    * CAMEL-17472: fix consumer reconnect no longer works
    
    Includes:
    - do comply with unlimited duration tasks
    - improved log messages for easier debug
    
    * camel-smpp: updated details about running the manual integration tests
    
    * CAMEL-17477: respect the re-connect delay when reconnecting
    
    * CAMEL-17472: do not exhaust scheduled service
    
    Includes:
    - allow giving more time to the shutdown of background tasks
    - fix preventing other tasks from being scheduled
    - minor related code cleanup
---
 .../apache/camel/component/smpp/SmppConsumer.java  | 99 +++++++++++++---------
 .../apache/camel/component/smpp/SmppProducer.java  | 77 ++++++++++-------
 .../org/apache/camel/component/smpp/SmppUtils.java | 35 ++++++--
 .../integration/SmppConsumerReconnectManualIT.java |  6 +-
 .../integration/SmppProducerReconnectManualIT.java |  6 +-
 .../apache/camel/support/task/BackgroundTask.java  | 23 +++--
 .../support/task/budget/TimeBoundedBudget.java     |  7 +-
 .../task/BackgroundIterationTimeTaskTest.java      | 18 ++++
 8 files changed, 180 insertions(+), 91 deletions(-)

diff --git a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java
index 2368bba..8723c1e 100644
--- a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java
+++ b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.smpp;
 
 import java.io.IOException;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.camel.Processor;
@@ -32,15 +33,16 @@ import org.jsmpp.extra.SessionState;
 import org.jsmpp.session.BindParameter;
 import org.jsmpp.session.MessageReceiverListener;
 import org.jsmpp.session.SMPPSession;
-import org.jsmpp.session.Session;
 import org.jsmpp.session.SessionStateListener;
 import org.jsmpp.util.DefaultComposer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.camel.component.smpp.SmppUtils.createExecutor;
 import static org.apache.camel.component.smpp.SmppUtils.isServiceStopping;
 import static org.apache.camel.component.smpp.SmppUtils.isSessionClosed;
 import static org.apache.camel.component.smpp.SmppUtils.newReconnectTask;
+import static org.apache.camel.component.smpp.SmppUtils.shutdownReconnectService;
 
 /**
  * An implementation of consumer which use the SMPP protocol
@@ -48,13 +50,16 @@ import static org.apache.camel.component.smpp.SmppUtils.newReconnectTask;
 public class SmppConsumer extends DefaultConsumer {
 
     private static final Logger LOG = LoggerFactory.getLogger(SmppConsumer.class);
+    private static final String RECONNECT_TASK_NAME = "smpp-consumer-reconnect";
 
-    private SmppConfiguration configuration;
-    private SMPPSession session;
-    private MessageReceiverListener messageReceiverListener;
-    private SessionStateListener internalSessionStateListener;
+    private final SmppConfiguration configuration;
+    private final MessageReceiverListener messageReceiverListener;
+    private final SessionStateListener internalSessionStateListener;
 
     private final ReentrantLock reconnectLock = new ReentrantLock();
+    private final ScheduledExecutorService reconnectService;
+
+    private SMPPSession session;
 
     /**
      * The constructor which gets a smpp endpoint, a smpp configuration and a processor
@@ -62,19 +67,19 @@ public class SmppConsumer extends DefaultConsumer {
     public SmppConsumer(SmppEndpoint endpoint, SmppConfiguration config, Processor processor) {
         super(endpoint, processor);
 
+        this.reconnectService = createExecutor(this, endpoint, RECONNECT_TASK_NAME);
+
         this.configuration = config;
-        this.internalSessionStateListener = new SessionStateListener() {
-            @Override
-            public void onStateChange(SessionState newState, SessionState oldState, Session source) {
-                if (configuration.getSessionStateListener() != null) {
-                    configuration.getSessionStateListener().onStateChange(newState, oldState, source);
-                }
-
-                if (newState.equals(SessionState.CLOSED)) {
-                    LOG.warn("Lost connection to: {} - trying to reconnect...", getEndpoint().getConnectionString());
-                    closeSession();
-                    reconnect(configuration.getInitialReconnectDelay());
-                }
+        this.internalSessionStateListener = (newState, oldState, source) -> {
+            if (configuration.getSessionStateListener() != null) {
+                configuration.getSessionStateListener().onStateChange(newState, oldState, source);
+            }
+
+            if (newState.equals(SessionState.CLOSED)) {
+                LOG.warn("Lost connection to: {} - trying to reconnect...", getEndpoint().getConnectionString());
+                closeSession();
+
+                reconnect(configuration.getInitialReconnectDelay());
             }
         };
         this.messageReceiverListener
@@ -92,21 +97,21 @@ public class SmppConsumer extends DefaultConsumer {
     }
 
     private SMPPSession createSession() throws IOException {
-        SMPPSession session = createSMPPSession();
-        session.setEnquireLinkTimer(configuration.getEnquireLinkTimer());
-        session.setTransactionTimer(configuration.getTransactionTimer());
-        session.setPduProcessorDegree(this.configuration.getPduProcessorDegree());
-        session.setQueueCapacity(this.configuration.getPduProcessorQueueCapacity());
-        session.addSessionStateListener(internalSessionStateListener);
-        session.setMessageReceiverListener(messageReceiverListener);
-        session.connectAndBind(this.configuration.getHost(), this.configuration.getPort(),
+        SMPPSession newSession = createSMPPSession();
+        newSession.setEnquireLinkTimer(configuration.getEnquireLinkTimer());
+        newSession.setTransactionTimer(configuration.getTransactionTimer());
+        newSession.setPduProcessorDegree(this.configuration.getPduProcessorDegree());
+        newSession.setQueueCapacity(this.configuration.getPduProcessorQueueCapacity());
+        newSession.addSessionStateListener(internalSessionStateListener);
+        newSession.setMessageReceiverListener(messageReceiverListener);
+        newSession.connectAndBind(this.configuration.getHost(), this.configuration.getPort(),
                 new BindParameter(
                         BindType.BIND_RX, this.configuration.getSystemId(),
                         this.configuration.getPassword(), this.configuration.getSystemType(),
                         TypeOfNumber.UNKNOWN, NumberingPlanIndicator.UNKNOWN,
                         configuration.getAddressRange()));
 
-        return session;
+        return newSession;
     }
 
     /**
@@ -125,6 +130,8 @@ public class SmppConsumer extends DefaultConsumer {
 
     @Override
     protected void doStop() throws Exception {
+        shutdownReconnectService(reconnectService);
+
         LOG.debug("Disconnecting from: {}...", getEndpoint().getConnectionString());
 
         super.doStop();
@@ -143,29 +150,43 @@ public class SmppConsumer extends DefaultConsumer {
     }
 
     private boolean doReconnect() {
-        if (isServiceStopping(this)) {
-            return true;
-        }
-
-        if (isSessionClosed(session)) {
-            try {
-                LOG.info("Trying to reconnect to {}", getEndpoint().getConnectionString());
-                session = createSession();
+        try {
+            LOG.info("Trying to reconnect to {}", getEndpoint().getConnectionString());
+            if (isServiceStopping(this)) {
                 return true;
-            } catch (IOException e) {
-                LOG.warn("Failed to reconnect to {}", getEndpoint().getConnectionString());
-                closeSession();
+            }
 
-                return false;
+            if (isSessionClosed(session)) {
+                return tryCreateSession();
             }
+
+            LOG.info("Nothing to do: the session is not closed");
+        } catch (Exception e) {
+            LOG.error("Unable to reconnect to {}: {}", getEndpoint().getConnectionString(), e.getMessage(), e);
+            return false;
         }
 
         return true;
     }
 
+    private boolean tryCreateSession() {
+        try {
+            LOG.info("Creating a new session to {}", getEndpoint().getConnectionString());
+            session = createSession();
+            LOG.info("Reconnected to {}", getEndpoint().getConnectionString());
+            return true;
+        } catch (IOException e) {
+            LOG.warn("Failed to reconnect to {}", getEndpoint().getConnectionString());
+            closeSession();
+
+            return false;
+        }
+    }
+
     private void reconnect(final long initialReconnectDelay) {
         if (reconnectLock.tryLock()) {
-            BlockingTask task = newReconnectTask(this, getEndpoint(), initialReconnectDelay,
+            BlockingTask task = newReconnectTask(reconnectService, RECONNECT_TASK_NAME, initialReconnectDelay,
+                    configuration.getReconnectDelay(),
                     configuration.getMaxReconnect());
 
             try {
diff --git a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java
index a7bc510..c56d78e 100644
--- a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java
+++ b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.smpp;
 
 import java.io.IOException;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.camel.Exchange;
@@ -32,15 +33,16 @@ import org.jsmpp.bean.TypeOfNumber;
 import org.jsmpp.extra.SessionState;
 import org.jsmpp.session.BindParameter;
 import org.jsmpp.session.SMPPSession;
-import org.jsmpp.session.Session;
 import org.jsmpp.session.SessionStateListener;
 import org.jsmpp.util.DefaultComposer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.camel.component.smpp.SmppUtils.createExecutor;
 import static org.apache.camel.component.smpp.SmppUtils.isServiceStopping;
 import static org.apache.camel.component.smpp.SmppUtils.isSessionClosed;
 import static org.apache.camel.component.smpp.SmppUtils.newReconnectTask;
+import static org.apache.camel.component.smpp.SmppUtils.shutdownReconnectService;
 
 /**
  * An implementation of @{link Producer} which use the SMPP protocol
@@ -48,27 +50,30 @@ import static org.apache.camel.component.smpp.SmppUtils.newReconnectTask;
 public class SmppProducer extends DefaultProducer {
 
     private static final Logger LOG = LoggerFactory.getLogger(SmppProducer.class);
+    private static final String RECONNECT_TASK_NAME = "smpp-producer-reconnect";
 
-    private SmppConfiguration configuration;
-    private SMPPSession session;
-    private SessionStateListener internalSessionStateListener;
+    private final SmppConfiguration configuration;
+    private final SessionStateListener internalSessionStateListener;
     private final ReentrantLock connectLock = new ReentrantLock();
+    private final ScheduledExecutorService reconnectService;
+
+    private SMPPSession session;
 
     public SmppProducer(SmppEndpoint endpoint, SmppConfiguration config) {
         super(endpoint);
+
+        this.reconnectService = createExecutor(this, endpoint, RECONNECT_TASK_NAME);
+
         this.configuration = config;
-        this.internalSessionStateListener = new SessionStateListener() {
-            @Override
-            public void onStateChange(SessionState newState, SessionState oldState, Session source) {
-                if (configuration.getSessionStateListener() != null) {
-                    configuration.getSessionStateListener().onStateChange(newState, oldState, source);
-                }
+        this.internalSessionStateListener = (newState, oldState, source) -> {
+            if (configuration.getSessionStateListener() != null) {
+                configuration.getSessionStateListener().onStateChange(newState, oldState, source);
+            }
 
-                if (newState.equals(SessionState.CLOSED)) {
-                    LOG.warn("Lost connection to: {} - trying to reconnect...", getEndpoint().getConnectionString());
-                    closeSession();
-                    reconnect(configuration.getInitialReconnectDelay());
-                }
+            if (newState.equals(SessionState.CLOSED)) {
+                LOG.warn("Lost connection to: {} - trying to reconnect...", getEndpoint().getConnectionString());
+                closeSession();
+                reconnect(configuration.getInitialReconnectDelay());
             }
         };
     }
@@ -164,6 +169,8 @@ public class SmppProducer extends DefaultProducer {
 
     @Override
     protected void doStop() throws Exception {
+        shutdownReconnectService(reconnectService);
+
         LOG.debug("Disconnecting from: {}...", getEndpoint().getConnectionString());
 
         super.doStop();
@@ -183,8 +190,8 @@ public class SmppProducer extends DefaultProducer {
 
     private void reconnect(final long initialReconnectDelay) {
         if (connectLock.tryLock()) {
-            BlockingTask task = newReconnectTask(this, getEndpoint(), initialReconnectDelay,
-                    configuration.getMaxReconnect());
+            BlockingTask task = newReconnectTask(reconnectService, RECONNECT_TASK_NAME, initialReconnectDelay,
+                    configuration.getReconnectDelay(), configuration.getMaxReconnect());
 
             try {
                 task.run(this::doReconnect);
@@ -195,26 +202,38 @@ public class SmppProducer extends DefaultProducer {
     }
 
     private boolean doReconnect() {
-        if (isServiceStopping(this)) {
-            return true;
-        }
-
-        if (isSessionClosed(session)) {
-            try {
-                LOG.info("Trying to reconnect to {}", getEndpoint().getConnectionString());
-                session = createSession();
+        try {
+            LOG.info("Trying to reconnect to {}", getEndpoint().getConnectionString());
+            if (isServiceStopping(this)) {
                 return true;
-            } catch (IOException e) {
-                LOG.warn("Failed to reconnect to {}", getEndpoint().getConnectionString());
-                closeSession();
+            }
 
-                return false;
+            if (isSessionClosed(session)) {
+                return tryCreateSession();
             }
+
+            LOG.info("Nothing to do: the session is not closed");
+        } catch (Exception e) {
+            LOG.error("Unable to reconnect to {}: {}", getEndpoint().getConnectionString(), e.getMessage(), e);
+            return false;
         }
 
         return true;
     }
 
+    private boolean tryCreateSession() {
+        try {
+
+            session = createSession();
+            return true;
+        } catch (IOException e) {
+            LOG.warn("Failed to reconnect to {}", getEndpoint().getConnectionString());
+            closeSession();
+
+            return false;
+        }
+    }
+
     @Override
     public SmppEndpoint getEndpoint() {
         return (SmppEndpoint) super.getEndpoint();
diff --git a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppUtils.java b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppUtils.java
index 2d2eaf0..44867e0 100644
--- a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppUtils.java
+++ b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppUtils.java
@@ -19,9 +19,12 @@ package org.apache.camel.component.smpp;
 import java.time.Duration;
 import java.util.Calendar;
 import java.util.Date;
+import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.Endpoint;
+import org.apache.camel.spi.ExecutorServiceManager;
 import org.apache.camel.support.service.BaseService;
 import org.apache.camel.support.task.BlockingTask;
 import org.apache.camel.support.task.Tasks;
@@ -34,9 +37,10 @@ import org.jsmpp.extra.SessionState;
 import org.jsmpp.session.SMPPSession;
 import org.jsmpp.util.AbsoluteTimeFormatter;
 import org.jsmpp.util.TimeFormatter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public final class SmppUtils {
-
     /**
      * See http://unicode.org/Public/MAPPINGS/ETSI/GSM0338.TXT
      */
@@ -67,6 +71,7 @@ public final class SmppUtils {
             { 60, 91 }, { 61, 126 }, { 62, 93 }, { 64, 124 }, { 101, 164 }
     };
 
+    private static final Logger LOG = LoggerFactory.getLogger(SmppUtils.class);
     private static final TimeFormatter TIME_FORMATTER = new AbsoluteTimeFormatter();
 
     private SmppUtils() {
@@ -281,21 +286,37 @@ public final class SmppUtils {
         return session == null || session.getSessionState().equals(SessionState.CLOSED);
     }
 
-    public static BlockingTask newReconnectTask(
-            BaseService source, Endpoint endpoint, long initialReconnectDelay,
-            int maxReconnect) {
-        final String taskName = "smpp-reconnect";
-        ScheduledExecutorService service = endpoint.getCamelContext().getExecutorServiceManager()
-                .newSingleThreadScheduledExecutor(source, taskName);
+    public static ScheduledExecutorService createExecutor(BaseService service, Endpoint endpoint, String taskName) {
+        if (endpoint.getCamelContext() != null && endpoint.getCamelContext().getExecutorServiceManager() != null) {
+            ExecutorServiceManager manager = endpoint.getCamelContext().getExecutorServiceManager();
+            return manager.newSingleThreadScheduledExecutor(service, taskName);
+        } else {
+            LOG.warn("Not using the Camel scheduled thread executor");
+            return Executors.newSingleThreadScheduledExecutor();
+        }
+    }
 
+    public static BlockingTask newReconnectTask(
+            ScheduledExecutorService service, String taskName, long initialReconnectDelay,
+            long reconnectDelay, int maxReconnect) {
         return Tasks.backgroundTask()
                 .withBudget(Budgets.iterationTimeBudget()
                         .withInitialDelay(Duration.ofMillis(initialReconnectDelay))
                         .withMaxIterations(maxReconnect)
                         .withUnlimitedDuration()
+                        .withInterval(Duration.ofMillis(reconnectDelay))
                         .build())
                 .withScheduledExecutor(service)
                 .withName(taskName)
                 .build();
     }
+
+    public static void shutdownReconnectService(ScheduledExecutorService service) throws InterruptedException {
+        service.shutdown();
+        if (!service.awaitTermination(1, TimeUnit.SECONDS)) {
+            LOG.warn("The reconnect service did not finish executing within the timeout");
+
+            service.shutdownNow();
+        }
+    }
 }
diff --git a/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/integration/SmppConsumerReconnectManualIT.java b/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/integration/SmppConsumerReconnectManualIT.java
index 396aa65..4063ed3 100644
--- a/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/integration/SmppConsumerReconnectManualIT.java
+++ b/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/integration/SmppConsumerReconnectManualIT.java
@@ -24,7 +24,11 @@ import org.junit.jupiter.api.Test;
 /**
  * Spring based integration test for the smpp component. To run this test, ensure that the SMSC is running on: host:
  * localhost port: 2775 user: smppclient password: password <br/>
- * A SMSC for test is available here: http://www.seleniumsoftware.com/downloads.html
+ * In the past, a SMSC for test was available here: http://www.seleniumsoftware.com/downloads.html.
+ * 
+ * Since it is not available anymore, it's possible to test the reconnect logic manually using the nc CLI tool:
+ *
+ * nc -lv 2775
  */
 @Disabled("Must be manually tested")
 public class SmppConsumerReconnectManualIT extends CamelTestSupport {
diff --git a/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/integration/SmppProducerReconnectManualIT.java b/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/integration/SmppProducerReconnectManualIT.java
index 824620e..3f74b2c 100644
--- a/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/integration/SmppProducerReconnectManualIT.java
+++ b/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/integration/SmppProducerReconnectManualIT.java
@@ -24,7 +24,11 @@ import org.junit.jupiter.api.Test;
 /**
  * Spring based integration test for the smpp component. To run this test, ensure that the SMSC is running on: host:
  * localhost port: 2775 user: smppclient password: password <br/>
- * A SMSC for test is available here: http://www.seleniumsoftware.com/downloads.html
+ * In the past, a SMSC for test was available here: http://www.seleniumsoftware.com/downloads.html.
+ *
+ * Since it is not available anymore, it's possible to test the reconnect logic manually using the nc CLI tool:
+ *
+ * nc -lv 2775
  */
 @Disabled("Must be manually tested")
 public class SmppProducerReconnectManualIT extends CamelTestSupport {
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java b/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java
index 3f4ee1e..01fe497 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java
@@ -20,6 +20,7 @@ package org.apache.camel.support.task;
 import java.time.Duration;
 import java.util.Objects;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BooleanSupplier;
@@ -127,29 +128,27 @@ public class BackgroundTask implements BlockingTask {
     public <T> boolean run(Predicate<T> predicate, T payload) {
         CountDownLatch latch = new CountDownLatch(1);
 
-        // We need it to be cancellable/non-runnable after reaching a certain point, and it needs to be deterministic.
-        // This is why we ignore the ScheduledFuture returned and implement the go/no-go using a latch.
-        service.scheduleAtFixedRate(() -> runTaskWrapper(latch, predicate, payload),
+        Future<?> task = service.scheduleAtFixedRate(() -> runTaskWrapper(latch, predicate, payload),
                 budget.initialDelay(), budget.interval(), TimeUnit.MILLISECONDS);
 
-        return waitForTaskCompletion(latch, service);
+        return waitForTaskCompletion(latch, task);
     }
 
     @Override
     public boolean run(BooleanSupplier supplier) {
         CountDownLatch latch = new CountDownLatch(1);
 
-        // We need it to be cancellable/non-runnable after reaching a certain point, and it needs to be deterministic.
-        // This is why we ignore the ScheduledFuture returned and implement the go/no-go using a latch.
-        service.scheduleAtFixedRate(() -> runTaskWrapper(latch, supplier), budget.initialDelay(),
+        Future<?> task = service.scheduleAtFixedRate(() -> runTaskWrapper(latch, supplier), budget.initialDelay(),
                 budget.interval(), TimeUnit.MILLISECONDS);
 
-        return waitForTaskCompletion(latch, service);
+        return waitForTaskCompletion(latch, task);
     }
 
-    private boolean waitForTaskCompletion(CountDownLatch latch, ScheduledExecutorService service) {
+    private boolean waitForTaskCompletion(CountDownLatch latch, Future<?> task) {
         boolean completed = false;
         try {
+            // We need it to be cancellable/non-runnable after reaching a certain point, and it needs to be deterministic.
+            // This is why we ignore the ScheduledFuture returned and implement the go/no-go using a latch.
             if (budget.maxDuration() == TimeBoundedBudget.UNLIMITED_DURATION) {
                 latch.await();
                 completed = true;
@@ -163,14 +162,12 @@ public class BackgroundTask implements BlockingTask {
                 }
             }
 
-            service.shutdown();
-            service.awaitTermination(1, TimeUnit.SECONDS);
+            task.cancel(true);
         } catch (InterruptedException e) {
-            LOG.warn("Interrupted while waiting for the repeatable task to execute");
+            LOG.warn("Interrupted while waiting for the repeatable task to execute: {}", e.getMessage(), e);
             Thread.currentThread().interrupt();
         } finally {
             elapsed = budget.elapsed();
-            service.shutdownNow();
         }
 
         return completed;
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/task/budget/TimeBoundedBudget.java b/core/camel-support/src/main/java/org/apache/camel/support/task/budget/TimeBoundedBudget.java
index 1c57329..ed48374 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/task/budget/TimeBoundedBudget.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/task/budget/TimeBoundedBudget.java
@@ -55,7 +55,12 @@ public class TimeBoundedBudget implements TimeBudget {
 
     @Override
     public boolean canContinue() {
-        // ... if time budget is NOT exhausted
+        // ... unless running forever
+        if (maxDuration == UNLIMITED_DURATION) {
+            return true;
+        }
+
+        // ... or if time budget is NOT exhausted
         if (elapsed().toMillis() >= maxDuration) {
             return false;
         }
diff --git a/core/camel-support/src/test/java/org/apache/camel/support/task/BackgroundIterationTimeTaskTest.java b/core/camel-support/src/test/java/org/apache/camel/support/task/BackgroundIterationTimeTaskTest.java
index b4fb5d2..cd5f7fb 100644
--- a/core/camel-support/src/test/java/org/apache/camel/support/task/BackgroundIterationTimeTaskTest.java
+++ b/core/camel-support/src/test/java/org/apache/camel/support/task/BackgroundIterationTimeTaskTest.java
@@ -145,4 +145,22 @@ public class BackgroundIterationTimeTaskTest extends TaskTestSupport {
         assertTrue(taskCount < maxIterations);
         assertFalse(completed, "The task did not complete because of timeout, the return should be false");
     }
+
+    @DisplayName("Test that the task runs until the boolean supplier succeeds")
+    @Test
+    @Timeout(10)
+    void testRunNoMoreBooleanSupplierWithForever() {
+        BackgroundTask task = Tasks.backgroundTask()
+                .withScheduledExecutor(Executors.newSingleThreadScheduledExecutor())
+                .withBudget(Budgets.iterationTimeBudget()
+                        .withMaxIterations(Integer.MAX_VALUE)
+                        .withInitialDelay(Duration.ofSeconds(1))
+                        .withUnlimitedDuration()
+                        .build())
+                .build();
+
+        boolean completed = task.run(this::taskPredicateWithDeterministicStop, 4);
+        assertTrue(maxIterations > taskCount, "The task execution should not exceed the max iterations");
+        assertTrue(completed, "The task did not complete, the return should be false");
+    }
 }