You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/11/19 15:33:07 UTC

[2/2] incubator-flink git commit: [FLINK-1254] [compiler] Fix compiler bug for pipeline breaker placement

[FLINK-1254] [compiler] Fix compiler bug for pipeline breaker placement

This closes #216


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/ce822bf7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/ce822bf7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/ce822bf7

Branch: refs/heads/master
Commit: ce822bf7f5ec80df5d5a749b1439320af3fb8b18
Parents: 54aa41b
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Nov 19 12:28:06 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Nov 19 15:30:56 2014 +0100

----------------------------------------------------------------------
 .../compiler/plan/WorksetIterationPlanNode.java |  5 ++-
 .../flink/compiler/IterationsCompilerTest.java  | 37 ++++++++++++++++++++
 .../java/org/apache/flink/api/java/DataSet.java |  5 +++
 3 files changed, 44 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce822bf7/flink-compiler/src/main/java/org/apache/flink/compiler/plan/WorksetIterationPlanNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/WorksetIterationPlanNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/WorksetIterationPlanNode.java
index 9f0895d..8d38814 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/WorksetIterationPlanNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/WorksetIterationPlanNode.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.compiler.plan;
 
 import static org.apache.flink.compiler.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE;
@@ -36,7 +35,7 @@ import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.util.Visitor;
 
 /**
- * 
+ * A node in the execution, representing a workset iteration (delta iteration).
  */
 public class WorksetIterationPlanNode extends DualInputPlanNode implements IterationPlanNode {
 
@@ -66,7 +65,7 @@ public class WorksetIterationPlanNode extends DualInputPlanNode implements Itera
 			SolutionSetPlanNode solutionSetPlanNode, WorksetPlanNode worksetPlanNode,
 			PlanNode nextWorkSetPlanNode, PlanNode solutionSetDeltaPlanNode)
 	{
-		super(template, nodeName, initialSolutionSet, initialWorkset, DriverStrategy.NONE);
+		super(template, nodeName, initialSolutionSet, initialWorkset, DriverStrategy.BINARY_NO_OP);
 		this.solutionSetPlanNode = solutionSetPlanNode;
 		this.worksetPlanNode = worksetPlanNode;
 		this.solutionSetDeltaPlanNode = solutionSetDeltaPlanNode;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce822bf7/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
index 2be28e4..2794572 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.api.java.aggregation.Aggregations;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.common.functions.RichJoinFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ConstantFields;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -227,6 +228,40 @@ public class IterationsCompilerTest extends CompilerTestBase {
 		}
 	}
 	
+	@Test
+	public void testWorksetIterationPipelineBreakerPlacement() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(8);
+			
+			// the workset (input two of the delta iteration) is the same as what is consumed be the successive join
+			DataSet<Tuple2<Long, Long>> initialWorkset = env.readCsvFile("/some/file/path").types(Long.class).map(new DuplicateValue());
+			
+			DataSet<Tuple2<Long, Long>> initialSolutionSet = env.readCsvFile("/some/file/path").types(Long.class).map(new DuplicateValue());
+			
+			// trivial iteration, since we are interested in the inputs to the iteration
+			DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration = initialSolutionSet.iterateDelta(initialWorkset, 100, 0);
+			
+			DataSet<Tuple2<Long, Long>> next = iteration.getWorkset().map(new IdentityMapper<Tuple2<Long,Long>>());
+			
+			DataSet<Tuple2<Long, Long>> result = iteration.closeWith(next, next);
+			
+			initialWorkset
+				.join(result, JoinHint.REPARTITION_HASH_FIRST)
+				.where(0).equalTo(0)
+				.print();
+			
+			Plan p = env.createProgramPlan();
+			compileNoStats(p);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
 	public static DataSet<Tuple2<Long, Long>> doBulkIteration(DataSet<Tuple2<Long, Long>> vertices, DataSet<Tuple2<Long, Long>> edges) {
 		
 		// open a bulk iteration
@@ -270,6 +305,8 @@ public class IterationsCompilerTest extends CompilerTestBase {
 		
 	}
 	
+	// --------------------------------------------------------------------------------------------
+	
 	public static final class Join222 extends RichJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
 
 		@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ce822bf7/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index 6b768b7..a30d4c9 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -72,6 +72,8 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.core.fs.Path;
 
+import com.google.common.base.Preconditions;
+
 /**
  * A DataSet represents a collection of elements of the same type.<br/>
  * A DataSet can be transformed into another DataSet by applying a transformation as for example 
@@ -847,6 +849,9 @@ public abstract class DataSet<T> {
 	 * @see org.apache.flink.api.java.operators.DeltaIteration
 	 */
 	public <R> DeltaIteration<T, R> iterateDelta(DataSet<R> workset, int maxIterations, int... keyPositions) {
+		Preconditions.checkNotNull(workset);
+		Preconditions.checkNotNull(keyPositions);
+		
 		Keys.ExpressionKeys<T> keys = new Keys.ExpressionKeys<T>(keyPositions, getType(), false);
 		return new DeltaIteration<T, R>(getExecutionEnvironment(), getType(), this, workset, keys, maxIterations);
 	}