You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2024/01/28 17:56:16 UTC

(camel) 01/02: (chores) camel-sjms: cleaned up duplicated code

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit f8a17729d80a31f2bb5dad1485f4ac2307347ddb
Author: Otavio R. Piske <an...@gmail.com>
AuthorDate: Sun Jan 28 15:15:25 2024 +0100

    (chores) camel-sjms: cleaned up duplicated code
    
    Signed-off-by: Otavio R. Piske <an...@gmail.com>
---
 .../apache/camel/component/sjms/SjmsProducer.java  | 29 ++++---
 .../sjms/consumer/EndpointMessageListener.java     | 96 ++++++++++------------
 2 files changed, 58 insertions(+), 67 deletions(-)

diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
index aafe7c4feca..2a1b4874b01 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
@@ -168,12 +168,7 @@ public class SjmsProducer extends DefaultAsyncProducer {
 
         name = "JmsReplyManagerOnTimeout[" + replyTo + "]";
         // allow the timeout thread to timeout so during normal operation we do not have a idle thread
-        int max = getEndpoint().getComponent().getReplyToOnTimeoutMaxConcurrentConsumers();
-        if (max <= 0) {
-            throw new IllegalArgumentException("The option replyToOnTimeoutMaxConcurrentConsumers must be >= 1");
-        }
-        ExecutorService replyManagerExecutorService
-                = getEndpoint().getCamelContext().getExecutorServiceManager().newThreadPool(replyManager, name, 0, max);
+        ExecutorService replyManagerExecutorService = createReplyManagerExecutor(replyManager, name);
         replyManager.setOnTimeoutExecutorService(replyManagerExecutorService);
 
         ServiceHelper.startService(replyManager);
@@ -193,13 +188,7 @@ public class SjmsProducer extends DefaultAsyncProducer {
 
         name = "JmsReplyManagerOnTimeout[" + getEndpoint().getEndpointConfiguredDestinationName() + "]";
         // allow the timeout thread to timeout so during normal operation we do not have a idle thread
-        int max = getEndpoint().getComponent().getReplyToOnTimeoutMaxConcurrentConsumers();
-        if (max <= 0) {
-            throw new IllegalArgumentException("The option replyToOnTimeoutMaxConcurrentConsumers must be >= 1");
-        }
-        ExecutorService replyManagerExecutorService
-                = getEndpoint().getCamelContext().getExecutorServiceManager().newThreadPool(temporaryQueueReplyManager, name, 0,
-                        max);
+        ExecutorService replyManagerExecutorService = createReplyManagerExecutor(temporaryQueueReplyManager, name);
         temporaryQueueReplyManager.setOnTimeoutExecutorService(replyManagerExecutorService);
 
         ServiceHelper.startService(temporaryQueueReplyManager);
@@ -207,6 +196,20 @@ public class SjmsProducer extends DefaultAsyncProducer {
         return temporaryQueueReplyManager;
     }
 
+    private ExecutorService createReplyManagerExecutor(ReplyManager temporaryQueueReplyManager, String name) {
+        int max = doGetMax();
+        return getEndpoint().getCamelContext().getExecutorServiceManager().newThreadPool(temporaryQueueReplyManager, name, 0,
+                        max);
+    }
+
+    private int doGetMax() {
+        int max = getEndpoint().getComponent().getReplyToOnTimeoutMaxConcurrentConsumers();
+        if (max <= 0) {
+            throw new IllegalArgumentException("The option replyToOnTimeoutMaxConcurrentConsumers must be >= 1");
+        }
+        return max;
+    }
+
     /**
      * Pre tests the connection before starting the listening.
      * <p/>
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java
index a233ee0b722..482fae34fa7 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java
@@ -27,6 +27,7 @@ import jakarta.jms.Session;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.CamelExchangeException;
+import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Processor;
@@ -273,33 +274,8 @@ public class EndpointMessageListener implements SessionMessageListener {
             return;
         }
         try {
-            SessionCallback callback = new SessionCallback() {
-                @Override
-                public Object doInJms(Session session) throws Exception {
-                    MessageProducer producer = null;
-                    try {
-                        Message reply = endpoint.getBinding().makeJmsMessage(exchange, out, session, cause);
-                        final String correlationID = determineCorrelationId(message);
-                        reply.setJMSCorrelationID(correlationID);
-
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("{} sending reply JMS message [correlationId:{}]: {}", endpoint, correlationID, reply);
-                        }
-
-                        producer = endpoint.getJmsObjectFactory().createMessageProducer(session, endpoint, replyDestination);
-                        template.send(producer, reply);
-                    } finally {
-                        close(producer);
-                    }
-
-                    return null;
-                }
-
-                @Override
-                public void onClose(Connection connection, Session session) {
-                    // do not close as we use provided session
-                }
-            };
+            SessionCallback callback = createSessionCallback(replyDestination, message, exchange, out, cause,
+                    endpoint.getJmsObjectFactory()::createMessageProducer);
 
             getTemplate().execute(session, callback);
 
@@ -308,6 +284,43 @@ public class EndpointMessageListener implements SessionMessageListener {
         }
     }
 
+    @FunctionalInterface
+    private interface MessageProducerCreator<T> {
+        MessageProducer create(Session session, Endpoint endpoint, T replyDestination) throws Exception;
+    }
+
+    private <T> SessionCallback createSessionCallback(T replyDestination, Message message, Exchange exchange, org.apache.camel.Message out, Exception cause,
+                                                      MessageProducerCreator<T> messageProducerCreator) {
+        return new SessionCallback() {
+            @Override
+            public Object doInJms(Session session) throws Exception {
+                MessageProducer producer = null;
+                try {
+                    Message reply = endpoint.getBinding().makeJmsMessage(exchange, out, session, cause);
+                    final String correlationID = determineCorrelationId(message);
+                    reply.setJMSCorrelationID(correlationID);
+
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("{} sending reply JMS message [correlationId:{}]: {}", endpoint, correlationID, reply);
+                    }
+
+                    producer = messageProducerCreator.create(session, endpoint, replyDestination);
+
+                    template.send(producer, reply);
+                } finally {
+                    close(producer);
+                }
+
+                return null;
+            }
+
+            @Override
+            public void onClose(Connection connection, Session session) {
+                // do not close as we use provided session
+            }
+        };
+    }
+
     protected void sendReply(
             Session session,
             String replyDestination, final Message message, final Exchange exchange,
@@ -317,33 +330,8 @@ public class EndpointMessageListener implements SessionMessageListener {
             return;
         }
         try {
-            SessionCallback callback = new SessionCallback() {
-                @Override
-                public Object doInJms(Session session) throws Exception {
-                    MessageProducer producer = null;
-                    try {
-                        Message reply = endpoint.getBinding().makeJmsMessage(exchange, out, session, cause);
-                        final String correlationID = determineCorrelationId(message);
-                        reply.setJMSCorrelationID(correlationID);
-
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("{} sending reply JMS message [correlationId:{}]: {}", endpoint, correlationID, reply);
-                        }
-
-                        producer = endpoint.getJmsObjectFactory().createMessageProducer(session, endpoint, replyDestination);
-                        template.send(producer, reply);
-                    } finally {
-                        close(producer);
-                    }
-
-                    return null;
-                }
-
-                @Override
-                public void onClose(Connection connection, Session session) {
-                    // do not close as we use provided session
-                }
-            };
+            SessionCallback callback = createSessionCallback(replyDestination, message, exchange, out, cause,
+                    endpoint.getJmsObjectFactory()::createMessageProducer);
 
             getTemplate().execute(session, callback);