You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/08/06 14:25:10 UTC
[camel] 04/04: CAMEL-13828: DefaultExchangeHolder - Do not propgate
exchange id for camel-jms with transferExchange as its across JVMs etc.
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit dac13922d47b0e07de1cec50ebfe569f096f3948
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Tue Aug 6 16:23:24 2019 +0200
CAMEL-13828: DefaultExchangeHolder - Do not propgate exchange id for camel-jms with transferExchange as its across JVMs etc.
---
.../org/apache/camel/component/jms/JmsBinding.java | 2 +-
...ransferExchangeInflightRepositoryFlushTest.java | 12 ++----
.../camel/support/DefaultExchangeHolder.java | 46 ++++++++++------------
3 files changed, 26 insertions(+), 34 deletions(-)
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
index fcfacd6..7f02f0a 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
@@ -519,7 +519,7 @@ public class JmsBinding {
// special for transferExchange
if (endpoint != null && endpoint.isTransferExchange()) {
LOG.trace("Option transferExchange=true so we use JmsMessageType: Object");
- Serializable holder = DefaultExchangeHolder.marshal(exchange, true, endpoint.isAllowSerializedHeaders());
+ Serializable holder = DefaultExchangeHolder.marshal(exchange, true, endpoint.isAllowSerializedHeaders(), false);
Message answer = session.createObjectMessage(holder);
// ensure default delivery mode is used by default
answer.setJMSDeliveryMode(Message.DEFAULT_DELIVERY_MODE);
diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsInOutTransferExchangeInflightRepositoryFlushTest.java b/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsInOutTransferExchangeInflightRepositoryFlushTest.java
index 0c83e9a..2b59fb8 100644
--- a/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsInOutTransferExchangeInflightRepositoryFlushTest.java
+++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsInOutTransferExchangeInflightRepositoryFlushTest.java
@@ -65,21 +65,17 @@ public class JmsInOutTransferExchangeInflightRepositoryFlushTest extends CamelTe
return new RouteBuilder() {
public void configure() {
from("direct:start")
- .log("A ${exchangeId}")
- .inOut("activemq:responseGenerator?transferExchange=true&requestTimeout=20000")
- .log("A ${exchangeId}")
- .to("log:result", "mock:result");
+ .inOut("activemq:responseGenerator?transferExchange=true&requestTimeout=5000")
+ .to("mock:result");
from("activemq:responseGenerator?transferExchange=true")
- .log("B ${exchangeId}")
.process(new Processor() {
public void process(Exchange exchange) throws Exception {
// there are 2 inflight (one for both routes)
assertEquals(2, exchange.getContext().getInflightRepository().size());
- exchange.getMessage().setBody(new SerializableResponseDto(true));
+ exchange.getIn().setBody(new SerializableResponseDto(true));
}
- }).to("log:reply")
- .log("B ${exchangeId}");
+ });
}
};
}
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchangeHolder.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchangeHolder.java
index ad37459..5ed075c 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchangeHolder.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchangeHolder.java
@@ -93,29 +93,7 @@ public class DefaultExchangeHolder implements Serializable {
* @return the holder object with information copied form the exchange
*/
public static DefaultExchangeHolder marshal(Exchange exchange, boolean includeProperties) {
- ObjectHelper.notNull(exchange, "exchange");
-
- // we do not support files
- Object body = exchange.getIn().getBody();
- if (body instanceof WrappedFile || body instanceof File) {
- throw new RuntimeExchangeException("Message body of type " + body.getClass().getCanonicalName() + " is not supported by this marshaller.", exchange);
- }
-
- DefaultExchangeHolder payload = new DefaultExchangeHolder();
-
- payload.exchangeId = exchange.getExchangeId();
- payload.inBody = checkSerializableBody("in body", exchange, exchange.getIn().getBody());
- payload.safeSetInHeaders(exchange, false);
- if (exchange.hasOut()) {
- payload.outBody = checkSerializableBody("out body", exchange, exchange.getOut().getBody());
- payload.safeSetOutHeaders(exchange, false);
- }
- if (includeProperties) {
- payload.safeSetProperties(exchange, false);
- }
- payload.exception = exchange.getException();
-
- return payload;
+ return marshal(exchange, includeProperties, false, true);
}
/**
@@ -127,6 +105,20 @@ public class DefaultExchangeHolder implements Serializable {
* @return the holder object with information copied form the exchange
*/
public static DefaultExchangeHolder marshal(Exchange exchange, boolean includeProperties, boolean allowSerializedHeaders) {
+ return marshal(exchange, includeProperties, allowSerializedHeaders, true);
+ }
+
+ /**
+ * Creates a payload object with the information from the given exchange.
+ *
+ * @param exchange the exchange, must <b>not</b> be <tt>null</tt>
+ * @param includeProperties whether or not to include exchange properties
+ * @param allowSerializedHeaders whether or not to include serialized headers
+ * @param preserveExchangeId whether to preserve exchange id
+ * @return the holder object with information copied form the exchange
+ */
+ public static DefaultExchangeHolder marshal(Exchange exchange, boolean includeProperties,
+ boolean allowSerializedHeaders, boolean preserveExchangeId) {
ObjectHelper.notNull(exchange, "exchange");
// we do not support files
@@ -137,7 +129,9 @@ public class DefaultExchangeHolder implements Serializable {
DefaultExchangeHolder payload = new DefaultExchangeHolder();
- payload.exchangeId = exchange.getExchangeId();
+ if (preserveExchangeId) {
+ payload.exchangeId = exchange.getExchangeId();
+ }
payload.inBody = checkSerializableBody("in body", exchange, exchange.getIn().getBody());
payload.safeSetInHeaders(exchange, allowSerializedHeaders);
if (exchange.hasOut()) {
@@ -162,7 +156,9 @@ public class DefaultExchangeHolder implements Serializable {
ObjectHelper.notNull(exchange, "exchange");
ObjectHelper.notNull(payload, "payload");
- exchange.setExchangeId(payload.exchangeId);
+ if (payload.exchangeId != null) {
+ exchange.setExchangeId(payload.exchangeId);
+ }
exchange.getIn().setBody(payload.inBody);
if (payload.inHeaders != null) {
exchange.getIn().setHeaders(payload.inHeaders);