You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2024/01/27 20:55:47 UTC

(camel) 01/03: (chores) camel-jms: cleaned up duplicated code

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 89c2bd2327d8a64129ac613e5602ebcf3e646515
Author: Otavio R. Piske <an...@gmail.com>
AuthorDate: Sat Jan 27 20:21:04 2024 +0100

    (chores) camel-jms: cleaned up duplicated code
    
    Signed-off-by: Otavio R. Piske <an...@gmail.com>
---
 .../apache/camel/component/jms/JmsProducer.java    | 27 ++++++++------
 .../component/jms/StreamMessageInputStream.java    | 20 +++-------
 .../camel/component/jms/reply/JmsReplyHelper.java  | 41 +++++++++++++++++++++
 .../component/jms/reply/QueueReplyManager.java     | 43 ++++++++--------------
 .../component/jms/reply/ReplyManagerSupport.java   | 10 +++++
 .../reply/SharedQueueMessageListenerContainer.java | 13 +------
 .../SharedQueueSimpleMessageListenerContainer.java | 13 +------
 .../jms/reply/TemporaryQueueReplyManager.java      |  8 +---
 8 files changed, 92 insertions(+), 83 deletions(-)

diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
index ca330c957aa..bb51ff9f97f 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
@@ -578,12 +578,7 @@ public class JmsProducer extends DefaultAsyncProducer {
 
         name = "JmsReplyManagerOnTimeout[" + getEndpoint().getEndpointConfiguredDestinationName() + "]";
         // allow the timeout thread to timeout so during normal operation we do not have a idle thread
-        int max = getEndpoint().getReplyToOnTimeoutMaxConcurrentConsumers();
-        if (max <= 0) {
-            throw new IllegalArgumentException("The option replyToOnTimeoutMaxConcurrentConsumers must be >= 1");
-        }
-        ExecutorService replyManagerExecutorService
-                = getEndpoint().getCamelContext().getExecutorServiceManager().newThreadPool(replyManager, name, 0, max);
+        ExecutorService replyManagerExecutorService = createReplyManagerExecutorService(replyManager, name);
         replyManager.setOnTimeoutExecutorService(replyManagerExecutorService);
 
         ServiceHelper.startService(replyManager);
@@ -591,6 +586,19 @@ public class JmsProducer extends DefaultAsyncProducer {
         return replyManager;
     }
 
+    private ExecutorService createReplyManagerExecutorService(ReplyManager replyManager, String name) {
+        int max = doGetMax();
+        return getEndpoint().getCamelContext().getExecutorServiceManager().newThreadPool(replyManager, name, 0, max);
+    }
+
+    private int doGetMax() {
+        int max = getEndpoint().getReplyToOnTimeoutMaxConcurrentConsumers();
+        if (max <= 0) {
+            throw new IllegalArgumentException("The option replyToOnTimeoutMaxConcurrentConsumers must be >= 1");
+        }
+        return max;
+    }
+
     protected ReplyManager createReplyManager(String replyTo) throws Exception {
         // use a regular queue
         ReplyManager replyManager = new QueueReplyManager(getEndpoint().getCamelContext());
@@ -603,12 +611,7 @@ public class JmsProducer extends DefaultAsyncProducer {
 
         name = "JmsReplyManagerOnTimeout[" + replyTo + "]";
         // allow the timeout thread to timeout so during normal operation we do not have a idle thread
-        int max = getEndpoint().getReplyToOnTimeoutMaxConcurrentConsumers();
-        if (max <= 0) {
-            throw new IllegalArgumentException("The option replyToOnTimeoutMaxConcurrentConsumers must be >= 1");
-        }
-        ExecutorService replyManagerExecutorService
-                = getEndpoint().getCamelContext().getExecutorServiceManager().newThreadPool(replyManager, name, 0, max);
+        ExecutorService replyManagerExecutorService = createReplyManagerExecutorService(replyManager, name);
         replyManager.setOnTimeoutExecutorService(replyManagerExecutorService);
 
         ServiceHelper.startService(replyManager);
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/StreamMessageInputStream.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/StreamMessageInputStream.java
index 23016719aa3..72295b24abc 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/StreamMessageInputStream.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/StreamMessageInputStream.java
@@ -46,6 +46,10 @@ public class StreamMessageInputStream extends InputStream {
 
     @Override
     public int read(byte[] array) throws IOException {
+        return doRead(array);
+    }
+
+    private int doRead(byte[] array) throws IOException {
         try {
             int num = message.readBytes(array);
             if (num < 0) {
@@ -66,21 +70,7 @@ public class StreamMessageInputStream extends InputStream {
     @Override
     public int read(byte[] array, int off, int len) throws IOException {
         // we cannot honor off and len, but assuming off is always 0
-        try {
-            int num = message.readBytes(array);
-            if (num < 0) {
-                //the first 128K(FileUtil.BUFFER_SIZE/128K is used when sending JMS StreamMessage)
-                //buffer reached, give a chance to see if there is the next 128K buffer
-                num = message.readBytes(array);
-            }
-            eof = num < 0;
-            return num;
-        } catch (MessageEOFException e) {
-            eof = true;
-            return -1;
-        } catch (JMSException e) {
-            throw new IOException(e);
-        }
+        return doRead(array);
     }
 
     @Override
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/JmsReplyHelper.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/JmsReplyHelper.java
new file mode 100644
index 00000000000..b5d2efd885d
--- /dev/null
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/JmsReplyHelper.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms.reply;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class JmsReplyHelper {
+    private static final Logger LOG = LoggerFactory.getLogger(JmsReplyHelper.class);
+
+    private JmsReplyHelper() {
+
+    }
+
+    static String getMessageSelector(String fixedMessageSelector, MessageSelectorCreator creator) {
+        // override this method and return the appropriate selector
+        String id = null;
+        if (fixedMessageSelector != null) {
+            id = fixedMessageSelector;
+        } else if (creator != null) {
+            id = creator.get();
+        }
+
+        LOG.trace("Using MessageSelector[{}]", id);
+        return id;
+    }
+}
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java
index b1d819a1ba8..dda1293838f 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java
@@ -29,7 +29,6 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.component.jms.ConsumerType;
 import org.apache.camel.component.jms.DefaultSpringErrorHandler;
-import org.apache.camel.component.jms.MessageListenerContainerFactory;
 import org.apache.camel.component.jms.ReplyToType;
 import org.apache.camel.component.jms.SimpleJmsMessageListenerContainer;
 import org.springframework.jms.listener.AbstractMessageListenerContainer;
@@ -130,12 +129,7 @@ public class QueueReplyManager extends ReplyManagerSupport {
         } else if (endpoint.getConfiguration().getReplyToConsumerType() == ConsumerType.Simple) {
             return createSimpleListenerContainer();
         } else {
-            MessageListenerContainerFactory factory = endpoint.getConfiguration().getMessageListenerContainerFactory();
-            if (factory != null) {
-                return factory.createMessageListenerContainer(endpoint);
-            }
-            throw new IllegalArgumentException(
-                    "ReplyToConsumerType.Custom requires that a MessageListenerContainerFactory has been configured");
+            return getAbstractMessageListenerContainer(endpoint);
         }
     }
 
@@ -194,16 +188,7 @@ public class QueueReplyManager extends ReplyManagerSupport {
         answer.setSessionTransacted(false);
 
         // other optional properties
-        if (endpoint.getExceptionListener() != null) {
-            answer.setExceptionListener(endpoint.getExceptionListener());
-        }
-        if (endpoint.getErrorHandler() != null) {
-            answer.setErrorHandler(endpoint.getErrorHandler());
-        } else {
-            answer.setErrorHandler(new DefaultSpringErrorHandler(
-                    endpoint.getCamelContext(), QueueReplyManager.class, endpoint.getErrorHandlerLoggingLevel(),
-                    endpoint.isErrorHandlerLogStackTrace()));
-        }
+        setOptionalProperties(answer);
         // set task executor
         if (endpoint.getTaskExecutor() != null) {
             log.debug("Using custom TaskExecutor: {} on listener container: {}", endpoint.getTaskExecutor(), answer);
@@ -302,16 +287,7 @@ public class QueueReplyManager extends ReplyManagerSupport {
         answer.setSessionTransacted(false);
 
         // other optional properties
-        if (endpoint.getExceptionListener() != null) {
-            answer.setExceptionListener(endpoint.getExceptionListener());
-        }
-        if (endpoint.getErrorHandler() != null) {
-            answer.setErrorHandler(endpoint.getErrorHandler());
-        } else {
-            answer.setErrorHandler(new DefaultSpringErrorHandler(
-                    endpoint.getCamelContext(), QueueReplyManager.class, endpoint.getErrorHandlerLoggingLevel(),
-                    endpoint.isErrorHandlerLogStackTrace()));
-        }
+        setOptionalProperties(answer);
         if (endpoint.getReceiveTimeout() >= 0) {
             answer.setReceiveTimeout(endpoint.getReceiveTimeout());
         }
@@ -344,4 +320,17 @@ public class QueueReplyManager extends ReplyManagerSupport {
 
         return answer;
     }
+
+    private <T extends AbstractMessageListenerContainer> void setOptionalProperties(T answer) {
+        if (endpoint.getExceptionListener() != null) {
+            answer.setExceptionListener(endpoint.getExceptionListener());
+        }
+        if (endpoint.getErrorHandler() != null) {
+            answer.setErrorHandler(endpoint.getErrorHandler());
+        } else {
+            answer.setErrorHandler(new DefaultSpringErrorHandler(
+                    endpoint.getCamelContext(), QueueReplyManager.class, endpoint.getErrorHandlerLoggingLevel(),
+                    endpoint.isErrorHandlerLogStackTrace()));
+        }
+    }
 }
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
index ef91941f669..8118c78ce27 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
@@ -35,6 +35,7 @@ import org.apache.camel.component.jms.JmsConstants;
 import org.apache.camel.component.jms.JmsEndpoint;
 import org.apache.camel.component.jms.JmsMessage;
 import org.apache.camel.component.jms.JmsMessageHelper;
+import org.apache.camel.component.jms.MessageListenerContainerFactory;
 import org.apache.camel.support.ExchangeHelper;
 import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.support.service.ServiceSupport;
@@ -313,4 +314,13 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl
             answer.setClientId(clientId);
         }
     }
+
+    protected static AbstractMessageListenerContainer getAbstractMessageListenerContainer(JmsEndpoint endpoint) {
+        MessageListenerContainerFactory factory = endpoint.getConfiguration().getMessageListenerContainerFactory();
+        if (factory != null) {
+            return factory.createMessageListenerContainer(endpoint);
+        }
+        throw new IllegalArgumentException(
+                "ReplyToConsumerType.Custom requires that a MessageListenerContainerFactory has been configured");
+    }
 }
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueMessageListenerContainer.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueMessageListenerContainer.java
index 0e9179de121..39bf18ad1d1 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueMessageListenerContainer.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueMessageListenerContainer.java
@@ -63,19 +63,10 @@ public class SharedQueueMessageListenerContainer extends DefaultJmsMessageListen
         setCacheLevel(DefaultMessageListenerContainer.CACHE_SESSION);
     }
 
+    // override this method and return the appropriate selector
     @Override
     public String getMessageSelector() {
-        // override this method and return the appropriate selector
-        String id = null;
-        if (fixedMessageSelector != null) {
-            id = fixedMessageSelector;
-        } else if (creator != null) {
-            id = creator.get();
-        }
-        if (logger.isTraceEnabled()) {
-            logger.trace("Using MessageSelector[" + id + "]");
-        }
-        return id;
+        return JmsReplyHelper.getMessageSelector(fixedMessageSelector, creator);
     }
 
 }
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueSimpleMessageListenerContainer.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueSimpleMessageListenerContainer.java
index e9931b8fa94..f145fb3e797 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueSimpleMessageListenerContainer.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueSimpleMessageListenerContainer.java
@@ -57,19 +57,10 @@ public class SharedQueueSimpleMessageListenerContainer extends SimpleJmsMessageL
         this.creator = creator;
     }
 
+    // override this method and return the appropriate selector
     @Override
     public String getMessageSelector() {
-        // override this method and return the appropriate selector
-        String id = null;
-        if (fixedMessageSelector != null) {
-            id = fixedMessageSelector;
-        } else if (creator != null) {
-            id = creator.get();
-        }
-        if (logger.isTraceEnabled()) {
-            logger.trace("Using MessageSelector[" + id + "]");
-        }
-        return id;
+        return JmsReplyHelper.getMessageSelector(fixedMessageSelector, creator);
     }
 
 }
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
index 2557f14e9e9..ba5d4a22e42 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
@@ -31,7 +31,6 @@ import org.apache.camel.Exchange;
 import org.apache.camel.component.jms.ConsumerType;
 import org.apache.camel.component.jms.DefaultJmsMessageListenerContainer;
 import org.apache.camel.component.jms.DefaultSpringErrorHandler;
-import org.apache.camel.component.jms.MessageListenerContainerFactory;
 import org.apache.camel.component.jms.SimpleJmsMessageListenerContainer;
 import org.springframework.jms.listener.AbstractMessageListenerContainer;
 import org.springframework.jms.listener.DefaultMessageListenerContainer;
@@ -107,12 +106,7 @@ public class TemporaryQueueReplyManager extends ReplyManagerSupport {
         } else if (endpoint.getConfiguration().getReplyToConsumerType() == ConsumerType.Simple) {
             return createSimpleListenerContainer();
         } else {
-            MessageListenerContainerFactory factory = endpoint.getConfiguration().getMessageListenerContainerFactory();
-            if (factory != null) {
-                return factory.createMessageListenerContainer(endpoint);
-            }
-            throw new IllegalArgumentException(
-                    "ReplyToConsumerType.Custom requires that a MessageListenerContainerFactory has been configured");
+            return getAbstractMessageListenerContainer(endpoint);
         }
     }