You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/03/31 07:55:28 UTC
[1/2] camel git commit: CAMEL-8571: Split EIP - Should use new
message id per splitted message. And other EIPs should do the same.
Repository: camel
Updated Branches:
refs/heads/camel-2.15.x ce0ffeeec -> 0977cef04
refs/heads/master d01e51f02 -> 97320fa18
CAMEL-8571: Split EIP - Should use new message id per splitted message. And other EIPs should do the same.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/97320fa1
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/97320fa1
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/97320fa1
Branch: refs/heads/master
Commit: 97320fa185c0f749ca086579d2c647900215e31a
Parents: d01e51f
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Mar 30 20:06:12 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Mar 31 07:13:30 2015 +0200
----------------------------------------------------------------------
.../camel/component/seda/SedaProducer.java | 4 ++--
.../org/apache/camel/util/ExchangeHelper.java | 22 ++++++++++++++++++++
.../apache/camel/processor/SplitterTest.java | 9 ++++++++
3 files changed, 33 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/97320fa1/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java b/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java
index c00b9b9..56b83f7 100644
--- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java
+++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java
@@ -179,8 +179,8 @@ public class SedaProducer extends DefaultAsyncProducer {
}
protected Exchange prepareCopy(Exchange exchange, boolean handover) {
- // use a new copy of the exchange to route async
- Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, handover);
+ // use a new copy of the exchange to route async (and use same message id)
+ Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, handover, true);
// set a new from endpoint to be the seda queue
copy.setFromEndpoint(endpoint);
return copy;
http://git-wip-us.apache.org/repos/asf/camel/blob/97320fa1/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
index 88905d7..6ca425d 100644
--- a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
+++ b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
@@ -215,11 +215,33 @@ public final class ExchangeHelper {
* @param handover whether the on completion callbacks should be handed over to the new copy.
*/
public static Exchange createCorrelatedCopy(Exchange exchange, boolean handover) {
+ return createCorrelatedCopy(exchange, handover, false);
+ }
+
+ /**
+ * Creates a new instance and copies from the current message exchange so that it can be
+ * forwarded to another destination as a new instance. Unlike regular copy this operation
+ * will not share the same {@link org.apache.camel.spi.UnitOfWork} so its should be used
+ * for async messaging, where the original and copied exchange are independent.
+ *
+ * @param exchange original copy of the exchange
+ * @param handover whether the on completion callbacks should be handed over to the new copy.
+ * @param useSameMessageId whether to use same message id on the copy message.
+ */
+ public static Exchange createCorrelatedCopy(Exchange exchange, boolean handover, boolean useSameMessageId) {
String id = exchange.getExchangeId();
Exchange copy = exchange.copy();
+ // do not reuse message id on copy
+ if (!useSameMessageId) {
+ if (copy.hasOut()) {
+ copy.getOut().setMessageId(null);
+ }
+ copy.getIn().setMessageId(null);
+ }
// do not share the unit of work
copy.setUnitOfWork(null);
+ // do not reuse the message id
// hand over on completion to the copy if we got any
UnitOfWork uow = exchange.getUnitOfWork();
if (handover && uow != null) {
http://git-wip-us.apache.org/repos/asf/camel/blob/97320fa1/camel-core/src/test/java/org/apache/camel/processor/SplitterTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/SplitterTest.java b/camel-core/src/test/java/org/apache/camel/processor/SplitterTest.java
index 9e0fd6f..a7cf819 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/SplitterTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/SplitterTest.java
@@ -19,6 +19,7 @@ package org.apache.camel.processor;
import java.io.File;
import java.net.URL;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
@@ -53,14 +54,22 @@ public class SplitterTest extends ContextTestSupport {
assertMockEndpointsSatisfied();
+ Set<String> ids = new HashSet<String>();
+ Set<String> ids2 = new HashSet<String>();
+
List<Exchange> list = resultEndpoint.getReceivedExchanges();
for (int i = 0; i < 4; i++) {
Exchange exchange = list.get(i);
Message in = exchange.getIn();
+ ids.add(in.getMessageId());
+ ids2.add(exchange.getExchangeId());
assertNotNull("The in message should not be null.", in);
assertProperty(exchange, Exchange.SPLIT_INDEX, i);
assertProperty(exchange, Exchange.SPLIT_SIZE, 4);
}
+
+ assertEquals("The sub messages should have unique message ids", 4, ids.size());
+ assertEquals("The sub messages should have unique exchange ids", 4, ids2.size());
}
public void testSplitterWithAggregationStrategy() throws Exception {
[2/2] camel git commit: CAMEL-8571: Split EIP - Should use new
message id per splitted message. And other EIPs should do the same.
Posted by da...@apache.org.
CAMEL-8571: Split EIP - Should use new message id per splitted message. And other EIPs should do the same.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0977cef0
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0977cef0
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0977cef0
Branch: refs/heads/camel-2.15.x
Commit: 0977cef04a14c73cef42e58e6a8dadb454dad28f
Parents: ce0ffee
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Mar 30 20:06:12 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Mar 31 07:57:49 2015 +0200
----------------------------------------------------------------------
.../camel/component/seda/SedaProducer.java | 4 ++--
.../org/apache/camel/util/ExchangeHelper.java | 22 ++++++++++++++++++++
.../apache/camel/processor/SplitterTest.java | 9 ++++++++
3 files changed, 33 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/0977cef0/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java b/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java
index c00b9b9..56b83f7 100644
--- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java
+++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java
@@ -179,8 +179,8 @@ public class SedaProducer extends DefaultAsyncProducer {
}
protected Exchange prepareCopy(Exchange exchange, boolean handover) {
- // use a new copy of the exchange to route async
- Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, handover);
+ // use a new copy of the exchange to route async (and use same message id)
+ Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, handover, true);
// set a new from endpoint to be the seda queue
copy.setFromEndpoint(endpoint);
return copy;
http://git-wip-us.apache.org/repos/asf/camel/blob/0977cef0/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
index 88905d7..6ca425d 100644
--- a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
+++ b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
@@ -215,11 +215,33 @@ public final class ExchangeHelper {
* @param handover whether the on completion callbacks should be handed over to the new copy.
*/
public static Exchange createCorrelatedCopy(Exchange exchange, boolean handover) {
+ return createCorrelatedCopy(exchange, handover, false);
+ }
+
+ /**
+ * Creates a new instance and copies from the current message exchange so that it can be
+ * forwarded to another destination as a new instance. Unlike regular copy this operation
+ * will not share the same {@link org.apache.camel.spi.UnitOfWork} so its should be used
+ * for async messaging, where the original and copied exchange are independent.
+ *
+ * @param exchange original copy of the exchange
+ * @param handover whether the on completion callbacks should be handed over to the new copy.
+ * @param useSameMessageId whether to use same message id on the copy message.
+ */
+ public static Exchange createCorrelatedCopy(Exchange exchange, boolean handover, boolean useSameMessageId) {
String id = exchange.getExchangeId();
Exchange copy = exchange.copy();
+ // do not reuse message id on copy
+ if (!useSameMessageId) {
+ if (copy.hasOut()) {
+ copy.getOut().setMessageId(null);
+ }
+ copy.getIn().setMessageId(null);
+ }
// do not share the unit of work
copy.setUnitOfWork(null);
+ // do not reuse the message id
// hand over on completion to the copy if we got any
UnitOfWork uow = exchange.getUnitOfWork();
if (handover && uow != null) {
http://git-wip-us.apache.org/repos/asf/camel/blob/0977cef0/camel-core/src/test/java/org/apache/camel/processor/SplitterTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/SplitterTest.java b/camel-core/src/test/java/org/apache/camel/processor/SplitterTest.java
index 9e0fd6f..a7cf819 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/SplitterTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/SplitterTest.java
@@ -19,6 +19,7 @@ package org.apache.camel.processor;
import java.io.File;
import java.net.URL;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
@@ -53,14 +54,22 @@ public class SplitterTest extends ContextTestSupport {
assertMockEndpointsSatisfied();
+ Set<String> ids = new HashSet<String>();
+ Set<String> ids2 = new HashSet<String>();
+
List<Exchange> list = resultEndpoint.getReceivedExchanges();
for (int i = 0; i < 4; i++) {
Exchange exchange = list.get(i);
Message in = exchange.getIn();
+ ids.add(in.getMessageId());
+ ids2.add(exchange.getExchangeId());
assertNotNull("The in message should not be null.", in);
assertProperty(exchange, Exchange.SPLIT_INDEX, i);
assertProperty(exchange, Exchange.SPLIT_SIZE, 4);
}
+
+ assertEquals("The sub messages should have unique message ids", 4, ids.size());
+ assertEquals("The sub messages should have unique exchange ids", 4, ids2.size());
}
public void testSplitterWithAggregationStrategy() throws Exception {