You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2018/04/04 16:05:43 UTC
flink git commit: [FLINK-9031] [optimizer] Fix DataSet Union operator
translation bug.
Repository: flink
Updated Branches:
refs/heads/release-1.3 b04f6d751 -> cb38b6def
[FLINK-9031] [optimizer] Fix DataSet Union operator translation bug.
- Adds a pass over the pre-optimized plan that fixes the output strategy of union nodes to FORWARD.
This closes #5742
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cb38b6de
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cb38b6de
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cb38b6de
Branch: refs/heads/release-1.3
Commit: cb38b6defbea5f92b6f3a5874acacb56523534f0
Parents: b04f6d7
Author: Fabian Hueske <fh...@apache.org>
Authored: Wed Mar 21 20:54:05 2018 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Apr 4 18:05:16 2018 +0200
----------------------------------------------------------------------
.../org/apache/flink/optimizer/Optimizer.java | 6 +
.../flink/optimizer/dag/BinaryUnionNode.java | 9 ++
.../plantranslate/JobGraphGenerator.java | 6 +
.../traversals/GraphCreatingVisitor.java | 6 +-
.../UnionParallelismAndForwardEnforcer.java | 60 +++++++
.../flink/optimizer/UnionReplacementTest.java | 159 +++++++++++++++++++
.../optimizer/testfunctions/IdentityFilter.java | 31 ++++
7 files changed, 276 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/cb38b6de/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java
index f73abe2..8d81505 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/Optimizer.java
@@ -43,6 +43,7 @@ import org.apache.flink.optimizer.plan.SinkPlanNode;
import org.apache.flink.optimizer.postpass.OptimizerPostPass;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.optimizer.traversals.RangePartitionRewriter;
+import org.apache.flink.optimizer.traversals.UnionParallelismAndForwardEnforcer;
import org.apache.flink.util.InstantiationUtil;
import org.slf4j.Logger;
@@ -476,6 +477,11 @@ public class Optimizer {
// guaranteed memory, for further cost estimations. We assume an equal distribution of memory among consumer tasks
rootNode.accept(new IdAndEstimatesVisitor(this.statistics));
+ // We need to enforce that union nodes always forward their output to their successor.
+ // Any partitioning must be either pushed before or done after the union, but not on the union's output.
+ UnionParallelismAndForwardEnforcer unionEnforcer = new UnionParallelismAndForwardEnforcer();
+ rootNode.accept(unionEnforcer);
+
// We are dealing with operator DAGs, rather than operator trees.
// That requires us to deviate at some points from the classical DB optimizer algorithms.
// This step builds auxiliary structures to help track branches and joins in the DAG
http://git-wip-us.apache.org/repos/asf/flink/blob/cb38b6de/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
index cb496a2..dbb0df5 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
@@ -54,6 +54,15 @@ public class BinaryUnionNode extends TwoInputNode {
}
@Override
+ public void addOutgoingConnection(DagConnection connection) {
+ // ensure that union nodes have not more than one outgoing connection.
+ if (this.getOutgoingConnections() != null && this.getOutgoingConnections().size() > 0) {
+ throw new CompilerException("BinaryUnionNode may only have a single outgoing connection.");
+ }
+ super.addOutgoingConnection(connection);
+ }
+
+ @Override
public String getOperatorName() {
return "Union";
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cb38b6de/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index a407bfe..672ae8a 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -636,6 +636,12 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
in.setShipStrategy(ShipStrategyType.BROADCAST, in.getDataExchangeMode());
}
}
+
+ // The outgoing connection of an NAryUnion must be a forward connection.
+ if (input.getShipStrategy() != ShipStrategyType.FORWARD && !isBroadcast) {
+ throw new CompilerException("Optimized plan contains Union with non-forward outgoing ship strategy.");
+ }
+
}
else if (inputPlanNode instanceof BulkPartialSolutionPlanNode) {
if (this.vertices.get(inputPlanNode) == null) {
http://git-wip-us.apache.org/repos/asf/flink/blob/cb38b6de/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java
index 3f3eae1..4872da5 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java
@@ -244,7 +244,11 @@ public class GraphCreatingVisitor implements Visitor<Operator<?>> {
if (n.getParallelism() < 1) {
// set the parallelism
int par = c.getParallelism();
- if (par > 0) {
+ if (n instanceof BinaryUnionNode) {
+ // Keep parallelism of union undefined for now.
+ // It will be determined based on the parallelism of its successor.
+ par = -1;
+ } else if (par > 0) {
if (this.forceParallelism && par != this.defaultParallelism) {
par = this.defaultParallelism;
Optimizer.LOG.warn("The parallelism of nested dataflows (such as step functions in iterations) is " +
http://git-wip-us.apache.org/repos/asf/flink/blob/cb38b6de/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/UnionParallelismAndForwardEnforcer.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/UnionParallelismAndForwardEnforcer.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/UnionParallelismAndForwardEnforcer.java
new file mode 100644
index 0000000..3b583e0
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/UnionParallelismAndForwardEnforcer.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.traversals;
+
+import org.apache.flink.optimizer.dag.BinaryUnionNode;
+import org.apache.flink.optimizer.dag.DagConnection;
+import org.apache.flink.optimizer.dag.IterationNode;
+import org.apache.flink.optimizer.dag.OptimizerNode;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.util.Visitor;
+
+/**
+ * Enforces that all union nodes have the same parallelism as their successor (there must be only one!)
+ * and that the union node and its successor are connected by a forward ship strategy.
+ */
+public class UnionParallelismAndForwardEnforcer implements Visitor<OptimizerNode> {
+
+ @Override
+ public boolean preVisit(OptimizerNode node) {
+
+ // if the current node is a union
+ if (node instanceof BinaryUnionNode) {
+ int parallelism = -1;
+ // set ship strategy of all outgoing connections to FORWARD.
+ for (DagConnection conn : node.getOutgoingConnections()) {
+ parallelism = conn.getTarget().getParallelism();
+ conn.setShipStrategy(ShipStrategyType.FORWARD);
+ }
+ // adjust parallelism to be same as successor
+ node.setParallelism(parallelism);
+ }
+
+ // traverse the whole plan
+ return true;
+ }
+
+ @Override
+ public void postVisit(OptimizerNode node) {
+ // if required, recurse into the step function
+ if (node instanceof IterationNode) {
+ ((IterationNode) node).acceptForStepFunction(this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/cb38b6de/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
index be6804b..76f2646 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/UnionReplacementTest.java
@@ -35,6 +35,9 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
import org.apache.flink.optimizer.plan.SourcePlanNode;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.optimizer.testfunctions.IdentityFilter;
+import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
+import org.apache.flink.optimizer.testfunctions.IdentityMapper;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
@@ -438,4 +441,160 @@ public class UnionReplacementTest extends CompilerTestBase {
}
}
+ /**
+ * Tests that a the outgoing connection of a Union node is FORWARD.
+ * See FLINK-9031 for a bug report.
+ *
+ * The issue is quite hard to reproduce as the plan choice seems to depend on the enumeration
+ * order due to lack of plan costs. This test is a smaller variant of the job that was reported
+ * to fail.
+ *
+ * /-\ /- PreFilter1 -\-/- Union - PostFilter1 - Reducer1 -\
+ * Src -< >- Union -< X >- Union - Out
+ * \-/ \- PreFilter2 -/-\- Union - PostFilter2 - Reducer2 -/
+ */
+ @Test
+ public void testUnionForwardOutput() throws Exception {
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(DEFAULT_PARALLELISM);
+
+ DataSet<Tuple2<Long, Long>> src1 = env.fromElements(new Tuple2<>(0L, 0L));
+
+ DataSet<Tuple2<Long, Long>> u1 = src1.union(src1)
+ .map(new IdentityMapper<Tuple2<Long, Long>>());
+
+ DataSet<Tuple2<Long, Long>> s1 = u1
+ .filter(new IdentityFilter<Tuple2<Long, Long>>()).name("preFilter1");
+ DataSet<Tuple2<Long, Long>> s2 = u1
+ .filter(new IdentityFilter<Tuple2<Long, Long>>()).name("preFilter2");
+
+ DataSet<Tuple2<Long, Long>> reduced1 = s1
+ .union(s2)
+ .filter(new IdentityFilter<Tuple2<Long, Long>>()).name("postFilter1")
+ .groupBy(0)
+ .reduceGroup(new IdentityGroupReducer<Tuple2<Long, Long>>()).name("reducer1");
+ DataSet<Tuple2<Long, Long>> reduced2 = s1
+ .union(s2)
+ .filter(new IdentityFilter<Tuple2<Long, Long>>()).name("postFilter2")
+ .groupBy(1)
+ .reduceGroup(new IdentityGroupReducer<Tuple2<Long, Long>>()).name("reducer2");
+
+ reduced1
+ .union(reduced2)
+ .output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
+
+ // -----------------------------------------------------------------------------------------
+ // Verify optimized plan
+ // -----------------------------------------------------------------------------------------
+
+ OptimizedPlan optimizedPlan = compileNoStats(env.createProgramPlan());
+
+ OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(optimizedPlan);
+
+ SingleInputPlanNode unionOut1 = resolver.getNode("postFilter1");
+ SingleInputPlanNode unionOut2 = resolver.getNode("postFilter2");
+
+ assertEquals(ShipStrategyType.FORWARD, unionOut1.getInput().getShipStrategy());
+ assertEquals(ShipStrategyType.FORWARD, unionOut2.getInput().getShipStrategy());
+ }
+
+ /**
+ * Test the input and output shipping strategies for union operators with input and output
+ * operators with different parallelisms.
+ *
+ * Src1 - Map(fullP) -\-/- Union - Map(fullP) - Out
+ * X
+ * Src2 - Map(halfP) -/-\- Union - Map(halfP) - Out
+ *
+ * The union operator must always have the same parallelism as its successor and connect to it
+ * with a FORWARD strategy.
+ * In this program, the input connections for union should be FORWARD for parallelism-preserving
+ * connections and PARTITION_RANDOM for parallelism-changing connections.
+ *
+ */
+ @Test
+ public void testUnionInputOutputDifferentDOP() throws Exception {
+
+ int fullDop = DEFAULT_PARALLELISM;
+ int halfDop = DEFAULT_PARALLELISM / 2;
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(DEFAULT_PARALLELISM);
+
+ DataSet<Tuple2<Long, Long>> in1 = env.fromElements(new Tuple2<>(0L, 0L))
+ .map(new IdentityMapper<Tuple2<Long, Long>>()).setParallelism(fullDop).name("inDopFull");
+ DataSet<Tuple2<Long, Long>> in2 = env.fromElements(new Tuple2<>(0L, 0L))
+ .map(new IdentityMapper<Tuple2<Long, Long>>()).setParallelism(halfDop).name("inDopHalf");
+
+ DataSet<Tuple2<Long, Long>> union = in1.union(in2);
+
+ DataSet<Tuple2<Long, Long>> dopFullMap = union
+ .map(new IdentityMapper<Tuple2<Long, Long>>()).setParallelism(fullDop).name("outDopFull");
+ DataSet<Tuple2<Long, Long>> dopHalfMap = union
+ .map(new IdentityMapper<Tuple2<Long, Long>>()).setParallelism(halfDop).name("outDopHalf");
+
+ dopFullMap.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
+ dopHalfMap.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
+
+ // -----------------------------------------------------------------------------------------
+ // Verify optimized plan
+ // -----------------------------------------------------------------------------------------
+
+ OptimizedPlan optimizedPlan = compileNoStats(env.createProgramPlan());
+
+ OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(optimizedPlan);
+
+ SingleInputPlanNode inDopFull = resolver.getNode("inDopFull");
+ SingleInputPlanNode inDopHalf = resolver.getNode("inDopHalf");
+ SingleInputPlanNode outDopFull = resolver.getNode("outDopFull");
+ SingleInputPlanNode outDopHalf = resolver.getNode("outDopHalf");
+ NAryUnionPlanNode unionDopFull = (NAryUnionPlanNode) outDopFull.getInput().getSource();
+ NAryUnionPlanNode unionDopHalf = (NAryUnionPlanNode) outDopHalf.getInput().getSource();
+
+ // check in map nodes
+ assertEquals(2, inDopFull.getOutgoingChannels().size());
+ assertEquals(2, inDopHalf.getOutgoingChannels().size());
+ assertEquals(fullDop, inDopFull.getParallelism());
+ assertEquals(halfDop, inDopHalf.getParallelism());
+
+ // check union nodes
+ assertEquals(fullDop, unionDopFull.getParallelism());
+ assertEquals(halfDop, unionDopHalf.getParallelism());
+
+ // check out map nodes
+ assertEquals(fullDop, outDopFull.getParallelism());
+ assertEquals(halfDop, outDopHalf.getParallelism());
+
+ // check Union -> outMap ship strategies
+ assertEquals(ShipStrategyType.FORWARD, outDopHalf.getInput().getShipStrategy());
+ assertEquals(ShipStrategyType.FORWARD, outDopFull.getInput().getShipStrategy());
+
+ // check inMap -> Union ship strategies
+ Channel fullFull;
+ Channel fullHalf;
+ Channel halfFull;
+ Channel halfHalf;
+
+ if (inDopFull.getOutgoingChannels().get(0).getTarget() == unionDopFull) {
+ fullFull = inDopFull.getOutgoingChannels().get(0);
+ fullHalf = inDopFull.getOutgoingChannels().get(1);
+ } else {
+ fullFull = inDopFull.getOutgoingChannels().get(1);
+ fullHalf = inDopFull.getOutgoingChannels().get(0);
+ }
+ if (inDopHalf.getOutgoingChannels().get(0).getTarget() == unionDopFull) {
+ halfFull = inDopHalf.getOutgoingChannels().get(0);
+ halfHalf = inDopHalf.getOutgoingChannels().get(1);
+ } else {
+ halfFull = inDopHalf.getOutgoingChannels().get(1);
+ halfHalf = inDopHalf.getOutgoingChannels().get(0);
+ }
+
+ assertEquals(ShipStrategyType.FORWARD, fullFull.getShipStrategy());
+ assertEquals(ShipStrategyType.FORWARD, halfHalf.getShipStrategy());
+ assertEquals(ShipStrategyType.PARTITION_RANDOM, fullHalf.getShipStrategy());
+ assertEquals(ShipStrategyType.PARTITION_RANDOM, halfFull.getShipStrategy());
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cb38b6de/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityFilter.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityFilter.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityFilter.java
new file mode 100644
index 0000000..9af35ab
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/testfunctions/IdentityFilter.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.testfunctions;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+
+public class IdentityFilter<T> implements FilterFunction<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean filter(T value) {
+ return true;
+ }
+}