You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/07/24 11:23:18 UTC

[camel] branch camel-3.7.x updated (78e274e -> 0542e49)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a change to branch camel-3.7.x
in repository https://gitbox.apache.org/repos/asf/camel.git.


    from 78e274e  https://issues.apache.org/jira/browse/CAMEL-16807 : Fixed a problem with KafkaConfiguration:copy()
     new c5e2870  CAMEL-16802: Added unit test. Thanks to Sergio Penkale for reporting.
     new 0542e49  CAMEL-16802: Fixed splitter/aggregate/multicast in parallel mode would not aggregate completed task in submitted order (but use random order instead).

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/camel/processor/MulticastProcessor.java | 26 +++++++++-------------
 ...ltiLinesTest.java => Split123ParallelTest.java} | 21 +++++++++--------
 2 files changed, 21 insertions(+), 26 deletions(-)
 copy core/camel-core/src/test/java/org/apache/camel/processor/{SplitGroupMultiLinesTest.java => Split123ParallelTest.java} (68%)

[camel] 01/02: CAMEL-16802: Added unit test. Thanks to Sergio Penkale for reporting.

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-3.7.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit c5e2870a34e74bfa849bd075184034ced5bc78b2
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sat Jul 24 09:41:08 2021 +0200

    CAMEL-16802: Added unit test. Thanks to Sergio Penkale for reporting.
---
 .../camel/processor/Split123ParallelTest.java      | 52 ++++++++++++++++++++++
 1 file changed, 52 insertions(+)

diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/Split123ParallelTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/Split123ParallelTest.java
new file mode 100644
index 0000000..999b148
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/Split123ParallelTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.StringAggregationStrategy;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+@Disabled("TODO: CAMEL-16802")
+public class Split123ParallelTest extends ContextTestSupport {
+
+    @Test
+    public void testSplitter() throws Exception {
+        getMockEndpoint("mock:foo").expectedBodiesReceivedInAnyOrder("1", "2", "3");
+        getMockEndpoint("mock:result").expectedBodiesReceived("123");
+
+        template.sendBody("direct:start", "1,2,3");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .split(body().tokenize(","), new StringAggregationStrategy()).parallelProcessing()
+                        .to("mock:foo")
+                    .end()
+                    .to("mock:result");
+            }
+        };
+    }
+
+}

[camel] 02/02: CAMEL-16802: Fixed splitter/aggregate/multicast in parallel mode would not aggregate completed task in submitted order (but use random order instead).

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-3.7.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 0542e49beaa400dbb8a01d83b4a49e1bc7345071
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sat Jul 24 10:29:17 2021 +0200

    CAMEL-16802: Fixed splitter/aggregate/multicast in parallel mode would not aggregate completed task in submitted order (but use random order instead).
---
 .../apache/camel/processor/MulticastProcessor.java | 26 +++++++++-------------
 .../camel/processor/Split123ParallelTest.java      |  2 --
 2 files changed, 11 insertions(+), 17 deletions(-)

diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index ec684f8..4e0e616 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -439,13 +439,7 @@ public class MulticastProcessor extends AsyncProcessorSupport
                 Exchange exchange = pair.getExchange();
                 int index = nbExchangeSent.getAndIncrement();
                 updateNewExchange(exchange, index, pairs, hasNext);
-
-                // Schedule the processing of the next pair
-                if (hasNext) {
-                    if (isParallelProcessing()) {
-                        schedule(this);
-                    }
-                } else {
+                if (!hasNext) {
                     allSent.set(true);
                 }
 
@@ -488,12 +482,15 @@ public class MulticastProcessor extends AsyncProcessorSupport
                         }
                     });
                 });
+                // after submitting this pair then move on to the next pair (if in parallel mode)
+                if (hasNext && isParallelProcessing()) {
+                    schedule(this);
+                }
             } catch (Exception e) {
                 original.setException(e);
                 doDone(null, false);
             }
         }
-
     }
 
     /**
@@ -545,13 +542,7 @@ public class MulticastProcessor extends AsyncProcessorSupport
             Exchange exchange = pair.getExchange();
             int index = nbExchangeSent.getAndIncrement();
             updateNewExchange(exchange, index, pairs, hasNext);
-
-            // Schedule the processing of the next pair
-            if (hasNext) {
-                if (isParallelProcessing()) {
-                    schedule(this);
-                }
-            } else {
+            if (!hasNext) {
                 allSent.set(true);
             }
 
@@ -611,6 +602,11 @@ public class MulticastProcessor extends AsyncProcessorSupport
                 aggregate();
             });
 
+            // after submitting this pair then move on to the next pair (if in parallel mode)
+            if (hasNext && isParallelProcessing()) {
+                schedule(this);
+            }
+
             // next step
             boolean next = hasNext && !isParallelProcessing();
             LOG.trace("Run next: {}", next);
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/Split123ParallelTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/Split123ParallelTest.java
index 999b148..15c6e6c 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/Split123ParallelTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/Split123ParallelTest.java
@@ -19,10 +19,8 @@ package org.apache.camel.processor;
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.processor.aggregate.StringAggregationStrategy;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
-@Disabled("TODO: CAMEL-16802")
 public class Split123ParallelTest extends ContextTestSupport {
 
     @Test