You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/08/06 14:25:10 UTC

[camel] 04/04: CAMEL-13828: DefaultExchangeHolder - Do not propgate exchange id for camel-jms with transferExchange as its across JVMs etc.

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit dac13922d47b0e07de1cec50ebfe569f096f3948
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Tue Aug 6 16:23:24 2019 +0200

    CAMEL-13828: DefaultExchangeHolder - Do not propgate exchange id for camel-jms with transferExchange as its across JVMs etc.
---
 .../org/apache/camel/component/jms/JmsBinding.java |  2 +-
 ...ransferExchangeInflightRepositoryFlushTest.java | 12 ++----
 .../camel/support/DefaultExchangeHolder.java       | 46 ++++++++++------------
 3 files changed, 26 insertions(+), 34 deletions(-)

diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
index fcfacd6..7f02f0a 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
@@ -519,7 +519,7 @@ public class JmsBinding {
         // special for transferExchange
         if (endpoint != null && endpoint.isTransferExchange()) {
             LOG.trace("Option transferExchange=true so we use JmsMessageType: Object");
-            Serializable holder = DefaultExchangeHolder.marshal(exchange, true, endpoint.isAllowSerializedHeaders());
+            Serializable holder = DefaultExchangeHolder.marshal(exchange, true, endpoint.isAllowSerializedHeaders(), false);
             Message answer = session.createObjectMessage(holder);
             // ensure default delivery mode is used by default
             answer.setJMSDeliveryMode(Message.DEFAULT_DELIVERY_MODE);
diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsInOutTransferExchangeInflightRepositoryFlushTest.java b/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsInOutTransferExchangeInflightRepositoryFlushTest.java
index 0c83e9a..2b59fb8 100644
--- a/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsInOutTransferExchangeInflightRepositoryFlushTest.java
+++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsInOutTransferExchangeInflightRepositoryFlushTest.java
@@ -65,21 +65,17 @@ public class JmsInOutTransferExchangeInflightRepositoryFlushTest extends CamelTe
         return new RouteBuilder() {
             public void configure() {
                 from("direct:start")
-                        .log("A ${exchangeId}")
-                        .inOut("activemq:responseGenerator?transferExchange=true&requestTimeout=20000")
-                        .log("A ${exchangeId}")
-                        .to("log:result", "mock:result");
+                        .inOut("activemq:responseGenerator?transferExchange=true&requestTimeout=5000")
+                        .to("mock:result");
 
                 from("activemq:responseGenerator?transferExchange=true")
-                        .log("B ${exchangeId}")
                         .process(new Processor() {
                             public void process(Exchange exchange) throws Exception {
                                 // there are 2 inflight (one for both routes)
                                 assertEquals(2, exchange.getContext().getInflightRepository().size());
-                                exchange.getMessage().setBody(new SerializableResponseDto(true));
+                                exchange.getIn().setBody(new SerializableResponseDto(true));
                             }
-                        }).to("log:reply")
-                        .log("B ${exchangeId}");
+                        });
             }
         };
     }
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchangeHolder.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchangeHolder.java
index ad37459..5ed075c 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchangeHolder.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchangeHolder.java
@@ -93,29 +93,7 @@ public class DefaultExchangeHolder implements Serializable {
      * @return the holder object with information copied form the exchange
      */
     public static DefaultExchangeHolder marshal(Exchange exchange, boolean includeProperties) {
-        ObjectHelper.notNull(exchange, "exchange");
-
-        // we do not support files
-        Object body = exchange.getIn().getBody();
-        if (body instanceof WrappedFile || body instanceof File) {
-            throw new RuntimeExchangeException("Message body of type " + body.getClass().getCanonicalName() + " is not supported by this marshaller.", exchange);
-        }
-
-        DefaultExchangeHolder payload = new DefaultExchangeHolder();
-
-        payload.exchangeId = exchange.getExchangeId();
-        payload.inBody = checkSerializableBody("in body", exchange, exchange.getIn().getBody());
-        payload.safeSetInHeaders(exchange, false);
-        if (exchange.hasOut()) {
-            payload.outBody = checkSerializableBody("out body", exchange, exchange.getOut().getBody());
-            payload.safeSetOutHeaders(exchange, false);
-        }
-        if (includeProperties) {
-            payload.safeSetProperties(exchange, false);
-        }
-        payload.exception = exchange.getException();
-
-        return payload;
+        return marshal(exchange, includeProperties, false, true);
     }
     
     /**
@@ -127,6 +105,20 @@ public class DefaultExchangeHolder implements Serializable {
      * @return the holder object with information copied form the exchange
      */
     public static DefaultExchangeHolder marshal(Exchange exchange, boolean includeProperties, boolean allowSerializedHeaders) {
+        return marshal(exchange, includeProperties, allowSerializedHeaders, true);
+    }
+
+    /**
+     * Creates a payload object with the information from the given exchange.
+     *
+     * @param exchange the exchange, must <b>not</b> be <tt>null</tt>
+     * @param includeProperties whether or not to include exchange properties
+     * @param allowSerializedHeaders whether or not to include serialized headers
+     * @param preserveExchangeId whether to preserve exchange id
+     * @return the holder object with information copied form the exchange
+     */
+    public static DefaultExchangeHolder marshal(Exchange exchange, boolean includeProperties,
+                                                boolean allowSerializedHeaders, boolean preserveExchangeId) {
         ObjectHelper.notNull(exchange, "exchange");
 
         // we do not support files
@@ -137,7 +129,9 @@ public class DefaultExchangeHolder implements Serializable {
 
         DefaultExchangeHolder payload = new DefaultExchangeHolder();
 
-        payload.exchangeId = exchange.getExchangeId();
+        if (preserveExchangeId) {
+            payload.exchangeId = exchange.getExchangeId();
+        }
         payload.inBody = checkSerializableBody("in body", exchange, exchange.getIn().getBody());
         payload.safeSetInHeaders(exchange, allowSerializedHeaders);
         if (exchange.hasOut()) {
@@ -162,7 +156,9 @@ public class DefaultExchangeHolder implements Serializable {
         ObjectHelper.notNull(exchange, "exchange");
         ObjectHelper.notNull(payload, "payload");
 
-        exchange.setExchangeId(payload.exchangeId);
+        if (payload.exchangeId != null) {
+            exchange.setExchangeId(payload.exchangeId);
+        }
         exchange.getIn().setBody(payload.inBody);
         if (payload.inHeaders != null) {
             exchange.getIn().setHeaders(payload.inHeaders);