You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2024/01/17 06:33:47 UTC
(camel) branch main updated: CAMEL-20121 reconnect SMPP session after receiving Unbound (#12046)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 6a8d9924e51 CAMEL-20121 reconnect SMPP session after receiving Unbound (#12046)
6a8d9924e51 is described below
commit 6a8d9924e51a4e7c370e51211de7f13ff4dd3f8d
Author: Christian Ambach <de...@users.noreply.github.com>
AuthorDate: Wed Jan 17 07:33:41 2024 +0100
CAMEL-20121 reconnect SMPP session after receiving Unbound (#12046)
* update version of jsmpp to 3.0.1
Signed-off-by: Christian Ambach <am...@samba.org>
* CAMEL-20121 reconnect SMPP session after receiving Unbind
Signed-off-by: Christian Ambach <am...@samba.org>
---------
Signed-off-by: Christian Ambach <am...@samba.org>
---
.../apache/camel/component/smpp/SmppConsumer.java | 7 ++--
.../apache/camel/component/smpp/SmppProducer.java | 6 ++-
.../camel/component/smpp/SmppConsumerTest.java | 45 +++++++++++++++++++++
.../camel/component/smpp/SmppProducerTest.java | 47 ++++++++++++++++++++++
4 files changed, 100 insertions(+), 5 deletions(-)
diff --git a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java
index 6f35a09f6d5..d2502aa163f 100644
--- a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java
+++ b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java
@@ -75,10 +75,11 @@ public class SmppConsumer extends DefaultConsumer {
configuration.getSessionStateListener().onStateChange(newState, oldState, source);
}
- if (newState.equals(SessionState.CLOSED)) {
- LOG.warn("Lost connection to: {} - trying to reconnect...", getEndpoint().getConnectionString());
+ if (newState.equals(SessionState.UNBOUND) || newState.equals(SessionState.CLOSED)) {
+ LOG.warn(newState.equals(SessionState.UNBOUND)
+ ? "Session to {} was unbound - trying to reconnect" : "Lost connection to: {} - trying to reconnect...",
+ getEndpoint().getConnectionString());
closeSession();
-
reconnect(configuration.getInitialReconnectDelay());
}
};
diff --git a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java
index 00e304eaa46..b8841fad205 100644
--- a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java
+++ b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java
@@ -71,8 +71,10 @@ public class SmppProducer extends DefaultProducer {
configuration.getSessionStateListener().onStateChange(newState, oldState, source);
}
- if (newState.equals(SessionState.CLOSED)) {
- LOG.warn("Lost connection to: {} - trying to reconnect...", getEndpoint().getConnectionString());
+ if (newState.equals(SessionState.UNBOUND) || newState.equals(SessionState.CLOSED)) {
+ LOG.warn(newState.equals(SessionState.UNBOUND)
+ ? "Session to {} was unbound - trying to reconnect" : "Lost connection to: {} - trying to reconnect...",
+ getEndpoint().getConnectionString());
closeSession();
reconnect(configuration.getInitialReconnectDelay());
}
diff --git a/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/SmppConsumerTest.java b/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/SmppConsumerTest.java
index e659fc917b8..9ff39852988 100644
--- a/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/SmppConsumerTest.java
+++ b/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/SmppConsumerTest.java
@@ -16,24 +16,37 @@
*/
package org.apache.camel.component.smpp;
+import java.util.concurrent.ScheduledExecutorService;
+
import org.apache.camel.CamelContext;
import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.Processor;
import org.apache.camel.spi.ExchangeFactory;
+import org.apache.camel.support.task.BackgroundTask;
+import org.apache.camel.support.task.budget.Budgets;
+import org.apache.camel.util.ReflectionHelper;
import org.jsmpp.bean.BindType;
import org.jsmpp.bean.NumberingPlanIndicator;
import org.jsmpp.bean.TypeOfNumber;
+import org.jsmpp.extra.SessionState;
import org.jsmpp.session.BindParameter;
import org.jsmpp.session.MessageReceiverListener;
import org.jsmpp.session.SMPPSession;
import org.jsmpp.session.SessionStateListener;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.mockito.MockedStatic;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -156,4 +169,36 @@ public class SmppConsumerTest {
assertSame(endpoint, consumer.getEndpoint());
assertSame(configuration, consumer.getConfiguration());
}
+
+ @ParameterizedTest
+ @EnumSource(value = SessionState.class, names = { "UNBOUND", "CLOSED" })
+ public void internalSessionStateListenerShouldCloseSessionAndReconnect(SessionState sessionState) throws Exception {
+ try (MockedStatic<SmppUtils> smppUtilsMock = mockStatic(SmppUtils.class)) {
+ SessionStateListener sessionStateListener = (SessionStateListener) ReflectionHelper
+ .getField(SmppConsumer.class.getDeclaredField("internalSessionStateListener"), consumer);
+ ScheduledExecutorService reconnectService = (ScheduledExecutorService) ReflectionHelper
+ .getField(SmppConsumer.class.getDeclaredField("reconnectService"), consumer);
+ when(endpoint.getConnectionString())
+ .thenReturn("smpp://smppclient@localhost:2775");
+ BindParameter expectedBindParameter = new BindParameter(
+ BindType.BIND_RX,
+ "smppclient",
+ "password",
+ "cp",
+ TypeOfNumber.UNKNOWN,
+ NumberingPlanIndicator.UNKNOWN,
+ "");
+ when(session.connectAndBind("localhost", Integer.valueOf(2775), expectedBindParameter))
+ .thenReturn("1");
+ smppUtilsMock.when(() -> SmppUtils.newReconnectTask(any(), anyString(), anyLong(), anyLong(), anyInt()))
+ .thenReturn(new BackgroundTask.BackgroundTaskBuilder().withScheduledExecutor(reconnectService)
+ .withBudget(Budgets.timeBudget().build()).build());
+
+ consumer.doStart();
+
+ sessionStateListener.onStateChange(sessionState, SessionState.BOUND_RX, null);
+ verify(session).unbindAndClose();
+ }
+ }
+
}
diff --git a/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/SmppProducerTest.java b/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/SmppProducerTest.java
index ebf5b0491b0..d855a136966 100644
--- a/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/SmppProducerTest.java
+++ b/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/SmppProducerTest.java
@@ -16,20 +16,34 @@
*/
package org.apache.camel.component.smpp;
+import java.util.concurrent.ScheduledExecutorService;
+
import org.apache.camel.Exchange;
+import org.apache.camel.support.task.BackgroundTask;
+import org.apache.camel.support.task.budget.Budgets;
+import org.apache.camel.util.ReflectionHelper;
import org.jsmpp.bean.BindType;
import org.jsmpp.bean.InterfaceVersion;
import org.jsmpp.bean.NumberingPlanIndicator;
import org.jsmpp.bean.TypeOfNumber;
+import org.jsmpp.extra.SessionState;
import org.jsmpp.session.BindParameter;
import org.jsmpp.session.SMPPSession;
import org.jsmpp.session.SessionStateListener;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.mockito.MockedStatic;
import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -126,4 +140,37 @@ public class SmppProducerTest {
assertSame(endpoint, producer.getEndpoint());
assertSame(configuration, producer.getConfiguration());
}
+
+ @ParameterizedTest
+ @EnumSource(value = SessionState.class, names = { "UNBOUND", "CLOSED" })
+ public void internalSessionStateListenerShouldCloseSessionAndReconnect(SessionState sessionState) throws Exception {
+ try (MockedStatic<SmppUtils> smppUtilsMock = mockStatic(SmppUtils.class)) {
+ ScheduledExecutorService reconnectService = (ScheduledExecutorService) ReflectionHelper
+ .getField(SmppProducer.class.getDeclaredField("reconnectService"), producer);
+ SessionStateListener sessionStateListener = (SessionStateListener) ReflectionHelper
+ .getField(SmppProducer.class.getDeclaredField("internalSessionStateListener"), producer);
+ when(endpoint.getConnectionString())
+ .thenReturn("smpp://smppclient@localhost:2775");
+ BindParameter expectedBindParameters = new BindParameter(
+ BindType.BIND_TX,
+ "smppclient",
+ "password",
+ "cp",
+ TypeOfNumber.UNKNOWN,
+ NumberingPlanIndicator.UNKNOWN,
+ "",
+ InterfaceVersion.IF_50);
+ when(session.connectAndBind("localhost", Integer.valueOf(2775), expectedBindParameters))
+ .thenReturn("1");
+ when(endpoint.isSingleton()).thenReturn(true);
+ smppUtilsMock.when(() -> SmppUtils.newReconnectTask(any(), anyString(), anyLong(), anyLong(), anyInt()))
+ .thenReturn(new BackgroundTask.BackgroundTaskBuilder().withScheduledExecutor(reconnectService)
+ .withBudget(Budgets.timeBudget().build()).build());
+
+ producer.doStart();
+
+ sessionStateListener.onStateChange(SessionState.CLOSED, SessionState.BOUND_TX, null);
+ verify(session).unbindAndClose();
+ }
+ }
}