You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/21 13:35:43 UTC

[2/2] flink git commit: [FLINK-2540] [optimizer] [runtime] Propagate union batch exchanges to union inputs

[FLINK-2540] [optimizer] [runtime] Propagate union batch exchanges to union inputs

The DataExchangeMode of union nodes was not respected when translating an OptimizedPlan
to a JobGraph. This could result in deadlocks, when a branched data flow was closed.

Union nodes with a batch exchange will propagate their exchange mode to all inputs of
their inputs when the JobGraph is generated.

This closes #1036


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/58421b84
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/58421b84
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/58421b84

Branch: refs/heads/master
Commit: 58421b848ac9190db3b7c86b5ee4f8a9fc977d90
Parents: d05d386
Author: Ufuk Celebi <uc...@apache.org>
Authored: Thu Aug 20 00:38:37 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Aug 21 12:09:01 2015 +0200

----------------------------------------------------------------------
 .../apache/flink/optimizer/plan/Channel.java    |  14 +-
 .../plantranslate/JobGraphGenerator.java        |  10 +
 .../flink/test/UnionClosedBranchingTest.java    | 192 +++++++++++++++++++
 3 files changed, 215 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/58421b84/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/Channel.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/Channel.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/Channel.java
index 4f8b1be..b139b62 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/Channel.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/Channel.java
@@ -36,6 +36,8 @@ import org.apache.flink.runtime.io.network.DataExchangeMode;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 /**
  * A Channel represents the result produced by an operator and the data exchange
  * before the consumption by the target operator.
@@ -181,7 +183,17 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
 	}
 
 	/**
-	 * Gets the data exchange mode (batch / streaming) to use for the data
+	 * Sets the data exchange mode (batch / pipelined) to use for the data
+	 * exchange of this channel.
+	 *
+	 * @return The data exchange mode of this channel.
+	 */
+	public void setDataExchangeMode(DataExchangeMode dataExchangeMode) {
+		this.dataExchangeMode = checkNotNull(dataExchangeMode);
+	}
+
+	/**
+	 * Gets the data exchange mode (batch / pipelined) to use for the data
 	 * exchange of this channel.
 	 *
 	 * @return The data exchange mode of this channel.

http://git-wip-us.apache.org/repos/asf/flink/blob/58421b84/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index d440063..943ec2e 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -593,6 +593,16 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 		
 		if (inputPlanNode instanceof NAryUnionPlanNode) {
 			allInChannels = ((NAryUnionPlanNode) inputPlanNode).getListOfInputs().iterator();
+
+			// If the union node has a batch data exchange, we have to adopt the exchange mode of
+			// the inputs of the union as well, because the optimizer has a separate union
+			// node, which does not exist in the JobGraph. Otherwise, this can result in
+			// deadlocks when closing a branching flow at runtime.
+			if (input.getDataExchangeMode().equals(DataExchangeMode.BATCH)) {
+				for (Channel in : inputPlanNode.getInputs()) {
+					in.setDataExchangeMode(DataExchangeMode.BATCH);
+				}
+			}
 		}
 		else if (inputPlanNode instanceof BulkPartialSolutionPlanNode) {
 			if (this.vertices.get(inputPlanNode) == null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/58421b84/flink-tests/src/test/java/org/apache/flink/test/UnionClosedBranchingTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/UnionClosedBranchingTest.java b/flink-tests/src/test/java/org/apache/flink/test/UnionClosedBranchingTest.java
new file mode 100644
index 0000000..f7ea911
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/UnionClosedBranchingTest.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test;
+
+import org.apache.flink.api.common.ExecutionMode;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.optimizer.plan.NAryUnionPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.plan.SourcePlanNode;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.optimizer.util.CompilerTestBase;
+import org.apache.flink.runtime.io.network.DataExchangeMode;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import static org.apache.flink.runtime.io.network.DataExchangeMode.BATCH;
+import static org.apache.flink.runtime.io.network.DataExchangeMode.PIPELINED;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * This tests a fix for FLINK-2540.
+ *
+ * <p> This test is necessary, because {@link NAryUnionPlanNode}s are not directly translated
+ * to runtime tasks by the {@link JobGraphGenerator}. Instead, the network stack unions the
+ * inputs by directly reading from multiple inputs (via {@link UnionInputGate}).
+ *
+ * <pre>
+ *   (source)-\        /-\
+ *            (union)-+  (join)
+ *   (source)-/        \-/
+ * </pre>
+ *
+ * @see <a href="https://issues.apache.org/jira/browse/FLINK-2540">FLINK-2540</a>
+ */
+@RunWith(Parameterized.class)
+@SuppressWarnings({"serial","unchecked"})
+public class UnionClosedBranchingTest extends CompilerTestBase {
+
+	@Parameterized.Parameters
+	public static Collection<Object[]> params() {
+		Collection<Object[]> params = Arrays.asList(new Object[][]{
+				{ExecutionMode.PIPELINED, PIPELINED, BATCH},
+				{ExecutionMode.PIPELINED_FORCED, PIPELINED, PIPELINED},
+				{ExecutionMode.BATCH, BATCH, BATCH},
+				{ExecutionMode.BATCH_FORCED, BATCH, BATCH},
+		});
+
+		// Make sure that changes to ExecutionMode are reflected in this test.
+		assertEquals(ExecutionMode.values().length, params.size());
+
+		return params;
+	}
+
+	private final ExecutionMode executionMode;
+
+	/** Expected {@link DataExchangeMode} from sources to union. */
+	private final DataExchangeMode sourceToUnion;
+
+	/** Expected {@link DataExchangeMode} from union to join. */
+	private final DataExchangeMode unionToJoin;
+
+	public UnionClosedBranchingTest(
+			ExecutionMode executionMode,
+			DataExchangeMode sourceToUnion,
+			DataExchangeMode unionToJoin) {
+
+		this.executionMode = executionMode;
+		this.sourceToUnion = sourceToUnion;
+		this.unionToJoin = unionToJoin;
+	}
+
+	@Test
+	public void testUnionClosedBranchingTest() throws Exception {
+
+		// -----------------------------------------------------------------------------------------
+		// Build test program
+		// -----------------------------------------------------------------------------------------
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().setExecutionMode(executionMode);
+		env.setParallelism(4);
+		
+		DataSet<Tuple1<Integer>> src1 = env.fromElements(new Tuple1<>(0), new Tuple1<>(1));
+
+		DataSet<Tuple1<Integer>> src2 = env.fromElements(new Tuple1<>(0), new Tuple1<>(1));
+
+		DataSet<Tuple1<Integer>> union = src1.union(src2);
+
+		DataSet<Tuple2<Integer, Integer>> join = union
+				.join(union).where(0).equalTo(0)
+				.projectFirst(0).projectSecond(0);
+
+		join.output(new DiscardingOutputFormat<Tuple2<Integer, Integer>>());
+
+		// -----------------------------------------------------------------------------------------
+		// Verify optimized plan
+		// -----------------------------------------------------------------------------------------
+
+		OptimizedPlan optimizedPlan = compileNoStats(env.createProgramPlan());
+
+		SinkPlanNode sinkNode = optimizedPlan.getDataSinks().iterator().next();
+
+		DualInputPlanNode joinNode = (DualInputPlanNode) sinkNode.getPredecessor();
+
+		// Verify that the compiler correctly sets the expected data exchange modes.
+		for (Channel channel : joinNode.getInputs()) {
+			assertEquals("Unexpected data exchange mode between union and join node.",
+					unionToJoin, channel.getDataExchangeMode());
+		}
+
+		for (SourcePlanNode src : optimizedPlan.getDataSources()) {
+			for (Channel channel : src.getOutgoingChannels()) {
+				assertEquals("Unexpected data exchange mode between source and union node.",
+						sourceToUnion, channel.getDataExchangeMode());
+			}
+		}
+
+		// -----------------------------------------------------------------------------------------
+		// Verify generated JobGraph
+		// -----------------------------------------------------------------------------------------
+
+		JobGraphGenerator jgg = new JobGraphGenerator();
+		JobGraph jobGraph = jgg.compileJobGraph(optimizedPlan);
+
+		List<JobVertex> vertices = jobGraph.getVerticesSortedTopologicallyFromSources();
+
+		// Sanity check for the test setup
+		assertEquals("Unexpected number of vertices created.", 4, vertices.size());
+
+		// Verify all sources
+		JobVertex[] sources = new JobVertex[]{vertices.get(0), vertices.get(1)};
+
+		for (JobVertex src : sources) {
+			// Sanity check
+			assertTrue("Unexpected vertex type. Test setup is broken.", src.isInputVertex());
+
+			// The union is not translated to an extra union task, but the join uses a union
+			// input gate to read multiple inputs. The source create a single result per consumer.
+			assertEquals("Unexpected number of created results.", 2,
+					src.getNumberOfProducedIntermediateDataSets());
+
+			for (IntermediateDataSet dataSet : src.getProducedDataSets()) {
+				ResultPartitionType dsType = dataSet.getResultType();
+
+				// The result type is determined by the channel between the union and the join node
+				// and *not* the channel between source and union.
+				if (unionToJoin.equals(BATCH)) {
+					assertTrue("Expected batch exchange, but result type is " + dsType + ".",
+							dsType.isBlocking());
+				} else {
+					assertFalse("Expected non-batch exchange, but result type is " + dsType + ".",
+							dsType.isBlocking());
+				}
+			}
+		}
+	}
+
+}