You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2024/01/27 20:55:46 UTC

(camel) branch main updated (c91f819141f -> 00d7093be2f)

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


    from c91f819141f Regen
     new 89c2bd2327d (chores) camel-jms: cleaned up duplicated code
     new 3084fa4c86b (chores) camel-jms: minor code cleanups
     new 00d7093be2f (chores) camel-jms: simplify overly complex methods

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../jms/DefaultJmsMessageListenerContainer.java    | 36 ++++++++++-------
 .../camel/component/jms/JmsConfiguration.java      |  4 +-
 .../apache/camel/component/jms/JmsProducer.java    | 27 +++++++------
 .../camel/component/jms/JmsSendDynamicAware.java   |  2 +-
 .../component/jms/StreamMessageInputStream.java    | 20 +++-------
 .../camel/component/jms/reply/JmsReplyHelper.java} | 26 ++++++------
 .../component/jms/reply/QueueReplyManager.java     | 46 ++++++++--------------
 .../component/jms/reply/ReplyManagerSupport.java   | 10 +++++
 .../reply/SharedQueueMessageListenerContainer.java | 13 +-----
 .../SharedQueueSimpleMessageListenerContainer.java | 13 +-----
 .../jms/reply/TemporaryQueueReplyManager.java      |  8 +---
 11 files changed, 91 insertions(+), 114 deletions(-)
 copy components/{camel-xmpp/src/main/java/org/apache/camel/component/xmpp/XmppLogger.java => camel-jms/src/main/java/org/apache/camel/component/jms/reply/JmsReplyHelper.java} (59%)


(camel) 03/03: (chores) camel-jms: simplify overly complex methods

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 00d7093be2fd37f5ba3eeebbe94a7514bbbf61f5
Author: Otavio R. Piske <an...@gmail.com>
AuthorDate: Sat Jan 27 20:25:51 2024 +0100

    (chores) camel-jms: simplify overly complex methods
    
    Signed-off-by: Otavio R. Piske <an...@gmail.com>
---
 .../jms/DefaultJmsMessageListenerContainer.java    | 36 +++++++++++++---------
 1 file changed, 22 insertions(+), 14 deletions(-)

diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java
index 4cf0c4a8ede..cb89cc573c3 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultJmsMessageListenerContainer.java
@@ -97,27 +97,35 @@ public class DefaultJmsMessageListenerContainer extends DefaultMessageListenerCo
         TaskExecutor answer;
 
         if (endpoint.getDefaultTaskExecutorType() == DefaultTaskExecutorType.ThreadPool) {
-            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
-            executor.setBeanName(beanName);
-            executor.setThreadFactory(new CamelThreadFactory(pattern, beanName, true));
-            executor.setCorePoolSize(endpoint.getConcurrentConsumers());
-            // Direct hand-off mode. Do not queue up tasks: assign it to a thread immediately.
-            // We set no upper-bound on the thread pool (no maxPoolSize) as it's already implicitly constrained by
-            // maxConcurrentConsumers on the DMLC itself (i.e. DMLC will only grow up to a level of concurrency as
-            // defined by maxConcurrentConsumers).
-            executor.setQueueCapacity(0);
-            executor.initialize();
-            answer = executor;
+            answer = createThreadPoolExecutor(beanName, pattern);
         } else {
-            SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor(beanName);
-            executor.setThreadFactory(new CamelThreadFactory(pattern, beanName, true));
-            answer = executor;
+            answer = createAsyncTaskExecutor(beanName, pattern);
         }
 
         taskExecutor = answer;
         return answer;
     }
 
+    private static TaskExecutor createAsyncTaskExecutor(String beanName, String pattern) {
+        SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor(beanName);
+        executor.setThreadFactory(new CamelThreadFactory(pattern, beanName, true));
+        return executor;
+    }
+
+    private TaskExecutor createThreadPoolExecutor(String beanName, String pattern) {
+        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+        executor.setBeanName(beanName);
+        executor.setThreadFactory(new CamelThreadFactory(pattern, beanName, true));
+        executor.setCorePoolSize(endpoint.getConcurrentConsumers());
+        // Direct hand-off mode. Do not queue up tasks: assign it to a thread immediately.
+        // We set no upper-bound on the thread pool (no maxPoolSize) as it's already implicitly constrained by
+        // maxConcurrentConsumers on the DMLC itself (i.e. DMLC will only grow up to a level of concurrency as
+        // defined by maxConcurrentConsumers).
+        executor.setQueueCapacity(0);
+        executor.initialize();
+        return executor;
+    }
+
     @Override
     public void stop() throws JmsException {
         if (logger.isDebugEnabled()) {


(camel) 02/03: (chores) camel-jms: minor code cleanups

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 3084fa4c86bff0d9879125aadd030a8529f17955
Author: Otavio R. Piske <an...@gmail.com>
AuthorDate: Sat Jan 27 20:22:22 2024 +0100

    (chores) camel-jms: minor code cleanups
    
    - removed unused throws declaration
    - use isEmpty
    - use final fields when possible
    - reduced unnecessary field visibility
    
    Signed-off-by: Otavio R. Piske <an...@gmail.com>
---
 .../main/java/org/apache/camel/component/jms/JmsConfiguration.java    | 4 ++--
 .../main/java/org/apache/camel/component/jms/JmsSendDynamicAware.java | 2 +-
 .../java/org/apache/camel/component/jms/reply/QueueReplyManager.java  | 3 +--
 3 files changed, 4 insertions(+), 5 deletions(-)

diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
index 74e27a5b011..433e386bc6e 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
@@ -557,7 +557,7 @@ public class JmsConfiguration implements Cloneable {
     }
 
     public static class CamelJmsTemplate extends JmsTemplate {
-        private JmsConfiguration config;
+        private final JmsConfiguration config;
 
         public CamelJmsTemplate(JmsConfiguration config, ConnectionFactory connectionFactory) {
             super(connectionFactory);
@@ -1621,7 +1621,7 @@ public class JmsConfiguration implements Cloneable {
             }
         }
 
-        if (endpoint.getSelector() != null && endpoint.getSelector().length() != 0) {
+        if (endpoint.getSelector() != null && !endpoint.getSelector().isEmpty()) {
             container.setMessageSelector(endpoint.getSelector());
         }
 
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsSendDynamicAware.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsSendDynamicAware.java
index d5877c83aa9..4e7aa1b2f4a 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsSendDynamicAware.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsSendDynamicAware.java
@@ -90,7 +90,7 @@ public class JmsSendDynamicAware extends ServiceSupport implements SendDynamicAw
         final String destinationName = parseDestinationName(entry.getUri());
         return new Processor() {
             @Override
-            public void process(Exchange exchange) throws Exception {
+            public void process(Exchange exchange) {
                 exchange.getMessage().setHeader(JmsConstants.JMS_DESTINATION_NAME, destinationName);
             }
         };
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java
index dda1293838f..c0697cf329d 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java
@@ -41,7 +41,6 @@ import org.springframework.jms.support.destination.DestinationResolver;
 public class QueueReplyManager extends ReplyManagerSupport {
 
     private String replyToSelectorValue;
-    private MessageSelectorCreator dynamicMessageSelector;
 
     public QueueReplyManager(CamelContext camelContext) {
         super(camelContext);
@@ -238,7 +237,7 @@ public class QueueReplyManager extends ReplyManagerSupport {
                         endpoint.getReplyTo(), fixedMessageSelector, answer);
             } else {
                 // use a dynamic message selector which will select the message we want to receive as reply
-                dynamicMessageSelector = new MessageSelectorCreator(correlation);
+                MessageSelectorCreator dynamicMessageSelector = new MessageSelectorCreator(correlation);
                 answer = new SharedQueueMessageListenerContainer(endpoint, dynamicMessageSelector);
                 log.debug("Using shared queue: {} with dynamic message selector as reply listener: {}", endpoint.getReplyTo(),
                         answer);


(camel) 01/03: (chores) camel-jms: cleaned up duplicated code

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 89c2bd2327d8a64129ac613e5602ebcf3e646515
Author: Otavio R. Piske <an...@gmail.com>
AuthorDate: Sat Jan 27 20:21:04 2024 +0100

    (chores) camel-jms: cleaned up duplicated code
    
    Signed-off-by: Otavio R. Piske <an...@gmail.com>
---
 .../apache/camel/component/jms/JmsProducer.java    | 27 ++++++++------
 .../component/jms/StreamMessageInputStream.java    | 20 +++-------
 .../camel/component/jms/reply/JmsReplyHelper.java  | 41 +++++++++++++++++++++
 .../component/jms/reply/QueueReplyManager.java     | 43 ++++++++--------------
 .../component/jms/reply/ReplyManagerSupport.java   | 10 +++++
 .../reply/SharedQueueMessageListenerContainer.java | 13 +------
 .../SharedQueueSimpleMessageListenerContainer.java | 13 +------
 .../jms/reply/TemporaryQueueReplyManager.java      |  8 +---
 8 files changed, 92 insertions(+), 83 deletions(-)

diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
index ca330c957aa..bb51ff9f97f 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
@@ -578,12 +578,7 @@ public class JmsProducer extends DefaultAsyncProducer {
 
         name = "JmsReplyManagerOnTimeout[" + getEndpoint().getEndpointConfiguredDestinationName() + "]";
         // allow the timeout thread to timeout so during normal operation we do not have a idle thread
-        int max = getEndpoint().getReplyToOnTimeoutMaxConcurrentConsumers();
-        if (max <= 0) {
-            throw new IllegalArgumentException("The option replyToOnTimeoutMaxConcurrentConsumers must be >= 1");
-        }
-        ExecutorService replyManagerExecutorService
-                = getEndpoint().getCamelContext().getExecutorServiceManager().newThreadPool(replyManager, name, 0, max);
+        ExecutorService replyManagerExecutorService = createReplyManagerExecutorService(replyManager, name);
         replyManager.setOnTimeoutExecutorService(replyManagerExecutorService);
 
         ServiceHelper.startService(replyManager);
@@ -591,6 +586,19 @@ public class JmsProducer extends DefaultAsyncProducer {
         return replyManager;
     }
 
+    private ExecutorService createReplyManagerExecutorService(ReplyManager replyManager, String name) {
+        int max = doGetMax();
+        return getEndpoint().getCamelContext().getExecutorServiceManager().newThreadPool(replyManager, name, 0, max);
+    }
+
+    private int doGetMax() {
+        int max = getEndpoint().getReplyToOnTimeoutMaxConcurrentConsumers();
+        if (max <= 0) {
+            throw new IllegalArgumentException("The option replyToOnTimeoutMaxConcurrentConsumers must be >= 1");
+        }
+        return max;
+    }
+
     protected ReplyManager createReplyManager(String replyTo) throws Exception {
         // use a regular queue
         ReplyManager replyManager = new QueueReplyManager(getEndpoint().getCamelContext());
@@ -603,12 +611,7 @@ public class JmsProducer extends DefaultAsyncProducer {
 
         name = "JmsReplyManagerOnTimeout[" + replyTo + "]";
         // allow the timeout thread to timeout so during normal operation we do not have a idle thread
-        int max = getEndpoint().getReplyToOnTimeoutMaxConcurrentConsumers();
-        if (max <= 0) {
-            throw new IllegalArgumentException("The option replyToOnTimeoutMaxConcurrentConsumers must be >= 1");
-        }
-        ExecutorService replyManagerExecutorService
-                = getEndpoint().getCamelContext().getExecutorServiceManager().newThreadPool(replyManager, name, 0, max);
+        ExecutorService replyManagerExecutorService = createReplyManagerExecutorService(replyManager, name);
         replyManager.setOnTimeoutExecutorService(replyManagerExecutorService);
 
         ServiceHelper.startService(replyManager);
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/StreamMessageInputStream.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/StreamMessageInputStream.java
index 23016719aa3..72295b24abc 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/StreamMessageInputStream.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/StreamMessageInputStream.java
@@ -46,6 +46,10 @@ public class StreamMessageInputStream extends InputStream {
 
     @Override
     public int read(byte[] array) throws IOException {
+        return doRead(array);
+    }
+
+    private int doRead(byte[] array) throws IOException {
         try {
             int num = message.readBytes(array);
             if (num < 0) {
@@ -66,21 +70,7 @@ public class StreamMessageInputStream extends InputStream {
     @Override
     public int read(byte[] array, int off, int len) throws IOException {
         // we cannot honor off and len, but assuming off is always 0
-        try {
-            int num = message.readBytes(array);
-            if (num < 0) {
-                //the first 128K(FileUtil.BUFFER_SIZE/128K is used when sending JMS StreamMessage)
-                //buffer reached, give a chance to see if there is the next 128K buffer
-                num = message.readBytes(array);
-            }
-            eof = num < 0;
-            return num;
-        } catch (MessageEOFException e) {
-            eof = true;
-            return -1;
-        } catch (JMSException e) {
-            throw new IOException(e);
-        }
+        return doRead(array);
     }
 
     @Override
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/JmsReplyHelper.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/JmsReplyHelper.java
new file mode 100644
index 00000000000..b5d2efd885d
--- /dev/null
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/JmsReplyHelper.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms.reply;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class JmsReplyHelper {
+    private static final Logger LOG = LoggerFactory.getLogger(JmsReplyHelper.class);
+
+    private JmsReplyHelper() {
+
+    }
+
+    static String getMessageSelector(String fixedMessageSelector, MessageSelectorCreator creator) {
+        // override this method and return the appropriate selector
+        String id = null;
+        if (fixedMessageSelector != null) {
+            id = fixedMessageSelector;
+        } else if (creator != null) {
+            id = creator.get();
+        }
+
+        LOG.trace("Using MessageSelector[{}]", id);
+        return id;
+    }
+}
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java
index b1d819a1ba8..dda1293838f 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java
@@ -29,7 +29,6 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.component.jms.ConsumerType;
 import org.apache.camel.component.jms.DefaultSpringErrorHandler;
-import org.apache.camel.component.jms.MessageListenerContainerFactory;
 import org.apache.camel.component.jms.ReplyToType;
 import org.apache.camel.component.jms.SimpleJmsMessageListenerContainer;
 import org.springframework.jms.listener.AbstractMessageListenerContainer;
@@ -130,12 +129,7 @@ public class QueueReplyManager extends ReplyManagerSupport {
         } else if (endpoint.getConfiguration().getReplyToConsumerType() == ConsumerType.Simple) {
             return createSimpleListenerContainer();
         } else {
-            MessageListenerContainerFactory factory = endpoint.getConfiguration().getMessageListenerContainerFactory();
-            if (factory != null) {
-                return factory.createMessageListenerContainer(endpoint);
-            }
-            throw new IllegalArgumentException(
-                    "ReplyToConsumerType.Custom requires that a MessageListenerContainerFactory has been configured");
+            return getAbstractMessageListenerContainer(endpoint);
         }
     }
 
@@ -194,16 +188,7 @@ public class QueueReplyManager extends ReplyManagerSupport {
         answer.setSessionTransacted(false);
 
         // other optional properties
-        if (endpoint.getExceptionListener() != null) {
-            answer.setExceptionListener(endpoint.getExceptionListener());
-        }
-        if (endpoint.getErrorHandler() != null) {
-            answer.setErrorHandler(endpoint.getErrorHandler());
-        } else {
-            answer.setErrorHandler(new DefaultSpringErrorHandler(
-                    endpoint.getCamelContext(), QueueReplyManager.class, endpoint.getErrorHandlerLoggingLevel(),
-                    endpoint.isErrorHandlerLogStackTrace()));
-        }
+        setOptionalProperties(answer);
         // set task executor
         if (endpoint.getTaskExecutor() != null) {
             log.debug("Using custom TaskExecutor: {} on listener container: {}", endpoint.getTaskExecutor(), answer);
@@ -302,16 +287,7 @@ public class QueueReplyManager extends ReplyManagerSupport {
         answer.setSessionTransacted(false);
 
         // other optional properties
-        if (endpoint.getExceptionListener() != null) {
-            answer.setExceptionListener(endpoint.getExceptionListener());
-        }
-        if (endpoint.getErrorHandler() != null) {
-            answer.setErrorHandler(endpoint.getErrorHandler());
-        } else {
-            answer.setErrorHandler(new DefaultSpringErrorHandler(
-                    endpoint.getCamelContext(), QueueReplyManager.class, endpoint.getErrorHandlerLoggingLevel(),
-                    endpoint.isErrorHandlerLogStackTrace()));
-        }
+        setOptionalProperties(answer);
         if (endpoint.getReceiveTimeout() >= 0) {
             answer.setReceiveTimeout(endpoint.getReceiveTimeout());
         }
@@ -344,4 +320,17 @@ public class QueueReplyManager extends ReplyManagerSupport {
 
         return answer;
     }
+
+    private <T extends AbstractMessageListenerContainer> void setOptionalProperties(T answer) {
+        if (endpoint.getExceptionListener() != null) {
+            answer.setExceptionListener(endpoint.getExceptionListener());
+        }
+        if (endpoint.getErrorHandler() != null) {
+            answer.setErrorHandler(endpoint.getErrorHandler());
+        } else {
+            answer.setErrorHandler(new DefaultSpringErrorHandler(
+                    endpoint.getCamelContext(), QueueReplyManager.class, endpoint.getErrorHandlerLoggingLevel(),
+                    endpoint.isErrorHandlerLogStackTrace()));
+        }
+    }
 }
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
index ef91941f669..8118c78ce27 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
@@ -35,6 +35,7 @@ import org.apache.camel.component.jms.JmsConstants;
 import org.apache.camel.component.jms.JmsEndpoint;
 import org.apache.camel.component.jms.JmsMessage;
 import org.apache.camel.component.jms.JmsMessageHelper;
+import org.apache.camel.component.jms.MessageListenerContainerFactory;
 import org.apache.camel.support.ExchangeHelper;
 import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.support.service.ServiceSupport;
@@ -313,4 +314,13 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl
             answer.setClientId(clientId);
         }
     }
+
+    protected static AbstractMessageListenerContainer getAbstractMessageListenerContainer(JmsEndpoint endpoint) {
+        MessageListenerContainerFactory factory = endpoint.getConfiguration().getMessageListenerContainerFactory();
+        if (factory != null) {
+            return factory.createMessageListenerContainer(endpoint);
+        }
+        throw new IllegalArgumentException(
+                "ReplyToConsumerType.Custom requires that a MessageListenerContainerFactory has been configured");
+    }
 }
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueMessageListenerContainer.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueMessageListenerContainer.java
index 0e9179de121..39bf18ad1d1 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueMessageListenerContainer.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueMessageListenerContainer.java
@@ -63,19 +63,10 @@ public class SharedQueueMessageListenerContainer extends DefaultJmsMessageListen
         setCacheLevel(DefaultMessageListenerContainer.CACHE_SESSION);
     }
 
+    // override this method and return the appropriate selector
     @Override
     public String getMessageSelector() {
-        // override this method and return the appropriate selector
-        String id = null;
-        if (fixedMessageSelector != null) {
-            id = fixedMessageSelector;
-        } else if (creator != null) {
-            id = creator.get();
-        }
-        if (logger.isTraceEnabled()) {
-            logger.trace("Using MessageSelector[" + id + "]");
-        }
-        return id;
+        return JmsReplyHelper.getMessageSelector(fixedMessageSelector, creator);
     }
 
 }
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueSimpleMessageListenerContainer.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueSimpleMessageListenerContainer.java
index e9931b8fa94..f145fb3e797 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueSimpleMessageListenerContainer.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueSimpleMessageListenerContainer.java
@@ -57,19 +57,10 @@ public class SharedQueueSimpleMessageListenerContainer extends SimpleJmsMessageL
         this.creator = creator;
     }
 
+    // override this method and return the appropriate selector
     @Override
     public String getMessageSelector() {
-        // override this method and return the appropriate selector
-        String id = null;
-        if (fixedMessageSelector != null) {
-            id = fixedMessageSelector;
-        } else if (creator != null) {
-            id = creator.get();
-        }
-        if (logger.isTraceEnabled()) {
-            logger.trace("Using MessageSelector[" + id + "]");
-        }
-        return id;
+        return JmsReplyHelper.getMessageSelector(fixedMessageSelector, creator);
     }
 
 }
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
index 2557f14e9e9..ba5d4a22e42 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
@@ -31,7 +31,6 @@ import org.apache.camel.Exchange;
 import org.apache.camel.component.jms.ConsumerType;
 import org.apache.camel.component.jms.DefaultJmsMessageListenerContainer;
 import org.apache.camel.component.jms.DefaultSpringErrorHandler;
-import org.apache.camel.component.jms.MessageListenerContainerFactory;
 import org.apache.camel.component.jms.SimpleJmsMessageListenerContainer;
 import org.springframework.jms.listener.AbstractMessageListenerContainer;
 import org.springframework.jms.listener.DefaultMessageListenerContainer;
@@ -107,12 +106,7 @@ public class TemporaryQueueReplyManager extends ReplyManagerSupport {
         } else if (endpoint.getConfiguration().getReplyToConsumerType() == ConsumerType.Simple) {
             return createSimpleListenerContainer();
         } else {
-            MessageListenerContainerFactory factory = endpoint.getConfiguration().getMessageListenerContainerFactory();
-            if (factory != null) {
-                return factory.createMessageListenerContainer(endpoint);
-            }
-            throw new IllegalArgumentException(
-                    "ReplyToConsumerType.Custom requires that a MessageListenerContainerFactory has been configured");
+            return getAbstractMessageListenerContainer(endpoint);
         }
     }