You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/11/04 16:45:23 UTC

[2/4] camel git commit: CAMEL-10442: Fixed an issue when using pipeline in Java DSL not setting up the EIP correctly which could lead to runtime route not as intended. This is a little Java DSL change so lets keep it for master branch only.

CAMEL-10442: Fixed an issue when using pipeline in Java DSL not setting up the EIP correctly which could lead to runtime route not as intended. This is a little Java DSL change so lets keep it for master branch only.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/94d7b0d2
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/94d7b0d2
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/94d7b0d2

Branch: refs/heads/master
Commit: 94d7b0d2a3def5bab26ba3542800dc0fd65fd8ab
Parents: 74a816b
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Nov 4 17:02:14 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Nov 4 17:44:51 2016 +0100

----------------------------------------------------------------------
 .../apache/camel/model/ProcessorDefinition.java |  15 ++-
 .../camel/processor/MulticastPipelineTest.java  | 109 +++++++++++++++++--
 2 files changed, 111 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/94d7b0d2/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
index b7e8fc9..2ce1fc6 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
@@ -1163,7 +1163,10 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
      * @return the builder
      */
     public Type pipeline(String... uris) {
-        return to(uris);
+        PipelineDefinition answer = new PipelineDefinition();
+        addOutput(answer);
+        answer.to(uris);
+        return (Type) this;
     }
 
     /**
@@ -1176,7 +1179,10 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
      * @return the builder
      */
     public Type pipeline(Endpoint... endpoints) {
-        return to(endpoints);
+        PipelineDefinition answer = new PipelineDefinition();
+        addOutput(answer);
+        answer.to(endpoints);
+        return (Type) this;
     }
 
     /**
@@ -1189,7 +1195,10 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type>
      * @return the builder
      */
     public Type pipeline(Collection<Endpoint> endpoints) {
-        return to(endpoints);
+        PipelineDefinition answer = new PipelineDefinition();
+        addOutput(answer);
+        answer.to(endpoints);
+        return (Type) this;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/camel/blob/94d7b0d2/camel-core/src/test/java/org/apache/camel/processor/MulticastPipelineTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/MulticastPipelineTest.java b/camel-core/src/test/java/org/apache/camel/processor/MulticastPipelineTest.java
index d0a2999..540cd2b 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/MulticastPipelineTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/MulticastPipelineTest.java
@@ -18,15 +18,35 @@ package org.apache.camel.processor;
 
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.util.toolbox.AggregationStrategies;
 
 public class MulticastPipelineTest extends ContextTestSupport {
 
-    public void testMulticastPipeline() throws Exception {
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+
+    public void testPlainPipeline() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .pipeline("direct:a", "direct:b")
+                    .pipeline("direct:c", "direct:d")
+                    .to("mock:result");
+
+                from("direct:a").to("mock:a").setBody().constant("A");
+                from("direct:b").to("mock:b").setBody().constant("B");
+                from("direct:c").to("mock:c").setBody().constant("C");
+                from("direct:d").to("mock:d").setBody().constant("D");
+            }
+        });
+        context.start();
+
         getMockEndpoint("mock:a").expectedBodiesReceived("Hello World");
-        getMockEndpoint("mock:b").expectedBodiesReceived("Hello World");
-        getMockEndpoint("mock:c").expectedBodiesReceived("Hello World");
-        getMockEndpoint("mock:d").expectedBodiesReceived("Hello World");
+        getMockEndpoint("mock:b").expectedBodiesReceived("A");
+        getMockEndpoint("mock:c").expectedBodiesReceived("B");
+        getMockEndpoint("mock:d").expectedBodiesReceived("C");
         getMockEndpoint("mock:result").expectedMessageCount(1);
 
         template.sendBody("direct:start", "Hello World");
@@ -34,13 +54,40 @@ public class MulticastPipelineTest extends ContextTestSupport {
         assertMockEndpointsSatisfied();
     }
 
-    @Override
-    protected RouteBuilder createRouteBuilder() throws Exception {
-        return new RouteBuilder() {
+    public void testPlainPipelineTo() throws Exception {
+        context.addRoutes(new RouteBuilder() {
             @Override
             public void configure() throws Exception {
                 from("direct:start")
-                    .multicast().aggregationStrategy(AggregationStrategies.groupedExchange())
+                    .pipeline().to("direct:a", "direct:b").end()
+                    .pipeline().to("direct:c", "direct:d").end()
+                    .to("mock:result");
+
+                from("direct:a").to("mock:a").setBody().constant("A");
+                from("direct:b").to("mock:b").setBody().constant("B");
+                from("direct:c").to("mock:c").setBody().constant("C");
+                from("direct:d").to("mock:d").setBody().constant("D");
+            }
+        });
+        context.start();
+
+        getMockEndpoint("mock:a").expectedBodiesReceived("Hello World");
+        getMockEndpoint("mock:b").expectedBodiesReceived("A");
+        getMockEndpoint("mock:c").expectedBodiesReceived("B");
+        getMockEndpoint("mock:d").expectedBodiesReceived("C");
+        getMockEndpoint("mock:result").expectedMessageCount(1);
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testMulticastPipeline() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .multicast()
                         .pipeline("direct:a", "direct:b")
                         .pipeline("direct:c", "direct:d")
                     .end()
@@ -51,6 +98,48 @@ public class MulticastPipelineTest extends ContextTestSupport {
                 from("direct:c").to("mock:c").setBody().constant("C");
                 from("direct:d").to("mock:d").setBody().constant("D");
             }
-        };
+        });
+        context.start();
+
+        getMockEndpoint("mock:a").expectedBodiesReceived("Hello World");
+        getMockEndpoint("mock:b").expectedBodiesReceived("A");
+        getMockEndpoint("mock:c").expectedBodiesReceived("Hello World");
+        getMockEndpoint("mock:d").expectedBodiesReceived("C");
+        getMockEndpoint("mock:result").expectedMessageCount(1);
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
     }
+
+    public void testMulticastPipelineTo() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .multicast()
+                        .pipeline().to("direct:a", "direct:b").end()
+                        .pipeline().to("direct:c", "direct:d").end()
+                    .end()
+                    .to("mock:result");
+
+                from("direct:a").to("mock:a").setBody().constant("A");
+                from("direct:b").to("mock:b").setBody().constant("B");
+                from("direct:c").to("mock:c").setBody().constant("C");
+                from("direct:d").to("mock:d").setBody().constant("D");
+            }
+        });
+        context.start();
+
+        getMockEndpoint("mock:a").expectedBodiesReceived("Hello World");
+        getMockEndpoint("mock:b").expectedBodiesReceived("A");
+        getMockEndpoint("mock:c").expectedBodiesReceived("Hello World");
+        getMockEndpoint("mock:d").expectedBodiesReceived("C");
+        getMockEndpoint("mock:result").expectedMessageCount(1);
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
 }