You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/04/12 18:22:00 UTC
[camel] 02/02: CAMEL-16366: camel-jms - JMS consumer supports
exchange pooling
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 07c6ed3fbd4350d68e75726010607aca3e0cf04e
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Apr 12 20:01:15 2021 +0200
CAMEL-16366: camel-jms - JMS consumer supports exchange pooling
---
.../camel/component/jms/EndpointMessageListener.java | 19 +++++++++++++++++--
.../org/apache/camel/component/jms/JmsMessage.java | 10 ++++++++++
.../org/apache/camel/component/jms/bind/MyBean.java | 4 +++-
3 files changed, 30 insertions(+), 3 deletions(-)
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
index 5bb1f9f..b6b9652 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
@@ -149,6 +149,8 @@ public class EndpointMessageListener implements SessionAwareMessageListener {
// if we failed processed the exchange from the async callback task, then grab the exception
rce = exchange.getException(RuntimeCamelException.class);
+ // release back when synchronous mode
+ consumer.releaseExchange(exchange, false);
} catch (Exception e) {
rce = wrapRuntimeCamelException(e);
}
@@ -252,15 +254,28 @@ public class EndpointMessageListener implements SessionAwareMessageListener {
}
}
}
+
+ if (!doneSync) {
+ // release back when in asynchronous mode
+ consumer.releaseExchange(exchange, false);
+ }
}
}
public Exchange createExchange(Message message, Session session, Object replyDestination) {
- // must be prototype scoped (not pooled) so we create the exchange via endpoint
- Exchange exchange = endpoint.createExchange(message, session);
+ Exchange exchange = consumer.createExchange(false);
JmsBinding binding = getBinding();
exchange.setProperty(Exchange.BINDING, binding);
+ // reuse existing jms message if pooled
+ org.apache.camel.Message msg = exchange.getIn();
+ if (msg instanceof JmsMessage) {
+ JmsMessage jm = (JmsMessage) msg;
+ jm.init(exchange, message, session, getBinding());
+ } else {
+ exchange.setIn(new JmsMessage(exchange, message, session, getBinding()));
+ }
+
// lets set to an InOut if we have some kind of reply-to destination
if (replyDestination != null && !disableReplyTo) {
// only change pattern if not already out capable
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java
index 3c8925a..c815e52 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java
@@ -50,9 +50,19 @@ public class JmsMessage extends DefaultMessage {
setBinding(binding);
}
+ public void init(Exchange exchange, Message jmsMessage, Session jmsSession, JmsBinding binding) {
+ setExchange(exchange);
+ setJmsMessage(jmsMessage);
+ setJmsSession(jmsSession);
+ setBinding(binding);
+ // need to populate initial headers when we use pooled exchanges
+ populateInitialHeaders(getHeaders());
+ }
+
@Override
public void reset() {
super.reset();
+ setExchange(null);
jmsMessage = null;
jmsSession = null;
binding = null;
diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/bind/MyBean.java b/components/camel-jms/src/test/java/org/apache/camel/component/jms/bind/MyBean.java
index ba7dfd0..a482ba9 100644
--- a/components/camel-jms/src/test/java/org/apache/camel/component/jms/bind/MyBean.java
+++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/bind/MyBean.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.component.jms.bind;
+import java.util.HashMap;
import java.util.Map;
import org.apache.camel.Consume;
@@ -31,7 +32,8 @@ public class MyBean {
@Consume("activemq:Test.BindingQueue")
public void myMethod(@Headers Map<?, ?> headers, String body) {
- this.headers = headers;
+ // defensive copy of headers
+ this.headers = new HashMap<>(headers);
this.body = body;
// now lets notify we've completed