You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/04/12 18:22:00 UTC

[camel] 02/02: CAMEL-16366: camel-jms - JMS consumer supports exchange pooling

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 07c6ed3fbd4350d68e75726010607aca3e0cf04e
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Apr 12 20:01:15 2021 +0200

    CAMEL-16366: camel-jms - JMS consumer supports exchange pooling
---
 .../camel/component/jms/EndpointMessageListener.java  | 19 +++++++++++++++++--
 .../org/apache/camel/component/jms/JmsMessage.java    | 10 ++++++++++
 .../org/apache/camel/component/jms/bind/MyBean.java   |  4 +++-
 3 files changed, 30 insertions(+), 3 deletions(-)

diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
index 5bb1f9f..b6b9652 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
@@ -149,6 +149,8 @@ public class EndpointMessageListener implements SessionAwareMessageListener {
             // if we failed processed the exchange from the async callback task, then grab the exception
             rce = exchange.getException(RuntimeCamelException.class);
 
+            // release back when synchronous mode
+            consumer.releaseExchange(exchange, false);
         } catch (Exception e) {
             rce = wrapRuntimeCamelException(e);
         }
@@ -252,15 +254,28 @@ public class EndpointMessageListener implements SessionAwareMessageListener {
                     }
                 }
             }
+
+            if (!doneSync) {
+                // release back when in asynchronous mode
+                consumer.releaseExchange(exchange, false);
+            }
         }
     }
 
     public Exchange createExchange(Message message, Session session, Object replyDestination) {
-        // must be prototype scoped (not pooled) so we create the exchange via endpoint
-        Exchange exchange = endpoint.createExchange(message, session);
+        Exchange exchange = consumer.createExchange(false);
         JmsBinding binding = getBinding();
         exchange.setProperty(Exchange.BINDING, binding);
 
+        // reuse existing jms message if pooled
+        org.apache.camel.Message msg = exchange.getIn();
+        if (msg instanceof JmsMessage) {
+            JmsMessage jm = (JmsMessage) msg;
+            jm.init(exchange, message, session, getBinding());
+        } else {
+            exchange.setIn(new JmsMessage(exchange, message, session, getBinding()));
+        }
+
         // lets set to an InOut if we have some kind of reply-to destination
         if (replyDestination != null && !disableReplyTo) {
             // only change pattern if not already out capable
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java
index 3c8925a..c815e52 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java
@@ -50,9 +50,19 @@ public class JmsMessage extends DefaultMessage {
         setBinding(binding);
     }
 
+    public void init(Exchange exchange, Message jmsMessage, Session jmsSession, JmsBinding binding) {
+        setExchange(exchange);
+        setJmsMessage(jmsMessage);
+        setJmsSession(jmsSession);
+        setBinding(binding);
+        // need to populate initial headers when we use pooled exchanges
+        populateInitialHeaders(getHeaders());
+    }
+
     @Override
     public void reset() {
         super.reset();
+        setExchange(null);
         jmsMessage = null;
         jmsSession = null;
         binding = null;
diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/bind/MyBean.java b/components/camel-jms/src/test/java/org/apache/camel/component/jms/bind/MyBean.java
index ba7dfd0..a482ba9 100644
--- a/components/camel-jms/src/test/java/org/apache/camel/component/jms/bind/MyBean.java
+++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/bind/MyBean.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.jms.bind;
 
+import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.camel.Consume;
@@ -31,7 +32,8 @@ public class MyBean {
 
     @Consume("activemq:Test.BindingQueue")
     public void myMethod(@Headers Map<?, ?> headers, String body) {
-        this.headers = headers;
+        // defensive copy of headers
+        this.headers = new HashMap<>(headers);
         this.body = body;
 
         // now lets notify we've completed