You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/04/12 18:21:59 UTC
[camel] 01/02: camel-core - Resequencer EIP should copy exchange
when adding to queue to better support pooled exchanges
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 004c659824ad8a5017c15e99975da1d14fd645a8
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Apr 12 19:47:18 2021 +0200
camel-core - Resequencer EIP should copy exchange when adding to queue to better support pooled exchanges
---
.../src/main/java/org/apache/camel/processor/Resequencer.java | 5 ++++-
.../src/main/java/org/apache/camel/processor/StreamResequencer.java | 5 ++++-
2 files changed, 8 insertions(+), 2 deletions(-)
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Resequencer.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Resequencer.java
index dc7f6b5..12822b2 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Resequencer.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Resequencer.java
@@ -48,6 +48,7 @@ import org.apache.camel.spi.IdAware;
import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.AsyncProcessorConverterHelper;
import org.apache.camel.support.AsyncProcessorSupport;
+import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.ExpressionComparator;
import org.apache.camel.support.LoggingExceptionHandler;
import org.apache.camel.support.service.ServiceHelper;
@@ -540,7 +541,9 @@ public class Resequencer extends AsyncProcessorSupport implements Navigate<Proce
completionPredicateMatched.add(exchange.getExchangeId());
}
}
- queue.add(exchange);
+ // need to make defensive copy that are put on the sequencer queue
+ Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, true);
+ queue.add(copy);
exchangeEnqueued.set(true);
exchangeEnqueuedCondition.signal();
} finally {
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/StreamResequencer.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/StreamResequencer.java
index 2beada7..d80f31e 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/StreamResequencer.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/StreamResequencer.java
@@ -38,6 +38,7 @@ import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.spi.IdAware;
import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.AsyncProcessorSupport;
+import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.LoggingExceptionHandler;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.util.ObjectHelper;
@@ -250,7 +251,9 @@ public class StreamResequencer extends AsyncProcessorSupport
}
try {
- engine.insert(exchange);
+ // need to make defensive copy that are put on the sequencer queue
+ Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, true);
+ engine.insert(copy);
delivery.request();
} catch (Exception e) {
if (isIgnoreInvalidExchanges()) {