You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/04/12 18:21:58 UTC

[camel] branch master updated (7394fe6 -> 07c6ed3)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git.


    from 7394fe6  Revert "CAMEL-16366: camel-spring-rabbitmq - RabbitMQ consumer supports exchange pooling"
     new 004c659  camel-core - Resequencer EIP should copy exchange when adding to queue to better support pooled exchanges
     new 07c6ed3  CAMEL-16366: camel-jms - JMS consumer supports exchange pooling

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../camel/component/jms/EndpointMessageListener.java  | 19 +++++++++++++++++--
 .../org/apache/camel/component/jms/JmsMessage.java    | 10 ++++++++++
 .../org/apache/camel/component/jms/bind/MyBean.java   |  4 +++-
 .../java/org/apache/camel/processor/Resequencer.java  |  5 ++++-
 .../org/apache/camel/processor/StreamResequencer.java |  5 ++++-
 5 files changed, 38 insertions(+), 5 deletions(-)

[camel] 02/02: CAMEL-16366: camel-jms - JMS consumer supports exchange pooling

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 07c6ed3fbd4350d68e75726010607aca3e0cf04e
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Apr 12 20:01:15 2021 +0200

    CAMEL-16366: camel-jms - JMS consumer supports exchange pooling
---
 .../camel/component/jms/EndpointMessageListener.java  | 19 +++++++++++++++++--
 .../org/apache/camel/component/jms/JmsMessage.java    | 10 ++++++++++
 .../org/apache/camel/component/jms/bind/MyBean.java   |  4 +++-
 3 files changed, 30 insertions(+), 3 deletions(-)

diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
index 5bb1f9f..b6b9652 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
@@ -149,6 +149,8 @@ public class EndpointMessageListener implements SessionAwareMessageListener {
             // if we failed processed the exchange from the async callback task, then grab the exception
             rce = exchange.getException(RuntimeCamelException.class);
 
+            // release back when synchronous mode
+            consumer.releaseExchange(exchange, false);
         } catch (Exception e) {
             rce = wrapRuntimeCamelException(e);
         }
@@ -252,15 +254,28 @@ public class EndpointMessageListener implements SessionAwareMessageListener {
                     }
                 }
             }
+
+            if (!doneSync) {
+                // release back when in asynchronous mode
+                consumer.releaseExchange(exchange, false);
+            }
         }
     }
 
     public Exchange createExchange(Message message, Session session, Object replyDestination) {
-        // must be prototype scoped (not pooled) so we create the exchange via endpoint
-        Exchange exchange = endpoint.createExchange(message, session);
+        Exchange exchange = consumer.createExchange(false);
         JmsBinding binding = getBinding();
         exchange.setProperty(Exchange.BINDING, binding);
 
+        // reuse existing jms message if pooled
+        org.apache.camel.Message msg = exchange.getIn();
+        if (msg instanceof JmsMessage) {
+            JmsMessage jm = (JmsMessage) msg;
+            jm.init(exchange, message, session, getBinding());
+        } else {
+            exchange.setIn(new JmsMessage(exchange, message, session, getBinding()));
+        }
+
         // lets set to an InOut if we have some kind of reply-to destination
         if (replyDestination != null && !disableReplyTo) {
             // only change pattern if not already out capable
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java
index 3c8925a..c815e52 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java
@@ -50,9 +50,19 @@ public class JmsMessage extends DefaultMessage {
         setBinding(binding);
     }
 
+    public void init(Exchange exchange, Message jmsMessage, Session jmsSession, JmsBinding binding) {
+        setExchange(exchange);
+        setJmsMessage(jmsMessage);
+        setJmsSession(jmsSession);
+        setBinding(binding);
+        // need to populate initial headers when we use pooled exchanges
+        populateInitialHeaders(getHeaders());
+    }
+
     @Override
     public void reset() {
         super.reset();
+        setExchange(null);
         jmsMessage = null;
         jmsSession = null;
         binding = null;
diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/bind/MyBean.java b/components/camel-jms/src/test/java/org/apache/camel/component/jms/bind/MyBean.java
index ba7dfd0..a482ba9 100644
--- a/components/camel-jms/src/test/java/org/apache/camel/component/jms/bind/MyBean.java
+++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/bind/MyBean.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.jms.bind;
 
+import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.camel.Consume;
@@ -31,7 +32,8 @@ public class MyBean {
 
     @Consume("activemq:Test.BindingQueue")
     public void myMethod(@Headers Map<?, ?> headers, String body) {
-        this.headers = headers;
+        // defensive copy of headers
+        this.headers = new HashMap<>(headers);
         this.body = body;
 
         // now lets notify we've completed

[camel] 01/02: camel-core - Resequencer EIP should copy exchange when adding to queue to better support pooled exchanges

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 004c659824ad8a5017c15e99975da1d14fd645a8
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Apr 12 19:47:18 2021 +0200

    camel-core - Resequencer EIP should copy exchange when adding to queue to better support pooled exchanges
---
 .../src/main/java/org/apache/camel/processor/Resequencer.java        | 5 ++++-
 .../src/main/java/org/apache/camel/processor/StreamResequencer.java  | 5 ++++-
 2 files changed, 8 insertions(+), 2 deletions(-)

diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Resequencer.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Resequencer.java
index dc7f6b5..12822b2 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Resequencer.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Resequencer.java
@@ -48,6 +48,7 @@ import org.apache.camel.spi.IdAware;
 import org.apache.camel.spi.RouteIdAware;
 import org.apache.camel.support.AsyncProcessorConverterHelper;
 import org.apache.camel.support.AsyncProcessorSupport;
+import org.apache.camel.support.ExchangeHelper;
 import org.apache.camel.support.ExpressionComparator;
 import org.apache.camel.support.LoggingExceptionHandler;
 import org.apache.camel.support.service.ServiceHelper;
@@ -540,7 +541,9 @@ public class Resequencer extends AsyncProcessorSupport implements Navigate<Proce
                         completionPredicateMatched.add(exchange.getExchangeId());
                     }
                 }
-                queue.add(exchange);
+                // need to make defensive copy that are put on the sequencer queue
+                Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, true);
+                queue.add(copy);
                 exchangeEnqueued.set(true);
                 exchangeEnqueuedCondition.signal();
             } finally {
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/StreamResequencer.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/StreamResequencer.java
index 2beada7..d80f31e 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/StreamResequencer.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/StreamResequencer.java
@@ -38,6 +38,7 @@ import org.apache.camel.spi.ExceptionHandler;
 import org.apache.camel.spi.IdAware;
 import org.apache.camel.spi.RouteIdAware;
 import org.apache.camel.support.AsyncProcessorSupport;
+import org.apache.camel.support.ExchangeHelper;
 import org.apache.camel.support.LoggingExceptionHandler;
 import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.util.ObjectHelper;
@@ -250,7 +251,9 @@ public class StreamResequencer extends AsyncProcessorSupport
         }
 
         try {
-            engine.insert(exchange);
+            // need to make defensive copy that are put on the sequencer queue
+            Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, true);
+            engine.insert(copy);
             delivery.request();
         } catch (Exception e) {
             if (isIgnoreInvalidExchanges()) {