You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/03/18 17:48:51 UTC

[02/13] flink git commit: [FLINK-1350] [runtime] Set result type to BLOCKING if data exchange mode is BATCH

[FLINK-1350] [runtime] Set result type to BLOCKING if data exchange mode is BATCH


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9c77f078
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9c77f078
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9c77f078

Branch: refs/heads/master
Commit: 9c77f0785e43326521da5e535f9ab1f05a9c6280
Parents: 9d7acf3
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Mar 17 10:42:19 2015 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Wed Mar 18 17:44:40 2015 +0100

----------------------------------------------------------------------
 .../plantranslate/NepheleJobGraphGenerator.java | 52 +++++++++++++++-----
 .../executiongraph/ExecutionJobVertex.java      |  1 -
 .../runtime/jobgraph/AbstractJobVertex.java     | 27 +++++++---
 .../ExecutionGraphConstructionTest.java         | 15 +++---
 4 files changed, 67 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9c77f078/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
index 052d439..9a47f79 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
@@ -18,16 +18,6 @@
 
 package org.apache.flink.compiler.plantranslate;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.aggregators.AggregatorRegistry;
 import org.apache.flink.api.common.aggregators.AggregatorWithName;
@@ -58,6 +48,8 @@ import org.apache.flink.compiler.plan.WorksetPlanNode;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.io.network.DataExchangeMode;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.iterative.convergence.WorksetEmptyConvergenceCriterion;
 import org.apache.flink.runtime.iterative.task.IterationHeadPactTask;
 import org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask;
@@ -87,6 +79,16 @@ import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Visitor;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
 /**
  * This component translates the optimizer's resulting plan a nephele job graph. The
  * translation is a one to one mapping. All decisions are made by the optimizer, this class
@@ -1069,7 +1071,33 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 				throw new RuntimeException("Unknown runtime ship strategy: " + channel.getShipStrategy());
 		}
 
-		targetVertex.connectNewDataSetAsInput(sourceVertex, distributionPattern);
+		final ResultPartitionType resultType;
+
+		switch (channel.getDataExchangeMode()) {
+
+			case PIPELINED:
+				resultType = ResultPartitionType.PIPELINED;
+				break;
+
+			case BATCH:
+				// BLOCKING results are currently not supported in closed loop iterations
+				//
+				// See https://issues.apache.org/jira/browse/FLINK-1713 for details
+				resultType = channel.getSource().isOnDynamicPath()
+						? ResultPartitionType.PIPELINED
+						: ResultPartitionType.BLOCKING;
+				break;
+
+			case PIPELINE_WITH_BATCH_FALLBACK:
+				throw new UnsupportedOperationException("Data exchange mode " +
+						channel.getDataExchangeMode() + " currently not supported.");
+
+			default:
+				throw new UnsupportedOperationException("Unknown data exchange mode.");
+
+		}
+
+		targetVertex.connectNewDataSetAsInput(sourceVertex, distributionPattern, resultType);
 
 		// -------------- configure the source task's ship strategy strategies in task config --------------
 		final int outputIndex = sourceConfig.getNumOutputs();
@@ -1139,7 +1167,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 
 			boolean needsMemory = false;
 			// Don't add a pipeline breaker if the data exchange is already blocking.
-			if (tm.breaksPipeline()) {
+			if (tm.breaksPipeline() && channel.getDataExchangeMode() != DataExchangeMode.BATCH) {
 				config.setInputAsynchronouslyMaterialized(inputNum, true);
 				needsMemory = true;
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/9c77f078/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 6fdc628..ad72d13 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -45,7 +45,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-
 public class ExecutionJobVertex implements Serializable {
 	
 	private static final long serialVersionUID = 42L;

http://git-wip-us.apache.org/repos/asf/flink/blob/9c77f078/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java
index 609ed3e..8816a69 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java
@@ -18,9 +18,6 @@
 
 package org.apache.flink.runtime.jobgraph;
 
-import java.util.ArrayList;
-import java.util.List;
-
 import org.apache.commons.lang3.Validate;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplitSource;
@@ -29,6 +26,9 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 
+import java.util.ArrayList;
+import java.util.List;
+
 /**
  * An abstract base class for a job vertex.
  */
@@ -316,12 +316,15 @@ public class AbstractJobVertex implements java.io.Serializable {
 	
 	// --------------------------------------------------------------------------------------------
 
-	public IntermediateDataSet createAndAddResultDataSet() {
-		return createAndAddResultDataSet(new IntermediateDataSetID());
+	public IntermediateDataSet createAndAddResultDataSet(ResultPartitionType partitionType) {
+		return createAndAddResultDataSet(new IntermediateDataSetID(), partitionType);
 	}
 
-	public IntermediateDataSet createAndAddResultDataSet(IntermediateDataSetID id) {
-		IntermediateDataSet result = new IntermediateDataSet(id, ResultPartitionType.PIPELINED, this);
+	public IntermediateDataSet createAndAddResultDataSet(
+			IntermediateDataSetID id,
+			ResultPartitionType partitionType) {
+
+		IntermediateDataSet result = new IntermediateDataSet(id, partitionType, this);
 		this.results.add(result);
 		return result;
 	}
@@ -333,7 +336,15 @@ public class AbstractJobVertex implements java.io.Serializable {
 	}
 
 	public void connectNewDataSetAsInput(AbstractJobVertex input, DistributionPattern distPattern) {
-		IntermediateDataSet dataSet = input.createAndAddResultDataSet();
+		connectNewDataSetAsInput(input, distPattern, ResultPartitionType.PIPELINED);
+	}
+
+	public void connectNewDataSetAsInput(
+			AbstractJobVertex input,
+			DistributionPattern distPattern,
+			ResultPartitionType partitionType) {
+
+		IntermediateDataSet dataSet = input.createAndAddResultDataSet(partitionType);
 		JobEdge edge = new JobEdge(dataSet, this, distPattern);
 		this.inputs.add(edge);
 		dataSet.addConsumer(edge);

http://git-wip-us.apache.org/repos/asf/flink/blob/9c77f078/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
index b16109a..e0852c6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
@@ -27,6 +27,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.junit.Test;
 import org.mockito.Matchers;
 
@@ -129,9 +130,9 @@ public class ExecutionGraphConstructionTest {
 		v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL);
 		
 		// create results for v2 and v3
-		IntermediateDataSet v2result = v2.createAndAddResultDataSet();
-		IntermediateDataSet v3result_1 = v3.createAndAddResultDataSet();
-		IntermediateDataSet v3result_2 = v3.createAndAddResultDataSet();
+		IntermediateDataSet v2result = v2.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
+		IntermediateDataSet v3result_1 = v3.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
+		IntermediateDataSet v3result_2 = v3.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
 		
 		
 		List<AbstractJobVertex> ordered = new ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2, v3));
@@ -190,9 +191,9 @@ public class ExecutionGraphConstructionTest {
 		v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL);
 		
 		// create results for v2 and v3
-		IntermediateDataSet v2result = v2.createAndAddResultDataSet();
-		IntermediateDataSet v3result_1 = v3.createAndAddResultDataSet();
-		IntermediateDataSet v3result_2 = v3.createAndAddResultDataSet();
+		IntermediateDataSet v2result = v2.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
+		IntermediateDataSet v3result_1 = v3.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
+		IntermediateDataSet v3result_2 = v3.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
 		
 		
 		List<AbstractJobVertex> ordered = new ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2, v3));
@@ -584,7 +585,7 @@ public class ExecutionGraphConstructionTest {
 			v2.setParallelism(7);
 			v3.setParallelism(2);
 
-			IntermediateDataSet result = v1.createAndAddResultDataSet();
+			IntermediateDataSet result = v1.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
 			v2.connectDataSetAsInput(result, DistributionPattern.ALL_TO_ALL);
 			v3.connectDataSetAsInput(result, DistributionPattern.ALL_TO_ALL);