You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2015/03/18 17:48:51 UTC
[02/13] flink git commit: [FLINK-1350] [runtime] Set result type to
BLOCKING if data exchange mode is BATCH
[FLINK-1350] [runtime] Set result type to BLOCKING if data exchange mode is BATCH
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9c77f078
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9c77f078
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9c77f078
Branch: refs/heads/master
Commit: 9c77f0785e43326521da5e535f9ab1f05a9c6280
Parents: 9d7acf3
Author: Ufuk Celebi <uc...@apache.org>
Authored: Tue Mar 17 10:42:19 2015 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Wed Mar 18 17:44:40 2015 +0100
----------------------------------------------------------------------
.../plantranslate/NepheleJobGraphGenerator.java | 52 +++++++++++++++-----
.../executiongraph/ExecutionJobVertex.java | 1 -
.../runtime/jobgraph/AbstractJobVertex.java | 27 +++++++---
.../ExecutionGraphConstructionTest.java | 15 +++---
4 files changed, 67 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9c77f078/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
index 052d439..9a47f79 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
@@ -18,16 +18,6 @@
package org.apache.flink.compiler.plantranslate;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.aggregators.AggregatorRegistry;
import org.apache.flink.api.common.aggregators.AggregatorWithName;
@@ -58,6 +48,8 @@ import org.apache.flink.compiler.plan.WorksetPlanNode;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.io.network.DataExchangeMode;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.iterative.convergence.WorksetEmptyConvergenceCriterion;
import org.apache.flink.runtime.iterative.task.IterationHeadPactTask;
import org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask;
@@ -87,6 +79,16 @@ import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Visitor;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
/**
* This component translates the optimizer's resulting plan a nephele job graph. The
* translation is a one to one mapping. All decisions are made by the optimizer, this class
@@ -1069,7 +1071,33 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
throw new RuntimeException("Unknown runtime ship strategy: " + channel.getShipStrategy());
}
- targetVertex.connectNewDataSetAsInput(sourceVertex, distributionPattern);
+ final ResultPartitionType resultType;
+
+ switch (channel.getDataExchangeMode()) {
+
+ case PIPELINED:
+ resultType = ResultPartitionType.PIPELINED;
+ break;
+
+ case BATCH:
+ // BLOCKING results are currently not supported in closed loop iterations
+ //
+ // See https://issues.apache.org/jira/browse/FLINK-1713 for details
+ resultType = channel.getSource().isOnDynamicPath()
+ ? ResultPartitionType.PIPELINED
+ : ResultPartitionType.BLOCKING;
+ break;
+
+ case PIPELINE_WITH_BATCH_FALLBACK:
+ throw new UnsupportedOperationException("Data exchange mode " +
+ channel.getDataExchangeMode() + " currently not supported.");
+
+ default:
+ throw new UnsupportedOperationException("Unknown data exchange mode.");
+
+ }
+
+ targetVertex.connectNewDataSetAsInput(sourceVertex, distributionPattern, resultType);
// -------------- configure the source task's ship strategy strategies in task config --------------
final int outputIndex = sourceConfig.getNumOutputs();
@@ -1139,7 +1167,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
boolean needsMemory = false;
// Don't add a pipeline breaker if the data exchange is already blocking.
- if (tm.breaksPipeline()) {
+ if (tm.breaksPipeline() && channel.getDataExchangeMode() != DataExchangeMode.BATCH) {
config.setInputAsynchronouslyMaterialized(inputNum, true);
needsMemory = true;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9c77f078/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 6fdc628..ad72d13 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -45,7 +45,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-
public class ExecutionJobVertex implements Serializable {
private static final long serialVersionUID = 42L;
http://git-wip-us.apache.org/repos/asf/flink/blob/9c77f078/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java
index 609ed3e..8816a69 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/AbstractJobVertex.java
@@ -18,9 +18,6 @@
package org.apache.flink.runtime.jobgraph;
-import java.util.ArrayList;
-import java.util.List;
-
import org.apache.commons.lang3.Validate;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplitSource;
@@ -29,6 +26,9 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import java.util.ArrayList;
+import java.util.List;
+
/**
* An abstract base class for a job vertex.
*/
@@ -316,12 +316,15 @@ public class AbstractJobVertex implements java.io.Serializable {
// --------------------------------------------------------------------------------------------
- public IntermediateDataSet createAndAddResultDataSet() {
- return createAndAddResultDataSet(new IntermediateDataSetID());
+ public IntermediateDataSet createAndAddResultDataSet(ResultPartitionType partitionType) {
+ return createAndAddResultDataSet(new IntermediateDataSetID(), partitionType);
}
- public IntermediateDataSet createAndAddResultDataSet(IntermediateDataSetID id) {
- IntermediateDataSet result = new IntermediateDataSet(id, ResultPartitionType.PIPELINED, this);
+ public IntermediateDataSet createAndAddResultDataSet(
+ IntermediateDataSetID id,
+ ResultPartitionType partitionType) {
+
+ IntermediateDataSet result = new IntermediateDataSet(id, partitionType, this);
this.results.add(result);
return result;
}
@@ -333,7 +336,15 @@ public class AbstractJobVertex implements java.io.Serializable {
}
public void connectNewDataSetAsInput(AbstractJobVertex input, DistributionPattern distPattern) {
- IntermediateDataSet dataSet = input.createAndAddResultDataSet();
+ connectNewDataSetAsInput(input, distPattern, ResultPartitionType.PIPELINED);
+ }
+
+ public void connectNewDataSetAsInput(
+ AbstractJobVertex input,
+ DistributionPattern distPattern,
+ ResultPartitionType partitionType) {
+
+ IntermediateDataSet dataSet = input.createAndAddResultDataSet(partitionType);
JobEdge edge = new JobEdge(dataSet, this, distPattern);
this.inputs.add(edge);
dataSet.addConsumer(edge);
http://git-wip-us.apache.org/repos/asf/flink/blob/9c77f078/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
index b16109a..e0852c6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
@@ -27,6 +27,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.junit.Test;
import org.mockito.Matchers;
@@ -129,9 +130,9 @@ public class ExecutionGraphConstructionTest {
v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL);
// create results for v2 and v3
- IntermediateDataSet v2result = v2.createAndAddResultDataSet();
- IntermediateDataSet v3result_1 = v3.createAndAddResultDataSet();
- IntermediateDataSet v3result_2 = v3.createAndAddResultDataSet();
+ IntermediateDataSet v2result = v2.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
+ IntermediateDataSet v3result_1 = v3.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
+ IntermediateDataSet v3result_2 = v3.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
List<AbstractJobVertex> ordered = new ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2, v3));
@@ -190,9 +191,9 @@ public class ExecutionGraphConstructionTest {
v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL);
// create results for v2 and v3
- IntermediateDataSet v2result = v2.createAndAddResultDataSet();
- IntermediateDataSet v3result_1 = v3.createAndAddResultDataSet();
- IntermediateDataSet v3result_2 = v3.createAndAddResultDataSet();
+ IntermediateDataSet v2result = v2.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
+ IntermediateDataSet v3result_1 = v3.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
+ IntermediateDataSet v3result_2 = v3.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
List<AbstractJobVertex> ordered = new ArrayList<AbstractJobVertex>(Arrays.asList(v1, v2, v3));
@@ -584,7 +585,7 @@ public class ExecutionGraphConstructionTest {
v2.setParallelism(7);
v3.setParallelism(2);
- IntermediateDataSet result = v1.createAndAddResultDataSet();
+ IntermediateDataSet result = v1.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
v2.connectDataSetAsInput(result, DistributionPattern.ALL_TO_ALL);
v3.connectDataSetAsInput(result, DistributionPattern.ALL_TO_ALL);