You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/12/10 15:50:39 UTC
[3/4] incubator-flink git commit: [FLINK-1315] [optimizer] Extra test
reproducing the IllegalStateException bug in bulk iterations.
[FLINK-1315] [optimizer] Extra test reproducing the IllegalStateException bug in bulk iterations.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/9d02f2a4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/9d02f2a4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/9d02f2a4
Branch: refs/heads/master
Commit: 9d02f2a479c49122f947364efee9f9f433e5ade2
Parents: 26820ea
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Dec 1 16:47:44 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Dec 10 15:42:48 2014 +0100
----------------------------------------------------------------------
.../flink/compiler/IterationsCompilerTest.java | 54 ++++++++++++++++++++
1 file changed, 54 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9d02f2a4/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
index 2794572..9ceb25c 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
@@ -28,6 +28,8 @@ import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.functions.RichJoinFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
@@ -41,6 +43,7 @@ import org.apache.flink.compiler.plan.OptimizedPlan;
import org.apache.flink.compiler.plan.WorksetIterationPlanNode;
import org.apache.flink.compiler.plandump.PlanJSONDumpGenerator;
import org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.compiler.testfunctions.IdentityKeyExtractor;
import org.apache.flink.compiler.testfunctions.IdentityMapper;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.util.Collector;
@@ -260,6 +263,50 @@ public class IterationsCompilerTest extends CompilerTestBase {
}
}
+ @Test
+ public void testResetPartialSolution() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Long> width = env.generateSequence(1, 10);
+ DataSet<Long> update = env.generateSequence(1, 10);
+ DataSet<Long> lastGradient = env.generateSequence(1, 10);
+
+ DataSet<Long> init = width.union(update).union(lastGradient);
+
+ IterativeDataSet<Long> iteration = init.iterate(10);
+
+ width = iteration.filter(new IdFilter<Long>());
+ update = iteration.filter(new IdFilter<Long>());
+ lastGradient = iteration.filter(new IdFilter<Long>());
+
+ DataSet<Long> gradient = width.map(new IdentityMapper<Long>());
+ DataSet<Long> term = gradient.join(lastGradient)
+ .where(new IdentityKeyExtractor<Long>())
+ .equalTo(new IdentityKeyExtractor<Long>())
+ .with(new JoinFunction<Long, Long, Long>() {
+ public Long join(Long first, Long second) { return null; }
+ });
+
+ update = update.map(new RichMapFunction<Long, Long>() {
+ public Long map(Long value) { return null; }
+ }).withBroadcastSet(term, "some-name");
+
+ DataSet<Long> result = iteration.closeWith(width.union(update).union(lastGradient));
+
+ result.print();
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
// --------------------------------------------------------------------------------------------
public static DataSet<Tuple2<Long, Long>> doBulkIteration(DataSet<Tuple2<Long, Long>> vertices, DataSet<Tuple2<Long, Long>> edges) {
@@ -352,4 +399,11 @@ public class IterationsCompilerTest extends CompilerTestBase {
return new Tuple2<T, T>(value, value);
}
}
+
+ public static final class IdFilter<T> implements FilterFunction<T> {
+ @Override
+ public boolean filter(T value) {
+ return true;
+ }
+ }
}