You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2024/01/28 17:56:15 UTC

(camel) branch main updated (a39677f85a0 -> 860f43ed669)

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


    from a39677f85a0 Update Jenkinsfile.jdk17 (#12938)
     new f8a17729d80 (chores) camel-sjms: cleaned up duplicated code
     new 860f43ed669 (chores) camel-sjms: minor cleanups

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../camel/component/sjms/SjmsPollingConsumer.java  |  4 +-
 .../apache/camel/component/sjms/SjmsProducer.java  | 29 ++++---
 .../camel/component/sjms/SjmsSendDynamicAware.java |  2 +-
 .../sjms/consumer/EndpointMessageListener.java     | 96 ++++++++++------------
 .../component/sjms/reply/QueueReplyManager.java    |  2 +-
 5 files changed, 62 insertions(+), 71 deletions(-)


(camel) 01/02: (chores) camel-sjms: cleaned up duplicated code

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit f8a17729d80a31f2bb5dad1485f4ac2307347ddb
Author: Otavio R. Piske <an...@gmail.com>
AuthorDate: Sun Jan 28 15:15:25 2024 +0100

    (chores) camel-sjms: cleaned up duplicated code
    
    Signed-off-by: Otavio R. Piske <an...@gmail.com>
---
 .../apache/camel/component/sjms/SjmsProducer.java  | 29 ++++---
 .../sjms/consumer/EndpointMessageListener.java     | 96 ++++++++++------------
 2 files changed, 58 insertions(+), 67 deletions(-)

diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
index aafe7c4feca..2a1b4874b01 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
@@ -168,12 +168,7 @@ public class SjmsProducer extends DefaultAsyncProducer {
 
         name = "JmsReplyManagerOnTimeout[" + replyTo + "]";
         // allow the timeout thread to timeout so during normal operation we do not have a idle thread
-        int max = getEndpoint().getComponent().getReplyToOnTimeoutMaxConcurrentConsumers();
-        if (max <= 0) {
-            throw new IllegalArgumentException("The option replyToOnTimeoutMaxConcurrentConsumers must be >= 1");
-        }
-        ExecutorService replyManagerExecutorService
-                = getEndpoint().getCamelContext().getExecutorServiceManager().newThreadPool(replyManager, name, 0, max);
+        ExecutorService replyManagerExecutorService = createReplyManagerExecutor(replyManager, name);
         replyManager.setOnTimeoutExecutorService(replyManagerExecutorService);
 
         ServiceHelper.startService(replyManager);
@@ -193,13 +188,7 @@ public class SjmsProducer extends DefaultAsyncProducer {
 
         name = "JmsReplyManagerOnTimeout[" + getEndpoint().getEndpointConfiguredDestinationName() + "]";
         // allow the timeout thread to timeout so during normal operation we do not have a idle thread
-        int max = getEndpoint().getComponent().getReplyToOnTimeoutMaxConcurrentConsumers();
-        if (max <= 0) {
-            throw new IllegalArgumentException("The option replyToOnTimeoutMaxConcurrentConsumers must be >= 1");
-        }
-        ExecutorService replyManagerExecutorService
-                = getEndpoint().getCamelContext().getExecutorServiceManager().newThreadPool(temporaryQueueReplyManager, name, 0,
-                        max);
+        ExecutorService replyManagerExecutorService = createReplyManagerExecutor(temporaryQueueReplyManager, name);
         temporaryQueueReplyManager.setOnTimeoutExecutorService(replyManagerExecutorService);
 
         ServiceHelper.startService(temporaryQueueReplyManager);
@@ -207,6 +196,20 @@ public class SjmsProducer extends DefaultAsyncProducer {
         return temporaryQueueReplyManager;
     }
 
+    private ExecutorService createReplyManagerExecutor(ReplyManager temporaryQueueReplyManager, String name) {
+        int max = doGetMax();
+        return getEndpoint().getCamelContext().getExecutorServiceManager().newThreadPool(temporaryQueueReplyManager, name, 0,
+                        max);
+    }
+
+    private int doGetMax() {
+        int max = getEndpoint().getComponent().getReplyToOnTimeoutMaxConcurrentConsumers();
+        if (max <= 0) {
+            throw new IllegalArgumentException("The option replyToOnTimeoutMaxConcurrentConsumers must be >= 1");
+        }
+        return max;
+    }
+
     /**
      * Pre tests the connection before starting the listening.
      * <p/>
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java
index a233ee0b722..482fae34fa7 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java
@@ -27,6 +27,7 @@ import jakarta.jms.Session;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.CamelExchangeException;
+import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Processor;
@@ -273,33 +274,8 @@ public class EndpointMessageListener implements SessionMessageListener {
             return;
         }
         try {
-            SessionCallback callback = new SessionCallback() {
-                @Override
-                public Object doInJms(Session session) throws Exception {
-                    MessageProducer producer = null;
-                    try {
-                        Message reply = endpoint.getBinding().makeJmsMessage(exchange, out, session, cause);
-                        final String correlationID = determineCorrelationId(message);
-                        reply.setJMSCorrelationID(correlationID);
-
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("{} sending reply JMS message [correlationId:{}]: {}", endpoint, correlationID, reply);
-                        }
-
-                        producer = endpoint.getJmsObjectFactory().createMessageProducer(session, endpoint, replyDestination);
-                        template.send(producer, reply);
-                    } finally {
-                        close(producer);
-                    }
-
-                    return null;
-                }
-
-                @Override
-                public void onClose(Connection connection, Session session) {
-                    // do not close as we use provided session
-                }
-            };
+            SessionCallback callback = createSessionCallback(replyDestination, message, exchange, out, cause,
+                    endpoint.getJmsObjectFactory()::createMessageProducer);
 
             getTemplate().execute(session, callback);
 
@@ -308,6 +284,43 @@ public class EndpointMessageListener implements SessionMessageListener {
         }
     }
 
+    @FunctionalInterface
+    private interface MessageProducerCreator<T> {
+        MessageProducer create(Session session, Endpoint endpoint, T replyDestination) throws Exception;
+    }
+
+    private <T> SessionCallback createSessionCallback(T replyDestination, Message message, Exchange exchange, org.apache.camel.Message out, Exception cause,
+                                                      MessageProducerCreator<T> messageProducerCreator) {
+        return new SessionCallback() {
+            @Override
+            public Object doInJms(Session session) throws Exception {
+                MessageProducer producer = null;
+                try {
+                    Message reply = endpoint.getBinding().makeJmsMessage(exchange, out, session, cause);
+                    final String correlationID = determineCorrelationId(message);
+                    reply.setJMSCorrelationID(correlationID);
+
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("{} sending reply JMS message [correlationId:{}]: {}", endpoint, correlationID, reply);
+                    }
+
+                    producer = messageProducerCreator.create(session, endpoint, replyDestination);
+
+                    template.send(producer, reply);
+                } finally {
+                    close(producer);
+                }
+
+                return null;
+            }
+
+            @Override
+            public void onClose(Connection connection, Session session) {
+                // do not close as we use provided session
+            }
+        };
+    }
+
     protected void sendReply(
             Session session,
             String replyDestination, final Message message, final Exchange exchange,
@@ -317,33 +330,8 @@ public class EndpointMessageListener implements SessionMessageListener {
             return;
         }
         try {
-            SessionCallback callback = new SessionCallback() {
-                @Override
-                public Object doInJms(Session session) throws Exception {
-                    MessageProducer producer = null;
-                    try {
-                        Message reply = endpoint.getBinding().makeJmsMessage(exchange, out, session, cause);
-                        final String correlationID = determineCorrelationId(message);
-                        reply.setJMSCorrelationID(correlationID);
-
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("{} sending reply JMS message [correlationId:{}]: {}", endpoint, correlationID, reply);
-                        }
-
-                        producer = endpoint.getJmsObjectFactory().createMessageProducer(session, endpoint, replyDestination);
-                        template.send(producer, reply);
-                    } finally {
-                        close(producer);
-                    }
-
-                    return null;
-                }
-
-                @Override
-                public void onClose(Connection connection, Session session) {
-                    // do not close as we use provided session
-                }
-            };
+            SessionCallback callback = createSessionCallback(replyDestination, message, exchange, out, cause,
+                    endpoint.getJmsObjectFactory()::createMessageProducer);
 
             getTemplate().execute(session, callback);
 


(camel) 02/02: (chores) camel-sjms: minor cleanups

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 860f43ed6694e750bf1e87a0415e9eb546b8ad00
Author: Otavio R. Piske <an...@gmail.com>
AuthorDate: Sun Jan 28 15:15:59 2024 +0100

    (chores) camel-sjms: minor cleanups
    
    - removed unused throws declaration
    - use final fields when possible
    
    Signed-off-by: Otavio R. Piske <an...@gmail.com>
---
 .../java/org/apache/camel/component/sjms/SjmsPollingConsumer.java     | 4 ++--
 .../java/org/apache/camel/component/sjms/SjmsSendDynamicAware.java    | 2 +-
 .../java/org/apache/camel/component/sjms/reply/QueueReplyManager.java | 2 +-
 3 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsPollingConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsPollingConsumer.java
index 538a403492e..736be3f81b2 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsPollingConsumer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsPollingConsumer.java
@@ -26,8 +26,8 @@ import org.apache.camel.support.PollingConsumerSupport;
  * A JMS {@link org.apache.camel.PollingConsumer}.
  */
 public class SjmsPollingConsumer extends PollingConsumerSupport {
-    private SjmsTemplate template;
-    private SjmsEndpoint jmsEndpoint;
+    private final SjmsTemplate template;
+    private final SjmsEndpoint jmsEndpoint;
 
     public SjmsPollingConsumer(SjmsEndpoint endpoint, SjmsTemplate template) {
         super(endpoint);
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsSendDynamicAware.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsSendDynamicAware.java
index 337de7162e2..d7d894e0ca6 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsSendDynamicAware.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsSendDynamicAware.java
@@ -90,7 +90,7 @@ public class SjmsSendDynamicAware extends ServiceSupport implements SendDynamicA
         final String destinationName = parseDestinationName(entry.getUri());
         return new Processor() {
             @Override
-            public void process(Exchange exchange) throws Exception {
+            public void process(Exchange exchange) {
                 exchange.getMessage().setHeader(SjmsConstants.JMS_DESTINATION_NAME, destinationName);
             }
         };
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/reply/QueueReplyManager.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/reply/QueueReplyManager.java
index 83f62b1a491..fb67ba02739 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/reply/QueueReplyManager.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/reply/QueueReplyManager.java
@@ -87,7 +87,7 @@ public class QueueReplyManager extends ReplyManagerSupport {
         }
 
         @Override
-        public Destination createTemporaryDestination(Session session, boolean topic) throws JMSException {
+        public Destination createTemporaryDestination(Session session, boolean topic) {
             return null;
         }
     }