You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/11/10 12:00:36 UTC

[camel] branch main updated: CAMEL-17121: converted camel-smpp producer to the repeatable tasks

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 59a91a4  CAMEL-17121: converted camel-smpp producer to the repeatable tasks
59a91a4 is described below

commit 59a91a4d4924eebcdd460cbde76b925b9a98fb79
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Nov 10 11:52:20 2021 +0100

    CAMEL-17121: converted camel-smpp producer to the repeatable tasks
---
 .../apache/camel/component/smpp/SmppConsumer.java  | 27 +++-----
 .../apache/camel/component/smpp/SmppProducer.java  | 71 +++++++++-------------
 .../org/apache/camel/component/smpp/SmppUtils.java | 35 +++++++++++
 3 files changed, 73 insertions(+), 60 deletions(-)

diff --git a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java
index 4a740aa..2368bba 100644
--- a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java
+++ b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java
@@ -17,15 +17,11 @@
 package org.apache.camel.component.smpp;
 
 import java.io.IOException;
-import java.time.Duration;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.camel.Processor;
 import org.apache.camel.support.DefaultConsumer;
 import org.apache.camel.support.task.BlockingTask;
-import org.apache.camel.support.task.Tasks;
-import org.apache.camel.support.task.budget.Budgets;
 import org.jsmpp.DefaultPDUReader;
 import org.jsmpp.DefaultPDUSender;
 import org.jsmpp.SynchronizedPDUSender;
@@ -42,6 +38,10 @@ import org.jsmpp.util.DefaultComposer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.camel.component.smpp.SmppUtils.isServiceStopping;
+import static org.apache.camel.component.smpp.SmppUtils.isSessionClosed;
+import static org.apache.camel.component.smpp.SmppUtils.newReconnectTask;
+
 /**
  * An implementation of consumer which use the SMPP protocol
  */
@@ -143,11 +143,11 @@ public class SmppConsumer extends DefaultConsumer {
     }
 
     private boolean doReconnect() {
-        if (isStopping() || isStopped()) {
+        if (isServiceStopping(this)) {
             return true;
         }
 
-        if (session == null || session.getSessionState().equals(SessionState.CLOSED)) {
+        if (isSessionClosed(session)) {
             try {
                 LOG.info("Trying to reconnect to {}", getEndpoint().getConnectionString());
                 session = createSession();
@@ -165,19 +165,8 @@ public class SmppConsumer extends DefaultConsumer {
 
     private void reconnect(final long initialReconnectDelay) {
         if (reconnectLock.tryLock()) {
-            final String taskName = "smpp-reconnect";
-            ScheduledExecutorService service = getEndpoint().getCamelContext().getExecutorServiceManager()
-                    .newSingleThreadScheduledExecutor(this, taskName);
-
-            BlockingTask task = Tasks.backgroundTask()
-                    .withBudget(Budgets.iterationTimeBudget()
-                            .withInitialDelay(Duration.ofMillis(initialReconnectDelay))
-                            .withMaxIterations(configuration.getMaxReconnect())
-                            .withUnlimitedDuration()
-                            .build())
-                    .withScheduledExecutor(service)
-                    .withName(taskName)
-                    .build();
+            BlockingTask task = newReconnectTask(this, getEndpoint(), initialReconnectDelay,
+                    configuration.getMaxReconnect());
 
             try {
                 task.run(this::doReconnect);
diff --git a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java
index 1965eb1..a7bc510 100644
--- a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java
+++ b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java
@@ -22,6 +22,7 @@ import java.util.concurrent.locks.ReentrantLock;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.support.DefaultProducer;
+import org.apache.camel.support.task.BlockingTask;
 import org.jsmpp.DefaultPDUReader;
 import org.jsmpp.DefaultPDUSender;
 import org.jsmpp.SynchronizedPDUSender;
@@ -37,6 +38,10 @@ import org.jsmpp.util.DefaultComposer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.camel.component.smpp.SmppUtils.isServiceStopping;
+import static org.apache.camel.component.smpp.SmppUtils.isSessionClosed;
+import static org.apache.camel.component.smpp.SmppUtils.newReconnectTask;
+
 /**
  * An implementation of @{link Producer} which use the SMPP protocol
  */
@@ -178,54 +183,38 @@ public class SmppProducer extends DefaultProducer {
 
     private void reconnect(final long initialReconnectDelay) {
         if (connectLock.tryLock()) {
-            try {
-                Runnable r = new Runnable() {
-                    public void run() {
-                        boolean reconnected = false;
-
-                        LOG.info("Schedule reconnect after {} millis", initialReconnectDelay);
-                        try {
-                            Thread.sleep(initialReconnectDelay);
-                        } catch (InterruptedException e) {
-                        }
-
-                        int attempt = 0;
-                        while (!(isStopping() || isStopped())
-                                && (session == null || session.getSessionState().equals(SessionState.CLOSED))
-                                && attempt < configuration.getMaxReconnect()) {
-                            try {
-                                attempt++;
-                                LOG.info("Trying to reconnect to {} - attempt #{}", getEndpoint().getConnectionString(),
-                                        attempt);
-                                session = createSession();
-                                reconnected = true;
-                            } catch (IOException e) {
-                                LOG.warn("Failed to reconnect to {}", getEndpoint().getConnectionString());
-                                closeSession();
-                                try {
-                                    Thread.sleep(configuration.getReconnectDelay());
-                                } catch (InterruptedException ee) {
-                                }
-                            }
-                        }
-
-                        if (reconnected) {
-                            LOG.info("Reconnected to {}", getEndpoint().getConnectionString());
-                        }
-                    }
-                };
+            BlockingTask task = newReconnectTask(this, getEndpoint(), initialReconnectDelay,
+                    configuration.getMaxReconnect());
 
-                Thread t = new Thread(r);
-                t.start();
-                t.join();
-            } catch (InterruptedException e) {
-                // noop
+            try {
+                task.run(this::doReconnect);
             } finally {
                 connectLock.unlock();
             }
         }
     }
 
+    private boolean doReconnect() {
+        if (isServiceStopping(this)) {
+            return true;
+        }
+
+        if (isSessionClosed(session)) {
+            try {
+                LOG.info("Trying to reconnect to {}", getEndpoint().getConnectionString());
+                session = createSession();
+                return true;
+            } catch (IOException e) {
+                LOG.warn("Failed to reconnect to {}", getEndpoint().getConnectionString());
+                closeSession();
+
+                return false;
+            }
+        }
+
+        return true;
+    }
+
     @Override
     public SmppEndpoint getEndpoint() {
         return (SmppEndpoint) super.getEndpoint();
diff --git a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppUtils.java b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppUtils.java
index 2f887a9..2d2eaf0 100644
--- a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppUtils.java
+++ b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppUtils.java
@@ -16,13 +16,22 @@
  */
 package org.apache.camel.component.smpp;
 
+import java.time.Duration;
 import java.util.Calendar;
 import java.util.Date;
+import java.util.concurrent.ScheduledExecutorService;
 
+import org.apache.camel.Endpoint;
+import org.apache.camel.support.service.BaseService;
+import org.apache.camel.support.task.BlockingTask;
+import org.apache.camel.support.task.Tasks;
+import org.apache.camel.support.task.budget.Budgets;
 import org.jsmpp.bean.Alphabet;
 import org.jsmpp.bean.DataSm;
 import org.jsmpp.bean.SubmitMulti;
 import org.jsmpp.bean.SubmitSm;
+import org.jsmpp.extra.SessionState;
+import org.jsmpp.session.SMPPSession;
 import org.jsmpp.util.AbsoluteTimeFormatter;
 import org.jsmpp.util.TimeFormatter;
 
@@ -263,4 +272,30 @@ public final class SmppUtils {
         }
         return dest;
     }
+
+    public static boolean isServiceStopping(BaseService service) {
+        return service.isStopping() || service.isStopped();
+    }
+
+    public static boolean isSessionClosed(SMPPSession session) {
+        return session == null || session.getSessionState().equals(SessionState.CLOSED);
+    }
+
+    public static BlockingTask newReconnectTask(
+            BaseService source, Endpoint endpoint, long initialReconnectDelay,
+            int maxReconnect) {
+        final String taskName = "smpp-reconnect";
+        ScheduledExecutorService service = endpoint.getCamelContext().getExecutorServiceManager()
+                .newSingleThreadScheduledExecutor(source, taskName);
+
+        return Tasks.backgroundTask()
+                .withBudget(Budgets.iterationTimeBudget()
+                        .withInitialDelay(Duration.ofMillis(initialReconnectDelay))
+                        .withMaxIterations(maxReconnect)
+                        .withUnlimitedDuration()
+                        .build())
+                .withScheduledExecutor(service)
+                .withName(taskName)
+                .build();
+    }
 }