You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2021/11/10 12:00:36 UTC
[camel] branch main updated: CAMEL-17121: converted camel-smpp
producer to the repeatable tasks
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 59a91a4 CAMEL-17121: converted camel-smpp producer to the repeatable tasks
59a91a4 is described below
commit 59a91a4d4924eebcdd460cbde76b925b9a98fb79
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Wed Nov 10 11:52:20 2021 +0100
CAMEL-17121: converted camel-smpp producer to the repeatable tasks
---
.../apache/camel/component/smpp/SmppConsumer.java | 27 +++-----
.../apache/camel/component/smpp/SmppProducer.java | 71 +++++++++-------------
.../org/apache/camel/component/smpp/SmppUtils.java | 35 +++++++++++
3 files changed, 73 insertions(+), 60 deletions(-)
diff --git a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java
index 4a740aa..2368bba 100644
--- a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java
+++ b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java
@@ -17,15 +17,11 @@
package org.apache.camel.component.smpp;
import java.io.IOException;
-import java.time.Duration;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.camel.Processor;
import org.apache.camel.support.DefaultConsumer;
import org.apache.camel.support.task.BlockingTask;
-import org.apache.camel.support.task.Tasks;
-import org.apache.camel.support.task.budget.Budgets;
import org.jsmpp.DefaultPDUReader;
import org.jsmpp.DefaultPDUSender;
import org.jsmpp.SynchronizedPDUSender;
@@ -42,6 +38,10 @@ import org.jsmpp.util.DefaultComposer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.camel.component.smpp.SmppUtils.isServiceStopping;
+import static org.apache.camel.component.smpp.SmppUtils.isSessionClosed;
+import static org.apache.camel.component.smpp.SmppUtils.newReconnectTask;
+
/**
* An implementation of consumer which use the SMPP protocol
*/
@@ -143,11 +143,11 @@ public class SmppConsumer extends DefaultConsumer {
}
private boolean doReconnect() {
- if (isStopping() || isStopped()) {
+ if (isServiceStopping(this)) {
return true;
}
- if (session == null || session.getSessionState().equals(SessionState.CLOSED)) {
+ if (isSessionClosed(session)) {
try {
LOG.info("Trying to reconnect to {}", getEndpoint().getConnectionString());
session = createSession();
@@ -165,19 +165,8 @@ public class SmppConsumer extends DefaultConsumer {
private void reconnect(final long initialReconnectDelay) {
if (reconnectLock.tryLock()) {
- final String taskName = "smpp-reconnect";
- ScheduledExecutorService service = getEndpoint().getCamelContext().getExecutorServiceManager()
- .newSingleThreadScheduledExecutor(this, taskName);
-
- BlockingTask task = Tasks.backgroundTask()
- .withBudget(Budgets.iterationTimeBudget()
- .withInitialDelay(Duration.ofMillis(initialReconnectDelay))
- .withMaxIterations(configuration.getMaxReconnect())
- .withUnlimitedDuration()
- .build())
- .withScheduledExecutor(service)
- .withName(taskName)
- .build();
+ BlockingTask task = newReconnectTask(this, getEndpoint(), initialReconnectDelay,
+ configuration.getMaxReconnect());
try {
task.run(this::doReconnect);
diff --git a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java
index 1965eb1..a7bc510 100644
--- a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java
+++ b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java
@@ -22,6 +22,7 @@ import java.util.concurrent.locks.ReentrantLock;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.support.DefaultProducer;
+import org.apache.camel.support.task.BlockingTask;
import org.jsmpp.DefaultPDUReader;
import org.jsmpp.DefaultPDUSender;
import org.jsmpp.SynchronizedPDUSender;
@@ -37,6 +38,10 @@ import org.jsmpp.util.DefaultComposer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.camel.component.smpp.SmppUtils.isServiceStopping;
+import static org.apache.camel.component.smpp.SmppUtils.isSessionClosed;
+import static org.apache.camel.component.smpp.SmppUtils.newReconnectTask;
+
/**
* An implementation of @{link Producer} which use the SMPP protocol
*/
@@ -178,54 +183,38 @@ public class SmppProducer extends DefaultProducer {
private void reconnect(final long initialReconnectDelay) {
if (connectLock.tryLock()) {
- try {
- Runnable r = new Runnable() {
- public void run() {
- boolean reconnected = false;
-
- LOG.info("Schedule reconnect after {} millis", initialReconnectDelay);
- try {
- Thread.sleep(initialReconnectDelay);
- } catch (InterruptedException e) {
- }
-
- int attempt = 0;
- while (!(isStopping() || isStopped())
- && (session == null || session.getSessionState().equals(SessionState.CLOSED))
- && attempt < configuration.getMaxReconnect()) {
- try {
- attempt++;
- LOG.info("Trying to reconnect to {} - attempt #{}", getEndpoint().getConnectionString(),
- attempt);
- session = createSession();
- reconnected = true;
- } catch (IOException e) {
- LOG.warn("Failed to reconnect to {}", getEndpoint().getConnectionString());
- closeSession();
- try {
- Thread.sleep(configuration.getReconnectDelay());
- } catch (InterruptedException ee) {
- }
- }
- }
-
- if (reconnected) {
- LOG.info("Reconnected to {}", getEndpoint().getConnectionString());
- }
- }
- };
+ BlockingTask task = newReconnectTask(this, getEndpoint(), initialReconnectDelay,
+ configuration.getMaxReconnect());
- Thread t = new Thread(r);
- t.start();
- t.join();
- } catch (InterruptedException e) {
- // noop
+ try {
+ task.run(this::doReconnect);
} finally {
connectLock.unlock();
}
}
}
+ private boolean doReconnect() {
+ if (isServiceStopping(this)) {
+ return true;
+ }
+
+ if (isSessionClosed(session)) {
+ try {
+ LOG.info("Trying to reconnect to {}", getEndpoint().getConnectionString());
+ session = createSession();
+ return true;
+ } catch (IOException e) {
+ LOG.warn("Failed to reconnect to {}", getEndpoint().getConnectionString());
+ closeSession();
+
+ return false;
+ }
+ }
+
+ return true;
+ }
+
@Override
public SmppEndpoint getEndpoint() {
return (SmppEndpoint) super.getEndpoint();
diff --git a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppUtils.java b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppUtils.java
index 2f887a9..2d2eaf0 100644
--- a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppUtils.java
+++ b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppUtils.java
@@ -16,13 +16,22 @@
*/
package org.apache.camel.component.smpp;
+import java.time.Duration;
import java.util.Calendar;
import java.util.Date;
+import java.util.concurrent.ScheduledExecutorService;
+import org.apache.camel.Endpoint;
+import org.apache.camel.support.service.BaseService;
+import org.apache.camel.support.task.BlockingTask;
+import org.apache.camel.support.task.Tasks;
+import org.apache.camel.support.task.budget.Budgets;
import org.jsmpp.bean.Alphabet;
import org.jsmpp.bean.DataSm;
import org.jsmpp.bean.SubmitMulti;
import org.jsmpp.bean.SubmitSm;
+import org.jsmpp.extra.SessionState;
+import org.jsmpp.session.SMPPSession;
import org.jsmpp.util.AbsoluteTimeFormatter;
import org.jsmpp.util.TimeFormatter;
@@ -263,4 +272,30 @@ public final class SmppUtils {
}
return dest;
}
+
+ public static boolean isServiceStopping(BaseService service) {
+ return service.isStopping() || service.isStopped();
+ }
+
+ public static boolean isSessionClosed(SMPPSession session) {
+ return session == null || session.getSessionState().equals(SessionState.CLOSED);
+ }
+
+ public static BlockingTask newReconnectTask(
+ BaseService source, Endpoint endpoint, long initialReconnectDelay,
+ int maxReconnect) {
+ final String taskName = "smpp-reconnect";
+ ScheduledExecutorService service = endpoint.getCamelContext().getExecutorServiceManager()
+ .newSingleThreadScheduledExecutor(source, taskName);
+
+ return Tasks.backgroundTask()
+ .withBudget(Budgets.iterationTimeBudget()
+ .withInitialDelay(Duration.ofMillis(initialReconnectDelay))
+ .withMaxIterations(maxReconnect)
+ .withUnlimitedDuration()
+ .build())
+ .withScheduledExecutor(service)
+ .withName(taskName)
+ .build();
+ }
}