You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2024/02/24 08:45:00 UTC

(camel) branch camel-4.0.x updated: CAMEL-20457: camel-core - Fix NPE in split parallel timeout without agg strategy. (#13282)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-4.0.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-4.0.x by this push:
     new f9edfff3dcc CAMEL-20457: camel-core - Fix NPE in split parallel timeout without agg strategy. (#13282)
f9edfff3dcc is described below

commit f9edfff3dcc9297ae092f994ea787be686c16002
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sat Feb 24 09:44:17 2024 +0100

    CAMEL-20457: camel-core - Fix NPE in split parallel timeout without agg strategy. (#13282)
---
 .../apache/camel/processor/MulticastProcessor.java |  7 +-
 ...itParallelTimeoutNoAggregationStrategyTest.java | 79 ++++++++++++++++++++++
 2 files changed, 84 insertions(+), 2 deletions(-)

diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 8196139634b..9e5a486ca94 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -469,9 +469,12 @@ public class MulticastProcessor extends AsyncProcessorSupport
                         Exchange exchange = completion.pollUnordered();
                         int index = exchange != null ? getExchangeIndex(exchange) : nbExchangeSent.get();
                         while (nbAggregated.get() < index) {
+                            int idx = nbAggregated.getAndIncrement();
                             AggregationStrategy strategy = getAggregationStrategy(null);
-                            strategy.timeout(result.get() != null ? result.get() : original,
-                                    nbAggregated.getAndIncrement(), nbExchangeSent.get(), timeout);
+                            if (strategy != null) {
+                                strategy.timeout(result.get() != null ? result.get() : original,
+                                        idx, nbExchangeSent.get(), timeout);
+                            }
                         }
                         if (exchange != null) {
                             doAggregate(result, exchange, original);
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/SplitParallelTimeoutNoAggregationStrategyTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/SplitParallelTimeoutNoAggregationStrategyTest.java
new file mode 100644
index 00000000000..6464d4da4bd
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/SplitParallelTimeoutNoAggregationStrategyTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.concurrent.Phaser;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.jupiter.api.Assumptions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.parallel.Isolated;
+
+@Isolated
+@Timeout(60)
+public class SplitParallelTimeoutNoAggregationStrategyTest extends ContextTestSupport {
+
+    private final Phaser phaser = new Phaser(3);
+
+    @BeforeEach
+    void sendEarly() {
+        Assumptions.assumeTrue(context.isStarted(), "The test cannot be run because the context is not started");
+        template.sendBody("direct:start", "A,B,C");
+    }
+
+    @Test
+    public void testSplitTimeout() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        // A will timeout so we only get B and/or C
+        mock.message(0).body().not(body().contains("A"));
+
+        phaser.awaitAdvanceInterruptibly(0, 5000, TimeUnit.SECONDS);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                        .split(body().tokenize(",")).parallelProcessing().timeout(100)
+                        .choice()
+                            .when(body().isEqualTo("A")).to("direct:a")
+                            .when(body().isEqualTo("B")).to("direct:b")
+                            .when(body().isEqualTo("C")).to("direct:c")
+                            .end() // end
+                        // choice
+                        .end() // end split
+                        .to("mock:result");
+
+                from("direct:a").process(e -> phaser.arriveAndAwaitAdvance()).delay(200).setBody(constant("A"));
+
+                from("direct:b").process(e -> phaser.arriveAndAwaitAdvance()).setBody(constant("B"));
+
+                from("direct:c").process(e -> phaser.arriveAndAwaitAdvance()).delay(10).setBody(constant("C"));
+            }
+        };
+    }
+
+}