You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2024/01/27 20:55:47 UTC
(camel) 01/03: (chores) camel-jms: cleaned up duplicated code
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 89c2bd2327d8a64129ac613e5602ebcf3e646515
Author: Otavio R. Piske <an...@gmail.com>
AuthorDate: Sat Jan 27 20:21:04 2024 +0100
(chores) camel-jms: cleaned up duplicated code
Signed-off-by: Otavio R. Piske <an...@gmail.com>
---
.../apache/camel/component/jms/JmsProducer.java | 27 ++++++++------
.../component/jms/StreamMessageInputStream.java | 20 +++-------
.../camel/component/jms/reply/JmsReplyHelper.java | 41 +++++++++++++++++++++
.../component/jms/reply/QueueReplyManager.java | 43 ++++++++--------------
.../component/jms/reply/ReplyManagerSupport.java | 10 +++++
.../reply/SharedQueueMessageListenerContainer.java | 13 +------
.../SharedQueueSimpleMessageListenerContainer.java | 13 +------
.../jms/reply/TemporaryQueueReplyManager.java | 8 +---
8 files changed, 92 insertions(+), 83 deletions(-)
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
index ca330c957aa..bb51ff9f97f 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
@@ -578,12 +578,7 @@ public class JmsProducer extends DefaultAsyncProducer {
name = "JmsReplyManagerOnTimeout[" + getEndpoint().getEndpointConfiguredDestinationName() + "]";
// allow the timeout thread to timeout so during normal operation we do not have a idle thread
- int max = getEndpoint().getReplyToOnTimeoutMaxConcurrentConsumers();
- if (max <= 0) {
- throw new IllegalArgumentException("The option replyToOnTimeoutMaxConcurrentConsumers must be >= 1");
- }
- ExecutorService replyManagerExecutorService
- = getEndpoint().getCamelContext().getExecutorServiceManager().newThreadPool(replyManager, name, 0, max);
+ ExecutorService replyManagerExecutorService = createReplyManagerExecutorService(replyManager, name);
replyManager.setOnTimeoutExecutorService(replyManagerExecutorService);
ServiceHelper.startService(replyManager);
@@ -591,6 +586,19 @@ public class JmsProducer extends DefaultAsyncProducer {
return replyManager;
}
+ private ExecutorService createReplyManagerExecutorService(ReplyManager replyManager, String name) {
+ int max = doGetMax();
+ return getEndpoint().getCamelContext().getExecutorServiceManager().newThreadPool(replyManager, name, 0, max);
+ }
+
+ private int doGetMax() {
+ int max = getEndpoint().getReplyToOnTimeoutMaxConcurrentConsumers();
+ if (max <= 0) {
+ throw new IllegalArgumentException("The option replyToOnTimeoutMaxConcurrentConsumers must be >= 1");
+ }
+ return max;
+ }
+
protected ReplyManager createReplyManager(String replyTo) throws Exception {
// use a regular queue
ReplyManager replyManager = new QueueReplyManager(getEndpoint().getCamelContext());
@@ -603,12 +611,7 @@ public class JmsProducer extends DefaultAsyncProducer {
name = "JmsReplyManagerOnTimeout[" + replyTo + "]";
// allow the timeout thread to timeout so during normal operation we do not have a idle thread
- int max = getEndpoint().getReplyToOnTimeoutMaxConcurrentConsumers();
- if (max <= 0) {
- throw new IllegalArgumentException("The option replyToOnTimeoutMaxConcurrentConsumers must be >= 1");
- }
- ExecutorService replyManagerExecutorService
- = getEndpoint().getCamelContext().getExecutorServiceManager().newThreadPool(replyManager, name, 0, max);
+ ExecutorService replyManagerExecutorService = createReplyManagerExecutorService(replyManager, name);
replyManager.setOnTimeoutExecutorService(replyManagerExecutorService);
ServiceHelper.startService(replyManager);
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/StreamMessageInputStream.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/StreamMessageInputStream.java
index 23016719aa3..72295b24abc 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/StreamMessageInputStream.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/StreamMessageInputStream.java
@@ -46,6 +46,10 @@ public class StreamMessageInputStream extends InputStream {
@Override
public int read(byte[] array) throws IOException {
+ return doRead(array);
+ }
+
+ private int doRead(byte[] array) throws IOException {
try {
int num = message.readBytes(array);
if (num < 0) {
@@ -66,21 +70,7 @@ public class StreamMessageInputStream extends InputStream {
@Override
public int read(byte[] array, int off, int len) throws IOException {
// we cannot honor off and len, but assuming off is always 0
- try {
- int num = message.readBytes(array);
- if (num < 0) {
- //the first 128K(FileUtil.BUFFER_SIZE/128K is used when sending JMS StreamMessage)
- //buffer reached, give a chance to see if there is the next 128K buffer
- num = message.readBytes(array);
- }
- eof = num < 0;
- return num;
- } catch (MessageEOFException e) {
- eof = true;
- return -1;
- } catch (JMSException e) {
- throw new IOException(e);
- }
+ return doRead(array);
}
@Override
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/JmsReplyHelper.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/JmsReplyHelper.java
new file mode 100644
index 00000000000..b5d2efd885d
--- /dev/null
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/JmsReplyHelper.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jms.reply;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class JmsReplyHelper {
+ private static final Logger LOG = LoggerFactory.getLogger(JmsReplyHelper.class);
+
+ private JmsReplyHelper() {
+
+ }
+
+ static String getMessageSelector(String fixedMessageSelector, MessageSelectorCreator creator) {
+ // override this method and return the appropriate selector
+ String id = null;
+ if (fixedMessageSelector != null) {
+ id = fixedMessageSelector;
+ } else if (creator != null) {
+ id = creator.get();
+ }
+
+ LOG.trace("Using MessageSelector[{}]", id);
+ return id;
+ }
+}
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java
index b1d819a1ba8..dda1293838f 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java
@@ -29,7 +29,6 @@ import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.component.jms.ConsumerType;
import org.apache.camel.component.jms.DefaultSpringErrorHandler;
-import org.apache.camel.component.jms.MessageListenerContainerFactory;
import org.apache.camel.component.jms.ReplyToType;
import org.apache.camel.component.jms.SimpleJmsMessageListenerContainer;
import org.springframework.jms.listener.AbstractMessageListenerContainer;
@@ -130,12 +129,7 @@ public class QueueReplyManager extends ReplyManagerSupport {
} else if (endpoint.getConfiguration().getReplyToConsumerType() == ConsumerType.Simple) {
return createSimpleListenerContainer();
} else {
- MessageListenerContainerFactory factory = endpoint.getConfiguration().getMessageListenerContainerFactory();
- if (factory != null) {
- return factory.createMessageListenerContainer(endpoint);
- }
- throw new IllegalArgumentException(
- "ReplyToConsumerType.Custom requires that a MessageListenerContainerFactory has been configured");
+ return getAbstractMessageListenerContainer(endpoint);
}
}
@@ -194,16 +188,7 @@ public class QueueReplyManager extends ReplyManagerSupport {
answer.setSessionTransacted(false);
// other optional properties
- if (endpoint.getExceptionListener() != null) {
- answer.setExceptionListener(endpoint.getExceptionListener());
- }
- if (endpoint.getErrorHandler() != null) {
- answer.setErrorHandler(endpoint.getErrorHandler());
- } else {
- answer.setErrorHandler(new DefaultSpringErrorHandler(
- endpoint.getCamelContext(), QueueReplyManager.class, endpoint.getErrorHandlerLoggingLevel(),
- endpoint.isErrorHandlerLogStackTrace()));
- }
+ setOptionalProperties(answer);
// set task executor
if (endpoint.getTaskExecutor() != null) {
log.debug("Using custom TaskExecutor: {} on listener container: {}", endpoint.getTaskExecutor(), answer);
@@ -302,16 +287,7 @@ public class QueueReplyManager extends ReplyManagerSupport {
answer.setSessionTransacted(false);
// other optional properties
- if (endpoint.getExceptionListener() != null) {
- answer.setExceptionListener(endpoint.getExceptionListener());
- }
- if (endpoint.getErrorHandler() != null) {
- answer.setErrorHandler(endpoint.getErrorHandler());
- } else {
- answer.setErrorHandler(new DefaultSpringErrorHandler(
- endpoint.getCamelContext(), QueueReplyManager.class, endpoint.getErrorHandlerLoggingLevel(),
- endpoint.isErrorHandlerLogStackTrace()));
- }
+ setOptionalProperties(answer);
if (endpoint.getReceiveTimeout() >= 0) {
answer.setReceiveTimeout(endpoint.getReceiveTimeout());
}
@@ -344,4 +320,17 @@ public class QueueReplyManager extends ReplyManagerSupport {
return answer;
}
+
+ private <T extends AbstractMessageListenerContainer> void setOptionalProperties(T answer) {
+ if (endpoint.getExceptionListener() != null) {
+ answer.setExceptionListener(endpoint.getExceptionListener());
+ }
+ if (endpoint.getErrorHandler() != null) {
+ answer.setErrorHandler(endpoint.getErrorHandler());
+ } else {
+ answer.setErrorHandler(new DefaultSpringErrorHandler(
+ endpoint.getCamelContext(), QueueReplyManager.class, endpoint.getErrorHandlerLoggingLevel(),
+ endpoint.isErrorHandlerLogStackTrace()));
+ }
+ }
}
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
index ef91941f669..8118c78ce27 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
@@ -35,6 +35,7 @@ import org.apache.camel.component.jms.JmsConstants;
import org.apache.camel.component.jms.JmsEndpoint;
import org.apache.camel.component.jms.JmsMessage;
import org.apache.camel.component.jms.JmsMessageHelper;
+import org.apache.camel.component.jms.MessageListenerContainerFactory;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.support.service.ServiceSupport;
@@ -313,4 +314,13 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements Repl
answer.setClientId(clientId);
}
}
+
+ protected static AbstractMessageListenerContainer getAbstractMessageListenerContainer(JmsEndpoint endpoint) {
+ MessageListenerContainerFactory factory = endpoint.getConfiguration().getMessageListenerContainerFactory();
+ if (factory != null) {
+ return factory.createMessageListenerContainer(endpoint);
+ }
+ throw new IllegalArgumentException(
+ "ReplyToConsumerType.Custom requires that a MessageListenerContainerFactory has been configured");
+ }
}
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueMessageListenerContainer.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueMessageListenerContainer.java
index 0e9179de121..39bf18ad1d1 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueMessageListenerContainer.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueMessageListenerContainer.java
@@ -63,19 +63,10 @@ public class SharedQueueMessageListenerContainer extends DefaultJmsMessageListen
setCacheLevel(DefaultMessageListenerContainer.CACHE_SESSION);
}
+ // override this method and return the appropriate selector
@Override
public String getMessageSelector() {
- // override this method and return the appropriate selector
- String id = null;
- if (fixedMessageSelector != null) {
- id = fixedMessageSelector;
- } else if (creator != null) {
- id = creator.get();
- }
- if (logger.isTraceEnabled()) {
- logger.trace("Using MessageSelector[" + id + "]");
- }
- return id;
+ return JmsReplyHelper.getMessageSelector(fixedMessageSelector, creator);
}
}
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueSimpleMessageListenerContainer.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueSimpleMessageListenerContainer.java
index e9931b8fa94..f145fb3e797 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueSimpleMessageListenerContainer.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueSimpleMessageListenerContainer.java
@@ -57,19 +57,10 @@ public class SharedQueueSimpleMessageListenerContainer extends SimpleJmsMessageL
this.creator = creator;
}
+ // override this method and return the appropriate selector
@Override
public String getMessageSelector() {
- // override this method and return the appropriate selector
- String id = null;
- if (fixedMessageSelector != null) {
- id = fixedMessageSelector;
- } else if (creator != null) {
- id = creator.get();
- }
- if (logger.isTraceEnabled()) {
- logger.trace("Using MessageSelector[" + id + "]");
- }
- return id;
+ return JmsReplyHelper.getMessageSelector(fixedMessageSelector, creator);
}
}
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
index 2557f14e9e9..ba5d4a22e42 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
@@ -31,7 +31,6 @@ import org.apache.camel.Exchange;
import org.apache.camel.component.jms.ConsumerType;
import org.apache.camel.component.jms.DefaultJmsMessageListenerContainer;
import org.apache.camel.component.jms.DefaultSpringErrorHandler;
-import org.apache.camel.component.jms.MessageListenerContainerFactory;
import org.apache.camel.component.jms.SimpleJmsMessageListenerContainer;
import org.springframework.jms.listener.AbstractMessageListenerContainer;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
@@ -107,12 +106,7 @@ public class TemporaryQueueReplyManager extends ReplyManagerSupport {
} else if (endpoint.getConfiguration().getReplyToConsumerType() == ConsumerType.Simple) {
return createSimpleListenerContainer();
} else {
- MessageListenerContainerFactory factory = endpoint.getConfiguration().getMessageListenerContainerFactory();
- if (factory != null) {
- return factory.createMessageListenerContainer(endpoint);
- }
- throw new IllegalArgumentException(
- "ReplyToConsumerType.Custom requires that a MessageListenerContainerFactory has been configured");
+ return getAbstractMessageListenerContainer(endpoint);
}
}