You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2014/01/07 10:19:39 UTC

git commit: CAMEL-7111: Multicast EIP with only one child processor does not call aggregate strategy

Updated Branches:
  refs/heads/camel-2.11.x 872e3be9d -> 2e9d12124


CAMEL-7111: Multicast EIP with only one child processor does not call aggregate strategy


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2e9d1212
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2e9d1212
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2e9d1212

Branch: refs/heads/camel-2.11.x
Commit: 2e9d121245a0d5f47d1dc9ac60475f3043346d2c
Parents: 872e3be
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Jan 7 10:20:49 2014 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Jan 7 10:23:26 2014 +0100

----------------------------------------------------------------------
 .../apache/camel/model/MulticastDefinition.java | 11 ++-
 .../MulticastSingleAggregateIssueTest.java      | 79 ++++++++++++++++++++
 2 files changed, 89 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/2e9d1212/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
index 916f18d..5bdb144 100644
--- a/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.model;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 
@@ -84,7 +85,15 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition> i
 
     @Override
     public Processor createProcessor(RouteContext routeContext) throws Exception {
-        return this.createChildProcessor(routeContext, true);
+        Processor answer = this.createChildProcessor(routeContext, true);
+
+        // force the answer as a multicast processor even if there is only one child processor in the multicast
+        if (!(answer instanceof MulticastProcessor)) {
+            List<Processor> list = new ArrayList<Processor>(1);
+            list.add(answer);
+            answer = createCompositeProcessor(routeContext, list);
+        }
+        return answer;
     }
 
     // Fluent API

http://git-wip-us.apache.org/repos/asf/camel/blob/2e9d1212/camel-core/src/test/java/org/apache/camel/issues/MulticastSingleAggregateIssueTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/issues/MulticastSingleAggregateIssueTest.java b/camel-core/src/test/java/org/apache/camel/issues/MulticastSingleAggregateIssueTest.java
new file mode 100644
index 0000000..84e3e9f
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/issues/MulticastSingleAggregateIssueTest.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.issues;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+
+/**
+ * @version 
+ */
+public class MulticastSingleAggregateIssueTest extends ContextTestSupport {
+
+    public void testMulticastSingleAggregateIssue() throws Exception {
+        getMockEndpoint("mock:a").expectedBodiesReceived(2);
+        getMockEndpoint("mock:a").expectedHeaderReceived("foo", "I was here");
+
+        template.sendBody("direct:a", 1);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:a").multicast(new SumAggregateBean())
+                    .to("direct:foo")
+                .end()
+                .to("mock:a");
+
+                from("direct:foo")
+                    .bean(IncreaseOne.class);
+            }
+        };
+    }
+
+    public static class IncreaseOne {
+
+        public int addOne(int num) {
+            return num + 1;
+        }
+    }
+
+    public static class SumAggregateBean implements AggregationStrategy {
+
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            newExchange.getIn().setHeader("foo", "I was here");
+
+            if (oldExchange == null) {
+                return newExchange;
+            }
+
+            int num1 = oldExchange.getIn().getBody(int.class);
+            int num2 = newExchange.getIn().getBody(int.class);
+
+            newExchange.getIn().setHeader("foo", "I was here");
+            newExchange.getIn().setBody(num1 + num2);
+            return newExchange;
+        }
+    }
+
+}