You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/12/10 15:50:37 UTC
[1/4] incubator-flink git commit: [FLINK-1316] [web client] Fix plan
display for nodes referenced from multiple closures.
Repository: incubator-flink
Updated Branches:
refs/heads/master 26820ea70 -> 6d69f697f
[FLINK-1316] [web client] Fix plan display for nodes referenced from multiple closures.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/a79ea784
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/a79ea784
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/a79ea784
Branch: refs/heads/master
Commit: a79ea784e6c5ee491cecf5c93aad2248029e3c4b
Parents: ac80458
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Dec 10 15:23:12 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Dec 10 15:42:48 2014 +0100
----------------------------------------------------------------------
flink-clients/resources/web-docs/js/graphCreator.js | 7 +++++--
1 file changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a79ea784/flink-clients/resources/web-docs/js/graphCreator.js
----------------------------------------------------------------------
diff --git a/flink-clients/resources/web-docs/js/graphCreator.js b/flink-clients/resources/web-docs/js/graphCreator.js
index bbd71b1..86c8c81 100644
--- a/flink-clients/resources/web-docs/js/graphCreator.js
+++ b/flink-clients/resources/web-docs/js/graphCreator.js
@@ -175,8 +175,11 @@ function loadJsonToDagre(data){
g.addEdge(null, el.predecessors[j].id, el.id, { label: createLabelEdge(el.predecessors[j]) });
} else {
var missingNode = searchForNode(el.predecessors[j].id);
- g.addNode(missingNode.id, {label: createLabelNode(missingNode, "mirror")});
- g.addEdge(null, missingNode.id, el.id, { label: createLabelEdge(missingNode) });
+ if (missingNode.alreadyAdded != true) {
+ missingNode.alreadyAdded = true;
+ g.addNode(missingNode.id, {label: createLabelNode(missingNode, "mirror")});
+ g.addEdge(null, missingNode.id, el.id, { label: createLabelEdge(missingNode) });
+ }
}
}
}
[2/4] incubator-flink git commit: [FLINK-1315] [optimizer] Fix bug in
branch tracking logic.
Posted by se...@apache.org.
[FLINK-1315] [optimizer] Fix bug in branch tracking logic.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/ac80458f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/ac80458f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/ac80458f
Branch: refs/heads/master
Commit: ac80458fc089b79f1e793f8760d331a988c407c1
Parents: 9d02f2a
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Dec 10 14:53:36 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Dec 10 15:42:48 2014 +0100
----------------------------------------------------------------------
.../compiler/plan/BulkIterationPlanNode.java | 8 +--
.../compiler/plan/SingleInputPlanNode.java | 11 +--
.../compiler/plan/WorksetIterationPlanNode.java | 6 +-
.../compiler/BranchingPlansCompilerTest.java | 75 ++++++++++++++++++++
.../flink/compiler/IterationsCompilerTest.java | 2 +-
5 files changed, 89 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac80458f/flink-compiler/src/main/java/org/apache/flink/compiler/plan/BulkIterationPlanNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/BulkIterationPlanNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/BulkIterationPlanNode.java
index 7fd8993..3f413b5 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/BulkIterationPlanNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/BulkIterationPlanNode.java
@@ -142,17 +142,17 @@ public class BulkIterationPlanNode extends SingleInputPlanNode implements Iterat
}
private void mergeBranchPlanMaps() {
- for(OptimizerNode.UnclosedBranchDescriptor desc: template.getOpenBranches()){
+ for (OptimizerNode.UnclosedBranchDescriptor desc: template.getOpenBranches()) {
OptimizerNode brancher = desc.getBranchingNode();
- if(branchPlan == null) {
+ if (branchPlan == null) {
branchPlan = new HashMap<OptimizerNode, PlanNode>(6);
}
- if(!branchPlan.containsKey(brancher)){
+ if (!branchPlan.containsKey(brancher)) {
PlanNode selectedCandidate = null;
- if(rootOfStepFunction.branchPlan != null){
+ if (rootOfStepFunction.branchPlan != null) {
selectedCandidate = rootOfStepFunction.branchPlan.get(brancher);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac80458f/flink-compiler/src/main/java/org/apache/flink/compiler/plan/SingleInputPlanNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/SingleInputPlanNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/SingleInputPlanNode.java
index 9b9202b..d77d82e 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/SingleInputPlanNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/SingleInputPlanNode.java
@@ -25,6 +25,7 @@ import static org.apache.flink.compiler.plan.PlanNode.SourceAndDamReport.NOT_FOU
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import org.apache.flink.api.common.operators.util.FieldList;
@@ -84,12 +85,14 @@ public class SingleInputPlanNode extends PlanNode {
}
final PlanNode predNode = input.getSource();
- if (this.branchPlan == null) {
- this.branchPlan = predNode.branchPlan;
- } else if (predNode.branchPlan != null) {
+
+ if (predNode.branchPlan != null && !predNode.branchPlan.isEmpty()) {
+
+ if (this.branchPlan == null) {
+ this.branchPlan = new HashMap<OptimizerNode, PlanNode>();
+ }
this.branchPlan.putAll(predNode.branchPlan);
}
-
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac80458f/flink-compiler/src/main/java/org/apache/flink/compiler/plan/WorksetIterationPlanNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/WorksetIterationPlanNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/WorksetIterationPlanNode.java
index 8d38814..47e9b69 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/WorksetIterationPlanNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/WorksetIterationPlanNode.java
@@ -200,11 +200,9 @@ public class WorksetIterationPlanNode extends DualInputPlanNode implements Itera
* because they can contain also some of the branching nodes.
*/
@Override
- protected void mergeBranchPlanMaps(Map<OptimizerNode, PlanNode> branchPlan1, Map<OptimizerNode,
- PlanNode> branchPlan2){
-
- }
+ protected void mergeBranchPlanMaps(Map<OptimizerNode, PlanNode> branchPlan1, Map<OptimizerNode,PlanNode> branchPlan2) {}
+
protected void mergeBranchPlanMaps() {
Map<OptimizerNode, PlanNode> branchPlan1 = input1.getSource().branchPlan;
Map<OptimizerNode, PlanNode> branchPlan2 = input2.getSource().branchPlan;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac80458f/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
index 2b4ff02..ae185f9 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
@@ -19,6 +19,8 @@
package org.apache.flink.compiler;
+import static org.junit.Assert.*;
+
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@@ -30,7 +32,9 @@ import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichJoinFunction;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
import org.apache.flink.api.java.record.operators.BulkIteration;
import org.apache.flink.api.java.record.operators.CoGroupOperator;
import org.apache.flink.api.java.record.operators.CrossOperator;
@@ -40,9 +44,11 @@ import org.apache.flink.api.java.record.operators.FileDataSource;
import org.apache.flink.api.java.record.operators.JoinOperator;
import org.apache.flink.api.java.record.operators.MapOperator;
import org.apache.flink.api.java.record.operators.ReduceOperator;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.compiler.plan.OptimizedPlan;
import org.apache.flink.compiler.plan.SinkPlanNode;
import org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.compiler.testfunctions.DummyFlatJoinFunction;
import org.apache.flink.compiler.testfunctions.IdentityGroupReducer;
import org.apache.flink.compiler.testfunctions.IdentityKeyExtractor;
import org.apache.flink.compiler.testfunctions.IdentityMapper;
@@ -1017,4 +1023,73 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
Assert.fail(e.getMessage());
}
}
+
+ @Test
+ public void testBranchesOnlyInBCVariables1() {
+ try{
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Long> input = env.generateSequence(1, 10);
+ DataSet<Long> bc_input = env.generateSequence(1, 10);
+
+ input
+ .map(new IdentityMapper<Long>()).withBroadcastSet(bc_input, "name1")
+ .map(new IdentityMapper<Long>()).withBroadcastSet(bc_input, "name2")
+ .print();
+
+ Plan plan = env.createProgramPlan();
+ compileNoStats(plan);
+ }
+ catch(Exception e){
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testBranchesOnlyInBCVariables2() {
+ try{
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple2<Long, Long>> input = env.generateSequence(1, 10).map(new Duplicator<Long>()).name("proper input");
+
+ DataSet<Long> bc_input1 = env.generateSequence(1, 10).name("BC input 1");
+ DataSet<Long> bc_input2 = env.generateSequence(1, 10).name("BC input 1");
+
+ DataSet<Tuple2<Long, Long>> joinInput1 =
+ input.map(new IdentityMapper<Tuple2<Long,Long>>())
+ .withBroadcastSet(bc_input1.map(new IdentityMapper<Long>()), "bc1")
+ .withBroadcastSet(bc_input2, "bc2");
+
+ DataSet<Tuple2<Long, Long>> joinInput2 =
+ input.map(new IdentityMapper<Tuple2<Long,Long>>())
+ .withBroadcastSet(bc_input1, "bc1")
+ .withBroadcastSet(bc_input2, "bc2");
+
+ DataSet<Tuple2<Long, Long>> joinResult = joinInput1
+ .join(joinInput2, JoinHint.REPARTITION_HASH_FIRST).where(0).equalTo(1)
+ .with(new DummyFlatJoinFunction<Tuple2<Long,Long>>());
+
+ input
+ .map(new IdentityMapper<Tuple2<Long,Long>>())
+ .withBroadcastSet(bc_input1, "bc1")
+ .union(joinResult)
+ .print();
+
+ Plan plan = env.createProgramPlan();
+ compileNoStats(plan);
+ }
+ catch(Exception e){
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ private static final class Duplicator<T> implements MapFunction<T, Tuple2<T, T>> {
+
+ @Override
+ public Tuple2<T, T> map(T value) {
+ return new Tuple2<T, T>(value, value);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac80458f/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
index 9ceb25c..23b7cfe 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
@@ -299,7 +299,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);
-
+ new NepheleJobGraphGenerator().compileJobGraph(op);
}
catch (Exception e) {
e.printStackTrace();
[3/4] incubator-flink git commit: [FLINK-1315] [optimizer] Extra test
reproducing the IllegalStateException bug in bulk iterations.
Posted by se...@apache.org.
[FLINK-1315] [optimizer] Extra test reproducing the IllegalStateException bug in bulk iterations.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/9d02f2a4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/9d02f2a4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/9d02f2a4
Branch: refs/heads/master
Commit: 9d02f2a479c49122f947364efee9f9f433e5ade2
Parents: 26820ea
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Dec 1 16:47:44 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Dec 10 15:42:48 2014 +0100
----------------------------------------------------------------------
.../flink/compiler/IterationsCompilerTest.java | 54 ++++++++++++++++++++
1 file changed, 54 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9d02f2a4/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
index 2794572..9ceb25c 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
@@ -28,6 +28,8 @@ import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.functions.RichJoinFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
@@ -41,6 +43,7 @@ import org.apache.flink.compiler.plan.OptimizedPlan;
import org.apache.flink.compiler.plan.WorksetIterationPlanNode;
import org.apache.flink.compiler.plandump.PlanJSONDumpGenerator;
import org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.compiler.testfunctions.IdentityKeyExtractor;
import org.apache.flink.compiler.testfunctions.IdentityMapper;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.util.Collector;
@@ -260,6 +263,50 @@ public class IterationsCompilerTest extends CompilerTestBase {
}
}
+ @Test
+ public void testResetPartialSolution() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Long> width = env.generateSequence(1, 10);
+ DataSet<Long> update = env.generateSequence(1, 10);
+ DataSet<Long> lastGradient = env.generateSequence(1, 10);
+
+ DataSet<Long> init = width.union(update).union(lastGradient);
+
+ IterativeDataSet<Long> iteration = init.iterate(10);
+
+ width = iteration.filter(new IdFilter<Long>());
+ update = iteration.filter(new IdFilter<Long>());
+ lastGradient = iteration.filter(new IdFilter<Long>());
+
+ DataSet<Long> gradient = width.map(new IdentityMapper<Long>());
+ DataSet<Long> term = gradient.join(lastGradient)
+ .where(new IdentityKeyExtractor<Long>())
+ .equalTo(new IdentityKeyExtractor<Long>())
+ .with(new JoinFunction<Long, Long, Long>() {
+ public Long join(Long first, Long second) { return null; }
+ });
+
+ update = update.map(new RichMapFunction<Long, Long>() {
+ public Long map(Long value) { return null; }
+ }).withBroadcastSet(term, "some-name");
+
+ DataSet<Long> result = iteration.closeWith(width.union(update).union(lastGradient));
+
+ result.print();
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
// --------------------------------------------------------------------------------------------
public static DataSet<Tuple2<Long, Long>> doBulkIteration(DataSet<Tuple2<Long, Long>> vertices, DataSet<Tuple2<Long, Long>> edges) {
@@ -352,4 +399,11 @@ public class IterationsCompilerTest extends CompilerTestBase {
return new Tuple2<T, T>(value, value);
}
}
+
+ public static final class IdFilter<T> implements FilterFunction<T> {
+ @Override
+ public boolean filter(T value) {
+ return true;
+ }
+ }
}
[4/4] incubator-flink git commit: [java8] Fix pom.xml to properly
work with Eclipse m2e
Posted by se...@apache.org.
[java8] Fix pom.xml to properly work with Eclipse m2e
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/6d69f697
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/6d69f697
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/6d69f697
Branch: refs/heads/master
Commit: 6d69f697fd1483a30a88bc10d2544a2240e77ed4
Parents: a79ea78
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Dec 10 15:45:40 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Dec 10 15:45:40 2014 +0100
----------------------------------------------------------------------
flink-java8/pom.xml | 17 +++++++++++++++--
1 file changed, 15 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6d69f697/flink-java8/pom.xml
----------------------------------------------------------------------
diff --git a/flink-java8/pom.xml b/flink-java8/pom.xml
index 0aa06fa..ad1273c 100644
--- a/flink-java8/pom.xml
+++ b/flink-java8/pom.xml
@@ -188,7 +188,7 @@ under the License.
</goals>
</pluginExecutionFilter>
<action>
- <ignore></ignore>
+ <ignore/>
</action>
</pluginExecution>
<pluginExecution>
@@ -202,7 +202,20 @@ under the License.
</goals>
</pluginExecutionFilter>
<action>
- <ignore></ignore>
+ <ignore/>
+ </action>
+ </pluginExecution>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <versionRange>[2.9,)</versionRange>
+ <goals>
+ <goal>unpack</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore/>
</action>
</pluginExecution>
</pluginExecutions>