You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/06/18 18:41:49 UTC

[1/2] git commit: [FLINk-941] Place pipeline breakers for SingleInput nodes with broadcast variables, if needed.

Repository: incubator-flink
Updated Branches:
  refs/heads/master 1a1a13e06 -> 3a452e5bb


[FLINk-941] Place pipeline breakers for SingleInput nodes with broadcast variables, if needed.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/3a452e5b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/3a452e5b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/3a452e5b

Branch: refs/heads/master
Commit: 3a452e5bbabe1967ffc04aa34f2959d1efece282
Parents: 7c4c4a9
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Jun 18 17:07:25 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jun 18 18:29:11 2014 +0200

----------------------------------------------------------------------
 .../compiler/dag/SingleInputNode.java           | 34 +++++++++
 .../BroadcastVariablePipelinebreakerTest.java   | 80 ++++++++++++++++++++
 2 files changed, 114 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3a452e5b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java
index e1727f9..8bf3f16 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java
@@ -13,6 +13,10 @@
 
 package eu.stratosphere.compiler.dag;
 
+import static eu.stratosphere.compiler.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE;
+import static eu.stratosphere.compiler.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE_AND_DAM;
+import static eu.stratosphere.compiler.plan.PlanNode.SourceAndDamReport.NOT_FOUND;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -39,6 +43,7 @@ import eu.stratosphere.compiler.plan.Channel;
 import eu.stratosphere.compiler.plan.NamedChannel;
 import eu.stratosphere.compiler.plan.PlanNode;
 import eu.stratosphere.compiler.plan.SingleInputPlanNode;
+import eu.stratosphere.compiler.plan.PlanNode.SourceAndDamReport;
 import eu.stratosphere.compiler.util.NoOpUnaryUdfOp;
 import eu.stratosphere.configuration.Configuration;
 import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
@@ -383,6 +388,7 @@ public abstract class SingleInputNode extends OptimizerNode {
 		for (List<NamedChannel> broadcastChannelsCombination: Sets.cartesianProduct(broadcastPlanChannels)) {
 			
 			boolean validCombination = true;
+			boolean requiresPipelinebreaker = false;
 			
 			// check whether the broadcast inputs use the same plan candidate at the branching point
 			for (int i = 0; i < broadcastChannelsCombination.size(); i++) {
@@ -404,12 +410,40 @@ public abstract class SingleInputNode extends OptimizerNode {
 						break;
 					}
 				}
+				
+				// check if there is a common predecessor and whether there is a dam on the way to all common predecessors
+				if (this.hereJoinedBranches != null) {
+					for (OptimizerNode brancher : this.hereJoinedBranches) {
+						PlanNode candAtBrancher = in.getSource().getCandidateAtBranchPoint(brancher);
+						
+						if (candAtBrancher == null) {
+							// closed branch between two broadcast variables
+							continue;
+						}
+						
+						SourceAndDamReport res = in.getSource().hasDamOnPathDownTo(candAtBrancher);
+						if (res == NOT_FOUND) {
+							throw new CompilerException("Bug: Tracing dams for deadlock detection is broken.");
+						} else if (res == FOUND_SOURCE) {
+							requiresPipelinebreaker = true;
+							break;
+						} else if (res == FOUND_SOURCE_AND_DAM) {
+							// good
+						} else {
+							throw new CompilerException();
+						}
+					}
+				}
 			}
 			
 			if (!validCombination) {
 				continue;
 			}
 			
+			if (requiresPipelinebreaker) {
+				in.setTempMode(in.getTempMode().makePipelineBreaker());
+			}
+			
 			final SingleInputPlanNode node = dps.instantiate(in, this);
 			node.setBroadcastInputs(broadcastChannelsCombination);
 			

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/3a452e5b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/BroadcastVariablePipelinebreakerTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/BroadcastVariablePipelinebreakerTest.java b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/BroadcastVariablePipelinebreakerTest.java
new file mode 100644
index 0000000..ec65e4b
--- /dev/null
+++ b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/BroadcastVariablePipelinebreakerTest.java
@@ -0,0 +1,80 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.pact.compiler;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+import eu.stratosphere.api.common.Plan;
+import eu.stratosphere.api.java.DataSet;
+import eu.stratosphere.api.java.ExecutionEnvironment;
+import eu.stratosphere.compiler.dag.TempMode;
+import eu.stratosphere.compiler.plan.OptimizedPlan;
+import eu.stratosphere.compiler.plan.SingleInputPlanNode;
+import eu.stratosphere.compiler.plan.SinkPlanNode;
+import eu.stratosphere.pact.compiler.testfunctions.IdentityMapper;
+
+@SuppressWarnings("serial")
+public class BroadcastVariablePipelinebreakerTest extends CompilerTestBase {
+
+	@Test
+	public void testNoBreakerForIndependentVariable() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<String> source1 = env.fromElements("test");
+			DataSet<String> source2 = env.fromElements("test");
+			
+			source1.map(new IdentityMapper<String>()).withBroadcastSet(source2, "some name").print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			SingleInputPlanNode mapper = (SingleInputPlanNode) sink.getInput().getSource();
+			
+			assertEquals(TempMode.NONE, mapper.getInput().getTempMode());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	 @Test
+	public void testBreakerForDependentVariable() {
+			try {
+				ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+				
+				DataSet<String> source1 = env.fromElements("test");
+				
+				source1.map(new IdentityMapper<String>()).map(new IdentityMapper<String>()).withBroadcastSet(source1, "some name").print();
+				
+				Plan p = env.createProgramPlan();
+				OptimizedPlan op = compileNoStats(p);
+				
+				SinkPlanNode sink = op.getDataSinks().iterator().next();
+				SingleInputPlanNode mapper = (SingleInputPlanNode) sink.getInput().getSource();
+				
+				assertEquals(TempMode.PIPELINE_BREAKER, mapper.getInput().getTempMode());
+			}
+			catch (Exception e) {
+				e.printStackTrace();
+				fail(e.getMessage());
+			}
+	}
+}


[2/2] git commit: [FLINK-940] Added additional test for cogroup with delta iteration solution set on second input.

Posted by se...@apache.org.
[FLINK-940] Added additional test for cogroup with delta iteration solution set on second input.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/7c4c4a9d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/7c4c4a9d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/7c4c4a9d

Branch: refs/heads/master
Commit: 7c4c4a9d2fbc7b2ffdbd4668a36c21d503c672b2
Parents: 1a1a13e
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Jun 18 01:28:34 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jun 18 18:29:11 2014 +0200

----------------------------------------------------------------------
 .../test/testdata/ConnectedComponentsData.java  |  19 +++
 .../CoGroupConnectedComponentsSecondITCase.java | 139 +++++++++++++++++++
 2 files changed, 158 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7c4c4a9d/stratosphere-test-utils/src/main/java/eu/stratosphere/test/testdata/ConnectedComponentsData.java
----------------------------------------------------------------------
diff --git a/stratosphere-test-utils/src/main/java/eu/stratosphere/test/testdata/ConnectedComponentsData.java b/stratosphere-test-utils/src/main/java/eu/stratosphere/test/testdata/ConnectedComponentsData.java
index 76c7099..5bb8f9e 100644
--- a/stratosphere-test-utils/src/main/java/eu/stratosphere/test/testdata/ConnectedComponentsData.java
+++ b/stratosphere-test-utils/src/main/java/eu/stratosphere/test/testdata/ConnectedComponentsData.java
@@ -16,11 +16,14 @@ package eu.stratosphere.test.testdata;
 
 import java.io.BufferedReader;
 import java.io.IOException;
+import java.util.List;
 import java.util.Random;
 import java.util.regex.Pattern;
 
 import org.junit.Assert;
 
+import eu.stratosphere.api.java.tuple.Tuple2;
+
 
 public class ConnectedComponentsData {
 
@@ -106,6 +109,22 @@ public class ConnectedComponentsData {
 			}
 		}
 	}
+	
+	public static void checkOddEvenResult(List<Tuple2<Long, Long>> lines) throws IOException {
+		for (Tuple2<Long, Long> line : lines) {
+			try {
+				long vertex = line.f0;
+				long component = line.f1;
+				long should = vertex % 2;
+				if (should == 0) {
+					should = 2;
+				}
+				Assert.assertEquals("Vertex is in wrong component.", should, component);
+			} catch (NumberFormatException e) {
+				Assert.fail("Malformed result.");
+			}
+		}
+	}
 
 	private ConnectedComponentsData() {}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/7c4c4a9d/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/CoGroupConnectedComponentsSecondITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/CoGroupConnectedComponentsSecondITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/CoGroupConnectedComponentsSecondITCase.java
new file mode 100644
index 0000000..1a0f443
--- /dev/null
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/iterative/CoGroupConnectedComponentsSecondITCase.java
@@ -0,0 +1,139 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.test.iterative;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import eu.stratosphere.api.java.DataSet;
+import eu.stratosphere.api.java.DeltaIteration;
+import eu.stratosphere.api.java.ExecutionEnvironment;
+import eu.stratosphere.api.java.functions.CoGroupFunction;
+import eu.stratosphere.api.java.functions.FlatMapFunction;
+import eu.stratosphere.api.java.functions.FunctionAnnotation.ConstantFieldsFirst;
+import eu.stratosphere.api.java.functions.FunctionAnnotation.ConstantFieldsSecond;
+import eu.stratosphere.api.java.functions.MapFunction;
+import eu.stratosphere.api.java.io.LocalCollectionOutputFormat;
+import eu.stratosphere.api.java.tuple.Tuple2;
+import eu.stratosphere.example.java.graph.ConnectedComponents.DuplicateValue;
+import eu.stratosphere.example.java.graph.ConnectedComponents.NeighborWithComponentIDJoin;
+import eu.stratosphere.test.testdata.ConnectedComponentsData;
+import eu.stratosphere.test.util.JavaProgramTestBase;
+import eu.stratosphere.util.Collector;
+import eu.stratosphere.util.LogUtils;
+
+
+@SuppressWarnings("serial")
+public class CoGroupConnectedComponentsSecondITCase extends JavaProgramTestBase {
+	
+	private static final long SEED = 0xBADC0FFEEBEEFL;
+	
+	private static final int NUM_VERTICES = 1000;
+	
+	private static final int NUM_EDGES = 10000;
+
+	
+	@Override
+	protected void testProgram() throws Exception {
+			
+		// set up execution environment
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		LogUtils.initializeDefaultConsoleLogger();
+		
+		// read vertex and edge data
+		DataSet<Long> vertices = env.fromElements(ConnectedComponentsData.getEnumeratingVertices(NUM_VERTICES).split("\n"))
+				.map(new VertexParser());
+		
+		DataSet<Tuple2<Long, Long>> edges = env.fromElements(ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES, NUM_VERTICES, SEED).split("\n"))
+				.flatMap(new EdgeParser());
+		
+		// assign the initial components (equal to the vertex id)
+		DataSet<Tuple2<Long, Long>> verticesWithInitialId = vertices.map(new DuplicateValue<Long>());
+				
+		// open a delta iteration
+		DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =
+				verticesWithInitialId.iterateDelta(verticesWithInitialId, 100, 0);
+		
+		// apply the step logic: join with the edges, select the minimum neighbor, update if the component of the candidate is smaller
+		DataSet<Tuple2<Long, Long>> changes = iteration
+				.getWorkset().join(edges).where(0).equalTo(0).with(new NeighborWithComponentIDJoin())
+				.coGroup(iteration.getSolutionSet()).where(0).equalTo(0)
+				.with(new MinIdAndUpdate());
+
+		// close the delta iteration (delta and new workset are identical)
+		DataSet<Tuple2<Long, Long>> result = iteration.closeWith(changes, changes);
+		
+		
+		// emit result
+		List<Tuple2<Long,Long>> resutTuples = new ArrayList<Tuple2<Long,Long>>();
+		result.output(new LocalCollectionOutputFormat<Tuple2<Long,Long>>(resutTuples));
+		
+		env.execute();
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  The test program
+	// --------------------------------------------------------------------------------------------
+	
+	public static final class VertexParser extends MapFunction<String, Long> {
+
+		@Override
+		public Long map(String value) throws Exception {
+			return Long.parseLong(value);
+		}
+	}
+	
+	public static final class EdgeParser extends FlatMapFunction<String, Tuple2<Long, Long>> {
+
+		@Override
+		public void flatMap(String value, Collector<Tuple2<Long, Long>> out) throws Exception {
+			String[] parts = value.split(" ");
+			long v1 = Long.parseLong(parts[0]);
+			long v2 = Long.parseLong(parts[1]);
+			
+			out.collect(new Tuple2<Long, Long>(v1, v2));
+			out.collect(new Tuple2<Long, Long>(v2, v1));
+		}
+	}
+
+	@ConstantFieldsFirst("0")
+	@ConstantFieldsSecond("0")
+	public static final class MinIdAndUpdate extends CoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
+		
+		@Override
+		public void coGroup(Iterator<Tuple2<Long, Long>> candidates, Iterator<Tuple2<Long, Long>> current, Collector<Tuple2<Long, Long>> out) {
+			if (!current.hasNext()) {
+				throw new RuntimeException("Error: Id not encountered before.");
+			}
+			
+			Tuple2<Long, Long> old = current.next();
+			
+			long minimumComponentID = Long.MAX_VALUE;
+
+			while (candidates.hasNext()) {
+				long candidateComponentID = candidates.next().f1;
+				if (candidateComponentID < minimumComponentID) {
+					minimumComponentID = candidateComponentID;
+				}
+			}
+			
+			if (minimumComponentID < old.f1) {
+				old.f1 = minimumComponentID;
+				out.collect(old);
+			}
+		}
+	}
+}