You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2023/08/29 15:51:34 UTC
[camel] branch main updated: (chores) camel-core: optimize copying exchanges (#11232)
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 3fde21858a9 (chores) camel-core: optimize copying exchanges (#11232)
3fde21858a9 is described below
commit 3fde21858a9d6db1b4ad75842f2105b972b8adaa
Author: Otavio Rodolfo Piske <or...@users.noreply.github.com>
AuthorDate: Tue Aug 29 17:51:26 2023 +0200
(chores) camel-core: optimize copying exchanges (#11232)
---
.../component/disruptor/DisruptorConsumer.java | 3 +-
.../java/org/apache/camel/ExchangeExtension.java | 8 ++++++
.../org/apache/camel/support/AbstractExchange.java | 15 +++++++++-
.../org/apache/camel/support/DefaultExchange.java | 10 ++++++-
.../org/apache/camel/support/ExchangeHelper.java | 33 ++--------------------
.../camel/support/ExtendedExchangeExtension.java | 20 +++++++++++++
6 files changed, 54 insertions(+), 35 deletions(-)
diff --git a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java
index 99c79f1646f..1a8b1128b86 100644
--- a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java
+++ b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java
@@ -135,8 +135,7 @@ public class DisruptorConsumer extends ServiceSupport implements Consumer, Suspe
private Exchange prepareExchange(final Exchange exchange) {
// send a new copied exchange with new camel context
// don't copy handovers as they are handled by the Disruptor Event Handlers
- final Exchange newExchange = ExchangeHelper
- .copyExchangeAndSetCamelContext(exchange, endpoint.getCamelContext(), false);
+ final Exchange newExchange = ExchangeHelper.copyExchangeWithProperties(exchange, endpoint.getCamelContext());
// set the from endpoint
newExchange.getExchangeExtension().setFromEndpoint(endpoint);
return newExchange;
diff --git a/core/camel-api/src/main/java/org/apache/camel/ExchangeExtension.java b/core/camel-api/src/main/java/org/apache/camel/ExchangeExtension.java
index baeaa44aa2f..7ea4a144193 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ExchangeExtension.java
+++ b/core/camel-api/src/main/java/org/apache/camel/ExchangeExtension.java
@@ -264,4 +264,12 @@ public interface ExchangeExtension {
* @param failureHandled true if failure handled or false otherwise
*/
void setFailureHandled(boolean failureHandled);
+
+ /**
+ * Create a new exchange copied from this, with the context set to the given context
+ *
+ * @param context the context associated with the new exchange
+ * @return A new Exchange instance
+ */
+ Exchange createCopyWithProperties(CamelContext context);
}
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java b/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java
index 48bffb2ae04..709497805b9 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java
@@ -52,7 +52,7 @@ import static org.apache.camel.support.MessageHelper.copyBody;
* @see DefaultExchange
*/
class AbstractExchange implements Exchange {
- protected final EnumMap<ExchangePropertyKey, Object> internalProperties = new EnumMap<>(ExchangePropertyKey.class);
+ protected final EnumMap<ExchangePropertyKey, Object> internalProperties;
protected final CamelContext context;
protected Map<String, Object> properties; // create properties on-demand as we use internal properties mostly
@@ -69,6 +69,14 @@ class AbstractExchange implements Exchange {
private final ExtendedExchangeExtension privateExtension;
private RedeliveryTraitPayload externalRedelivered = RedeliveryTraitPayload.UNDEFINED_REDELIVERY;
+ AbstractExchange(CamelContext context, EnumMap<ExchangePropertyKey, Object> internalProperties,
+ Map<String, Object> properties) {
+ this.context = context;
+ this.internalProperties = new EnumMap<>(internalProperties);
+ this.privateExtension = new ExtendedExchangeExtension(this);
+ this.properties = properties;
+ }
+
public AbstractExchange(CamelContext context) {
this(context, ExchangePattern.InOnly);
}
@@ -78,6 +86,7 @@ class AbstractExchange implements Exchange {
this.pattern = pattern;
this.created = System.currentTimeMillis();
+ internalProperties = new EnumMap<>(ExchangePropertyKey.class);
privateExtension = new ExtendedExchangeExtension(this);
}
@@ -86,6 +95,8 @@ class AbstractExchange implements Exchange {
this.pattern = parent.getPattern();
this.created = parent.getCreated();
+ internalProperties = new EnumMap<>(ExchangePropertyKey.class);
+
privateExtension = new ExtendedExchangeExtension(this);
privateExtension.setFromEndpoint(parent.getFromEndpoint());
privateExtension.setFromRouteId(parent.getFromRouteId());
@@ -97,6 +108,7 @@ class AbstractExchange implements Exchange {
this.pattern = fromEndpoint.getExchangePattern();
this.created = System.currentTimeMillis();
+ internalProperties = new EnumMap<>(ExchangePropertyKey.class);
privateExtension = new ExtendedExchangeExtension(this);
privateExtension.setFromEndpoint(fromEndpoint);
}
@@ -106,6 +118,7 @@ class AbstractExchange implements Exchange {
this.pattern = pattern;
this.created = System.currentTimeMillis();
+ internalProperties = new EnumMap<>(ExchangePropertyKey.class);
privateExtension = new ExtendedExchangeExtension(this);
privateExtension.setFromEndpoint(fromEndpoint);
}
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
index 198484a5523..a3a58e9c1c6 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
@@ -16,16 +16,25 @@
*/
package org.apache.camel.support;
+import java.util.EnumMap;
+import java.util.Map;
+
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
+import org.apache.camel.ExchangePropertyKey;
/**
* The default and only implementation of {@link Exchange}.
*/
public final class DefaultExchange extends AbstractExchange {
+ DefaultExchange(CamelContext context, EnumMap<ExchangePropertyKey, Object> internalProperties,
+ Map<String, Object> properties) {
+ super(context, internalProperties, properties);
+ }
+
public DefaultExchange(CamelContext context) {
super(context);
}
@@ -45,5 +54,4 @@ public final class DefaultExchange extends AbstractExchange {
public DefaultExchange(Endpoint fromEndpoint, ExchangePattern pattern) {
super(fromEndpoint, pattern);
}
-
}
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
index 1ab46faffed..32cd22eec71 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
@@ -24,7 +24,6 @@ import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -783,17 +782,6 @@ public final class ExchangeHelper {
return "(MessageId: " + msgId + " on ExchangeId: " + exchange.getExchangeId() + ")";
}
- /**
- * Copies the exchange but the copy will be tied to the given context
- *
- * @param exchange the source exchange
- * @param context the camel context
- * @return a copy with the given camel context
- */
- public static Exchange copyExchangeAndSetCamelContext(Exchange exchange, CamelContext context) {
- return copyExchangeAndSetCamelContext(exchange, context, true);
- }
-
/*
* Safe copy message history using a defensive copy
*/
@@ -809,23 +797,13 @@ public final class ExchangeHelper {
* Copies the exchange but the copy will be tied to the given context
*
* @param exchange the source exchange
- * @param context the camel context
- * @param handover whether to handover on completions from the source to the copy
* @return a copy with the given camel context
*/
- public static Exchange copyExchangeAndSetCamelContext(Exchange exchange, CamelContext context, boolean handover) {
- DefaultExchange answer = new DefaultExchange(context, exchange.getPattern());
- if (exchange.hasProperties()) {
- answer.getExchangeExtension().setProperties(safeCopyProperties(exchange.getProperties()));
- }
- exchange.getExchangeExtension().copyInternalProperties(answer);
+ public static Exchange copyExchangeWithProperties(Exchange exchange, CamelContext context) {
+ Exchange answer = exchange.getExchangeExtension().createCopyWithProperties(context);
setMessageHistory(answer, exchange);
- if (handover) {
- // Need to hand over the completion for async invocation
- exchange.getExchangeExtension().handoverCompletions(answer);
- }
answer.setIn(exchange.getIn().copy());
if (exchange.hasOut()) {
answer.setOut(exchange.getOut().copy());
@@ -892,13 +870,6 @@ public final class ExchangeHelper {
return StringHelper.before(uri, ":");
}
- private static Map<String, Object> safeCopyProperties(Map<String, Object> properties) {
- if (properties == null) {
- return null;
- }
- return new ConcurrentHashMap<>(properties);
- }
-
/**
* @see #getCharsetName(Exchange, boolean)
*/
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ExtendedExchangeExtension.java b/core/camel-support/src/main/java/org/apache/camel/support/ExtendedExchangeExtension.java
index 2ce20cdea33..3f60ac6ca0d 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/ExtendedExchangeExtension.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/ExtendedExchangeExtension.java
@@ -20,8 +20,10 @@ package org.apache.camel.support;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangeExtension;
@@ -328,4 +330,22 @@ public class ExtendedExchangeExtension implements ExchangeExtension {
setErrorHandlerHandled(null);
setStreamCacheDisabled(false);
}
+
+ private static Map<String, Object> safeCopyProperties(Map<String, Object> properties) {
+ if (properties == null) {
+ return null;
+ }
+ return new ConcurrentHashMap<>(properties);
+ }
+
+ @Override
+ public Exchange createCopyWithProperties(CamelContext context) {
+ final Map<String, Object> properties = safeCopyProperties(exchange.properties);
+
+ DefaultExchange answer = new DefaultExchange(context, exchange.internalProperties, properties);
+
+ answer.setPattern(exchange.pattern);
+
+ return answer;
+ }
}