You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2023/09/05 16:14:49 UTC
[camel] branch main updated: CAMEL-19801: pre-work for cleaning up copying exchanges (#11304)
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new bbf062a4251 CAMEL-19801: pre-work for cleaning up copying exchanges (#11304)
bbf062a4251 is described below
commit bbf062a425141287d14617c282a982bca9100c00
Author: Otavio Rodolfo Piske <or...@users.noreply.github.com>
AuthorDate: Tue Sep 5 18:14:40 2023 +0200
CAMEL-19801: pre-work for cleaning up copying exchanges (#11304)
---
.../apache/camel/support/AbstractExchangeTest.java | 19 +++-
.../org/apache/camel/support/AbstractExchange.java | 115 +++++++++------------
.../org/apache/camel/support/DefaultExchange.java | 9 ++
.../camel/support/DefaultPooledExchange.java | 6 ++
.../camel/support/ExtendedExchangeExtension.java | 12 +--
5 files changed, 83 insertions(+), 78 deletions(-)
diff --git a/core/camel-core/src/test/java/org/apache/camel/support/AbstractExchangeTest.java b/core/camel-core/src/test/java/org/apache/camel/support/AbstractExchangeTest.java
index 8b26b593e8f..2710b25b52f 100644
--- a/core/camel-core/src/test/java/org/apache/camel/support/AbstractExchangeTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/support/AbstractExchangeTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.support;
+import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.spi.DataType;
@@ -30,9 +31,25 @@ import static org.junit.jupiter.api.Assertions.assertSame;
*/
public class AbstractExchangeTest {
+ static class CustomAbstractExchange extends AbstractExchange {
+
+ CustomAbstractExchange(CustomAbstractExchange abstractExchange) {
+ super(abstractExchange);
+ }
+
+ public CustomAbstractExchange(CamelContext context) {
+ super(context);
+ }
+
+ @Override
+ AbstractExchange newCopy() {
+ return new CustomAbstractExchange(this);
+ }
+ }
+
@Test
void shouldPreserveDataTypeOnCopy() {
- AbstractExchange e1 = new AbstractExchange(new DefaultCamelContext());
+ AbstractExchange e1 = new CustomAbstractExchange(new DefaultCamelContext());
Object body1 = new Object();
DataType type1 = new DataType("foo1");
DefaultMessage in = new DefaultMessage((Exchange) null);
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java b/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java
index 709497805b9..ac2824eda19 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java
@@ -34,14 +34,11 @@ import org.apache.camel.ExchangePropertyKey;
import org.apache.camel.Message;
import org.apache.camel.MessageHistory;
import org.apache.camel.SafeCopyProperty;
-import org.apache.camel.spi.HeadersMapFactory;
import org.apache.camel.spi.UnitOfWork;
import org.apache.camel.trait.message.MessageTrait;
import org.apache.camel.trait.message.RedeliveryTraitPayload;
import org.apache.camel.util.ObjectHelper;
-import static org.apache.camel.support.MessageHelper.copyBody;
-
/**
* Base class for the two official and only implementations of {@link Exchange}, the {@link DefaultExchange} and
* {@link DefaultPooledExchange}.
@@ -51,7 +48,7 @@ import static org.apache.camel.support.MessageHelper.copyBody;
*
* @see DefaultExchange
*/
-class AbstractExchange implements Exchange {
+abstract class AbstractExchange implements Exchange {
protected final EnumMap<ExchangePropertyKey, Object> internalProperties;
protected final CamelContext context;
@@ -74,7 +71,7 @@ class AbstractExchange implements Exchange {
this.context = context;
this.internalProperties = new EnumMap<>(internalProperties);
this.privateExtension = new ExtendedExchangeExtension(this);
- this.properties = properties;
+ this.properties = safeCopyProperties(properties);
}
public AbstractExchange(CamelContext context) {
@@ -103,6 +100,43 @@ class AbstractExchange implements Exchange {
privateExtension.setUnitOfWork(parent.getUnitOfWork());
}
+ AbstractExchange(AbstractExchange parent) {
+ this.context = parent.getContext();
+ this.pattern = parent.getPattern();
+ this.created = parent.getCreated();
+
+ this.internalProperties = new EnumMap<>(parent.internalProperties);
+
+ privateExtension = new ExtendedExchangeExtension(this);
+ privateExtension.setFromEndpoint(parent.getFromEndpoint());
+ privateExtension.setFromRouteId(parent.getFromRouteId());
+ privateExtension.setUnitOfWork(parent.getUnitOfWork());
+
+ setIn(parent.getIn().copy());
+
+ if (parent.hasOut()) {
+ setOut(parent.getOut().copy());
+ }
+
+ setException(parent.exception);
+ setRouteStop(parent.routeStop);
+ setRollbackOnly(parent.rollbackOnly);
+ setRollbackOnlyLast(parent.rollbackOnlyLast);
+
+ privateExtension.setNotifyEvent(parent.getExchangeExtension().isNotifyEvent());
+ privateExtension.setRedeliveryExhausted(parent.getExchangeExtension().isRedeliveryExhausted());
+ privateExtension.setErrorHandlerHandled(parent.getExchangeExtension().getErrorHandlerHandled());
+ privateExtension.setStreamCacheDisabled(parent.getExchangeExtension().isStreamCacheDisabled());
+
+ if (parent.hasProperties()) {
+ this.properties = safeCopyProperties(parent.properties);
+ }
+
+ if (parent.hasSafeCopyProperties()) {
+ this.safeCopyProperties = parent.getSafeCopyProperties();
+ }
+ }
+
public AbstractExchange(Endpoint fromEndpoint) {
this.context = fromEndpoint.getCamelContext();
this.pattern = fromEndpoint.getExchangePattern();
@@ -128,43 +162,11 @@ class AbstractExchange implements Exchange {
return created;
}
+ abstract AbstractExchange newCopy();
+
@Override
public Exchange copy() {
- DefaultExchange exchange = new DefaultExchange(this);
-
- exchange.setIn(getIn().copy());
- copyBody(getIn(), exchange.getIn());
- if (getIn().hasHeaders()) {
- exchange.getIn().setHeaders(safeCopyHeaders(getIn().getHeaders()));
- }
- if (hasOut()) {
- exchange.setOut(getOut().copy());
- copyBody(getOut(), exchange.getOut());
- if (getOut().hasHeaders()) {
- exchange.getOut().setHeaders(safeCopyHeaders(getOut().getHeaders()));
- }
- }
-
- exchange.setException(exception);
- exchange.setRouteStop(routeStop);
- exchange.setRollbackOnly(rollbackOnly);
- exchange.setRollbackOnlyLast(rollbackOnlyLast);
- final ExtendedExchangeExtension newExchangeExtension = exchange.getExchangeExtension();
- newExchangeExtension.setNotifyEvent(getExchangeExtension().isNotifyEvent());
- newExchangeExtension.setRedeliveryExhausted(getExchangeExtension().isRedeliveryExhausted());
- newExchangeExtension.setErrorHandlerHandled(getExchangeExtension().getErrorHandlerHandled());
- newExchangeExtension.setStreamCacheDisabled(getExchangeExtension().isStreamCacheDisabled());
-
- // copy properties after body as body may trigger lazy init
- if (hasProperties()) {
- copyProperties(getProperties(), exchange.getProperties());
- }
-
- if (hasSafeCopyProperties()) {
- safeCopyProperties(this.safeCopyProperties, exchange.getSafeCopyProperties());
- }
- // copy over internal properties
- exchange.internalProperties.putAll(internalProperties);
+ AbstractExchange exchange = newCopy();
if (getContext().isMessageHistory()) {
exchange.internalProperties.computeIfPresent(ExchangePropertyKey.MESSAGE_HISTORY,
@@ -174,32 +176,6 @@ class AbstractExchange implements Exchange {
return exchange;
}
- private Map<String, Object> safeCopyHeaders(Map<String, Object> headers) {
- if (headers == null) {
- return null;
- }
-
- if (context != null) {
- HeadersMapFactory factory = context.getCamelContextExtension().getHeadersMapFactory();
- if (factory != null) {
- return factory.newMap(headers);
- }
- }
- // should not really happen but some tests dont start camel context
- return new HashMap<>(headers);
- }
-
- private void copyProperties(Map<String, Object> source, Map<String, Object> target) {
- target.putAll(source);
- }
-
- private void safeCopyProperties(
- Map<String, SafeCopyProperty> source, Map<String, SafeCopyProperty> target) {
- source.entrySet().stream().forEach(entry -> {
- target.put(entry.getKey(), entry.getValue().safeCopy());
- });
- }
-
@Override
public CamelContext getContext() {
return context;
@@ -719,4 +695,11 @@ class AbstractExchange implements Exchange {
public ExtendedExchangeExtension getExchangeExtension() {
return privateExtension;
}
+
+ private static Map<String, Object> safeCopyProperties(Map<String, Object> properties) {
+ if (properties == null) {
+ return null;
+ }
+ return new ConcurrentHashMap<>(properties);
+ }
}
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
index a3a58e9c1c6..91cb6c882b0 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
@@ -47,6 +47,10 @@ public final class DefaultExchange extends AbstractExchange {
super(parent);
}
+ DefaultExchange(AbstractExchange parent) {
+ super(parent);
+ }
+
public DefaultExchange(Endpoint fromEndpoint) {
super(fromEndpoint);
}
@@ -54,4 +58,9 @@ public final class DefaultExchange extends AbstractExchange {
public DefaultExchange(Endpoint fromEndpoint, ExchangePattern pattern) {
super(fromEndpoint, pattern);
}
+
+ @Override
+ AbstractExchange newCopy() {
+ return new DefaultExchange(this);
+ }
}
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java
index 75bcfca549a..19c757c1309 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java
@@ -66,6 +66,12 @@ public final class DefaultPooledExchange extends AbstractExchange implements Poo
this.properties = new ConcurrentHashMap<>(8);
}
+ @Override
+ AbstractExchange newCopy() {
+ // NOTE: this is the same behavior as done previously from AbstractExchange when returning a copy.
+ return new DefaultExchange(this);
+ }
+
public boolean isAutoRelease() {
return autoRelease;
}
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ExtendedExchangeExtension.java b/core/camel-support/src/main/java/org/apache/camel/support/ExtendedExchangeExtension.java
index 3f60ac6ca0d..42cb778526b 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/ExtendedExchangeExtension.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/ExtendedExchangeExtension.java
@@ -20,7 +20,6 @@ package org.apache.camel.support;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
@@ -331,18 +330,9 @@ public class ExtendedExchangeExtension implements ExchangeExtension {
setStreamCacheDisabled(false);
}
- private static Map<String, Object> safeCopyProperties(Map<String, Object> properties) {
- if (properties == null) {
- return null;
- }
- return new ConcurrentHashMap<>(properties);
- }
-
@Override
public Exchange createCopyWithProperties(CamelContext context) {
- final Map<String, Object> properties = safeCopyProperties(exchange.properties);
-
- DefaultExchange answer = new DefaultExchange(context, exchange.internalProperties, properties);
+ DefaultExchange answer = new DefaultExchange(context, exchange.internalProperties, exchange.properties);
answer.setPattern(exchange.pattern);