You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/07/12 19:32:15 UTC

[1/3] git commit: [FLINK-1018] Fix cross pipelining/daming info to resolve cross-related streaming deadlocks.

Repository: incubator-flink
Updated Branches:
  refs/heads/master ca4e7b4b3 -> 3002258f8


[FLINK-1018] Fix cross pipelining/daming info to resolve cross-related streaming deadlocks.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/3002258f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/3002258f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/3002258f

Branch: refs/heads/master
Commit: 3002258f8a22a8adbdb230e57c972ad17910debf
Parents: ec0b00d
Author: Stephan Ewen <se...@apache.org>
Authored: Sat Jul 12 15:57:22 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat Jul 12 19:31:26 2014 +0200

----------------------------------------------------------------------
 .../flink/compiler/PipelineBreakerTest.java     | 103 ++++++++++++++++++-
 .../flink/runtime/operators/DriverStrategy.java |   8 +-
 2 files changed, 106 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3002258f/flink-compiler/src/test/java/org/apache/flink/compiler/PipelineBreakerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/PipelineBreakerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/PipelineBreakerTest.java
index 4e43a74..45bf729 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/PipelineBreakerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/PipelineBreakerTest.java
@@ -23,12 +23,13 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.IterativeDataSet;
 import org.apache.flink.compiler.plan.BulkIterationPlanNode;
+import org.apache.flink.compiler.plan.DualInputPlanNode;
 import org.apache.flink.compiler.plan.OptimizedPlan;
 import org.apache.flink.compiler.plan.SingleInputPlanNode;
 import org.apache.flink.compiler.plan.SinkPlanNode;
-import org.apache.flink.compiler.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.compiler.testfunctions.IdentityMapper;
 import org.apache.flink.compiler.testfunctions.SelectOneReducer;
+import org.apache.flink.configuration.Configuration;
 
 @SuppressWarnings("serial")
 public class PipelineBreakerTest extends CompilerTestBase {
@@ -134,4 +135,104 @@ public class PipelineBreakerTest extends CompilerTestBase {
 			fail(e.getMessage());
 		}
 	}
+	
+	@Test
+	public void testPilelineBreakerWithCross() {
+		try {
+			{
+				ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+				env.setDegreeOfParallelism(64);
+				
+				DataSet<Long> initialSource = env.generateSequence(1, 10);
+				
+				Configuration conf= new Configuration();
+				conf.setString(PactCompiler.HINT_LOCAL_STRATEGY, PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_FIRST);
+				initialSource
+					.map(new IdentityMapper<Long>())
+					.cross(initialSource).withParameters(conf)
+					.print();
+				
+				
+				Plan p = env.createProgramPlan();
+				OptimizedPlan op = compileNoStats(p);
+				SinkPlanNode sink = op.getDataSinks().iterator().next();
+				DualInputPlanNode mapper = (DualInputPlanNode) sink.getInput().getSource();
+				
+				assertTrue(mapper.getInput1().getTempMode().breaksPipeline());
+			}
+			
+			{
+				ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+				env.setDegreeOfParallelism(64);
+				
+				DataSet<Long> initialSource = env.generateSequence(1, 10);
+				
+				Configuration conf= new Configuration();
+				conf.setString(PactCompiler.HINT_LOCAL_STRATEGY, PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_BLOCKED_OUTER_SECOND);
+				initialSource
+					.map(new IdentityMapper<Long>())
+					.cross(initialSource).withParameters(conf)
+					.print();
+				
+				
+				Plan p = env.createProgramPlan();
+				OptimizedPlan op = compileNoStats(p);
+				
+				SinkPlanNode sink = op.getDataSinks().iterator().next();
+				DualInputPlanNode mapper = (DualInputPlanNode) sink.getInput().getSource();
+				
+				assertTrue(mapper.getInput2().getTempMode().breaksPipeline());
+			}
+			
+			{
+				ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+				env.setDegreeOfParallelism(64);
+				
+				DataSet<Long> initialSource = env.generateSequence(1, 10);
+				
+				Configuration conf= new Configuration();
+				conf.setString(PactCompiler.HINT_LOCAL_STRATEGY, PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_FIRST);
+				initialSource
+					.map(new IdentityMapper<Long>())
+					.cross(initialSource).withParameters(conf)
+					.print();
+				
+				
+				Plan p = env.createProgramPlan();
+				OptimizedPlan op = compileNoStats(p);
+				
+				SinkPlanNode sink = op.getDataSinks().iterator().next();
+				DualInputPlanNode mapper = (DualInputPlanNode) sink.getInput().getSource();
+				
+				assertTrue(mapper.getInput1().getTempMode().breaksPipeline());
+			}
+			
+			{
+				ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+				env.setDegreeOfParallelism(64);
+				
+				DataSet<Long> initialSource = env.generateSequence(1, 10);
+				
+				Configuration conf= new Configuration();
+				conf.setString(PactCompiler.HINT_LOCAL_STRATEGY, PactCompiler.HINT_LOCAL_STRATEGY_NESTEDLOOP_STREAMED_OUTER_SECOND);
+				initialSource
+					.map(new IdentityMapper<Long>())
+					.cross(initialSource).withParameters(conf)
+					.print();
+				
+				
+				Plan p = env.createProgramPlan();
+				OptimizedPlan op = compileNoStats(p);
+				
+				SinkPlanNode sink = op.getDataSinks().iterator().next();
+				DualInputPlanNode mapper = (DualInputPlanNode) sink.getInput().getSource();
+				
+				assertTrue(mapper.getInput2().getTempMode().breaksPipeline());
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3002258f/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
index 3bf6c01..5f00277 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
@@ -80,13 +80,13 @@ public enum DriverStrategy {
 	HYBRIDHASH_BUILD_SECOND_CACHED(BuildSecondCachedMatchDriver.class, null, MATERIALIZING, FULL_DAM, true),
 	
 	// the second input is inner loop, the first input is outer loop and block-wise processed
-	NESTEDLOOP_BLOCKED_OUTER_FIRST(CrossDriver.class, null, MATERIALIZING, MATERIALIZING, false),
+	NESTEDLOOP_BLOCKED_OUTER_FIRST(CrossDriver.class, null, MATERIALIZING, FULL_DAM, false),
 	// the first input is inner loop, the second input is outer loop and block-wise processed
-	NESTEDLOOP_BLOCKED_OUTER_SECOND(CrossDriver.class, null, MATERIALIZING, MATERIALIZING, false),
+	NESTEDLOOP_BLOCKED_OUTER_SECOND(CrossDriver.class, null, FULL_DAM, MATERIALIZING, false),
 	// the second input is inner loop, the first input is outer loop and stream-processed
-	NESTEDLOOP_STREAMED_OUTER_FIRST(CrossDriver.class, null, PIPELINED, MATERIALIZING, false),
+	NESTEDLOOP_STREAMED_OUTER_FIRST(CrossDriver.class, null, PIPELINED, FULL_DAM, false),
 	// the first input is inner loop, the second input is outer loop and stream-processed
-	NESTEDLOOP_STREAMED_OUTER_SECOND(CrossDriver.class, null, MATERIALIZING, PIPELINED, false),
+	NESTEDLOOP_STREAMED_OUTER_SECOND(CrossDriver.class, null, FULL_DAM, PIPELINED, false),
 	
 	// union utility op. unions happen implicitly on the network layer (in the readers) when bundeling streams
 	UNION(null, null, FULL_DAM, FULL_DAM, false);


[3/3] git commit: Improve error messages for data sinks inside iterations.

Posted by se...@apache.org.
Improve error messages for data sinks inside iterations.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/a8224861
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/a8224861
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/a8224861

Branch: refs/heads/master
Commit: a82248617fa560a97a9f097346ef0104a463b1c1
Parents: ca4e7b4
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Jul 11 17:45:45 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat Jul 12 19:31:26 2014 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/compiler/PactCompiler.java   | 13 +++++++++++++
 1 file changed, 13 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a8224861/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
index 1ee1413..b55dea0 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
@@ -31,6 +31,7 @@ import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.Union;
@@ -704,6 +705,10 @@ public class PactCompiler {
 				n = new BinaryUnionNode((Union<?>) c);
 			}
 			else if (c instanceof PartialSolutionPlaceHolder) {
+				if (this.parent == null) {
+					throw new InvalidProgramException("It is currently not supported to create data sinks inside iterations.");
+				}
+				
 				final PartialSolutionPlaceHolder<?> holder = (PartialSolutionPlaceHolder<?>) c;
 				final BulkIterationBase<?> enclosingIteration = holder.getContainingBulkIteration();
 				final BulkIterationNode containingIterationNode =
@@ -715,6 +720,10 @@ public class PactCompiler {
 				n = p;
 			}
 			else if (c instanceof WorksetPlaceHolder) {
+				if (this.parent == null) {
+					throw new InvalidProgramException("It is currently not supported to create data sinks inside iterations.");
+				}
+				
 				final WorksetPlaceHolder<?> holder = (WorksetPlaceHolder<?>) c;
 				final DeltaIterationBase<?, ?> enclosingIteration = holder.getContainingWorksetIteration();
 				final WorksetIterationNode containingIterationNode =
@@ -726,6 +735,10 @@ public class PactCompiler {
 				n = p;
 			}
 			else if (c instanceof SolutionSetPlaceHolder) {
+				if (this.parent == null) {
+					throw new InvalidProgramException("It is currently not supported to create data sinks inside iterations.");
+				}
+				
 				final SolutionSetPlaceHolder<?> holder = (SolutionSetPlaceHolder<?>) c;
 				final DeltaIterationBase<?, ?> enclosingIteration = holder.getContainingWorksetIteration();
 				final WorksetIterationNode containingIterationNode =


Re: [2/3] git commit: [FLINK-1018] Add tests to verify correct placement of pipeline breakers with broadcast variables

Posted by Stephan Ewen <se...@apache.org>.
Okay, I'll wait with merging the streaming code until the rat is updated.

Re: [2/3] git commit: [FLINK-1018] Add tests to verify correct placement of pipeline breakers with broadcast variables

Posted by Robert Metzger <rm...@apache.org>.
I'm currently preparing a commit with some more license fixes and a strict
rat configuration.


On Sun, Jul 13, 2014 at 2:58 PM, Stephan Ewen <se...@apache.org> wrote:

> Ah, good point. They com from old pull requests.
>
> Let me know if you manage to configure rat stricter.
>

Re: [2/3] git commit: [FLINK-1018] Add tests to verify correct placement of pipeline breakers with broadcast variables

Posted by Stephan Ewen <se...@apache.org>.
Ah, good point. They com from old pull requests.

Let me know if you manage to configure rat stricter.

Re: [2/3] git commit: [FLINK-1018] Add tests to verify correct placement of pipeline breakers with broadcast variables

Posted by Robert Metzger <rm...@apache.org>.
The two files you committed here "PipelineBreakerTest.java" and
"SelectOneReducer.java" both contain the old license header. I'll try and
see if I can make the license checker more strict.


On Sat, Jul 12, 2014 at 7:32 PM, <se...@apache.org> wrote:

> [FLINK-1018] Add tests to verify correct placement of pipeline breakers
> with broadcast variables
>
>
> Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
> Commit:
> http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/ec0b00d6
> Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/ec0b00d6
> Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/ec0b00d6
>
> Branch: refs/heads/master
> Commit: ec0b00d613a4400baf53f5de2361a2271a26ae63
> Parents: a822486
> Author: Stephan Ewen <se...@apache.org>
> Authored: Fri Jul 11 18:02:52 2014 +0200
> Committer: Stephan Ewen <se...@apache.org>
> Committed: Sat Jul 12 19:31:26 2014 +0200
>
> ----------------------------------------------------------------------
>  .../compiler/BranchingPlansCompilerTest.java    |  10 +-
>  .../flink/compiler/PipelineBreakerTest.java     | 137 +++++++++++++++++++
>  .../testfunctions/SelectOneReducer.java         |  28 ++++
>  3 files changed, 170 insertions(+), 5 deletions(-)
> ----------------------------------------------------------------------
>
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ec0b00d6/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
> ----------------------------------------------------------------------
> diff --git
> a/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
> b/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
> index 31dadae..571f4e4 100644
> ---
> a/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
> +++
> b/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
> @@ -359,6 +359,7 @@ public class BranchingPlansCompilerTest extends
> CompilerTestBase {
>                 }
>         }
>
> +       @SuppressWarnings({ "unchecked", "deprecation" })
>         @Test
>         public void testBranchEachContractType() {
>                 try {
> @@ -374,7 +375,6 @@ public class BranchingPlansCompilerTest extends
> CompilerTestBase {
>                                 .name("Reduce 1")
>                                 .build();
>
> -                       @SuppressWarnings("unchecked")
>                         JoinOperator match1 = JoinOperator.builder(new
> DummyMatchStub(), IntValue.class, 0, 0)
>                                 .input1(sourceB, sourceB, sourceC)
>                                 .input2(sourceC)
> @@ -434,10 +434,10 @@ public class BranchingPlansCompilerTest extends
> CompilerTestBase {
>                                 .build();
>
>                         FileDataSink sink = new FileDataSink(new
> DummyOutputFormat(), OUT_FILE, cogroup7);
> -       //              sink.addInput(sourceA);
> -       //              sink.addInput(co3);
> -       //              sink.addInput(co4);
> -       //              sink.addInput(co1);
> +                       sink.addInput(sourceA);
> +                       sink.addInput(cogroup3);
> +                       sink.addInput(cogroup4);
> +                       sink.addInput(cogroup1);
>
>                         // return the PACT plan
>                         Plan plan = new Plan(sink, "Branching of each
> contract type");
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ec0b00d6/flink-compiler/src/test/java/org/apache/flink/compiler/PipelineBreakerTest.java
> ----------------------------------------------------------------------
> diff --git
> a/flink-compiler/src/test/java/org/apache/flink/compiler/PipelineBreakerTest.java
> b/flink-compiler/src/test/java/org/apache/flink/compiler/PipelineBreakerTest.java
> new file mode 100644
> index 0000000..4e43a74
> --- /dev/null
> +++
> b/flink-compiler/src/test/java/org/apache/flink/compiler/PipelineBreakerTest.java
> @@ -0,0 +1,137 @@
>
> +/***********************************************************************************************************************
> + *
> + * Copyright (C) 2010-2013 by the Stratosphere project (
> http://stratosphere.eu)
> + *
> + * Licensed under the Apache License, Version 2.0 (the "License"); you
> may not use this file except in compliance with
> + * the License. You may obtain a copy of the License at
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> distributed under the License is distributed on
> + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
> express or implied. See the License for the
> + * specific language governing permissions and limitations under the
> License.
> + *
> +
> **********************************************************************************************************************/
> +
> +package org.apache.flink.compiler;
> +
> +import static org.junit.Assert.*;
> +
> +import org.junit.Test;
> +import org.apache.flink.api.common.Plan;
> +import org.apache.flink.api.java.DataSet;
> +import org.apache.flink.api.java.ExecutionEnvironment;
> +import org.apache.flink.api.java.IterativeDataSet;
> +import org.apache.flink.compiler.plan.BulkIterationPlanNode;
> +import org.apache.flink.compiler.plan.OptimizedPlan;
> +import org.apache.flink.compiler.plan.SingleInputPlanNode;
> +import org.apache.flink.compiler.plan.SinkPlanNode;
> +import org.apache.flink.compiler.plandump.PlanJSONDumpGenerator;
> +import org.apache.flink.compiler.testfunctions.IdentityMapper;
> +import org.apache.flink.compiler.testfunctions.SelectOneReducer;
> +
> +@SuppressWarnings("serial")
> +public class PipelineBreakerTest extends CompilerTestBase {
> +
> +       @Test
> +       public void testPipelineBreakerWithBroadcastVariable() {
> +               try {
> +                       ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> +                       env.setDegreeOfParallelism(64);
> +
> +                       DataSet<Long> source = env.generateSequence(1,
> 10).map(new IdentityMapper<Long>());
> +
> +                       DataSet<Long> result = source.map(new
> IdentityMapper<Long>())
> +
>       .map(new IdentityMapper<Long>())
> +
>               .withBroadcastSet(source, "bc");
> +
> +                       result.print();
> +
> +                       Plan p = env.createProgramPlan();
> +                       OptimizedPlan op = compileNoStats(p);
> +
> +                       SinkPlanNode sink =
> op.getDataSinks().iterator().next();
> +                       SingleInputPlanNode mapper = (SingleInputPlanNode)
> sink.getInput().getSource();
> +
> +
> assertTrue(mapper.getInput().getTempMode().breaksPipeline());
> +               }
> +               catch (Exception e) {
> +                       e.printStackTrace();
> +                       fail(e.getMessage());
> +               }
> +       }
> +
> +       @Test
> +       public void testPipelineBreakerBroadcastedAllReduce() {
> +               try {
> +                       ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> +                       env.setDegreeOfParallelism(64);
> +
> +                       DataSet<Long> sourceWithMapper =
> env.generateSequence(1, 10).map(new IdentityMapper<Long>());
> +
> +                       DataSet<Long> bcInput1 = sourceWithMapper
> +
>       .map(new IdentityMapper<Long>())
> +
>       .reduce(new SelectOneReducer<Long>());
> +                       DataSet<Long> bcInput2 = env.generateSequence(1,
> 10);
> +
> +                       DataSet<Long> result = sourceWithMapper
> +                                       .map(new IdentityMapper<Long>())
> +
> .withBroadcastSet(bcInput1, "bc1")
> +
> .withBroadcastSet(bcInput2, "bc2");
> +
> +                       result.print();
> +
> +                       Plan p = env.createProgramPlan();
> +                       OptimizedPlan op = compileNoStats(p);
> +
> +                       SinkPlanNode sink =
> op.getDataSinks().iterator().next();
> +                       SingleInputPlanNode mapper = (SingleInputPlanNode)
> sink.getInput().getSource();
> +
> +
> assertTrue(mapper.getInput().getTempMode().breaksPipeline());
> +               }
> +               catch (Exception e) {
> +                       e.printStackTrace();
> +                       fail(e.getMessage());
> +               }
> +       }
> +
> +       @Test
> +       public void testPipelineBreakerBroadcastedPartialSolution() {
> +               try {
> +                       ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> +                       env.setDegreeOfParallelism(64);
> +
> +
> +                       DataSet<Long> initialSource =
> env.generateSequence(1, 10);
> +                       IterativeDataSet<Long> iteration =
> initialSource.iterate(100);
> +
> +
> +                       DataSet<Long> sourceWithMapper =
> env.generateSequence(1, 10).map(new IdentityMapper<Long>());
> +
> +                       DataSet<Long> bcInput1 = sourceWithMapper
> +
>       .map(new IdentityMapper<Long>())
> +
>       .reduce(new SelectOneReducer<Long>());
> +
> +                       DataSet<Long> result = sourceWithMapper
> +                                       .map(new IdentityMapper<Long>())
> +
> .withBroadcastSet(iteration, "bc2")
> +
> .withBroadcastSet(bcInput1, "bc1");
> +
> +
> +                       iteration.closeWith(result).print();
> +
> +                       Plan p = env.createProgramPlan();
> +                       OptimizedPlan op = compileNoStats(p);
> +
> +                       SinkPlanNode sink =
> op.getDataSinks().iterator().next();
> +                       BulkIterationPlanNode iterationPlanNode =
> (BulkIterationPlanNode) sink.getInput().getSource();
> +                       SingleInputPlanNode mapper = (SingleInputPlanNode)
> iterationPlanNode.getRootOfStepFunction();
> +
> +
> assertTrue(mapper.getInput().getTempMode().breaksPipeline());
> +               }
> +               catch (Exception e) {
> +                       e.printStackTrace();
> +                       fail(e.getMessage());
> +               }
> +       }
> +}
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ec0b00d6/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java
> ----------------------------------------------------------------------
> diff --git
> a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java
> b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java
> new file mode 100644
> index 0000000..492b9f8
> --- /dev/null
> +++
> b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java
> @@ -0,0 +1,28 @@
>
> +/***********************************************************************************************************************
> + *
> + * Copyright (C) 2010-2013 by the Stratosphere project (
> http://stratosphere.eu)
> + *
> + * Licensed under the Apache License, Version 2.0 (the "License"); you
> may not use this file except in compliance with
> + * the License. You may obtain a copy of the License at
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing, software
> distributed under the License is distributed on
> + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
> express or implied. See the License for the
> + * specific language governing permissions and limitations under the
> License.
> + *
> +
> **********************************************************************************************************************/
> +
> +package org.apache.flink.compiler.testfunctions;
> +
> +import org.apache.flink.api.java.functions.ReduceFunction;
> +
> +public class SelectOneReducer<T> extends ReduceFunction<T> {
> +
> +       private static final long serialVersionUID = 1L;
> +
> +       @Override
> +       public T reduce(T value1, T value2) throws Exception {
> +               return value1;
> +       }
> +}
>
>

[2/3] git commit: [FLINK-1018] Add tests to verify correct placement of pipeline breakers with broadcast variables

Posted by se...@apache.org.
[FLINK-1018] Add tests to verify correct placement of pipeline breakers with broadcast variables


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/ec0b00d6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/ec0b00d6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/ec0b00d6

Branch: refs/heads/master
Commit: ec0b00d613a4400baf53f5de2361a2271a26ae63
Parents: a822486
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Jul 11 18:02:52 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat Jul 12 19:31:26 2014 +0200

----------------------------------------------------------------------
 .../compiler/BranchingPlansCompilerTest.java    |  10 +-
 .../flink/compiler/PipelineBreakerTest.java     | 137 +++++++++++++++++++
 .../testfunctions/SelectOneReducer.java         |  28 ++++
 3 files changed, 170 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ec0b00d6/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
index 31dadae..571f4e4 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
@@ -359,6 +359,7 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 		}
 	}
 	
+	@SuppressWarnings({ "unchecked", "deprecation" })
 	@Test
 	public void testBranchEachContractType() {
 		try {
@@ -374,7 +375,6 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 				.name("Reduce 1")
 				.build();
 			
-			@SuppressWarnings("unchecked")
 			JoinOperator match1 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0)
 				.input1(sourceB, sourceB, sourceC)
 				.input2(sourceC)
@@ -434,10 +434,10 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 				.build();
 			
 			FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, cogroup7);
-	//		sink.addInput(sourceA);
-	//		sink.addInput(co3);
-	//		sink.addInput(co4);
-	//		sink.addInput(co1);
+			sink.addInput(sourceA);
+			sink.addInput(cogroup3);
+			sink.addInput(cogroup4);
+			sink.addInput(cogroup1);
 			
 			// return the PACT plan
 			Plan plan = new Plan(sink, "Branching of each contract type");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ec0b00d6/flink-compiler/src/test/java/org/apache/flink/compiler/PipelineBreakerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/PipelineBreakerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/PipelineBreakerTest.java
new file mode 100644
index 0000000..4e43a74
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/PipelineBreakerTest.java
@@ -0,0 +1,137 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package org.apache.flink.compiler;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.IterativeDataSet;
+import org.apache.flink.compiler.plan.BulkIterationPlanNode;
+import org.apache.flink.compiler.plan.OptimizedPlan;
+import org.apache.flink.compiler.plan.SingleInputPlanNode;
+import org.apache.flink.compiler.plan.SinkPlanNode;
+import org.apache.flink.compiler.plandump.PlanJSONDumpGenerator;
+import org.apache.flink.compiler.testfunctions.IdentityMapper;
+import org.apache.flink.compiler.testfunctions.SelectOneReducer;
+
+@SuppressWarnings("serial")
+public class PipelineBreakerTest extends CompilerTestBase {
+
+	@Test
+	public void testPipelineBreakerWithBroadcastVariable() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(64);
+			
+			DataSet<Long> source = env.generateSequence(1, 10).map(new IdentityMapper<Long>());
+			
+			DataSet<Long> result = source.map(new IdentityMapper<Long>())
+										.map(new IdentityMapper<Long>())
+											.withBroadcastSet(source, "bc");
+			
+			result.print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			SingleInputPlanNode mapper = (SingleInputPlanNode) sink.getInput().getSource();
+			
+			assertTrue(mapper.getInput().getTempMode().breaksPipeline());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testPipelineBreakerBroadcastedAllReduce() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(64);
+			
+			DataSet<Long> sourceWithMapper = env.generateSequence(1, 10).map(new IdentityMapper<Long>());
+			
+			DataSet<Long> bcInput1 = sourceWithMapper
+										.map(new IdentityMapper<Long>())
+										.reduce(new SelectOneReducer<Long>());
+			DataSet<Long> bcInput2 = env.generateSequence(1, 10);
+			
+			DataSet<Long> result = sourceWithMapper
+					.map(new IdentityMapper<Long>())
+							.withBroadcastSet(bcInput1, "bc1")
+							.withBroadcastSet(bcInput2, "bc2");
+			
+			result.print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			SingleInputPlanNode mapper = (SingleInputPlanNode) sink.getInput().getSource();
+			
+			assertTrue(mapper.getInput().getTempMode().breaksPipeline());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testPipelineBreakerBroadcastedPartialSolution() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(64);
+			
+			
+			DataSet<Long> initialSource = env.generateSequence(1, 10);
+			IterativeDataSet<Long> iteration = initialSource.iterate(100);
+			
+			
+			DataSet<Long> sourceWithMapper = env.generateSequence(1, 10).map(new IdentityMapper<Long>());
+			
+			DataSet<Long> bcInput1 = sourceWithMapper
+										.map(new IdentityMapper<Long>())
+										.reduce(new SelectOneReducer<Long>());
+			
+			DataSet<Long> result = sourceWithMapper
+					.map(new IdentityMapper<Long>())
+							.withBroadcastSet(iteration, "bc2")
+							.withBroadcastSet(bcInput1, "bc1");
+							
+			
+			iteration.closeWith(result).print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			BulkIterationPlanNode iterationPlanNode = (BulkIterationPlanNode) sink.getInput().getSource();
+			SingleInputPlanNode mapper = (SingleInputPlanNode) iterationPlanNode.getRootOfStepFunction();
+			
+			assertTrue(mapper.getInput().getTempMode().breaksPipeline());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ec0b00d6/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java
new file mode 100644
index 0000000..492b9f8
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/testfunctions/SelectOneReducer.java
@@ -0,0 +1,28 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package org.apache.flink.compiler.testfunctions;
+
+import org.apache.flink.api.java.functions.ReduceFunction;
+
+public class SelectOneReducer<T> extends ReduceFunction<T> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public T reduce(T value1, T value2) throws Exception {
+		return value1;
+	}
+}