You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/12/10 15:50:37 UTC

[1/4] incubator-flink git commit: [FLINK-1316] [web client] Fix plan display for nodes referenced from multiple closures.

Repository: incubator-flink
Updated Branches:
  refs/heads/master 26820ea70 -> 6d69f697f


[FLINK-1316] [web client] Fix plan display for nodes referenced from multiple closures.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/a79ea784
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/a79ea784
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/a79ea784

Branch: refs/heads/master
Commit: a79ea784e6c5ee491cecf5c93aad2248029e3c4b
Parents: ac80458
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Dec 10 15:23:12 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Dec 10 15:42:48 2014 +0100

----------------------------------------------------------------------
 flink-clients/resources/web-docs/js/graphCreator.js | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a79ea784/flink-clients/resources/web-docs/js/graphCreator.js
----------------------------------------------------------------------
diff --git a/flink-clients/resources/web-docs/js/graphCreator.js b/flink-clients/resources/web-docs/js/graphCreator.js
index bbd71b1..86c8c81 100644
--- a/flink-clients/resources/web-docs/js/graphCreator.js
+++ b/flink-clients/resources/web-docs/js/graphCreator.js
@@ -175,8 +175,11 @@ function loadJsonToDagre(data){
 					g.addEdge(null, el.predecessors[j].id, el.id, { label: createLabelEdge(el.predecessors[j]) });	
 				} else {
 					var missingNode = searchForNode(el.predecessors[j].id);
-					g.addNode(missingNode.id, {label: createLabelNode(missingNode, "mirror")});
-					g.addEdge(null, missingNode.id, el.id, { label: createLabelEdge(missingNode) });
+					if (missingNode.alreadyAdded != true) {
+						missingNode.alreadyAdded = true;
+						g.addNode(missingNode.id, {label: createLabelNode(missingNode, "mirror")});
+						g.addEdge(null, missingNode.id, el.id, { label: createLabelEdge(missingNode) });
+					}
 				}
 			}
 		}


[2/4] incubator-flink git commit: [FLINK-1315] [optimizer] Fix bug in branch tracking logic.

Posted by se...@apache.org.
[FLINK-1315] [optimizer] Fix bug in branch tracking logic.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/ac80458f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/ac80458f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/ac80458f

Branch: refs/heads/master
Commit: ac80458fc089b79f1e793f8760d331a988c407c1
Parents: 9d02f2a
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Dec 10 14:53:36 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Dec 10 15:42:48 2014 +0100

----------------------------------------------------------------------
 .../compiler/plan/BulkIterationPlanNode.java    |  8 +--
 .../compiler/plan/SingleInputPlanNode.java      | 11 +--
 .../compiler/plan/WorksetIterationPlanNode.java |  6 +-
 .../compiler/BranchingPlansCompilerTest.java    | 75 ++++++++++++++++++++
 .../flink/compiler/IterationsCompilerTest.java  |  2 +-
 5 files changed, 89 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac80458f/flink-compiler/src/main/java/org/apache/flink/compiler/plan/BulkIterationPlanNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/BulkIterationPlanNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/BulkIterationPlanNode.java
index 7fd8993..3f413b5 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/BulkIterationPlanNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/BulkIterationPlanNode.java
@@ -142,17 +142,17 @@ public class BulkIterationPlanNode extends SingleInputPlanNode implements Iterat
 	}
 
 	private void mergeBranchPlanMaps() {
-		for(OptimizerNode.UnclosedBranchDescriptor desc: template.getOpenBranches()){
+		for (OptimizerNode.UnclosedBranchDescriptor desc: template.getOpenBranches()) {
 			OptimizerNode brancher = desc.getBranchingNode();
 
-			if(branchPlan == null) {
+			if (branchPlan == null) {
 				branchPlan = new HashMap<OptimizerNode, PlanNode>(6);
 			}
 			
-			if(!branchPlan.containsKey(brancher)){
+			if (!branchPlan.containsKey(brancher)) {
 				PlanNode selectedCandidate = null;
 
-				if(rootOfStepFunction.branchPlan != null){
+				if (rootOfStepFunction.branchPlan != null) {
 					selectedCandidate = rootOfStepFunction.branchPlan.get(brancher);
 				}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac80458f/flink-compiler/src/main/java/org/apache/flink/compiler/plan/SingleInputPlanNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/SingleInputPlanNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/SingleInputPlanNode.java
index 9b9202b..d77d82e 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/SingleInputPlanNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/SingleInputPlanNode.java
@@ -25,6 +25,7 @@ import static org.apache.flink.compiler.plan.PlanNode.SourceAndDamReport.NOT_FOU
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 
 import org.apache.flink.api.common.operators.util.FieldList;
@@ -84,12 +85,14 @@ public class SingleInputPlanNode extends PlanNode {
 		}
 		
 		final PlanNode predNode = input.getSource();
-		if (this.branchPlan == null) {
-			this.branchPlan = predNode.branchPlan;
-		} else if (predNode.branchPlan != null) {
+		
+		if (predNode.branchPlan != null && !predNode.branchPlan.isEmpty()) {
+			
+			if (this.branchPlan == null) {
+				this.branchPlan = new HashMap<OptimizerNode, PlanNode>();
+			}
 			this.branchPlan.putAll(predNode.branchPlan);
 		}
-		
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac80458f/flink-compiler/src/main/java/org/apache/flink/compiler/plan/WorksetIterationPlanNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/WorksetIterationPlanNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/WorksetIterationPlanNode.java
index 8d38814..47e9b69 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/WorksetIterationPlanNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/WorksetIterationPlanNode.java
@@ -200,11 +200,9 @@ public class WorksetIterationPlanNode extends DualInputPlanNode implements Itera
 	 * because they can contain also some of the branching nodes.
 	 */
 	@Override
-	protected void mergeBranchPlanMaps(Map<OptimizerNode, PlanNode> branchPlan1, Map<OptimizerNode,
-			PlanNode> branchPlan2){
-
-	}
+	protected void mergeBranchPlanMaps(Map<OptimizerNode, PlanNode> branchPlan1, Map<OptimizerNode,PlanNode> branchPlan2) {}
 
+	
 	protected void mergeBranchPlanMaps() {
 		Map<OptimizerNode, PlanNode> branchPlan1 = input1.getSource().branchPlan;
 		Map<OptimizerNode, PlanNode> branchPlan2 = input2.getSource().branchPlan;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac80458f/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
index 2b4ff02..ae185f9 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/BranchingPlansCompilerTest.java
@@ -19,6 +19,8 @@
 
 package org.apache.flink.compiler;
 
+import static org.junit.Assert.*;
+
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
@@ -30,7 +32,9 @@ import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichJoinFunction;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
 import org.apache.flink.api.java.record.operators.BulkIteration;
 import org.apache.flink.api.java.record.operators.CoGroupOperator;
 import org.apache.flink.api.java.record.operators.CrossOperator;
@@ -40,9 +44,11 @@ import org.apache.flink.api.java.record.operators.FileDataSource;
 import org.apache.flink.api.java.record.operators.JoinOperator;
 import org.apache.flink.api.java.record.operators.MapOperator;
 import org.apache.flink.api.java.record.operators.ReduceOperator;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.compiler.plan.OptimizedPlan;
 import org.apache.flink.compiler.plan.SinkPlanNode;
 import org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.compiler.testfunctions.DummyFlatJoinFunction;
 import org.apache.flink.compiler.testfunctions.IdentityGroupReducer;
 import org.apache.flink.compiler.testfunctions.IdentityKeyExtractor;
 import org.apache.flink.compiler.testfunctions.IdentityMapper;
@@ -1017,4 +1023,73 @@ public class BranchingPlansCompilerTest extends CompilerTestBase {
 			Assert.fail(e.getMessage());
 		}
 	}
+	
+	@Test
+	public void testBranchesOnlyInBCVariables1() {
+		try{
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Long> input = env.generateSequence(1, 10);
+			DataSet<Long> bc_input = env.generateSequence(1, 10);
+			
+			input
+				.map(new IdentityMapper<Long>()).withBroadcastSet(bc_input, "name1")
+				.map(new IdentityMapper<Long>()).withBroadcastSet(bc_input, "name2")
+				.print();
+			
+			Plan plan = env.createProgramPlan();
+			compileNoStats(plan);
+		}
+		catch(Exception e){
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testBranchesOnlyInBCVariables2() {
+		try{
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Tuple2<Long, Long>> input = env.generateSequence(1, 10).map(new Duplicator<Long>()).name("proper input");
+			
+			DataSet<Long> bc_input1 = env.generateSequence(1, 10).name("BC input 1");
+			DataSet<Long> bc_input2 = env.generateSequence(1, 10).name("BC input 1");
+			
+			DataSet<Tuple2<Long, Long>> joinInput1 =
+					input.map(new IdentityMapper<Tuple2<Long,Long>>())
+						.withBroadcastSet(bc_input1.map(new IdentityMapper<Long>()), "bc1")
+						.withBroadcastSet(bc_input2, "bc2");
+			
+			DataSet<Tuple2<Long, Long>> joinInput2 =
+					input.map(new IdentityMapper<Tuple2<Long,Long>>())
+						.withBroadcastSet(bc_input1, "bc1")
+						.withBroadcastSet(bc_input2, "bc2");
+			
+			DataSet<Tuple2<Long, Long>> joinResult = joinInput1
+				.join(joinInput2, JoinHint.REPARTITION_HASH_FIRST).where(0).equalTo(1)
+				.with(new DummyFlatJoinFunction<Tuple2<Long,Long>>());
+			
+			input
+				.map(new IdentityMapper<Tuple2<Long,Long>>())
+					.withBroadcastSet(bc_input1, "bc1")
+				.union(joinResult)
+				.print();
+			
+			Plan plan = env.createProgramPlan();
+			compileNoStats(plan);
+		}
+		catch(Exception e){
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	private static final class Duplicator<T> implements MapFunction<T, Tuple2<T, T>> {
+		
+		@Override
+		public Tuple2<T, T> map(T value) {
+			return new Tuple2<T, T>(value, value);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ac80458f/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
index 9ceb25c..23b7cfe 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
@@ -299,7 +299,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
 			
-			
+			new NepheleJobGraphGenerator().compileJobGraph(op);
 		}
 		catch (Exception e) {
 			e.printStackTrace();


[3/4] incubator-flink git commit: [FLINK-1315] [optimizer] Extra test reproducing the IllegalStateException bug in bulk iterations.

Posted by se...@apache.org.
[FLINK-1315] [optimizer] Extra test reproducing the IllegalStateException bug in bulk iterations.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/9d02f2a4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/9d02f2a4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/9d02f2a4

Branch: refs/heads/master
Commit: 9d02f2a479c49122f947364efee9f9f433e5ade2
Parents: 26820ea
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Dec 1 16:47:44 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Dec 10 15:42:48 2014 +0100

----------------------------------------------------------------------
 .../flink/compiler/IterationsCompilerTest.java  | 54 ++++++++++++++++++++
 1 file changed, 54 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9d02f2a4/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
index 2794572..9ceb25c 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
@@ -28,6 +28,8 @@ import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.operators.IterativeDataSet;
 import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.api.common.functions.RichJoinFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
@@ -41,6 +43,7 @@ import org.apache.flink.compiler.plan.OptimizedPlan;
 import org.apache.flink.compiler.plan.WorksetIterationPlanNode;
 import org.apache.flink.compiler.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator;
+import org.apache.flink.compiler.testfunctions.IdentityKeyExtractor;
 import org.apache.flink.compiler.testfunctions.IdentityMapper;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.util.Collector;
@@ -260,6 +263,50 @@ public class IterationsCompilerTest extends CompilerTestBase {
 		}
 	}
 	
+	@Test
+	public void testResetPartialSolution() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Long> width = env.generateSequence(1, 10);
+			DataSet<Long> update = env.generateSequence(1, 10);
+			DataSet<Long> lastGradient = env.generateSequence(1, 10);
+			
+			DataSet<Long> init = width.union(update).union(lastGradient);
+			
+			IterativeDataSet<Long> iteration = init.iterate(10);
+			
+			width = iteration.filter(new IdFilter<Long>());
+			update = iteration.filter(new IdFilter<Long>());
+			lastGradient = iteration.filter(new IdFilter<Long>());
+			
+			DataSet<Long> gradient = width.map(new IdentityMapper<Long>());
+			DataSet<Long> term = gradient.join(lastGradient)
+								.where(new IdentityKeyExtractor<Long>())
+								.equalTo(new IdentityKeyExtractor<Long>())
+								.with(new JoinFunction<Long, Long, Long>() {
+									public Long join(Long first, Long second) { return null; }
+								});
+			
+			update = update.map(new RichMapFunction<Long, Long>() {
+				public Long map(Long value) { return null; }
+			}).withBroadcastSet(term, "some-name");
+			
+			DataSet<Long> result = iteration.closeWith(width.union(update).union(lastGradient));
+			
+			result.print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
 	// --------------------------------------------------------------------------------------------
 	
 	public static DataSet<Tuple2<Long, Long>> doBulkIteration(DataSet<Tuple2<Long, Long>> vertices, DataSet<Tuple2<Long, Long>> edges) {
@@ -352,4 +399,11 @@ public class IterationsCompilerTest extends CompilerTestBase {
 			return new Tuple2<T, T>(value, value);
 		}
 	}
+	
+	public static final class IdFilter<T> implements FilterFunction<T> {
+		@Override
+		public boolean filter(T value) {
+			return true;
+		}
+	}
 }


[4/4] incubator-flink git commit: [java8] Fix pom.xml to properly work with Eclipse m2e

Posted by se...@apache.org.
[java8] Fix pom.xml to properly work with Eclipse m2e


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/6d69f697
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/6d69f697
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/6d69f697

Branch: refs/heads/master
Commit: 6d69f697fd1483a30a88bc10d2544a2240e77ed4
Parents: a79ea78
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Dec 10 15:45:40 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Dec 10 15:45:40 2014 +0100

----------------------------------------------------------------------
 flink-java8/pom.xml | 17 +++++++++++++++--
 1 file changed, 15 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6d69f697/flink-java8/pom.xml
----------------------------------------------------------------------
diff --git a/flink-java8/pom.xml b/flink-java8/pom.xml
index 0aa06fa..ad1273c 100644
--- a/flink-java8/pom.xml
+++ b/flink-java8/pom.xml
@@ -188,7 +188,7 @@ under the License.
 										</goals>
 									</pluginExecutionFilter>
 									<action>
-										<ignore></ignore>
+										<ignore/>
 									</action>
 								</pluginExecution>
 								<pluginExecution>
@@ -202,7 +202,20 @@ under the License.
 										</goals>
 									</pluginExecutionFilter>
 									<action>
-										<ignore></ignore>
+										<ignore/>
+									</action>
+								</pluginExecution>
+								<pluginExecution>
+									<pluginExecutionFilter>
+										<groupId>org.apache.maven.plugins</groupId>
+										<artifactId>maven-dependency-plugin</artifactId>
+										<versionRange>[2.9,)</versionRange>
+										<goals>
+											<goal>unpack</goal>
+										</goals>
+									</pluginExecutionFilter>
+									<action>
+										<ignore/>
 									</action>
 								</pluginExecution>
 							</pluginExecutions>