You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/11/27 18:55:58 UTC
[5/5] incubator-flink git commit: [FLINK-1290] Fix Optimizer to
create plans when encountering incompatible partitionings.
[FLINK-1290] Fix Optimizer to create plans when encountering incompatible partitionings.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/45fb6d82
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/45fb6d82
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/45fb6d82
Branch: refs/heads/master
Commit: 45fb6d82386260fef5222cff498583036db20855
Parents: a1100af
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Nov 27 17:14:30 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Nov 27 18:18:26 2014 +0100
----------------------------------------------------------------------
.../operators/AbstractJoinDescriptor.java | 28 ++++
.../compiler/operators/CoGroupDescriptor.java | 33 +++--
.../flink/compiler/SortPartialReuseTest.java | 130 +++++++++++++++++++
.../CoGroupCustomPartitioningTest.java | 45 +++++++
.../JoinCustomPartitioningTest.java | 46 +++++++
5 files changed, 270 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/45fb6d82/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java
index cb0e61c..d8f7746 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java
@@ -19,6 +19,7 @@
package org.apache.flink.compiler.operators;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import org.apache.flink.api.common.functions.Partitioner;
@@ -62,6 +63,33 @@ public abstract class AbstractJoinDescriptor extends OperatorDescriptorDual {
if (repartitionAllowed) {
// partition both (hash or custom)
+ if (this.customPartitioner == null) {
+
+ // we accept compatible partitionings of any type
+ RequestedGlobalProperties partitioned_left_any = new RequestedGlobalProperties();
+ RequestedGlobalProperties partitioned_right_any = new RequestedGlobalProperties();
+ partitioned_left_any.setAnyPartitioning(this.keys1);
+ partitioned_right_any.setAnyPartitioning(this.keys2);
+ pairs.add(new GlobalPropertiesPair(partitioned_left_any, partitioned_right_any));
+
+ // we also explicitly add hash partitioning, as a fallback, if the any-pairs do not match
+ RequestedGlobalProperties partitioned_left_hash = new RequestedGlobalProperties();
+ RequestedGlobalProperties partitioned_right_hash = new RequestedGlobalProperties();
+ partitioned_left_hash.setHashPartitioned(this.keys1);
+ partitioned_right_hash.setHashPartitioned(this.keys2);
+ pairs.add(new GlobalPropertiesPair(partitioned_left_hash, partitioned_right_hash));
+ }
+ else {
+ RequestedGlobalProperties partitioned_left = new RequestedGlobalProperties();
+ partitioned_left.setCustomPartitioned(this.keys1, this.customPartitioner);
+
+ RequestedGlobalProperties partitioned_right = new RequestedGlobalProperties();
+ partitioned_right.setCustomPartitioned(this.keys2, this.customPartitioner);
+
+ return Collections.singletonList(new GlobalPropertiesPair(partitioned_left, partitioned_right));
+ }
+
+
RequestedGlobalProperties partitioned1 = new RequestedGlobalProperties();
if (customPartitioner == null) {
partitioned1.setAnyPartitioning(this.keys1);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/45fb6d82/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java
index 14f40f3..bc83c51 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java
@@ -18,6 +18,7 @@
package org.apache.flink.compiler.operators;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -98,21 +99,29 @@ public class CoGroupDescriptor extends OperatorDescriptorDual {
@Override
protected List<GlobalPropertiesPair> createPossibleGlobalProperties() {
- RequestedGlobalProperties partitioned1 = new RequestedGlobalProperties();
if (this.customPartitioner == null) {
- partitioned1.setAnyPartitioning(this.keys1);
- } else {
- partitioned1.setCustomPartitioned(this.keys1, this.customPartitioner);
+ RequestedGlobalProperties partitioned_left_any = new RequestedGlobalProperties();
+ RequestedGlobalProperties partitioned_left_hash = new RequestedGlobalProperties();
+ partitioned_left_any.setAnyPartitioning(this.keys1);
+ partitioned_left_hash.setHashPartitioned(this.keys1);
+
+ RequestedGlobalProperties partitioned_right_any = new RequestedGlobalProperties();
+ RequestedGlobalProperties partitioned_right_hash = new RequestedGlobalProperties();
+ partitioned_right_any.setAnyPartitioning(this.keys2);
+ partitioned_right_hash.setHashPartitioned(this.keys2);
+
+ return Arrays.asList(new GlobalPropertiesPair(partitioned_left_any, partitioned_right_any),
+ new GlobalPropertiesPair(partitioned_left_hash, partitioned_right_hash));
}
-
- RequestedGlobalProperties partitioned2 = new RequestedGlobalProperties();
- if (this.customPartitioner == null) {
- partitioned2.setAnyPartitioning(this.keys2);
- } else {
- partitioned2.setCustomPartitioned(this.keys2, this.customPartitioner);
+ else {
+ RequestedGlobalProperties partitioned_left = new RequestedGlobalProperties();
+ partitioned_left.setCustomPartitioned(this.keys1, this.customPartitioner);
+
+ RequestedGlobalProperties partitioned_right = new RequestedGlobalProperties();
+ partitioned_right.setCustomPartitioned(this.keys2, this.customPartitioner);
+
+ return Collections.singletonList(new GlobalPropertiesPair(partitioned_left, partitioned_right));
}
-
- return Collections.singletonList(new GlobalPropertiesPair(partitioned1, partitioned2));
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/45fb6d82/flink-compiler/src/test/java/org/apache/flink/compiler/SortPartialReuseTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/SortPartialReuseTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/SortPartialReuseTest.java
new file mode 100644
index 0000000..d4c0c33
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/SortPartialReuseTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.compiler.plan.OptimizedPlan;
+import org.apache.flink.compiler.plan.SingleInputPlanNode;
+import org.apache.flink.compiler.plan.SinkPlanNode;
+import org.apache.flink.compiler.testfunctions.IdentityGroupReducer;
+import org.apache.flink.compiler.testfunctions.IdentityMapper;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.runtime.operators.util.LocalStrategy;
+
+@SuppressWarnings("serial")
+public class SortPartialReuseTest extends CompilerTestBase {
+
+ @Test
+ public void testPartialPartitioningReuse() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ @SuppressWarnings("unchecked")
+ DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L));
+
+ input
+ .partitionByHash(0)
+ .map(new IdentityMapper<Tuple3<Long,Long,Long>>()).withConstantSet("0", "1", "2")
+
+ .groupBy(0, 1)
+ .reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>()).withConstantSet("0", "1", "2")
+
+ .groupBy(0)
+ .reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>())
+
+ .print();
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+ SinkPlanNode sink = op.getDataSinks().iterator().next();
+ SingleInputPlanNode reducer2 = (SingleInputPlanNode) sink.getInput().getSource();
+ SingleInputPlanNode reducer1 = (SingleInputPlanNode) reducer2.getInput().getSource();
+
+ assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+
+ // should be locally forwarding, reusing sort and partitioning
+ assertEquals(ShipStrategyType.FORWARD, reducer2.getInput().getShipStrategy());
+ assertEquals(LocalStrategy.NONE, reducer2.getInput().getLocalStrategy());
+
+ assertEquals(ShipStrategyType.FORWARD, reducer1.getInput().getShipStrategy());
+ assertEquals(LocalStrategy.COMBININGSORT, reducer1.getInput().getLocalStrategy());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testCustomPartitioningNotReused() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ @SuppressWarnings("unchecked")
+ DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L));
+
+ input
+ .partitionCustom(new Partitioner<Long>() {
+ @Override
+ public int partition(Long key, int numPartitions) { return 0; }
+ }, 0)
+ .map(new IdentityMapper<Tuple3<Long,Long,Long>>()).withConstantSet("0", "1", "2")
+
+ .groupBy(0, 1)
+ .reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>()).withConstantSet("0", "1", "2")
+
+ .groupBy(1)
+ .reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>())
+
+ .print();
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+ SinkPlanNode sink = op.getDataSinks().iterator().next();
+ SingleInputPlanNode reducer2 = (SingleInputPlanNode) sink.getInput().getSource();
+ SingleInputPlanNode combiner = (SingleInputPlanNode) reducer2.getInput().getSource();
+ SingleInputPlanNode reducer1 = (SingleInputPlanNode) combiner.getInput().getSource();
+
+ assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+
+ // should be locally forwarding, reusing sort and partitioning
+ assertEquals(ShipStrategyType.PARTITION_HASH, reducer2.getInput().getShipStrategy());
+ assertEquals(LocalStrategy.COMBININGSORT, reducer2.getInput().getLocalStrategy());
+
+ assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
+ assertEquals(LocalStrategy.NONE, combiner.getInput().getLocalStrategy());
+
+ assertEquals(ShipStrategyType.FORWARD, reducer1.getInput().getShipStrategy());
+ assertEquals(LocalStrategy.COMBININGSORT, reducer1.getInput().getLocalStrategy());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/45fb6d82/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/CoGroupCustomPartitioningTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/CoGroupCustomPartitioningTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/CoGroupCustomPartitioningTest.java
index c79e365..3508f42 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/CoGroupCustomPartitioningTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/CoGroupCustomPartitioningTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.*;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
@@ -33,6 +34,8 @@ import org.apache.flink.compiler.plan.DualInputPlanNode;
import org.apache.flink.compiler.plan.OptimizedPlan;
import org.apache.flink.compiler.plan.SinkPlanNode;
import org.apache.flink.compiler.testfunctions.DummyCoGroupFunction;
+import org.apache.flink.compiler.testfunctions.IdentityGroupReducer;
+import org.apache.flink.compiler.testfunctions.IdentityMapper;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.junit.Test;
@@ -224,6 +227,48 @@ public class CoGroupCustomPartitioningTest extends CompilerTestBase {
}
}
+ @Test
+ public void testIncompatibleHashAndCustomPartitioning() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L));
+
+ DataSet<Tuple3<Long, Long, Long>> partitioned = input
+ .partitionCustom(new Partitioner<Long>() {
+ @Override
+ public int partition(Long key, int numPartitions) { return 0; }
+ }, 0)
+ .map(new IdentityMapper<Tuple3<Long,Long,Long>>()).withConstantSet("0", "1", "2");
+
+
+ DataSet<Tuple3<Long, Long, Long>> grouped = partitioned
+ .distinct(0, 1)
+ .groupBy(1)
+ .sortGroup(0, Order.ASCENDING)
+ .reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>()).withConstantSet("0", "1");
+
+ grouped
+ .coGroup(partitioned).where(0).equalTo(0)
+ .with(new DummyCoGroupFunction<Tuple3<Long,Long,Long>, Tuple3<Long,Long,Long>>())
+ .print();
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+ SinkPlanNode sink = op.getDataSinks().iterator().next();
+ DualInputPlanNode coGroup = (DualInputPlanNode) sink.getInput().getSource();
+
+ assertEquals(ShipStrategyType.PARTITION_HASH, coGroup.getInput1().getShipStrategy());
+ assertTrue(coGroup.getInput2().getShipStrategy() == ShipStrategyType.PARTITION_HASH ||
+ coGroup.getInput2().getShipStrategy() == ShipStrategyType.FORWARD);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
// --------------------------------------------------------------------------------------------
private static class TestPartitionerInt implements Partitioner<Integer> {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/45fb6d82/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/JoinCustomPartitioningTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/JoinCustomPartitioningTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/JoinCustomPartitioningTest.java
index 0020c66..682cd5f 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/JoinCustomPartitioningTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/JoinCustomPartitioningTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.*;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
@@ -33,6 +34,9 @@ import org.apache.flink.compiler.CompilerTestBase;
import org.apache.flink.compiler.plan.DualInputPlanNode;
import org.apache.flink.compiler.plan.OptimizedPlan;
import org.apache.flink.compiler.plan.SinkPlanNode;
+import org.apache.flink.compiler.testfunctions.DummyFlatJoinFunction;
+import org.apache.flink.compiler.testfunctions.IdentityGroupReducer;
+import org.apache.flink.compiler.testfunctions.IdentityMapper;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.junit.Test;
@@ -220,6 +224,48 @@ public class JoinCustomPartitioningTest extends CompilerTestBase {
}
}
+ @Test
+ public void testIncompatibleHashAndCustomPartitioning() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L));
+
+ DataSet<Tuple3<Long, Long, Long>> partitioned = input
+ .partitionCustom(new Partitioner<Long>() {
+ @Override
+ public int partition(Long key, int numPartitions) { return 0; }
+ }, 0)
+ .map(new IdentityMapper<Tuple3<Long,Long,Long>>()).withConstantSet("0", "1", "2");
+
+
+ DataSet<Tuple3<Long, Long, Long>> grouped = partitioned
+ .distinct(0, 1)
+ .groupBy(1)
+ .sortGroup(0, Order.ASCENDING)
+ .reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>()).withConstantSet("0", "1");
+
+ grouped
+ .join(partitioned, JoinHint.REPARTITION_HASH_FIRST).where(0).equalTo(0)
+ .with(new DummyFlatJoinFunction<Tuple3<Long,Long,Long>>())
+ .print();
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+ SinkPlanNode sink = op.getDataSinks().iterator().next();
+ DualInputPlanNode coGroup = (DualInputPlanNode) sink.getInput().getSource();
+
+ assertEquals(ShipStrategyType.PARTITION_HASH, coGroup.getInput1().getShipStrategy());
+ assertTrue(coGroup.getInput2().getShipStrategy() == ShipStrategyType.PARTITION_HASH ||
+ coGroup.getInput2().getShipStrategy() == ShipStrategyType.FORWARD);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
// --------------------------------------------------------------------------------------------
private static class TestPartitionerInt implements Partitioner<Integer> {