You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/11/04 16:45:23 UTC
[2/4] camel git commit: CAMEL-10442: Fixed an issue when using
pipeline in Java DSL not setting up the EIP correctly which could lead to
runtime route not as intended. This is a little Java DSL change so lets keep
it for master branch only.
CAMEL-10442: Fixed an issue when using pipeline in Java DSL not setting up the EIP correctly which could lead to runtime route not as intended. This is a little Java DSL change so lets keep it for master branch only.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/94d7b0d2
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/94d7b0d2
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/94d7b0d2
Branch: refs/heads/master
Commit: 94d7b0d2a3def5bab26ba3542800dc0fd65fd8ab
Parents: 74a816b
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Nov 4 17:02:14 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Nov 4 17:44:51 2016 +0100
----------------------------------------------------------------------
.../apache/camel/model/ProcessorDefinition.java | 15 ++-
.../camel/processor/MulticastPipelineTest.java | 109 +++++++++++++++++--
2 files changed, 111 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/94d7b0d2/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
index b7e8fc9..2ce1fc6 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
@@ -1163,7 +1163,10 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
* @return the builder
*/
public Type pipeline(String... uris) {
- return to(uris);
+ PipelineDefinition answer = new PipelineDefinition();
+ addOutput(answer);
+ answer.to(uris);
+ return (Type) this;
}
/**
@@ -1176,7 +1179,10 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
* @return the builder
*/
public Type pipeline(Endpoint... endpoints) {
- return to(endpoints);
+ PipelineDefinition answer = new PipelineDefinition();
+ addOutput(answer);
+ answer.to(endpoints);
+ return (Type) this;
}
/**
@@ -1189,7 +1195,10 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
* @return the builder
*/
public Type pipeline(Collection<Endpoint> endpoints) {
- return to(endpoints);
+ PipelineDefinition answer = new PipelineDefinition();
+ addOutput(answer);
+ answer.to(endpoints);
+ return (Type) this;
}
/**
http://git-wip-us.apache.org/repos/asf/camel/blob/94d7b0d2/camel-core/src/test/java/org/apache/camel/processor/MulticastPipelineTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/MulticastPipelineTest.java b/camel-core/src/test/java/org/apache/camel/processor/MulticastPipelineTest.java
index d0a2999..540cd2b 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/MulticastPipelineTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/MulticastPipelineTest.java
@@ -18,15 +18,35 @@ package org.apache.camel.processor;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.util.toolbox.AggregationStrategies;
public class MulticastPipelineTest extends ContextTestSupport {
- public void testMulticastPipeline() throws Exception {
+ @Override
+ public boolean isUseRouteBuilder() {
+ return false;
+ }
+
+ public void testPlainPipeline() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .pipeline("direct:a", "direct:b")
+ .pipeline("direct:c", "direct:d")
+ .to("mock:result");
+
+ from("direct:a").to("mock:a").setBody().constant("A");
+ from("direct:b").to("mock:b").setBody().constant("B");
+ from("direct:c").to("mock:c").setBody().constant("C");
+ from("direct:d").to("mock:d").setBody().constant("D");
+ }
+ });
+ context.start();
+
getMockEndpoint("mock:a").expectedBodiesReceived("Hello World");
- getMockEndpoint("mock:b").expectedBodiesReceived("Hello World");
- getMockEndpoint("mock:c").expectedBodiesReceived("Hello World");
- getMockEndpoint("mock:d").expectedBodiesReceived("Hello World");
+ getMockEndpoint("mock:b").expectedBodiesReceived("A");
+ getMockEndpoint("mock:c").expectedBodiesReceived("B");
+ getMockEndpoint("mock:d").expectedBodiesReceived("C");
getMockEndpoint("mock:result").expectedMessageCount(1);
template.sendBody("direct:start", "Hello World");
@@ -34,13 +54,40 @@ public class MulticastPipelineTest extends ContextTestSupport {
assertMockEndpointsSatisfied();
}
- @Override
- protected RouteBuilder createRouteBuilder() throws Exception {
- return new RouteBuilder() {
+ public void testPlainPipelineTo() throws Exception {
+ context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:start")
- .multicast().aggregationStrategy(AggregationStrategies.groupedExchange())
+ .pipeline().to("direct:a", "direct:b").end()
+ .pipeline().to("direct:c", "direct:d").end()
+ .to("mock:result");
+
+ from("direct:a").to("mock:a").setBody().constant("A");
+ from("direct:b").to("mock:b").setBody().constant("B");
+ from("direct:c").to("mock:c").setBody().constant("C");
+ from("direct:d").to("mock:d").setBody().constant("D");
+ }
+ });
+ context.start();
+
+ getMockEndpoint("mock:a").expectedBodiesReceived("Hello World");
+ getMockEndpoint("mock:b").expectedBodiesReceived("A");
+ getMockEndpoint("mock:c").expectedBodiesReceived("B");
+ getMockEndpoint("mock:d").expectedBodiesReceived("C");
+ getMockEndpoint("mock:result").expectedMessageCount(1);
+
+ template.sendBody("direct:start", "Hello World");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ public void testMulticastPipeline() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .multicast()
.pipeline("direct:a", "direct:b")
.pipeline("direct:c", "direct:d")
.end()
@@ -51,6 +98,48 @@ public class MulticastPipelineTest extends ContextTestSupport {
from("direct:c").to("mock:c").setBody().constant("C");
from("direct:d").to("mock:d").setBody().constant("D");
}
- };
+ });
+ context.start();
+
+ getMockEndpoint("mock:a").expectedBodiesReceived("Hello World");
+ getMockEndpoint("mock:b").expectedBodiesReceived("A");
+ getMockEndpoint("mock:c").expectedBodiesReceived("Hello World");
+ getMockEndpoint("mock:d").expectedBodiesReceived("C");
+ getMockEndpoint("mock:result").expectedMessageCount(1);
+
+ template.sendBody("direct:start", "Hello World");
+
+ assertMockEndpointsSatisfied();
}
+
+ public void testMulticastPipelineTo() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .multicast()
+ .pipeline().to("direct:a", "direct:b").end()
+ .pipeline().to("direct:c", "direct:d").end()
+ .end()
+ .to("mock:result");
+
+ from("direct:a").to("mock:a").setBody().constant("A");
+ from("direct:b").to("mock:b").setBody().constant("B");
+ from("direct:c").to("mock:c").setBody().constant("C");
+ from("direct:d").to("mock:d").setBody().constant("D");
+ }
+ });
+ context.start();
+
+ getMockEndpoint("mock:a").expectedBodiesReceived("Hello World");
+ getMockEndpoint("mock:b").expectedBodiesReceived("A");
+ getMockEndpoint("mock:c").expectedBodiesReceived("Hello World");
+ getMockEndpoint("mock:d").expectedBodiesReceived("C");
+ getMockEndpoint("mock:result").expectedMessageCount(1);
+
+ template.sendBody("direct:start", "Hello World");
+
+ assertMockEndpointsSatisfied();
+ }
+
}