You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2024/01/17 06:33:47 UTC

(camel) branch main updated: CAMEL-20121 reconnect SMPP session after receiving Unbound (#12046)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 6a8d9924e51 CAMEL-20121 reconnect SMPP session after receiving Unbound (#12046)
6a8d9924e51 is described below

commit 6a8d9924e51a4e7c370e51211de7f13ff4dd3f8d
Author: Christian Ambach <de...@users.noreply.github.com>
AuthorDate: Wed Jan 17 07:33:41 2024 +0100

    CAMEL-20121 reconnect SMPP session after receiving Unbound (#12046)
    
    * update version of jsmpp to 3.0.1
    
    Signed-off-by: Christian Ambach <am...@samba.org>
    
    * CAMEL-20121 reconnect SMPP session after receiving Unbind
    
    Signed-off-by: Christian Ambach <am...@samba.org>
    
    ---------
    
    Signed-off-by: Christian Ambach <am...@samba.org>
---
 .../apache/camel/component/smpp/SmppConsumer.java  |  7 ++--
 .../apache/camel/component/smpp/SmppProducer.java  |  6 ++-
 .../camel/component/smpp/SmppConsumerTest.java     | 45 +++++++++++++++++++++
 .../camel/component/smpp/SmppProducerTest.java     | 47 ++++++++++++++++++++++
 4 files changed, 100 insertions(+), 5 deletions(-)

diff --git a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java
index 6f35a09f6d5..d2502aa163f 100644
--- a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java
+++ b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java
@@ -75,10 +75,11 @@ public class SmppConsumer extends DefaultConsumer {
                 configuration.getSessionStateListener().onStateChange(newState, oldState, source);
             }
 
-            if (newState.equals(SessionState.CLOSED)) {
-                LOG.warn("Lost connection to: {} - trying to reconnect...", getEndpoint().getConnectionString());
+            if (newState.equals(SessionState.UNBOUND) || newState.equals(SessionState.CLOSED)) {
+                LOG.warn(newState.equals(SessionState.UNBOUND)
+                        ? "Session to {} was unbound - trying to reconnect" : "Lost connection to: {} - trying to reconnect...",
+                        getEndpoint().getConnectionString());
                 closeSession();
-
                 reconnect(configuration.getInitialReconnectDelay());
             }
         };
diff --git a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java
index 00e304eaa46..b8841fad205 100644
--- a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java
+++ b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java
@@ -71,8 +71,10 @@ public class SmppProducer extends DefaultProducer {
                 configuration.getSessionStateListener().onStateChange(newState, oldState, source);
             }
 
-            if (newState.equals(SessionState.CLOSED)) {
-                LOG.warn("Lost connection to: {} - trying to reconnect...", getEndpoint().getConnectionString());
+            if (newState.equals(SessionState.UNBOUND) || newState.equals(SessionState.CLOSED)) {
+                LOG.warn(newState.equals(SessionState.UNBOUND)
+                        ? "Session to {} was unbound - trying to reconnect" : "Lost connection to: {} - trying to reconnect...",
+                        getEndpoint().getConnectionString());
                 closeSession();
                 reconnect(configuration.getInitialReconnectDelay());
             }
diff --git a/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/SmppConsumerTest.java b/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/SmppConsumerTest.java
index e659fc917b8..9ff39852988 100644
--- a/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/SmppConsumerTest.java
+++ b/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/SmppConsumerTest.java
@@ -16,24 +16,37 @@
  */
 package org.apache.camel.component.smpp;
 
+import java.util.concurrent.ScheduledExecutorService;
+
 import org.apache.camel.CamelContext;
 import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.Processor;
 import org.apache.camel.spi.ExchangeFactory;
+import org.apache.camel.support.task.BackgroundTask;
+import org.apache.camel.support.task.budget.Budgets;
+import org.apache.camel.util.ReflectionHelper;
 import org.jsmpp.bean.BindType;
 import org.jsmpp.bean.NumberingPlanIndicator;
 import org.jsmpp.bean.TypeOfNumber;
+import org.jsmpp.extra.SessionState;
 import org.jsmpp.session.BindParameter;
 import org.jsmpp.session.MessageReceiverListener;
 import org.jsmpp.session.SMPPSession;
 import org.jsmpp.session.SessionStateListener;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.mockito.MockedStatic;
 
 import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.isA;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -156,4 +169,36 @@ public class SmppConsumerTest {
         assertSame(endpoint, consumer.getEndpoint());
         assertSame(configuration, consumer.getConfiguration());
     }
+
+    @ParameterizedTest
+    @EnumSource(value = SessionState.class, names = { "UNBOUND", "CLOSED" })
+    public void internalSessionStateListenerShouldCloseSessionAndReconnect(SessionState sessionState) throws Exception {
+        try (MockedStatic<SmppUtils> smppUtilsMock = mockStatic(SmppUtils.class)) {
+            SessionStateListener sessionStateListener = (SessionStateListener) ReflectionHelper
+                    .getField(SmppConsumer.class.getDeclaredField("internalSessionStateListener"), consumer);
+            ScheduledExecutorService reconnectService = (ScheduledExecutorService) ReflectionHelper
+                    .getField(SmppConsumer.class.getDeclaredField("reconnectService"), consumer);
+            when(endpoint.getConnectionString())
+                    .thenReturn("smpp://smppclient@localhost:2775");
+            BindParameter expectedBindParameter = new BindParameter(
+                    BindType.BIND_RX,
+                    "smppclient",
+                    "password",
+                    "cp",
+                    TypeOfNumber.UNKNOWN,
+                    NumberingPlanIndicator.UNKNOWN,
+                    "");
+            when(session.connectAndBind("localhost", Integer.valueOf(2775), expectedBindParameter))
+                    .thenReturn("1");
+            smppUtilsMock.when(() -> SmppUtils.newReconnectTask(any(), anyString(), anyLong(), anyLong(), anyInt()))
+                    .thenReturn(new BackgroundTask.BackgroundTaskBuilder().withScheduledExecutor(reconnectService)
+                            .withBudget(Budgets.timeBudget().build()).build());
+
+            consumer.doStart();
+
+            sessionStateListener.onStateChange(sessionState, SessionState.BOUND_RX, null);
+            verify(session).unbindAndClose();
+        }
+    }
+
 }
diff --git a/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/SmppProducerTest.java b/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/SmppProducerTest.java
index ebf5b0491b0..d855a136966 100644
--- a/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/SmppProducerTest.java
+++ b/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/SmppProducerTest.java
@@ -16,20 +16,34 @@
  */
 package org.apache.camel.component.smpp;
 
+import java.util.concurrent.ScheduledExecutorService;
+
 import org.apache.camel.Exchange;
+import org.apache.camel.support.task.BackgroundTask;
+import org.apache.camel.support.task.budget.Budgets;
+import org.apache.camel.util.ReflectionHelper;
 import org.jsmpp.bean.BindType;
 import org.jsmpp.bean.InterfaceVersion;
 import org.jsmpp.bean.NumberingPlanIndicator;
 import org.jsmpp.bean.TypeOfNumber;
+import org.jsmpp.extra.SessionState;
 import org.jsmpp.session.BindParameter;
 import org.jsmpp.session.SMPPSession;
 import org.jsmpp.session.SessionStateListener;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.mockito.MockedStatic;
 
 import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.isA;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -126,4 +140,37 @@ public class SmppProducerTest {
         assertSame(endpoint, producer.getEndpoint());
         assertSame(configuration, producer.getConfiguration());
     }
+
+    @ParameterizedTest
+    @EnumSource(value = SessionState.class, names = { "UNBOUND", "CLOSED" })
+    public void internalSessionStateListenerShouldCloseSessionAndReconnect(SessionState sessionState) throws Exception {
+        try (MockedStatic<SmppUtils> smppUtilsMock = mockStatic(SmppUtils.class)) {
+            ScheduledExecutorService reconnectService = (ScheduledExecutorService) ReflectionHelper
+                    .getField(SmppProducer.class.getDeclaredField("reconnectService"), producer);
+            SessionStateListener sessionStateListener = (SessionStateListener) ReflectionHelper
+                    .getField(SmppProducer.class.getDeclaredField("internalSessionStateListener"), producer);
+            when(endpoint.getConnectionString())
+                    .thenReturn("smpp://smppclient@localhost:2775");
+            BindParameter expectedBindParameters = new BindParameter(
+                    BindType.BIND_TX,
+                    "smppclient",
+                    "password",
+                    "cp",
+                    TypeOfNumber.UNKNOWN,
+                    NumberingPlanIndicator.UNKNOWN,
+                    "",
+                    InterfaceVersion.IF_50);
+            when(session.connectAndBind("localhost", Integer.valueOf(2775), expectedBindParameters))
+                    .thenReturn("1");
+            when(endpoint.isSingleton()).thenReturn(true);
+            smppUtilsMock.when(() -> SmppUtils.newReconnectTask(any(), anyString(), anyLong(), anyLong(), anyInt()))
+                    .thenReturn(new BackgroundTask.BackgroundTaskBuilder().withScheduledExecutor(reconnectService)
+                            .withBudget(Budgets.timeBudget().build()).build());
+
+            producer.doStart();
+
+            sessionStateListener.onStateChange(SessionState.CLOSED, SessionState.BOUND_TX, null);
+            verify(session).unbindAndClose();
+        }
+    }
 }