You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2024/01/28 17:56:16 UTC
(camel) 01/02: (chores) camel-sjms: cleaned up duplicated code
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
commit f8a17729d80a31f2bb5dad1485f4ac2307347ddb
Author: Otavio R. Piske <an...@gmail.com>
AuthorDate: Sun Jan 28 15:15:25 2024 +0100
(chores) camel-sjms: cleaned up duplicated code
Signed-off-by: Otavio R. Piske <an...@gmail.com>
---
.../apache/camel/component/sjms/SjmsProducer.java | 29 ++++---
.../sjms/consumer/EndpointMessageListener.java | 96 ++++++++++------------
2 files changed, 58 insertions(+), 67 deletions(-)
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
index aafe7c4feca..2a1b4874b01 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
@@ -168,12 +168,7 @@ public class SjmsProducer extends DefaultAsyncProducer {
name = "JmsReplyManagerOnTimeout[" + replyTo + "]";
// allow the timeout thread to timeout so during normal operation we do not have a idle thread
- int max = getEndpoint().getComponent().getReplyToOnTimeoutMaxConcurrentConsumers();
- if (max <= 0) {
- throw new IllegalArgumentException("The option replyToOnTimeoutMaxConcurrentConsumers must be >= 1");
- }
- ExecutorService replyManagerExecutorService
- = getEndpoint().getCamelContext().getExecutorServiceManager().newThreadPool(replyManager, name, 0, max);
+ ExecutorService replyManagerExecutorService = createReplyManagerExecutor(replyManager, name);
replyManager.setOnTimeoutExecutorService(replyManagerExecutorService);
ServiceHelper.startService(replyManager);
@@ -193,13 +188,7 @@ public class SjmsProducer extends DefaultAsyncProducer {
name = "JmsReplyManagerOnTimeout[" + getEndpoint().getEndpointConfiguredDestinationName() + "]";
// allow the timeout thread to timeout so during normal operation we do not have a idle thread
- int max = getEndpoint().getComponent().getReplyToOnTimeoutMaxConcurrentConsumers();
- if (max <= 0) {
- throw new IllegalArgumentException("The option replyToOnTimeoutMaxConcurrentConsumers must be >= 1");
- }
- ExecutorService replyManagerExecutorService
- = getEndpoint().getCamelContext().getExecutorServiceManager().newThreadPool(temporaryQueueReplyManager, name, 0,
- max);
+ ExecutorService replyManagerExecutorService = createReplyManagerExecutor(temporaryQueueReplyManager, name);
temporaryQueueReplyManager.setOnTimeoutExecutorService(replyManagerExecutorService);
ServiceHelper.startService(temporaryQueueReplyManager);
@@ -207,6 +196,20 @@ public class SjmsProducer extends DefaultAsyncProducer {
return temporaryQueueReplyManager;
}
+ private ExecutorService createReplyManagerExecutor(ReplyManager temporaryQueueReplyManager, String name) {
+ int max = doGetMax();
+ return getEndpoint().getCamelContext().getExecutorServiceManager().newThreadPool(temporaryQueueReplyManager, name, 0,
+ max);
+ }
+
+ private int doGetMax() {
+ int max = getEndpoint().getComponent().getReplyToOnTimeoutMaxConcurrentConsumers();
+ if (max <= 0) {
+ throw new IllegalArgumentException("The option replyToOnTimeoutMaxConcurrentConsumers must be >= 1");
+ }
+ return max;
+ }
+
/**
* Pre tests the connection before starting the listening.
* <p/>
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java
index a233ee0b722..482fae34fa7 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java
@@ -27,6 +27,7 @@ import jakarta.jms.Session;
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelExchangeException;
+import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Processor;
@@ -273,33 +274,8 @@ public class EndpointMessageListener implements SessionMessageListener {
return;
}
try {
- SessionCallback callback = new SessionCallback() {
- @Override
- public Object doInJms(Session session) throws Exception {
- MessageProducer producer = null;
- try {
- Message reply = endpoint.getBinding().makeJmsMessage(exchange, out, session, cause);
- final String correlationID = determineCorrelationId(message);
- reply.setJMSCorrelationID(correlationID);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("{} sending reply JMS message [correlationId:{}]: {}", endpoint, correlationID, reply);
- }
-
- producer = endpoint.getJmsObjectFactory().createMessageProducer(session, endpoint, replyDestination);
- template.send(producer, reply);
- } finally {
- close(producer);
- }
-
- return null;
- }
-
- @Override
- public void onClose(Connection connection, Session session) {
- // do not close as we use provided session
- }
- };
+ SessionCallback callback = createSessionCallback(replyDestination, message, exchange, out, cause,
+ endpoint.getJmsObjectFactory()::createMessageProducer);
getTemplate().execute(session, callback);
@@ -308,6 +284,43 @@ public class EndpointMessageListener implements SessionMessageListener {
}
}
+ @FunctionalInterface
+ private interface MessageProducerCreator<T> {
+ MessageProducer create(Session session, Endpoint endpoint, T replyDestination) throws Exception;
+ }
+
+ private <T> SessionCallback createSessionCallback(T replyDestination, Message message, Exchange exchange, org.apache.camel.Message out, Exception cause,
+ MessageProducerCreator<T> messageProducerCreator) {
+ return new SessionCallback() {
+ @Override
+ public Object doInJms(Session session) throws Exception {
+ MessageProducer producer = null;
+ try {
+ Message reply = endpoint.getBinding().makeJmsMessage(exchange, out, session, cause);
+ final String correlationID = determineCorrelationId(message);
+ reply.setJMSCorrelationID(correlationID);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{} sending reply JMS message [correlationId:{}]: {}", endpoint, correlationID, reply);
+ }
+
+ producer = messageProducerCreator.create(session, endpoint, replyDestination);
+
+ template.send(producer, reply);
+ } finally {
+ close(producer);
+ }
+
+ return null;
+ }
+
+ @Override
+ public void onClose(Connection connection, Session session) {
+ // do not close as we use provided session
+ }
+ };
+ }
+
protected void sendReply(
Session session,
String replyDestination, final Message message, final Exchange exchange,
@@ -317,33 +330,8 @@ public class EndpointMessageListener implements SessionMessageListener {
return;
}
try {
- SessionCallback callback = new SessionCallback() {
- @Override
- public Object doInJms(Session session) throws Exception {
- MessageProducer producer = null;
- try {
- Message reply = endpoint.getBinding().makeJmsMessage(exchange, out, session, cause);
- final String correlationID = determineCorrelationId(message);
- reply.setJMSCorrelationID(correlationID);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("{} sending reply JMS message [correlationId:{}]: {}", endpoint, correlationID, reply);
- }
-
- producer = endpoint.getJmsObjectFactory().createMessageProducer(session, endpoint, replyDestination);
- template.send(producer, reply);
- } finally {
- close(producer);
- }
-
- return null;
- }
-
- @Override
- public void onClose(Connection connection, Session session) {
- // do not close as we use provided session
- }
- };
+ SessionCallback callback = createSessionCallback(replyDestination, message, exchange, out, cause,
+ endpoint.getJmsObjectFactory()::createMessageProducer);
getTemplate().execute(session, callback);