You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2018/04/04 16:05:43 UTC

flink git commit: [FLINK-9031] [optimizer] Fix DataSet Union operator translation bug.

Repository: flink
Updated Branches:
  refs/heads/release-1.3 b04f6d751 -> cb38b6def


[FLINK-9031] [optimizer] Fix DataSet Union operator translation bug.

- Adds a pass over the pre-optimized plan that fixes the output strategy of union nodes to FORWARD.

This closes #5742


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cb38b6de
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cb38b6de
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cb38b6de

Branch: refs/heads/release-1.3
Commit: cb38b6defbea5f92b6f3a5874acacb56523534f0
Parents: b04f6d7
Author: Fabian Hueske <fh...@apache.org>
Authored: Wed Mar 21 20:54:05 2018 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Apr 4 18:05:16 2018 +0200

----------------------------------------------------------------------
 .../org/apache/flink/optimizer/Optimizer.java   |   6 +
 .../flink/optimizer/dag/BinaryUnionNode.java    |   9 ++
 .../plantranslate/JobGraphGenerator.java        |   6 +
 .../traversals/GraphCreatingVisitor.java        |   6 +-
 .../UnionParallelismAndForwardEnforcer.java     |  60 +++++++
 .../flink/optimizer/UnionReplacementTest.java   | 159 +++++++++++++++++++
 .../optimizer/testfunctions/IdentityFilter.java |  31 ++++
 7 files changed, 276 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cb38b6de/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java
index f73abe2..8d81505 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java
@@ -43,6 +43,7 @@ import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.optimizer.postpass.OptimizerPostPass;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.optimizer.traversals.RangePartitionRewriter;
+import org.apache.flink.optimizer.traversals.UnionParallelismAndForwardEnforcer;
 import org.apache.flink.util.InstantiationUtil;
 
 import org.slf4j.Logger;
@@ -476,6 +477,11 @@ public class Optimizer {
 		// guaranteed memory, for further cost estimations. We assume an equal distribution of memory among consumer tasks
 		rootNode.accept(new IdAndEstimatesVisitor(this.statistics));
 
+		// We need to enforce that union nodes always forward their output to their successor.
+		// Any partitioning must be either pushed before or done after the union, but not on the union's output.
+		UnionParallelismAndForwardEnforcer unionEnforcer = new UnionParallelismAndForwardEnforcer();
+		rootNode.accept(unionEnforcer);
+
 		// We are dealing with operator DAGs, rather than operator trees.
 		// That requires us to deviate at some points from the classical DB optimizer algorithms.
 		// This step builds auxiliary structures to help track branches and joins in the DAG

http://git-wip-us.apache.org/repos/asf/flink/blob/cb38b6de/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
index cb496a2..dbb0df5 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
@@ -54,6 +54,15 @@ public class BinaryUnionNode extends TwoInputNode {
 	}
 
 	@Override
+	public void addOutgoingConnection(DagConnection connection) {
+		// ensure that union nodes have not more than one outgoing connection.
+		if (this.getOutgoingConnections() != null && this.getOutgoingConnections().size() > 0) {
+			throw new CompilerException("BinaryUnionNode may only have a single outgoing connection.");
+		}
+		super.addOutgoingConnection(connection);
+	}
+
+	@Override
 	public String getOperatorName() {
 		return "Union";
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/cb38b6de/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index a407bfe..672ae8a 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -636,6 +636,12 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 					in.setShipStrategy(ShipStrategyType.BROADCAST, in.getDataExchangeMode());
 				}
 			}
+
+			// The outgoing connection of an NAryUnion must be a forward connection.
+			if (input.getShipStrategy() != ShipStrategyType.FORWARD && !isBroadcast) {
+				throw new CompilerException("Optimized plan contains Union with non-forward outgoing ship strategy.");
+			}
+
 		}
 		else if (inputPlanNode instanceof BulkPartialSolutionPlanNode) {
 			if (this.vertices.get(inputPlanNode) == null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/cb38b6de/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java
index 3f3eae1..4872da5 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java
@@ -244,7 +244,11 @@ public class GraphCreatingVisitor implements Visitor<Operator<?>> {
 		if (n.getParallelism() < 1) {
 			// set the parallelism
 			int par = c.getParallelism();
-			if (par > 0) {
+			if (n instanceof BinaryUnionNode) {
+				// Keep parallelism of union undefined for now.
+				// It will be determined based on the parallelism of its successor.
+				par = -1;
+			} else if (par > 0) {
 				if (this.forceParallelism && par != this.defaultParallelism) {
 					par = this.defaultParallelism;
 					Optimizer.LOG.warn("The parallelism of nested dataflows (such as step functions in iterations) is " +

http://git-wip-us.apache.org/repos/asf/flink/blob/cb38b6de/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/UnionParallelismAndForwardEnforcer.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/UnionParallelismAndForwardEnforcer.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/UnionParallelismAndForwardEnforcer.java
new file mode 100644
index 0000000..3b583e0
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/UnionParallelismAndForwardEnforcer.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.traversals;
+
+import org.apache.flink.optimizer.dag.BinaryUnionNode;
+import org.apache.flink.optimizer.dag.DagConnection;
+import org.apache.flink.optimizer.dag.IterationNode;
+import org.apache.flink.optimizer.dag.OptimizerNode;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.util.Visitor;
+
+/**
+ * Enforces that all union nodes have the same parallelism as their successor (there must be only one!)
+ * and that the union node and its successor are connected by a forward ship strategy.
+ */
+public class UnionParallelismAndForwardEnforcer implements Visitor<OptimizerNode> {
+
+	@Override
+	public boolean preVisit(OptimizerNode node) {
+
+		// if the current node is a union
+		if (node instanceof BinaryUnionNode) {
+			int parallelism = -1;
+			// set ship strategy of all outgoing connections to FORWARD.
+			for (DagConnection conn : node.getOutgoingConnections()) {
+				parallelism = conn.getTarget().getParallelism();
+				conn.setShipStrategy(ShipStrategyType.FORWARD);
+			}
+			// adjust parallelism to be same as successor
+			node.setParallelism(parallelism);
+		}
+
+		// traverse the whole plan
+		return true;
+	}
+
+	@Override
+	public void postVisit(OptimizerNode node) {
+		// if required, recurse into the step function
+		if (node instanceof IterationNode) {
+			((IterationNode) node).acceptForStepFunction(this);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cb38b6de/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
index be6804b..76f2646 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
@@ -35,6 +35,9 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SourcePlanNode;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.optimizer.testfunctions.IdentityFilter;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.testfunctions.IdentityMapper;
 import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
@@ -438,4 +441,160 @@ public class UnionReplacementTest extends CompilerTestBase {
 		}
 	}
 
+	/**
+	 * Tests that a the outgoing connection of a Union node is FORWARD.
+	 * See FLINK-9031 for a bug report.
+	 *
+	 * The issue is quite hard to reproduce as the plan choice seems to depend on the enumeration
+	 * order due to lack of plan costs. This test is a smaller variant of the job that was reported
+	 * to fail.
+	 *
+	 *       /-\           /- PreFilter1 -\-/- Union - PostFilter1 - Reducer1 -\
+	 * Src -<   >- Union -<                X                                    >- Union - Out
+	 *       \-/           \- PreFilter2 -/-\- Union - PostFilter2 - Reducer2 -/
+	 */
+	@Test
+	public void testUnionForwardOutput() throws Exception {
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(DEFAULT_PARALLELISM);
+
+		DataSet<Tuple2<Long, Long>> src1 = env.fromElements(new Tuple2<>(0L, 0L));
+
+		DataSet<Tuple2<Long, Long>> u1 = src1.union(src1)
+			.map(new IdentityMapper<Tuple2<Long, Long>>());
+
+		DataSet<Tuple2<Long, Long>> s1 = u1
+			.filter(new IdentityFilter<Tuple2<Long, Long>>()).name("preFilter1");
+		DataSet<Tuple2<Long, Long>> s2 = u1
+			.filter(new IdentityFilter<Tuple2<Long, Long>>()).name("preFilter2");
+
+		DataSet<Tuple2<Long, Long>> reduced1 = s1
+			.union(s2)
+			.filter(new IdentityFilter<Tuple2<Long, Long>>()).name("postFilter1")
+			.groupBy(0)
+			.reduceGroup(new IdentityGroupReducer<Tuple2<Long, Long>>()).name("reducer1");
+		DataSet<Tuple2<Long, Long>> reduced2 = s1
+			.union(s2)
+			.filter(new IdentityFilter<Tuple2<Long, Long>>()).name("postFilter2")
+			.groupBy(1)
+			.reduceGroup(new IdentityGroupReducer<Tuple2<Long, Long>>()).name("reducer2");
+
+		reduced1
+			.union(reduced2)
+			.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
+
+		// -----------------------------------------------------------------------------------------
+		// Verify optimized plan
+		// -----------------------------------------------------------------------------------------
+
+		OptimizedPlan optimizedPlan = compileNoStats(env.createProgramPlan());
+
+		OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(optimizedPlan);
+
+		SingleInputPlanNode unionOut1 = resolver.getNode("postFilter1");
+		SingleInputPlanNode unionOut2 = resolver.getNode("postFilter2");
+
+		assertEquals(ShipStrategyType.FORWARD, unionOut1.getInput().getShipStrategy());
+		assertEquals(ShipStrategyType.FORWARD, unionOut2.getInput().getShipStrategy());
+	}
+
+	/**
+	 * Test the input and output shipping strategies for union operators with input and output
+	 * operators with different parallelisms.
+	 *
+	 * Src1 - Map(fullP) -\-/- Union - Map(fullP) - Out
+	 *                     X
+	 * Src2 - Map(halfP) -/-\- Union - Map(halfP) - Out
+	 *
+	 * The union operator must always have the same parallelism as its successor and connect to it
+	 * with a FORWARD strategy.
+	 * In this program, the input connections for union should be FORWARD for parallelism-preserving
+	 * connections and PARTITION_RANDOM for parallelism-changing connections.
+	 *
+	 */
+	@Test
+	public void testUnionInputOutputDifferentDOP() throws Exception {
+
+		int fullDop = DEFAULT_PARALLELISM;
+		int halfDop = DEFAULT_PARALLELISM / 2;
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(DEFAULT_PARALLELISM);
+
+		DataSet<Tuple2<Long, Long>> in1 = env.fromElements(new Tuple2<>(0L, 0L))
+			.map(new IdentityMapper<Tuple2<Long, Long>>()).setParallelism(fullDop).name("inDopFull");
+		DataSet<Tuple2<Long, Long>> in2 = env.fromElements(new Tuple2<>(0L, 0L))
+			.map(new IdentityMapper<Tuple2<Long, Long>>()).setParallelism(halfDop).name("inDopHalf");
+
+		DataSet<Tuple2<Long, Long>> union = in1.union(in2);
+
+		DataSet<Tuple2<Long, Long>> dopFullMap = union
+			.map(new IdentityMapper<Tuple2<Long, Long>>()).setParallelism(fullDop).name("outDopFull");
+		DataSet<Tuple2<Long, Long>> dopHalfMap = union
+			.map(new IdentityMapper<Tuple2<Long, Long>>()).setParallelism(halfDop).name("outDopHalf");
+
+		dopFullMap.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
+		dopHalfMap.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
+
+		// -----------------------------------------------------------------------------------------
+		// Verify optimized plan
+		// -----------------------------------------------------------------------------------------
+
+		OptimizedPlan optimizedPlan = compileNoStats(env.createProgramPlan());
+
+		OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(optimizedPlan);
+
+		SingleInputPlanNode inDopFull = resolver.getNode("inDopFull");
+		SingleInputPlanNode inDopHalf = resolver.getNode("inDopHalf");
+		SingleInputPlanNode outDopFull = resolver.getNode("outDopFull");
+		SingleInputPlanNode outDopHalf = resolver.getNode("outDopHalf");
+		NAryUnionPlanNode unionDopFull = (NAryUnionPlanNode) outDopFull.getInput().getSource();
+		NAryUnionPlanNode unionDopHalf = (NAryUnionPlanNode) outDopHalf.getInput().getSource();
+
+		// check in map nodes
+		assertEquals(2, inDopFull.getOutgoingChannels().size());
+		assertEquals(2, inDopHalf.getOutgoingChannels().size());
+		assertEquals(fullDop, inDopFull.getParallelism());
+		assertEquals(halfDop, inDopHalf.getParallelism());
+
+		// check union nodes
+		assertEquals(fullDop, unionDopFull.getParallelism());
+		assertEquals(halfDop, unionDopHalf.getParallelism());
+
+		// check out map nodes
+		assertEquals(fullDop, outDopFull.getParallelism());
+		assertEquals(halfDop, outDopHalf.getParallelism());
+
+		// check Union -> outMap ship strategies
+		assertEquals(ShipStrategyType.FORWARD, outDopHalf.getInput().getShipStrategy());
+		assertEquals(ShipStrategyType.FORWARD, outDopFull.getInput().getShipStrategy());
+
+		// check inMap -> Union ship strategies
+		Channel fullFull;
+		Channel fullHalf;
+		Channel halfFull;
+		Channel halfHalf;
+
+		if (inDopFull.getOutgoingChannels().get(0).getTarget() == unionDopFull) {
+			fullFull = inDopFull.getOutgoingChannels().get(0);
+			fullHalf = inDopFull.getOutgoingChannels().get(1);
+		} else {
+			fullFull = inDopFull.getOutgoingChannels().get(1);
+			fullHalf = inDopFull.getOutgoingChannels().get(0);
+		}
+		if (inDopHalf.getOutgoingChannels().get(0).getTarget() == unionDopFull) {
+			halfFull = inDopHalf.getOutgoingChannels().get(0);
+			halfHalf = inDopHalf.getOutgoingChannels().get(1);
+		} else {
+			halfFull = inDopHalf.getOutgoingChannels().get(1);
+			halfHalf = inDopHalf.getOutgoingChannels().get(0);
+		}
+
+		assertEquals(ShipStrategyType.FORWARD, fullFull.getShipStrategy());
+		assertEquals(ShipStrategyType.FORWARD, halfHalf.getShipStrategy());
+		assertEquals(ShipStrategyType.PARTITION_RANDOM, fullHalf.getShipStrategy());
+		assertEquals(ShipStrategyType.PARTITION_RANDOM, halfFull.getShipStrategy());
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cb38b6de/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityFilter.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityFilter.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityFilter.java
new file mode 100644
index 0000000..9af35ab
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityFilter.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.testfunctions;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+
+public class IdentityFilter<T> implements FilterFunction<T> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public boolean filter(T value) {
+		return true;
+	}
+}