You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/21 13:35:43 UTC
[2/2] flink git commit: [FLINK-2540] [optimizer] [runtime] Propagate
union batch exchanges to union inputs
[FLINK-2540] [optimizer] [runtime] Propagate union batch exchanges to union inputs
The DataExchangeMode of union nodes was not respected when translating an OptimizedPlan
to a JobGraph. This could result in deadlocks, when a branched data flow was closed.
Union nodes with a batch exchange will propagate their exchange mode to all inputs of
their inputs when the JobGraph is generated.
This closes #1036
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/58421b84
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/58421b84
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/58421b84
Branch: refs/heads/master
Commit: 58421b848ac9190db3b7c86b5ee4f8a9fc977d90
Parents: d05d386
Author: Ufuk Celebi <uc...@apache.org>
Authored: Thu Aug 20 00:38:37 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Aug 21 12:09:01 2015 +0200
----------------------------------------------------------------------
.../apache/flink/optimizer/plan/Channel.java | 14 +-
.../plantranslate/JobGraphGenerator.java | 10 +
.../flink/test/UnionClosedBranchingTest.java | 192 +++++++++++++++++++
3 files changed, 215 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/58421b84/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/Channel.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/Channel.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/Channel.java
index 4f8b1be..b139b62 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/Channel.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/Channel.java
@@ -36,6 +36,8 @@ import org.apache.flink.runtime.io.network.DataExchangeMode;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.runtime.operators.util.LocalStrategy;
+import static com.google.common.base.Preconditions.checkNotNull;
+
/**
* A Channel represents the result produced by an operator and the data exchange
* before the consumption by the target operator.
@@ -181,7 +183,17 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
}
/**
- * Gets the data exchange mode (batch / streaming) to use for the data
+ * Sets the data exchange mode (batch / pipelined) to use for the data
+ * exchange of this channel.
+ *
+ * @return The data exchange mode of this channel.
+ */
+ public void setDataExchangeMode(DataExchangeMode dataExchangeMode) {
+ this.dataExchangeMode = checkNotNull(dataExchangeMode);
+ }
+
+ /**
+ * Gets the data exchange mode (batch / pipelined) to use for the data
* exchange of this channel.
*
* @return The data exchange mode of this channel.
http://git-wip-us.apache.org/repos/asf/flink/blob/58421b84/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index d440063..943ec2e 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -593,6 +593,16 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
if (inputPlanNode instanceof NAryUnionPlanNode) {
allInChannels = ((NAryUnionPlanNode) inputPlanNode).getListOfInputs().iterator();
+
+ // If the union node has a batch data exchange, we have to adopt the exchange mode of
+ // the inputs of the union as well, because the optimizer has a separate union
+ // node, which does not exist in the JobGraph. Otherwise, this can result in
+ // deadlocks when closing a branching flow at runtime.
+ if (input.getDataExchangeMode().equals(DataExchangeMode.BATCH)) {
+ for (Channel in : inputPlanNode.getInputs()) {
+ in.setDataExchangeMode(DataExchangeMode.BATCH);
+ }
+ }
}
else if (inputPlanNode instanceof BulkPartialSolutionPlanNode) {
if (this.vertices.get(inputPlanNode) == null) {
http://git-wip-us.apache.org/repos/asf/flink/blob/58421b84/flink-tests/src/test/java/org/apache/flink/test/UnionClosedBranchingTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/UnionClosedBranchingTest.java b/flink-tests/src/test/java/org/apache/flink/test/UnionClosedBranchingTest.java
new file mode 100644
index 0000000..f7ea911
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/UnionClosedBranchingTest.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test;
+
+import org.apache.flink.api.common.ExecutionMode;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.optimizer.plan.NAryUnionPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.plan.SourcePlanNode;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.optimizer.util.CompilerTestBase;
+import org.apache.flink.runtime.io.network.DataExchangeMode;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import static org.apache.flink.runtime.io.network.DataExchangeMode.BATCH;
+import static org.apache.flink.runtime.io.network.DataExchangeMode.PIPELINED;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * This tests a fix for FLINK-2540.
+ *
+ * <p> This test is necessary, because {@link NAryUnionPlanNode}s are not directly translated
+ * to runtime tasks by the {@link JobGraphGenerator}. Instead, the network stack unions the
+ * inputs by directly reading from multiple inputs (via {@link UnionInputGate}).
+ *
+ * <pre>
+ * (source)-\ /-\
+ * (union)-+ (join)
+ * (source)-/ \-/
+ * </pre>
+ *
+ * @see <a href="https://issues.apache.org/jira/browse/FLINK-2540">FLINK-2540</a>
+ */
+@RunWith(Parameterized.class)
+@SuppressWarnings({"serial","unchecked"})
+public class UnionClosedBranchingTest extends CompilerTestBase {
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> params() {
+ Collection<Object[]> params = Arrays.asList(new Object[][]{
+ {ExecutionMode.PIPELINED, PIPELINED, BATCH},
+ {ExecutionMode.PIPELINED_FORCED, PIPELINED, PIPELINED},
+ {ExecutionMode.BATCH, BATCH, BATCH},
+ {ExecutionMode.BATCH_FORCED, BATCH, BATCH},
+ });
+
+ // Make sure that changes to ExecutionMode are reflected in this test.
+ assertEquals(ExecutionMode.values().length, params.size());
+
+ return params;
+ }
+
+ private final ExecutionMode executionMode;
+
+ /** Expected {@link DataExchangeMode} from sources to union. */
+ private final DataExchangeMode sourceToUnion;
+
+ /** Expected {@link DataExchangeMode} from union to join. */
+ private final DataExchangeMode unionToJoin;
+
+ public UnionClosedBranchingTest(
+ ExecutionMode executionMode,
+ DataExchangeMode sourceToUnion,
+ DataExchangeMode unionToJoin) {
+
+ this.executionMode = executionMode;
+ this.sourceToUnion = sourceToUnion;
+ this.unionToJoin = unionToJoin;
+ }
+
+ @Test
+ public void testUnionClosedBranchingTest() throws Exception {
+
+ // -----------------------------------------------------------------------------------------
+ // Build test program
+ // -----------------------------------------------------------------------------------------
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().setExecutionMode(executionMode);
+ env.setParallelism(4);
+
+ DataSet<Tuple1<Integer>> src1 = env.fromElements(new Tuple1<>(0), new Tuple1<>(1));
+
+ DataSet<Tuple1<Integer>> src2 = env.fromElements(new Tuple1<>(0), new Tuple1<>(1));
+
+ DataSet<Tuple1<Integer>> union = src1.union(src2);
+
+ DataSet<Tuple2<Integer, Integer>> join = union
+ .join(union).where(0).equalTo(0)
+ .projectFirst(0).projectSecond(0);
+
+ join.output(new DiscardingOutputFormat<Tuple2<Integer, Integer>>());
+
+ // -----------------------------------------------------------------------------------------
+ // Verify optimized plan
+ // -----------------------------------------------------------------------------------------
+
+ OptimizedPlan optimizedPlan = compileNoStats(env.createProgramPlan());
+
+ SinkPlanNode sinkNode = optimizedPlan.getDataSinks().iterator().next();
+
+ DualInputPlanNode joinNode = (DualInputPlanNode) sinkNode.getPredecessor();
+
+ // Verify that the compiler correctly sets the expected data exchange modes.
+ for (Channel channel : joinNode.getInputs()) {
+ assertEquals("Unexpected data exchange mode between union and join node.",
+ unionToJoin, channel.getDataExchangeMode());
+ }
+
+ for (SourcePlanNode src : optimizedPlan.getDataSources()) {
+ for (Channel channel : src.getOutgoingChannels()) {
+ assertEquals("Unexpected data exchange mode between source and union node.",
+ sourceToUnion, channel.getDataExchangeMode());
+ }
+ }
+
+ // -----------------------------------------------------------------------------------------
+ // Verify generated JobGraph
+ // -----------------------------------------------------------------------------------------
+
+ JobGraphGenerator jgg = new JobGraphGenerator();
+ JobGraph jobGraph = jgg.compileJobGraph(optimizedPlan);
+
+ List<JobVertex> vertices = jobGraph.getVerticesSortedTopologicallyFromSources();
+
+ // Sanity check for the test setup
+ assertEquals("Unexpected number of vertices created.", 4, vertices.size());
+
+ // Verify all sources
+ JobVertex[] sources = new JobVertex[]{vertices.get(0), vertices.get(1)};
+
+ for (JobVertex src : sources) {
+ // Sanity check
+ assertTrue("Unexpected vertex type. Test setup is broken.", src.isInputVertex());
+
+ // The union is not translated to an extra union task, but the join uses a union
+ // input gate to read multiple inputs. The source create a single result per consumer.
+ assertEquals("Unexpected number of created results.", 2,
+ src.getNumberOfProducedIntermediateDataSets());
+
+ for (IntermediateDataSet dataSet : src.getProducedDataSets()) {
+ ResultPartitionType dsType = dataSet.getResultType();
+
+ // The result type is determined by the channel between the union and the join node
+ // and *not* the channel between source and union.
+ if (unionToJoin.equals(BATCH)) {
+ assertTrue("Expected batch exchange, but result type is " + dsType + ".",
+ dsType.isBlocking());
+ } else {
+ assertFalse("Expected non-batch exchange, but result type is " + dsType + ".",
+ dsType.isBlocking());
+ }
+ }
+ }
+ }
+
+}