You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/04/12 18:21:59 UTC

[camel] 01/02: camel-core - Resequencer EIP should copy exchange when adding to queue to better support pooled exchanges

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 004c659824ad8a5017c15e99975da1d14fd645a8
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Apr 12 19:47:18 2021 +0200

    camel-core - Resequencer EIP should copy exchange when adding to queue to better support pooled exchanges
---
 .../src/main/java/org/apache/camel/processor/Resequencer.java        | 5 ++++-
 .../src/main/java/org/apache/camel/processor/StreamResequencer.java  | 5 ++++-
 2 files changed, 8 insertions(+), 2 deletions(-)

diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Resequencer.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Resequencer.java
index dc7f6b5..12822b2 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Resequencer.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Resequencer.java
@@ -48,6 +48,7 @@ import org.apache.camel.spi.IdAware;
 import org.apache.camel.spi.RouteIdAware;
 import org.apache.camel.support.AsyncProcessorConverterHelper;
 import org.apache.camel.support.AsyncProcessorSupport;
+import org.apache.camel.support.ExchangeHelper;
 import org.apache.camel.support.ExpressionComparator;
 import org.apache.camel.support.LoggingExceptionHandler;
 import org.apache.camel.support.service.ServiceHelper;
@@ -540,7 +541,9 @@ public class Resequencer extends AsyncProcessorSupport implements Navigate<Proce
                         completionPredicateMatched.add(exchange.getExchangeId());
                     }
                 }
-                queue.add(exchange);
+                // need to make defensive copy that are put on the sequencer queue
+                Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, true);
+                queue.add(copy);
                 exchangeEnqueued.set(true);
                 exchangeEnqueuedCondition.signal();
             } finally {
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/StreamResequencer.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/StreamResequencer.java
index 2beada7..d80f31e 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/StreamResequencer.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/StreamResequencer.java
@@ -38,6 +38,7 @@ import org.apache.camel.spi.ExceptionHandler;
 import org.apache.camel.spi.IdAware;
 import org.apache.camel.spi.RouteIdAware;
 import org.apache.camel.support.AsyncProcessorSupport;
+import org.apache.camel.support.ExchangeHelper;
 import org.apache.camel.support.LoggingExceptionHandler;
 import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.util.ObjectHelper;
@@ -250,7 +251,9 @@ public class StreamResequencer extends AsyncProcessorSupport
         }
 
         try {
-            engine.insert(exchange);
+            // need to make defensive copy that are put on the sequencer queue
+            Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, true);
+            engine.insert(copy);
             delivery.request();
         } catch (Exception e) {
             if (isIgnoreInvalidExchanges()) {