You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/01/13 14:55:50 UTC

[camel] branch main updated (964fb60 -> eccfa64)

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git.


    from 964fb60  Camel-HTTP: Use String.indexOf(String) instead of String.indexOf(char)
     new 9e6aa00  CAMEL-17472: fix consumer reconnect no longer works
     new 88519f8  camel-smpp: updated details about running the manual integration tests
     new cbd1b6e  CAMEL-17477: respect the re-connect delay when reconnecting
     new eccfa64  CAMEL-17472: do not exhaust scheduled service

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/camel/component/smpp/SmppConsumer.java  | 99 +++++++++++++---------
 .../apache/camel/component/smpp/SmppProducer.java  | 77 ++++++++++-------
 .../org/apache/camel/component/smpp/SmppUtils.java | 35 ++++++--
 .../integration/SmppConsumerReconnectManualIT.java |  6 +-
 .../integration/SmppProducerReconnectManualIT.java |  6 +-
 .../apache/camel/support/task/BackgroundTask.java  | 23 +++--
 .../support/task/budget/TimeBoundedBudget.java     |  7 +-
 .../task/BackgroundIterationTimeTaskTest.java      | 18 ++++
 8 files changed, 180 insertions(+), 91 deletions(-)

[camel] 03/04: CAMEL-17477: respect the re-connect delay when reconnecting

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit cbd1b6eaeae49f3d98fb5d7df3c26e8f8499a52d
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Jan 12 13:56:18 2022 +0100

    CAMEL-17477: respect the re-connect delay when reconnecting
---
 .../src/main/java/org/apache/camel/component/smpp/SmppConsumer.java    | 1 +
 .../src/main/java/org/apache/camel/component/smpp/SmppProducer.java    | 2 +-
 .../src/main/java/org/apache/camel/component/smpp/SmppUtils.java       | 3 ++-
 3 files changed, 4 insertions(+), 2 deletions(-)

diff --git a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java
index 87f1fce..9b81a42 100644
--- a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java
+++ b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java
@@ -175,6 +175,7 @@ public class SmppConsumer extends DefaultConsumer {
     private void reconnect(final long initialReconnectDelay) {
         if (reconnectLock.tryLock()) {
             BlockingTask task = newReconnectTask(this, getEndpoint(), initialReconnectDelay,
+                    configuration.getReconnectDelay(),
                     configuration.getMaxReconnect());
 
             try {
diff --git a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java
index a7bc510..d5f1106 100644
--- a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java
+++ b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java
@@ -184,7 +184,7 @@ public class SmppProducer extends DefaultProducer {
     private void reconnect(final long initialReconnectDelay) {
         if (connectLock.tryLock()) {
             BlockingTask task = newReconnectTask(this, getEndpoint(), initialReconnectDelay,
-                    configuration.getMaxReconnect());
+                    configuration.getReconnectDelay(), configuration.getMaxReconnect());
 
             try {
                 task.run(this::doReconnect);
diff --git a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppUtils.java b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppUtils.java
index 2d2eaf0..400d1eb 100644
--- a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppUtils.java
+++ b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppUtils.java
@@ -282,7 +282,7 @@ public final class SmppUtils {
     }
 
     public static BlockingTask newReconnectTask(
-            BaseService source, Endpoint endpoint, long initialReconnectDelay,
+            BaseService source, Endpoint endpoint, long initialReconnectDelay, long reconnectDelay,
             int maxReconnect) {
         final String taskName = "smpp-reconnect";
         ScheduledExecutorService service = endpoint.getCamelContext().getExecutorServiceManager()
@@ -293,6 +293,7 @@ public final class SmppUtils {
                         .withInitialDelay(Duration.ofMillis(initialReconnectDelay))
                         .withMaxIterations(maxReconnect)
                         .withUnlimitedDuration()
+                        .withInterval(Duration.ofMillis(reconnectDelay))
                         .build())
                 .withScheduledExecutor(service)
                 .withName(taskName)

[camel] 01/04: CAMEL-17472: fix consumer reconnect no longer works

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 9e6aa004c6e1832e8243cff6d415134589637fb3
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Tue Jan 11 17:15:02 2022 +0100

    CAMEL-17472: fix consumer reconnect no longer works
    
    Includes:
    - do comply with unlimited duration tasks
    - improved log messages for easier debug
---
 .../apache/camel/component/smpp/SmppConsumer.java  | 33 ++++++++++++++--------
 .../apache/camel/support/task/BackgroundTask.java  |  6 ++--
 .../support/task/budget/TimeBoundedBudget.java     |  7 ++++-
 .../task/BackgroundIterationTimeTaskTest.java      | 18 ++++++++++++
 4 files changed, 49 insertions(+), 15 deletions(-)

diff --git a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java
index 2368bba..87f1fce 100644
--- a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java
+++ b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java
@@ -143,21 +143,30 @@ public class SmppConsumer extends DefaultConsumer {
     }
 
     private boolean doReconnect() {
-        if (isServiceStopping(this)) {
-            return true;
-        }
-
-        if (isSessionClosed(session)) {
-            try {
-                LOG.info("Trying to reconnect to {}", getEndpoint().getConnectionString());
-                session = createSession();
+        try {
+            LOG.info("Trying to reconnect to {}", getEndpoint().getConnectionString());
+            if (isServiceStopping(this)) {
                 return true;
-            } catch (IOException e) {
-                LOG.warn("Failed to reconnect to {}", getEndpoint().getConnectionString());
-                closeSession();
+            }
 
-                return false;
+            if (isSessionClosed(session)) {
+                try {
+                    LOG.info("Creating a new session to {}", getEndpoint().getConnectionString());
+                    session = createSession();
+                    LOG.info("Reconnected to {}", getEndpoint().getConnectionString());
+                    return true;
+                } catch (IOException e) {
+                    LOG.warn("Failed to reconnect to {}", getEndpoint().getConnectionString());
+                    closeSession();
+
+                    return false;
+                }
             }
+
+            LOG.info("Nothing to do: the session is not closed");
+        } catch (Exception e) {
+            LOG.error("Unable to reconnect to {}: {}", getEndpoint().getConnectionString(), e.getMessage(), e);
+            return false;
         }
 
         return true;
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java b/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java
index 3f4ee1e..f7bf226 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java
@@ -164,9 +164,11 @@ public class BackgroundTask implements BlockingTask {
             }
 
             service.shutdown();
-            service.awaitTermination(1, TimeUnit.SECONDS);
+            if (!service.awaitTermination(1, TimeUnit.SECONDS)) {
+                LOG.warn("The tasks did not finish with the specified time");
+            }
         } catch (InterruptedException e) {
-            LOG.warn("Interrupted while waiting for the repeatable task to execute");
+            LOG.warn("Interrupted while waiting for the repeatable task to execute: {}", e.getMessage(), e);
             Thread.currentThread().interrupt();
         } finally {
             elapsed = budget.elapsed();
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/task/budget/TimeBoundedBudget.java b/core/camel-support/src/main/java/org/apache/camel/support/task/budget/TimeBoundedBudget.java
index 1c57329..ed48374 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/task/budget/TimeBoundedBudget.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/task/budget/TimeBoundedBudget.java
@@ -55,7 +55,12 @@ public class TimeBoundedBudget implements TimeBudget {
 
     @Override
     public boolean canContinue() {
-        // ... if time budget is NOT exhausted
+        // ... unless running forever
+        if (maxDuration == UNLIMITED_DURATION) {
+            return true;
+        }
+
+        // ... or if time budget is NOT exhausted
         if (elapsed().toMillis() >= maxDuration) {
             return false;
         }
diff --git a/core/camel-support/src/test/java/org/apache/camel/support/task/BackgroundIterationTimeTaskTest.java b/core/camel-support/src/test/java/org/apache/camel/support/task/BackgroundIterationTimeTaskTest.java
index b4fb5d2..cd5f7fb 100644
--- a/core/camel-support/src/test/java/org/apache/camel/support/task/BackgroundIterationTimeTaskTest.java
+++ b/core/camel-support/src/test/java/org/apache/camel/support/task/BackgroundIterationTimeTaskTest.java
@@ -145,4 +145,22 @@ public class BackgroundIterationTimeTaskTest extends TaskTestSupport {
         assertTrue(taskCount < maxIterations);
         assertFalse(completed, "The task did not complete because of timeout, the return should be false");
     }
+
+    @DisplayName("Test that the task runs until the boolean supplier succeeds")
+    @Test
+    @Timeout(10)
+    void testRunNoMoreBooleanSupplierWithForever() {
+        BackgroundTask task = Tasks.backgroundTask()
+                .withScheduledExecutor(Executors.newSingleThreadScheduledExecutor())
+                .withBudget(Budgets.iterationTimeBudget()
+                        .withMaxIterations(Integer.MAX_VALUE)
+                        .withInitialDelay(Duration.ofSeconds(1))
+                        .withUnlimitedDuration()
+                        .build())
+                .build();
+
+        boolean completed = task.run(this::taskPredicateWithDeterministicStop, 4);
+        assertTrue(maxIterations > taskCount, "The task execution should not exceed the max iterations");
+        assertTrue(completed, "The task did not complete, the return should be false");
+    }
 }

[camel] 02/04: camel-smpp: updated details about running the manual integration tests

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 88519f849b081c45ebe5de09c8dbdb043d4bc43b
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Tue Jan 11 20:29:54 2022 +0100

    camel-smpp: updated details about running the manual integration tests
---
 .../component/smpp/integration/SmppConsumerReconnectManualIT.java   | 6 +++++-
 .../component/smpp/integration/SmppProducerReconnectManualIT.java   | 6 +++++-
 2 files changed, 10 insertions(+), 2 deletions(-)

diff --git a/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/integration/SmppConsumerReconnectManualIT.java b/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/integration/SmppConsumerReconnectManualIT.java
index 396aa65..4063ed3 100644
--- a/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/integration/SmppConsumerReconnectManualIT.java
+++ b/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/integration/SmppConsumerReconnectManualIT.java
@@ -24,7 +24,11 @@ import org.junit.jupiter.api.Test;
 /**
  * Spring based integration test for the smpp component. To run this test, ensure that the SMSC is running on: host:
  * localhost port: 2775 user: smppclient password: password <br/>
- * A SMSC for test is available here: http://www.seleniumsoftware.com/downloads.html
+ * In the past, a SMSC for test was available here: http://www.seleniumsoftware.com/downloads.html.
+ * 
+ * Since it is not available anymore, it's possible to test the reconnect logic manually using the nc CLI tool:
+ *
+ * nc -lv 2775
  */
 @Disabled("Must be manually tested")
 public class SmppConsumerReconnectManualIT extends CamelTestSupport {
diff --git a/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/integration/SmppProducerReconnectManualIT.java b/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/integration/SmppProducerReconnectManualIT.java
index 824620e..3f74b2c 100644
--- a/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/integration/SmppProducerReconnectManualIT.java
+++ b/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/integration/SmppProducerReconnectManualIT.java
@@ -24,7 +24,11 @@ import org.junit.jupiter.api.Test;
 /**
  * Spring based integration test for the smpp component. To run this test, ensure that the SMSC is running on: host:
  * localhost port: 2775 user: smppclient password: password <br/>
- * A SMSC for test is available here: http://www.seleniumsoftware.com/downloads.html
+ * In the past, a SMSC for test was available here: http://www.seleniumsoftware.com/downloads.html.
+ *
+ * Since it is not available anymore, it's possible to test the reconnect logic manually using the nc CLI tool:
+ *
+ * nc -lv 2775
  */
 @Disabled("Must be manually tested")
 public class SmppProducerReconnectManualIT extends CamelTestSupport {

[camel] 04/04: CAMEL-17472: do not exhaust scheduled service

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit eccfa64d6a239f26f56466d3067cb26c3a8e7a4b
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Jan 12 13:56:38 2022 +0100

    CAMEL-17472: do not exhaust scheduled service
    
    Includes:
    - allow giving more time to the shutdown of background tasks
    - fix preventing other tasks from being scheduled
    - minor related code cleanup
---
 .../apache/camel/component/smpp/SmppConsumer.java  | 87 ++++++++++++----------
 .../apache/camel/component/smpp/SmppProducer.java  | 75 ++++++++++++-------
 .../org/apache/camel/component/smpp/SmppUtils.java | 34 +++++++--
 .../apache/camel/support/task/BackgroundTask.java  | 23 +++---
 4 files changed, 132 insertions(+), 87 deletions(-)

diff --git a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java
index 9b81a42..8723c1e 100644
--- a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java
+++ b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.smpp;
 
 import java.io.IOException;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.camel.Processor;
@@ -32,15 +33,16 @@ import org.jsmpp.extra.SessionState;
 import org.jsmpp.session.BindParameter;
 import org.jsmpp.session.MessageReceiverListener;
 import org.jsmpp.session.SMPPSession;
-import org.jsmpp.session.Session;
 import org.jsmpp.session.SessionStateListener;
 import org.jsmpp.util.DefaultComposer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.camel.component.smpp.SmppUtils.createExecutor;
 import static org.apache.camel.component.smpp.SmppUtils.isServiceStopping;
 import static org.apache.camel.component.smpp.SmppUtils.isSessionClosed;
 import static org.apache.camel.component.smpp.SmppUtils.newReconnectTask;
+import static org.apache.camel.component.smpp.SmppUtils.shutdownReconnectService;
 
 /**
  * An implementation of consumer which use the SMPP protocol
@@ -48,13 +50,16 @@ import static org.apache.camel.component.smpp.SmppUtils.newReconnectTask;
 public class SmppConsumer extends DefaultConsumer {
 
     private static final Logger LOG = LoggerFactory.getLogger(SmppConsumer.class);
+    private static final String RECONNECT_TASK_NAME = "smpp-consumer-reconnect";
 
-    private SmppConfiguration configuration;
-    private SMPPSession session;
-    private MessageReceiverListener messageReceiverListener;
-    private SessionStateListener internalSessionStateListener;
+    private final SmppConfiguration configuration;
+    private final MessageReceiverListener messageReceiverListener;
+    private final SessionStateListener internalSessionStateListener;
 
     private final ReentrantLock reconnectLock = new ReentrantLock();
+    private final ScheduledExecutorService reconnectService;
+
+    private SMPPSession session;
 
     /**
      * The constructor which gets a smpp endpoint, a smpp configuration and a processor
@@ -62,19 +67,19 @@ public class SmppConsumer extends DefaultConsumer {
     public SmppConsumer(SmppEndpoint endpoint, SmppConfiguration config, Processor processor) {
         super(endpoint, processor);
 
+        this.reconnectService = createExecutor(this, endpoint, RECONNECT_TASK_NAME);
+
         this.configuration = config;
-        this.internalSessionStateListener = new SessionStateListener() {
-            @Override
-            public void onStateChange(SessionState newState, SessionState oldState, Session source) {
-                if (configuration.getSessionStateListener() != null) {
-                    configuration.getSessionStateListener().onStateChange(newState, oldState, source);
-                }
-
-                if (newState.equals(SessionState.CLOSED)) {
-                    LOG.warn("Lost connection to: {} - trying to reconnect...", getEndpoint().getConnectionString());
-                    closeSession();
-                    reconnect(configuration.getInitialReconnectDelay());
-                }
+        this.internalSessionStateListener = (newState, oldState, source) -> {
+            if (configuration.getSessionStateListener() != null) {
+                configuration.getSessionStateListener().onStateChange(newState, oldState, source);
+            }
+
+            if (newState.equals(SessionState.CLOSED)) {
+                LOG.warn("Lost connection to: {} - trying to reconnect...", getEndpoint().getConnectionString());
+                closeSession();
+
+                reconnect(configuration.getInitialReconnectDelay());
             }
         };
         this.messageReceiverListener
@@ -92,21 +97,21 @@ public class SmppConsumer extends DefaultConsumer {
     }
 
     private SMPPSession createSession() throws IOException {
-        SMPPSession session = createSMPPSession();
-        session.setEnquireLinkTimer(configuration.getEnquireLinkTimer());
-        session.setTransactionTimer(configuration.getTransactionTimer());
-        session.setPduProcessorDegree(this.configuration.getPduProcessorDegree());
-        session.setQueueCapacity(this.configuration.getPduProcessorQueueCapacity());
-        session.addSessionStateListener(internalSessionStateListener);
-        session.setMessageReceiverListener(messageReceiverListener);
-        session.connectAndBind(this.configuration.getHost(), this.configuration.getPort(),
+        SMPPSession newSession = createSMPPSession();
+        newSession.setEnquireLinkTimer(configuration.getEnquireLinkTimer());
+        newSession.setTransactionTimer(configuration.getTransactionTimer());
+        newSession.setPduProcessorDegree(this.configuration.getPduProcessorDegree());
+        newSession.setQueueCapacity(this.configuration.getPduProcessorQueueCapacity());
+        newSession.addSessionStateListener(internalSessionStateListener);
+        newSession.setMessageReceiverListener(messageReceiverListener);
+        newSession.connectAndBind(this.configuration.getHost(), this.configuration.getPort(),
                 new BindParameter(
                         BindType.BIND_RX, this.configuration.getSystemId(),
                         this.configuration.getPassword(), this.configuration.getSystemType(),
                         TypeOfNumber.UNKNOWN, NumberingPlanIndicator.UNKNOWN,
                         configuration.getAddressRange()));
 
-        return session;
+        return newSession;
     }
 
     /**
@@ -125,6 +130,8 @@ public class SmppConsumer extends DefaultConsumer {
 
     @Override
     protected void doStop() throws Exception {
+        shutdownReconnectService(reconnectService);
+
         LOG.debug("Disconnecting from: {}...", getEndpoint().getConnectionString());
 
         super.doStop();
@@ -150,17 +157,7 @@ public class SmppConsumer extends DefaultConsumer {
             }
 
             if (isSessionClosed(session)) {
-                try {
-                    LOG.info("Creating a new session to {}", getEndpoint().getConnectionString());
-                    session = createSession();
-                    LOG.info("Reconnected to {}", getEndpoint().getConnectionString());
-                    return true;
-                } catch (IOException e) {
-                    LOG.warn("Failed to reconnect to {}", getEndpoint().getConnectionString());
-                    closeSession();
-
-                    return false;
-                }
+                return tryCreateSession();
             }
 
             LOG.info("Nothing to do: the session is not closed");
@@ -172,9 +169,23 @@ public class SmppConsumer extends DefaultConsumer {
         return true;
     }
 
+    private boolean tryCreateSession() {
+        try {
+            LOG.info("Creating a new session to {}", getEndpoint().getConnectionString());
+            session = createSession();
+            LOG.info("Reconnected to {}", getEndpoint().getConnectionString());
+            return true;
+        } catch (IOException e) {
+            LOG.warn("Failed to reconnect to {}", getEndpoint().getConnectionString());
+            closeSession();
+
+            return false;
+        }
+    }
+
     private void reconnect(final long initialReconnectDelay) {
         if (reconnectLock.tryLock()) {
-            BlockingTask task = newReconnectTask(this, getEndpoint(), initialReconnectDelay,
+            BlockingTask task = newReconnectTask(reconnectService, RECONNECT_TASK_NAME, initialReconnectDelay,
                     configuration.getReconnectDelay(),
                     configuration.getMaxReconnect());
 
diff --git a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java
index d5f1106..c56d78e 100644
--- a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java
+++ b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.smpp;
 
 import java.io.IOException;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.camel.Exchange;
@@ -32,15 +33,16 @@ import org.jsmpp.bean.TypeOfNumber;
 import org.jsmpp.extra.SessionState;
 import org.jsmpp.session.BindParameter;
 import org.jsmpp.session.SMPPSession;
-import org.jsmpp.session.Session;
 import org.jsmpp.session.SessionStateListener;
 import org.jsmpp.util.DefaultComposer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.camel.component.smpp.SmppUtils.createExecutor;
 import static org.apache.camel.component.smpp.SmppUtils.isServiceStopping;
 import static org.apache.camel.component.smpp.SmppUtils.isSessionClosed;
 import static org.apache.camel.component.smpp.SmppUtils.newReconnectTask;
+import static org.apache.camel.component.smpp.SmppUtils.shutdownReconnectService;
 
 /**
  * An implementation of @{link Producer} which use the SMPP protocol
@@ -48,27 +50,30 @@ import static org.apache.camel.component.smpp.SmppUtils.newReconnectTask;
 public class SmppProducer extends DefaultProducer {
 
     private static final Logger LOG = LoggerFactory.getLogger(SmppProducer.class);
+    private static final String RECONNECT_TASK_NAME = "smpp-producer-reconnect";
 
-    private SmppConfiguration configuration;
-    private SMPPSession session;
-    private SessionStateListener internalSessionStateListener;
+    private final SmppConfiguration configuration;
+    private final SessionStateListener internalSessionStateListener;
     private final ReentrantLock connectLock = new ReentrantLock();
+    private final ScheduledExecutorService reconnectService;
+
+    private SMPPSession session;
 
     public SmppProducer(SmppEndpoint endpoint, SmppConfiguration config) {
         super(endpoint);
+
+        this.reconnectService = createExecutor(this, endpoint, RECONNECT_TASK_NAME);
+
         this.configuration = config;
-        this.internalSessionStateListener = new SessionStateListener() {
-            @Override
-            public void onStateChange(SessionState newState, SessionState oldState, Session source) {
-                if (configuration.getSessionStateListener() != null) {
-                    configuration.getSessionStateListener().onStateChange(newState, oldState, source);
-                }
+        this.internalSessionStateListener = (newState, oldState, source) -> {
+            if (configuration.getSessionStateListener() != null) {
+                configuration.getSessionStateListener().onStateChange(newState, oldState, source);
+            }
 
-                if (newState.equals(SessionState.CLOSED)) {
-                    LOG.warn("Lost connection to: {} - trying to reconnect...", getEndpoint().getConnectionString());
-                    closeSession();
-                    reconnect(configuration.getInitialReconnectDelay());
-                }
+            if (newState.equals(SessionState.CLOSED)) {
+                LOG.warn("Lost connection to: {} - trying to reconnect...", getEndpoint().getConnectionString());
+                closeSession();
+                reconnect(configuration.getInitialReconnectDelay());
             }
         };
     }
@@ -164,6 +169,8 @@ public class SmppProducer extends DefaultProducer {
 
     @Override
     protected void doStop() throws Exception {
+        shutdownReconnectService(reconnectService);
+
         LOG.debug("Disconnecting from: {}...", getEndpoint().getConnectionString());
 
         super.doStop();
@@ -183,7 +190,7 @@ public class SmppProducer extends DefaultProducer {
 
     private void reconnect(final long initialReconnectDelay) {
         if (connectLock.tryLock()) {
-            BlockingTask task = newReconnectTask(this, getEndpoint(), initialReconnectDelay,
+            BlockingTask task = newReconnectTask(reconnectService, RECONNECT_TASK_NAME, initialReconnectDelay,
                     configuration.getReconnectDelay(), configuration.getMaxReconnect());
 
             try {
@@ -195,26 +202,38 @@ public class SmppProducer extends DefaultProducer {
     }
 
     private boolean doReconnect() {
-        if (isServiceStopping(this)) {
-            return true;
-        }
-
-        if (isSessionClosed(session)) {
-            try {
-                LOG.info("Trying to reconnect to {}", getEndpoint().getConnectionString());
-                session = createSession();
+        try {
+            LOG.info("Trying to reconnect to {}", getEndpoint().getConnectionString());
+            if (isServiceStopping(this)) {
                 return true;
-            } catch (IOException e) {
-                LOG.warn("Failed to reconnect to {}", getEndpoint().getConnectionString());
-                closeSession();
+            }
 
-                return false;
+            if (isSessionClosed(session)) {
+                return tryCreateSession();
             }
+
+            LOG.info("Nothing to do: the session is not closed");
+        } catch (Exception e) {
+            LOG.error("Unable to reconnect to {}: {}", getEndpoint().getConnectionString(), e.getMessage(), e);
+            return false;
         }
 
         return true;
     }
 
+    private boolean tryCreateSession() {
+        try {
+
+            session = createSession();
+            return true;
+        } catch (IOException e) {
+            LOG.warn("Failed to reconnect to {}", getEndpoint().getConnectionString());
+            closeSession();
+
+            return false;
+        }
+    }
+
     @Override
     public SmppEndpoint getEndpoint() {
         return (SmppEndpoint) super.getEndpoint();
diff --git a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppUtils.java b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppUtils.java
index 400d1eb..44867e0 100644
--- a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppUtils.java
+++ b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppUtils.java
@@ -19,9 +19,12 @@ package org.apache.camel.component.smpp;
 import java.time.Duration;
 import java.util.Calendar;
 import java.util.Date;
+import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.Endpoint;
+import org.apache.camel.spi.ExecutorServiceManager;
 import org.apache.camel.support.service.BaseService;
 import org.apache.camel.support.task.BlockingTask;
 import org.apache.camel.support.task.Tasks;
@@ -34,9 +37,10 @@ import org.jsmpp.extra.SessionState;
 import org.jsmpp.session.SMPPSession;
 import org.jsmpp.util.AbsoluteTimeFormatter;
 import org.jsmpp.util.TimeFormatter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public final class SmppUtils {
-
     /**
      * See http://unicode.org/Public/MAPPINGS/ETSI/GSM0338.TXT
      */
@@ -67,6 +71,7 @@ public final class SmppUtils {
             { 60, 91 }, { 61, 126 }, { 62, 93 }, { 64, 124 }, { 101, 164 }
     };
 
+    private static final Logger LOG = LoggerFactory.getLogger(SmppUtils.class);
     private static final TimeFormatter TIME_FORMATTER = new AbsoluteTimeFormatter();
 
     private SmppUtils() {
@@ -281,13 +286,19 @@ public final class SmppUtils {
         return session == null || session.getSessionState().equals(SessionState.CLOSED);
     }
 
-    public static BlockingTask newReconnectTask(
-            BaseService source, Endpoint endpoint, long initialReconnectDelay, long reconnectDelay,
-            int maxReconnect) {
-        final String taskName = "smpp-reconnect";
-        ScheduledExecutorService service = endpoint.getCamelContext().getExecutorServiceManager()
-                .newSingleThreadScheduledExecutor(source, taskName);
+    public static ScheduledExecutorService createExecutor(BaseService service, Endpoint endpoint, String taskName) {
+        if (endpoint.getCamelContext() != null && endpoint.getCamelContext().getExecutorServiceManager() != null) {
+            ExecutorServiceManager manager = endpoint.getCamelContext().getExecutorServiceManager();
+            return manager.newSingleThreadScheduledExecutor(service, taskName);
+        } else {
+            LOG.warn("Not using the Camel scheduled thread executor");
+            return Executors.newSingleThreadScheduledExecutor();
+        }
+    }
 
+    public static BlockingTask newReconnectTask(
+            ScheduledExecutorService service, String taskName, long initialReconnectDelay,
+            long reconnectDelay, int maxReconnect) {
         return Tasks.backgroundTask()
                 .withBudget(Budgets.iterationTimeBudget()
                         .withInitialDelay(Duration.ofMillis(initialReconnectDelay))
@@ -299,4 +310,13 @@ public final class SmppUtils {
                 .withName(taskName)
                 .build();
     }
+
+    public static void shutdownReconnectService(ScheduledExecutorService service) throws InterruptedException {
+        service.shutdown();
+        if (!service.awaitTermination(1, TimeUnit.SECONDS)) {
+            LOG.warn("The reconnect service did not finish executing within the timeout");
+
+            service.shutdownNow();
+        }
+    }
 }
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java b/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java
index f7bf226..01fe497 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java
@@ -20,6 +20,7 @@ package org.apache.camel.support.task;
 import java.time.Duration;
 import java.util.Objects;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BooleanSupplier;
@@ -127,29 +128,27 @@ public class BackgroundTask implements BlockingTask {
     public <T> boolean run(Predicate<T> predicate, T payload) {
         CountDownLatch latch = new CountDownLatch(1);
 
-        // We need it to be cancellable/non-runnable after reaching a certain point, and it needs to be deterministic.
-        // This is why we ignore the ScheduledFuture returned and implement the go/no-go using a latch.
-        service.scheduleAtFixedRate(() -> runTaskWrapper(latch, predicate, payload),
+        Future<?> task = service.scheduleAtFixedRate(() -> runTaskWrapper(latch, predicate, payload),
                 budget.initialDelay(), budget.interval(), TimeUnit.MILLISECONDS);
 
-        return waitForTaskCompletion(latch, service);
+        return waitForTaskCompletion(latch, task);
     }
 
     @Override
     public boolean run(BooleanSupplier supplier) {
         CountDownLatch latch = new CountDownLatch(1);
 
-        // We need it to be cancellable/non-runnable after reaching a certain point, and it needs to be deterministic.
-        // This is why we ignore the ScheduledFuture returned and implement the go/no-go using a latch.
-        service.scheduleAtFixedRate(() -> runTaskWrapper(latch, supplier), budget.initialDelay(),
+        Future<?> task = service.scheduleAtFixedRate(() -> runTaskWrapper(latch, supplier), budget.initialDelay(),
                 budget.interval(), TimeUnit.MILLISECONDS);
 
-        return waitForTaskCompletion(latch, service);
+        return waitForTaskCompletion(latch, task);
     }
 
-    private boolean waitForTaskCompletion(CountDownLatch latch, ScheduledExecutorService service) {
+    private boolean waitForTaskCompletion(CountDownLatch latch, Future<?> task) {
         boolean completed = false;
         try {
+            // We need it to be cancellable/non-runnable after reaching a certain point, and it needs to be deterministic.
+            // This is why we ignore the ScheduledFuture returned and implement the go/no-go using a latch.
             if (budget.maxDuration() == TimeBoundedBudget.UNLIMITED_DURATION) {
                 latch.await();
                 completed = true;
@@ -163,16 +162,12 @@ public class BackgroundTask implements BlockingTask {
                 }
             }
 
-            service.shutdown();
-            if (!service.awaitTermination(1, TimeUnit.SECONDS)) {
-                LOG.warn("The tasks did not finish with the specified time");
-            }
+            task.cancel(true);
         } catch (InterruptedException e) {
             LOG.warn("Interrupted while waiting for the repeatable task to execute: {}", e.getMessage(), e);
             Thread.currentThread().interrupt();
         } finally {
             elapsed = budget.elapsed();
-            service.shutdownNow();
         }
 
         return completed;