You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/12/10 15:50:39 UTC

[3/4] incubator-flink git commit: [FLINK-1315] [optimizer] Extra test reproducing the IllegalStateException bug in bulk iterations.

[FLINK-1315] [optimizer] Extra test reproducing the IllegalStateException bug in bulk iterations.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/9d02f2a4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/9d02f2a4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/9d02f2a4

Branch: refs/heads/master
Commit: 9d02f2a479c49122f947364efee9f9f433e5ade2
Parents: 26820ea
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Dec 1 16:47:44 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Dec 10 15:42:48 2014 +0100

----------------------------------------------------------------------
 .../flink/compiler/IterationsCompilerTest.java  | 54 ++++++++++++++++++++
 1 file changed, 54 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9d02f2a4/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
index 2794572..9ceb25c 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
@@ -28,6 +28,8 @@ import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.operators.IterativeDataSet;
 import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.common.functions.RichJoinFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
@@ -41,6 +43,7 @@ import org.apache.flink.compiler.plan.OptimizedPlan;
 import org.apache.flink.compiler.plan.WorksetIterationPlanNode;
 import org.apache.flink.compiler.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.compiler.testfunctions.IdentityKeyExtractor;
 import org.apache.flink.compiler.testfunctions.IdentityMapper;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.util.Collector;
@@ -260,6 +263,50 @@ public class IterationsCompilerTest extends CompilerTestBase {
 		}
 	}
 	
+	@Test
+	public void testResetPartialSolution() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Long> width = env.generateSequence(1, 10);
+			DataSet<Long> update = env.generateSequence(1, 10);
+			DataSet<Long> lastGradient = env.generateSequence(1, 10);
+			
+			DataSet<Long> init = width.union(update).union(lastGradient);
+			
+			IterativeDataSet<Long> iteration = init.iterate(10);
+			
+			width = iteration.filter(new IdFilter<Long>());
+			update = iteration.filter(new IdFilter<Long>());
+			lastGradient = iteration.filter(new IdFilter<Long>());
+			
+			DataSet<Long> gradient = width.map(new IdentityMapper<Long>());
+			DataSet<Long> term = gradient.join(lastGradient)
+								.where(new IdentityKeyExtractor<Long>())
+								.equalTo(new IdentityKeyExtractor<Long>())
+								.with(new JoinFunction<Long, Long, Long>() {
+									public Long join(Long first, Long second) { return null; }
+								});
+			
+			update = update.map(new RichMapFunction<Long, Long>() {
+				public Long map(Long value) { return null; }
+			}).withBroadcastSet(term, "some-name");
+			
+			DataSet<Long> result = iteration.closeWith(width.union(update).union(lastGradient));
+			
+			result.print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
 	// --------------------------------------------------------------------------------------------
 	
 	public static DataSet<Tuple2<Long, Long>> doBulkIteration(DataSet<Tuple2<Long, Long>> vertices, DataSet<Tuple2<Long, Long>> edges) {
@@ -352,4 +399,11 @@ public class IterationsCompilerTest extends CompilerTestBase {
 			return new Tuple2<T, T>(value, value);
 		}
 	}
+	
+	public static final class IdFilter<T> implements FilterFunction<T> {
+		@Override
+		public boolean filter(T value) {
+			return true;
+		}
+	}
 }