You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2023/08/29 15:51:34 UTC

[camel] branch main updated: (chores) camel-core: optimize copying exchanges (#11232)

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 3fde21858a9 (chores) camel-core: optimize copying exchanges (#11232)
3fde21858a9 is described below

commit 3fde21858a9d6db1b4ad75842f2105b972b8adaa
Author: Otavio Rodolfo Piske <or...@users.noreply.github.com>
AuthorDate: Tue Aug 29 17:51:26 2023 +0200

    (chores) camel-core: optimize copying exchanges (#11232)
---
 .../component/disruptor/DisruptorConsumer.java     |  3 +-
 .../java/org/apache/camel/ExchangeExtension.java   |  8 ++++++
 .../org/apache/camel/support/AbstractExchange.java | 15 +++++++++-
 .../org/apache/camel/support/DefaultExchange.java  | 10 ++++++-
 .../org/apache/camel/support/ExchangeHelper.java   | 33 ++--------------------
 .../camel/support/ExtendedExchangeExtension.java   | 20 +++++++++++++
 6 files changed, 54 insertions(+), 35 deletions(-)

diff --git a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java
index 99c79f1646f..1a8b1128b86 100644
--- a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java
+++ b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java
@@ -135,8 +135,7 @@ public class DisruptorConsumer extends ServiceSupport implements Consumer, Suspe
     private Exchange prepareExchange(final Exchange exchange) {
         // send a new copied exchange with new camel context
         // don't copy handovers as they are handled by the Disruptor Event Handlers
-        final Exchange newExchange = ExchangeHelper
-                .copyExchangeAndSetCamelContext(exchange, endpoint.getCamelContext(), false);
+        final Exchange newExchange = ExchangeHelper.copyExchangeWithProperties(exchange, endpoint.getCamelContext());
         // set the from endpoint
         newExchange.getExchangeExtension().setFromEndpoint(endpoint);
         return newExchange;
diff --git a/core/camel-api/src/main/java/org/apache/camel/ExchangeExtension.java b/core/camel-api/src/main/java/org/apache/camel/ExchangeExtension.java
index baeaa44aa2f..7ea4a144193 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ExchangeExtension.java
+++ b/core/camel-api/src/main/java/org/apache/camel/ExchangeExtension.java
@@ -264,4 +264,12 @@ public interface ExchangeExtension {
      * @param failureHandled true if failure handled or false otherwise
      */
     void setFailureHandled(boolean failureHandled);
+
+    /**
+     * Create a new exchange copied from this, with the context set to the given context
+     *
+     * @param  context the context associated with the new exchange
+     * @return         A new Exchange instance
+     */
+    Exchange createCopyWithProperties(CamelContext context);
 }
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java b/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java
index 48bffb2ae04..709497805b9 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java
@@ -52,7 +52,7 @@ import static org.apache.camel.support.MessageHelper.copyBody;
  * @see DefaultExchange
  */
 class AbstractExchange implements Exchange {
-    protected final EnumMap<ExchangePropertyKey, Object> internalProperties = new EnumMap<>(ExchangePropertyKey.class);
+    protected final EnumMap<ExchangePropertyKey, Object> internalProperties;
 
     protected final CamelContext context;
     protected Map<String, Object> properties; // create properties on-demand as we use internal properties mostly
@@ -69,6 +69,14 @@ class AbstractExchange implements Exchange {
     private final ExtendedExchangeExtension privateExtension;
     private RedeliveryTraitPayload externalRedelivered = RedeliveryTraitPayload.UNDEFINED_REDELIVERY;
 
+    AbstractExchange(CamelContext context, EnumMap<ExchangePropertyKey, Object> internalProperties,
+                     Map<String, Object> properties) {
+        this.context = context;
+        this.internalProperties = new EnumMap<>(internalProperties);
+        this.privateExtension = new ExtendedExchangeExtension(this);
+        this.properties = properties;
+    }
+
     public AbstractExchange(CamelContext context) {
         this(context, ExchangePattern.InOnly);
     }
@@ -78,6 +86,7 @@ class AbstractExchange implements Exchange {
         this.pattern = pattern;
         this.created = System.currentTimeMillis();
 
+        internalProperties = new EnumMap<>(ExchangePropertyKey.class);
         privateExtension = new ExtendedExchangeExtension(this);
     }
 
@@ -86,6 +95,8 @@ class AbstractExchange implements Exchange {
         this.pattern = parent.getPattern();
         this.created = parent.getCreated();
 
+        internalProperties = new EnumMap<>(ExchangePropertyKey.class);
+
         privateExtension = new ExtendedExchangeExtension(this);
         privateExtension.setFromEndpoint(parent.getFromEndpoint());
         privateExtension.setFromRouteId(parent.getFromRouteId());
@@ -97,6 +108,7 @@ class AbstractExchange implements Exchange {
         this.pattern = fromEndpoint.getExchangePattern();
         this.created = System.currentTimeMillis();
 
+        internalProperties = new EnumMap<>(ExchangePropertyKey.class);
         privateExtension = new ExtendedExchangeExtension(this);
         privateExtension.setFromEndpoint(fromEndpoint);
     }
@@ -106,6 +118,7 @@ class AbstractExchange implements Exchange {
         this.pattern = pattern;
         this.created = System.currentTimeMillis();
 
+        internalProperties = new EnumMap<>(ExchangePropertyKey.class);
         privateExtension = new ExtendedExchangeExtension(this);
         privateExtension.setFromEndpoint(fromEndpoint);
     }
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
index 198484a5523..a3a58e9c1c6 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
@@ -16,16 +16,25 @@
  */
 package org.apache.camel.support;
 
+import java.util.EnumMap;
+import java.util.Map;
+
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
+import org.apache.camel.ExchangePropertyKey;
 
 /**
  * The default and only implementation of {@link Exchange}.
  */
 public final class DefaultExchange extends AbstractExchange {
 
+    DefaultExchange(CamelContext context, EnumMap<ExchangePropertyKey, Object> internalProperties,
+                    Map<String, Object> properties) {
+        super(context, internalProperties, properties);
+    }
+
     public DefaultExchange(CamelContext context) {
         super(context);
     }
@@ -45,5 +54,4 @@ public final class DefaultExchange extends AbstractExchange {
     public DefaultExchange(Endpoint fromEndpoint, ExchangePattern pattern) {
         super(fromEndpoint, pattern);
     }
-
 }
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
index 1ab46faffed..32cd22eec71 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
@@ -24,7 +24,6 @@ import java.nio.charset.Charset;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -783,17 +782,6 @@ public final class ExchangeHelper {
         return "(MessageId: " + msgId + " on ExchangeId: " + exchange.getExchangeId() + ")";
     }
 
-    /**
-     * Copies the exchange but the copy will be tied to the given context
-     *
-     * @param  exchange the source exchange
-     * @param  context  the camel context
-     * @return          a copy with the given camel context
-     */
-    public static Exchange copyExchangeAndSetCamelContext(Exchange exchange, CamelContext context) {
-        return copyExchangeAndSetCamelContext(exchange, context, true);
-    }
-
     /*
      * Safe copy message history using a defensive copy
      */
@@ -809,23 +797,13 @@ public final class ExchangeHelper {
      * Copies the exchange but the copy will be tied to the given context
      *
      * @param  exchange the source exchange
-     * @param  context  the camel context
-     * @param  handover whether to handover on completions from the source to the copy
      * @return          a copy with the given camel context
      */
-    public static Exchange copyExchangeAndSetCamelContext(Exchange exchange, CamelContext context, boolean handover) {
-        DefaultExchange answer = new DefaultExchange(context, exchange.getPattern());
-        if (exchange.hasProperties()) {
-            answer.getExchangeExtension().setProperties(safeCopyProperties(exchange.getProperties()));
-        }
-        exchange.getExchangeExtension().copyInternalProperties(answer);
+    public static Exchange copyExchangeWithProperties(Exchange exchange, CamelContext context) {
+        Exchange answer = exchange.getExchangeExtension().createCopyWithProperties(context);
 
         setMessageHistory(answer, exchange);
 
-        if (handover) {
-            // Need to hand over the completion for async invocation
-            exchange.getExchangeExtension().handoverCompletions(answer);
-        }
         answer.setIn(exchange.getIn().copy());
         if (exchange.hasOut()) {
             answer.setOut(exchange.getOut().copy());
@@ -892,13 +870,6 @@ public final class ExchangeHelper {
         return StringHelper.before(uri, ":");
     }
 
-    private static Map<String, Object> safeCopyProperties(Map<String, Object> properties) {
-        if (properties == null) {
-            return null;
-        }
-        return new ConcurrentHashMap<>(properties);
-    }
-
     /**
      * @see #getCharsetName(Exchange, boolean)
      */
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ExtendedExchangeExtension.java b/core/camel-support/src/main/java/org/apache/camel/support/ExtendedExchangeExtension.java
index 2ce20cdea33..3f60ac6ca0d 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/ExtendedExchangeExtension.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/ExtendedExchangeExtension.java
@@ -20,8 +20,10 @@ package org.apache.camel.support;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangeExtension;
@@ -328,4 +330,22 @@ public class ExtendedExchangeExtension implements ExchangeExtension {
         setErrorHandlerHandled(null);
         setStreamCacheDisabled(false);
     }
+
+    private static Map<String, Object> safeCopyProperties(Map<String, Object> properties) {
+        if (properties == null) {
+            return null;
+        }
+        return new ConcurrentHashMap<>(properties);
+    }
+
+    @Override
+    public Exchange createCopyWithProperties(CamelContext context) {
+        final Map<String, Object> properties = safeCopyProperties(exchange.properties);
+
+        DefaultExchange answer = new DefaultExchange(context, exchange.internalProperties, properties);
+
+        answer.setPattern(exchange.pattern);
+
+        return answer;
+    }
 }