You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2014/01/07 10:19:39 UTC
git commit: CAMEL-7111: Multicast EIP with only one child processor
does not call aggregate strategy
Updated Branches:
refs/heads/camel-2.11.x 872e3be9d -> 2e9d12124
CAMEL-7111: Multicast EIP with only one child processor does not call aggregate strategy
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2e9d1212
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2e9d1212
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2e9d1212
Branch: refs/heads/camel-2.11.x
Commit: 2e9d121245a0d5f47d1dc9ac60475f3043346d2c
Parents: 872e3be
Author: Claus Ibsen <da...@apache.org>
Authored: Tue Jan 7 10:20:49 2014 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Jan 7 10:23:26 2014 +0100
----------------------------------------------------------------------
.../apache/camel/model/MulticastDefinition.java | 11 ++-
.../MulticastSingleAggregateIssueTest.java | 79 ++++++++++++++++++++
2 files changed, 89 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/2e9d1212/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
index 916f18d..5bdb144 100644
--- a/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.model;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
@@ -84,7 +85,15 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition> i
@Override
public Processor createProcessor(RouteContext routeContext) throws Exception {
- return this.createChildProcessor(routeContext, true);
+ Processor answer = this.createChildProcessor(routeContext, true);
+
+ // force the answer as a multicast processor even if there is only one child processor in the multicast
+ if (!(answer instanceof MulticastProcessor)) {
+ List<Processor> list = new ArrayList<Processor>(1);
+ list.add(answer);
+ answer = createCompositeProcessor(routeContext, list);
+ }
+ return answer;
}
// Fluent API
http://git-wip-us.apache.org/repos/asf/camel/blob/2e9d1212/camel-core/src/test/java/org/apache/camel/issues/MulticastSingleAggregateIssueTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/issues/MulticastSingleAggregateIssueTest.java b/camel-core/src/test/java/org/apache/camel/issues/MulticastSingleAggregateIssueTest.java
new file mode 100644
index 0000000..84e3e9f
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/issues/MulticastSingleAggregateIssueTest.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.issues;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+
+/**
+ * @version
+ */
+public class MulticastSingleAggregateIssueTest extends ContextTestSupport {
+
+ public void testMulticastSingleAggregateIssue() throws Exception {
+ getMockEndpoint("mock:a").expectedBodiesReceived(2);
+ getMockEndpoint("mock:a").expectedHeaderReceived("foo", "I was here");
+
+ template.sendBody("direct:a", 1);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:a").multicast(new SumAggregateBean())
+ .to("direct:foo")
+ .end()
+ .to("mock:a");
+
+ from("direct:foo")
+ .bean(IncreaseOne.class);
+ }
+ };
+ }
+
+ public static class IncreaseOne {
+
+ public int addOne(int num) {
+ return num + 1;
+ }
+ }
+
+ public static class SumAggregateBean implements AggregationStrategy {
+
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ newExchange.getIn().setHeader("foo", "I was here");
+
+ if (oldExchange == null) {
+ return newExchange;
+ }
+
+ int num1 = oldExchange.getIn().getBody(int.class);
+ int num2 = newExchange.getIn().getBody(int.class);
+
+ newExchange.getIn().setHeader("foo", "I was here");
+ newExchange.getIn().setBody(num1 + num2);
+ return newExchange;
+ }
+ }
+
+}