You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/03/31 07:55:29 UTC

[2/2] camel git commit: CAMEL-8571: Split EIP - Should use new message id per splitted message. And other EIPs should do the same.

CAMEL-8571: Split EIP - Should use new message id per splitted message. And other EIPs should do the same.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0977cef0
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0977cef0
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0977cef0

Branch: refs/heads/camel-2.15.x
Commit: 0977cef04a14c73cef42e58e6a8dadb454dad28f
Parents: ce0ffee
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Mar 30 20:06:12 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Mar 31 07:57:49 2015 +0200

----------------------------------------------------------------------
 .../camel/component/seda/SedaProducer.java      |  4 ++--
 .../org/apache/camel/util/ExchangeHelper.java   | 22 ++++++++++++++++++++
 .../apache/camel/processor/SplitterTest.java    |  9 ++++++++
 3 files changed, 33 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/0977cef0/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java b/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java
index c00b9b9..56b83f7 100644
--- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java
+++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaProducer.java
@@ -179,8 +179,8 @@ public class SedaProducer extends DefaultAsyncProducer {
     }
 
     protected Exchange prepareCopy(Exchange exchange, boolean handover) {
-        // use a new copy of the exchange to route async
-        Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, handover);
+        // use a new copy of the exchange to route async (and use same message id)
+        Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, handover, true);
         // set a new from endpoint to be the seda queue
         copy.setFromEndpoint(endpoint);
         return copy;

http://git-wip-us.apache.org/repos/asf/camel/blob/0977cef0/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
index 88905d7..6ca425d 100644
--- a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
+++ b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
@@ -215,11 +215,33 @@ public final class ExchangeHelper {
      * @param handover whether the on completion callbacks should be handed over to the new copy.
      */
     public static Exchange createCorrelatedCopy(Exchange exchange, boolean handover) {
+        return createCorrelatedCopy(exchange, handover, false);
+    }
+
+    /**
+     * Creates a new instance and copies from the current message exchange so that it can be
+     * forwarded to another destination as a new instance. Unlike regular copy this operation
+     * will not share the same {@link org.apache.camel.spi.UnitOfWork} so its should be used
+     * for async messaging, where the original and copied exchange are independent.
+     *
+     * @param exchange original copy of the exchange
+     * @param handover whether the on completion callbacks should be handed over to the new copy.
+     * @param useSameMessageId whether to use same message id on the copy message.
+     */
+    public static Exchange createCorrelatedCopy(Exchange exchange, boolean handover, boolean useSameMessageId) {
         String id = exchange.getExchangeId();
 
         Exchange copy = exchange.copy();
+        // do not reuse message id on copy
+        if (!useSameMessageId) {
+            if (copy.hasOut()) {
+                copy.getOut().setMessageId(null);
+            }
+            copy.getIn().setMessageId(null);
+        }
         // do not share the unit of work
         copy.setUnitOfWork(null);
+        // do not reuse the message id
         // hand over on completion to the copy if we got any
         UnitOfWork uow = exchange.getUnitOfWork();
         if (handover && uow != null) {

http://git-wip-us.apache.org/repos/asf/camel/blob/0977cef0/camel-core/src/test/java/org/apache/camel/processor/SplitterTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/SplitterTest.java b/camel-core/src/test/java/org/apache/camel/processor/SplitterTest.java
index 9e0fd6f..a7cf819 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/SplitterTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/SplitterTest.java
@@ -19,6 +19,7 @@ package org.apache.camel.processor;
 import java.io.File;
 import java.net.URL;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
@@ -53,14 +54,22 @@ public class SplitterTest extends ContextTestSupport {
 
         assertMockEndpointsSatisfied();
 
+        Set<String> ids = new HashSet<String>();
+        Set<String> ids2 = new HashSet<String>();
+
         List<Exchange> list = resultEndpoint.getReceivedExchanges();
         for (int i = 0; i < 4; i++) {
             Exchange exchange = list.get(i);
             Message in = exchange.getIn();
+            ids.add(in.getMessageId());
+            ids2.add(exchange.getExchangeId());
             assertNotNull("The in message should not be null.", in);
             assertProperty(exchange, Exchange.SPLIT_INDEX, i);
             assertProperty(exchange, Exchange.SPLIT_SIZE, 4);
         }
+
+        assertEquals("The sub messages should have unique message ids", 4, ids.size());
+        assertEquals("The sub messages should have unique exchange ids", 4, ids2.size());
     }
 
     public void testSplitterWithAggregationStrategy() throws Exception {