You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2023/09/05 16:14:49 UTC

[camel] branch main updated: CAMEL-19801: pre-work for cleaning up copying exchanges (#11304)

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new bbf062a4251 CAMEL-19801: pre-work for cleaning up copying exchanges (#11304)
bbf062a4251 is described below

commit bbf062a425141287d14617c282a982bca9100c00
Author: Otavio Rodolfo Piske <or...@users.noreply.github.com>
AuthorDate: Tue Sep 5 18:14:40 2023 +0200

    CAMEL-19801: pre-work for cleaning up copying exchanges (#11304)
---
 .../apache/camel/support/AbstractExchangeTest.java |  19 +++-
 .../org/apache/camel/support/AbstractExchange.java | 115 +++++++++------------
 .../org/apache/camel/support/DefaultExchange.java  |   9 ++
 .../camel/support/DefaultPooledExchange.java       |   6 ++
 .../camel/support/ExtendedExchangeExtension.java   |  12 +--
 5 files changed, 83 insertions(+), 78 deletions(-)

diff --git a/core/camel-core/src/test/java/org/apache/camel/support/AbstractExchangeTest.java b/core/camel-core/src/test/java/org/apache/camel/support/AbstractExchangeTest.java
index 8b26b593e8f..2710b25b52f 100644
--- a/core/camel-core/src/test/java/org/apache/camel/support/AbstractExchangeTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/support/AbstractExchangeTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.support;
 
+import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.spi.DataType;
@@ -30,9 +31,25 @@ import static org.junit.jupiter.api.Assertions.assertSame;
  */
 public class AbstractExchangeTest {
 
+    static class CustomAbstractExchange extends AbstractExchange {
+
+        CustomAbstractExchange(CustomAbstractExchange abstractExchange) {
+            super(abstractExchange);
+        }
+
+        public CustomAbstractExchange(CamelContext context) {
+            super(context);
+        }
+
+        @Override
+        AbstractExchange newCopy() {
+            return new CustomAbstractExchange(this);
+        }
+    }
+
     @Test
     void shouldPreserveDataTypeOnCopy() {
-        AbstractExchange e1 = new AbstractExchange(new DefaultCamelContext());
+        AbstractExchange e1 = new CustomAbstractExchange(new DefaultCamelContext());
         Object body1 = new Object();
         DataType type1 = new DataType("foo1");
         DefaultMessage in = new DefaultMessage((Exchange) null);
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java b/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java
index 709497805b9..ac2824eda19 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java
@@ -34,14 +34,11 @@ import org.apache.camel.ExchangePropertyKey;
 import org.apache.camel.Message;
 import org.apache.camel.MessageHistory;
 import org.apache.camel.SafeCopyProperty;
-import org.apache.camel.spi.HeadersMapFactory;
 import org.apache.camel.spi.UnitOfWork;
 import org.apache.camel.trait.message.MessageTrait;
 import org.apache.camel.trait.message.RedeliveryTraitPayload;
 import org.apache.camel.util.ObjectHelper;
 
-import static org.apache.camel.support.MessageHelper.copyBody;
-
 /**
  * Base class for the two official and only implementations of {@link Exchange}, the {@link DefaultExchange} and
  * {@link DefaultPooledExchange}.
@@ -51,7 +48,7 @@ import static org.apache.camel.support.MessageHelper.copyBody;
  *
  * @see DefaultExchange
  */
-class AbstractExchange implements Exchange {
+abstract class AbstractExchange implements Exchange {
     protected final EnumMap<ExchangePropertyKey, Object> internalProperties;
 
     protected final CamelContext context;
@@ -74,7 +71,7 @@ class AbstractExchange implements Exchange {
         this.context = context;
         this.internalProperties = new EnumMap<>(internalProperties);
         this.privateExtension = new ExtendedExchangeExtension(this);
-        this.properties = properties;
+        this.properties = safeCopyProperties(properties);
     }
 
     public AbstractExchange(CamelContext context) {
@@ -103,6 +100,43 @@ class AbstractExchange implements Exchange {
         privateExtension.setUnitOfWork(parent.getUnitOfWork());
     }
 
+    AbstractExchange(AbstractExchange parent) {
+        this.context = parent.getContext();
+        this.pattern = parent.getPattern();
+        this.created = parent.getCreated();
+
+        this.internalProperties = new EnumMap<>(parent.internalProperties);
+
+        privateExtension = new ExtendedExchangeExtension(this);
+        privateExtension.setFromEndpoint(parent.getFromEndpoint());
+        privateExtension.setFromRouteId(parent.getFromRouteId());
+        privateExtension.setUnitOfWork(parent.getUnitOfWork());
+
+        setIn(parent.getIn().copy());
+
+        if (parent.hasOut()) {
+            setOut(parent.getOut().copy());
+        }
+
+        setException(parent.exception);
+        setRouteStop(parent.routeStop);
+        setRollbackOnly(parent.rollbackOnly);
+        setRollbackOnlyLast(parent.rollbackOnlyLast);
+
+        privateExtension.setNotifyEvent(parent.getExchangeExtension().isNotifyEvent());
+        privateExtension.setRedeliveryExhausted(parent.getExchangeExtension().isRedeliveryExhausted());
+        privateExtension.setErrorHandlerHandled(parent.getExchangeExtension().getErrorHandlerHandled());
+        privateExtension.setStreamCacheDisabled(parent.getExchangeExtension().isStreamCacheDisabled());
+
+        if (parent.hasProperties()) {
+            this.properties = safeCopyProperties(parent.properties);
+        }
+
+        if (parent.hasSafeCopyProperties()) {
+            this.safeCopyProperties = parent.getSafeCopyProperties();
+        }
+    }
+
     public AbstractExchange(Endpoint fromEndpoint) {
         this.context = fromEndpoint.getCamelContext();
         this.pattern = fromEndpoint.getExchangePattern();
@@ -128,43 +162,11 @@ class AbstractExchange implements Exchange {
         return created;
     }
 
+    abstract AbstractExchange newCopy();
+
     @Override
     public Exchange copy() {
-        DefaultExchange exchange = new DefaultExchange(this);
-
-        exchange.setIn(getIn().copy());
-        copyBody(getIn(), exchange.getIn());
-        if (getIn().hasHeaders()) {
-            exchange.getIn().setHeaders(safeCopyHeaders(getIn().getHeaders()));
-        }
-        if (hasOut()) {
-            exchange.setOut(getOut().copy());
-            copyBody(getOut(), exchange.getOut());
-            if (getOut().hasHeaders()) {
-                exchange.getOut().setHeaders(safeCopyHeaders(getOut().getHeaders()));
-            }
-        }
-
-        exchange.setException(exception);
-        exchange.setRouteStop(routeStop);
-        exchange.setRollbackOnly(rollbackOnly);
-        exchange.setRollbackOnlyLast(rollbackOnlyLast);
-        final ExtendedExchangeExtension newExchangeExtension = exchange.getExchangeExtension();
-        newExchangeExtension.setNotifyEvent(getExchangeExtension().isNotifyEvent());
-        newExchangeExtension.setRedeliveryExhausted(getExchangeExtension().isRedeliveryExhausted());
-        newExchangeExtension.setErrorHandlerHandled(getExchangeExtension().getErrorHandlerHandled());
-        newExchangeExtension.setStreamCacheDisabled(getExchangeExtension().isStreamCacheDisabled());
-
-        // copy properties after body as body may trigger lazy init
-        if (hasProperties()) {
-            copyProperties(getProperties(), exchange.getProperties());
-        }
-
-        if (hasSafeCopyProperties()) {
-            safeCopyProperties(this.safeCopyProperties, exchange.getSafeCopyProperties());
-        }
-        // copy over internal properties
-        exchange.internalProperties.putAll(internalProperties);
+        AbstractExchange exchange = newCopy();
 
         if (getContext().isMessageHistory()) {
             exchange.internalProperties.computeIfPresent(ExchangePropertyKey.MESSAGE_HISTORY,
@@ -174,32 +176,6 @@ class AbstractExchange implements Exchange {
         return exchange;
     }
 
-    private Map<String, Object> safeCopyHeaders(Map<String, Object> headers) {
-        if (headers == null) {
-            return null;
-        }
-
-        if (context != null) {
-            HeadersMapFactory factory = context.getCamelContextExtension().getHeadersMapFactory();
-            if (factory != null) {
-                return factory.newMap(headers);
-            }
-        }
-        // should not really happen but some tests dont start camel context
-        return new HashMap<>(headers);
-    }
-
-    private void copyProperties(Map<String, Object> source, Map<String, Object> target) {
-        target.putAll(source);
-    }
-
-    private void safeCopyProperties(
-            Map<String, SafeCopyProperty> source, Map<String, SafeCopyProperty> target) {
-        source.entrySet().stream().forEach(entry -> {
-            target.put(entry.getKey(), entry.getValue().safeCopy());
-        });
-    }
-
     @Override
     public CamelContext getContext() {
         return context;
@@ -719,4 +695,11 @@ class AbstractExchange implements Exchange {
     public ExtendedExchangeExtension getExchangeExtension() {
         return privateExtension;
     }
+
+    private static Map<String, Object> safeCopyProperties(Map<String, Object> properties) {
+        if (properties == null) {
+            return null;
+        }
+        return new ConcurrentHashMap<>(properties);
+    }
 }
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
index a3a58e9c1c6..91cb6c882b0 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
@@ -47,6 +47,10 @@ public final class DefaultExchange extends AbstractExchange {
         super(parent);
     }
 
+    DefaultExchange(AbstractExchange parent) {
+        super(parent);
+    }
+
     public DefaultExchange(Endpoint fromEndpoint) {
         super(fromEndpoint);
     }
@@ -54,4 +58,9 @@ public final class DefaultExchange extends AbstractExchange {
     public DefaultExchange(Endpoint fromEndpoint, ExchangePattern pattern) {
         super(fromEndpoint, pattern);
     }
+
+    @Override
+    AbstractExchange newCopy() {
+        return new DefaultExchange(this);
+    }
 }
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java
index 75bcfca549a..19c757c1309 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java
@@ -66,6 +66,12 @@ public final class DefaultPooledExchange extends AbstractExchange implements Poo
         this.properties = new ConcurrentHashMap<>(8);
     }
 
+    @Override
+    AbstractExchange newCopy() {
+        // NOTE: this is the same behavior as done previously from AbstractExchange when returning a copy.
+        return new DefaultExchange(this);
+    }
+
     public boolean isAutoRelease() {
         return autoRelease;
     }
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ExtendedExchangeExtension.java b/core/camel-support/src/main/java/org/apache/camel/support/ExtendedExchangeExtension.java
index 3f60ac6ca0d..42cb778526b 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/ExtendedExchangeExtension.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/ExtendedExchangeExtension.java
@@ -20,7 +20,6 @@ package org.apache.camel.support;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelContext;
@@ -331,18 +330,9 @@ public class ExtendedExchangeExtension implements ExchangeExtension {
         setStreamCacheDisabled(false);
     }
 
-    private static Map<String, Object> safeCopyProperties(Map<String, Object> properties) {
-        if (properties == null) {
-            return null;
-        }
-        return new ConcurrentHashMap<>(properties);
-    }
-
     @Override
     public Exchange createCopyWithProperties(CamelContext context) {
-        final Map<String, Object> properties = safeCopyProperties(exchange.properties);
-
-        DefaultExchange answer = new DefaultExchange(context, exchange.internalProperties, properties);
+        DefaultExchange answer = new DefaultExchange(context, exchange.internalProperties, exchange.properties);
 
         answer.setPattern(exchange.pattern);