You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/11/27 18:55:58 UTC

[5/5] incubator-flink git commit: [FLINK-1290] Fix Optimizer to create plans when encountering incompatible partitionings.

[FLINK-1290] Fix Optimizer to create plans when encountering incompatible partitionings.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/45fb6d82
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/45fb6d82
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/45fb6d82

Branch: refs/heads/master
Commit: 45fb6d82386260fef5222cff498583036db20855
Parents: a1100af
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Nov 27 17:14:30 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Nov 27 18:18:26 2014 +0100

----------------------------------------------------------------------
 .../operators/AbstractJoinDescriptor.java       |  28 ++++
 .../compiler/operators/CoGroupDescriptor.java   |  33 +++--
 .../flink/compiler/SortPartialReuseTest.java    | 130 +++++++++++++++++++
 .../CoGroupCustomPartitioningTest.java          |  45 +++++++
 .../JoinCustomPartitioningTest.java             |  46 +++++++
 5 files changed, 270 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/45fb6d82/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java
index cb0e61c..d8f7746 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java
@@ -19,6 +19,7 @@
 package org.apache.flink.compiler.operators;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.flink.api.common.functions.Partitioner;
@@ -62,6 +63,33 @@ public abstract class AbstractJoinDescriptor extends OperatorDescriptorDual {
 		
 		if (repartitionAllowed) {
 			// partition both (hash or custom)
+			if (this.customPartitioner == null) {
+				
+				// we accept compatible partitionings of any type
+				RequestedGlobalProperties partitioned_left_any = new RequestedGlobalProperties();
+				RequestedGlobalProperties partitioned_right_any = new RequestedGlobalProperties();
+				partitioned_left_any.setAnyPartitioning(this.keys1);
+				partitioned_right_any.setAnyPartitioning(this.keys2);
+				pairs.add(new GlobalPropertiesPair(partitioned_left_any, partitioned_right_any));
+				
+				// we also explicitly add hash partitioning, as a fallback, if the any-pairs do not match
+				RequestedGlobalProperties partitioned_left_hash = new RequestedGlobalProperties();
+				RequestedGlobalProperties partitioned_right_hash = new RequestedGlobalProperties();
+				partitioned_left_hash.setHashPartitioned(this.keys1);
+				partitioned_right_hash.setHashPartitioned(this.keys2);
+				pairs.add(new GlobalPropertiesPair(partitioned_left_hash, partitioned_right_hash));
+			}
+			else {
+				RequestedGlobalProperties partitioned_left = new RequestedGlobalProperties();
+				partitioned_left.setCustomPartitioned(this.keys1, this.customPartitioner);
+				
+				RequestedGlobalProperties partitioned_right = new RequestedGlobalProperties();
+				partitioned_right.setCustomPartitioned(this.keys2, this.customPartitioner);
+				
+				return Collections.singletonList(new GlobalPropertiesPair(partitioned_left, partitioned_right));
+			}
+			
+			
 			RequestedGlobalProperties partitioned1 = new RequestedGlobalProperties();
 			if (customPartitioner == null) {
 				partitioned1.setAnyPartitioning(this.keys1);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/45fb6d82/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java
index 14f40f3..bc83c51 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/CoGroupDescriptor.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.compiler.operators;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
@@ -98,21 +99,29 @@ public class CoGroupDescriptor extends OperatorDescriptorDual {
 
 	@Override
 	protected List<GlobalPropertiesPair> createPossibleGlobalProperties() {
-		RequestedGlobalProperties partitioned1 = new RequestedGlobalProperties();
 		if (this.customPartitioner == null) {
-			partitioned1.setAnyPartitioning(this.keys1);
-		} else {
-			partitioned1.setCustomPartitioned(this.keys1, this.customPartitioner);
+			RequestedGlobalProperties partitioned_left_any = new RequestedGlobalProperties();
+			RequestedGlobalProperties partitioned_left_hash = new RequestedGlobalProperties();
+			partitioned_left_any.setAnyPartitioning(this.keys1);
+			partitioned_left_hash.setHashPartitioned(this.keys1);
+			
+			RequestedGlobalProperties partitioned_right_any = new RequestedGlobalProperties();
+			RequestedGlobalProperties partitioned_right_hash = new RequestedGlobalProperties();
+			partitioned_right_any.setAnyPartitioning(this.keys2);
+			partitioned_right_hash.setHashPartitioned(this.keys2);
+			
+			return Arrays.asList(new GlobalPropertiesPair(partitioned_left_any, partitioned_right_any),
+					new GlobalPropertiesPair(partitioned_left_hash, partitioned_right_hash));
 		}
-		
-		RequestedGlobalProperties partitioned2 = new RequestedGlobalProperties();
-		if (this.customPartitioner == null) {
-			partitioned2.setAnyPartitioning(this.keys2);
-		} else {
-			partitioned2.setCustomPartitioned(this.keys2, this.customPartitioner);
+		else {
+			RequestedGlobalProperties partitioned_left = new RequestedGlobalProperties();
+			partitioned_left.setCustomPartitioned(this.keys1, this.customPartitioner);
+			
+			RequestedGlobalProperties partitioned_right = new RequestedGlobalProperties();
+			partitioned_right.setCustomPartitioned(this.keys2, this.customPartitioner);
+			
+			return Collections.singletonList(new GlobalPropertiesPair(partitioned_left, partitioned_right));
 		}
-		
-		return Collections.singletonList(new GlobalPropertiesPair(partitioned1, partitioned2));
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/45fb6d82/flink-compiler/src/test/java/org/apache/flink/compiler/SortPartialReuseTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/SortPartialReuseTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/SortPartialReuseTest.java
new file mode 100644
index 0000000..d4c0c33
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/SortPartialReuseTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.compiler.plan.OptimizedPlan;
+import org.apache.flink.compiler.plan.SingleInputPlanNode;
+import org.apache.flink.compiler.plan.SinkPlanNode;
+import org.apache.flink.compiler.testfunctions.IdentityGroupReducer;
+import org.apache.flink.compiler.testfunctions.IdentityMapper;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.runtime.operators.util.LocalStrategy;
+
+@SuppressWarnings("serial")
+public class SortPartialReuseTest extends CompilerTestBase {
+
+	@Test
+	public void testPartialPartitioningReuse() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			@SuppressWarnings("unchecked")
+			DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L));
+			
+			input
+				.partitionByHash(0)
+				.map(new IdentityMapper<Tuple3<Long,Long,Long>>()).withConstantSet("0", "1", "2")
+				
+				.groupBy(0, 1)
+				.reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>()).withConstantSet("0", "1", "2")
+				
+				.groupBy(0)
+				.reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>())
+				
+				.print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			SingleInputPlanNode reducer2 = (SingleInputPlanNode) sink.getInput().getSource();
+			SingleInputPlanNode reducer1 = (SingleInputPlanNode) reducer2.getInput().getSource();
+			
+			assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+
+			// should be locally forwarding, reusing sort and partitioning
+			assertEquals(ShipStrategyType.FORWARD, reducer2.getInput().getShipStrategy());
+			assertEquals(LocalStrategy.NONE, reducer2.getInput().getLocalStrategy());
+			
+			assertEquals(ShipStrategyType.FORWARD, reducer1.getInput().getShipStrategy());
+			assertEquals(LocalStrategy.COMBININGSORT, reducer1.getInput().getLocalStrategy());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testCustomPartitioningNotReused() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			@SuppressWarnings("unchecked")
+			DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L));
+			
+			input
+				.partitionCustom(new Partitioner<Long>() {
+					@Override
+					public int partition(Long key, int numPartitions) { return 0; }
+				}, 0)
+				.map(new IdentityMapper<Tuple3<Long,Long,Long>>()).withConstantSet("0", "1", "2")
+				
+				.groupBy(0, 1)
+				.reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>()).withConstantSet("0", "1", "2")
+				
+				.groupBy(1)
+				.reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>())
+				
+				.print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			SingleInputPlanNode reducer2 = (SingleInputPlanNode) sink.getInput().getSource();
+			SingleInputPlanNode combiner = (SingleInputPlanNode) reducer2.getInput().getSource();
+			SingleInputPlanNode reducer1 = (SingleInputPlanNode) combiner.getInput().getSource();
+			
+			assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+
+			// should be locally forwarding, reusing sort and partitioning
+			assertEquals(ShipStrategyType.PARTITION_HASH, reducer2.getInput().getShipStrategy());
+			assertEquals(LocalStrategy.COMBININGSORT, reducer2.getInput().getLocalStrategy());
+			
+			assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
+			assertEquals(LocalStrategy.NONE, combiner.getInput().getLocalStrategy());
+			
+			assertEquals(ShipStrategyType.FORWARD, reducer1.getInput().getShipStrategy());
+			assertEquals(LocalStrategy.COMBININGSORT, reducer1.getInput().getLocalStrategy());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/45fb6d82/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/CoGroupCustomPartitioningTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/CoGroupCustomPartitioningTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/CoGroupCustomPartitioningTest.java
index c79e365..3508f42 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/CoGroupCustomPartitioningTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/CoGroupCustomPartitioningTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.*;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.KeySelector;
@@ -33,6 +34,8 @@ import org.apache.flink.compiler.plan.DualInputPlanNode;
 import org.apache.flink.compiler.plan.OptimizedPlan;
 import org.apache.flink.compiler.plan.SinkPlanNode;
 import org.apache.flink.compiler.testfunctions.DummyCoGroupFunction;
+import org.apache.flink.compiler.testfunctions.IdentityGroupReducer;
+import org.apache.flink.compiler.testfunctions.IdentityMapper;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.junit.Test;
 
@@ -224,6 +227,48 @@ public class CoGroupCustomPartitioningTest extends CompilerTestBase {
 		}
 	}
 	
+	@Test
+	public void testIncompatibleHashAndCustomPartitioning() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L));
+			
+			DataSet<Tuple3<Long, Long, Long>> partitioned = input
+				.partitionCustom(new Partitioner<Long>() {
+					@Override
+					public int partition(Long key, int numPartitions) { return 0; }
+				}, 0)
+				.map(new IdentityMapper<Tuple3<Long,Long,Long>>()).withConstantSet("0", "1", "2");
+				
+			
+			DataSet<Tuple3<Long, Long, Long>> grouped = partitioned
+				.distinct(0, 1)
+				.groupBy(1)
+				.sortGroup(0, Order.ASCENDING)
+				.reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>()).withConstantSet("0", "1");
+			
+			grouped
+				.coGroup(partitioned).where(0).equalTo(0)
+				.with(new DummyCoGroupFunction<Tuple3<Long,Long,Long>, Tuple3<Long,Long,Long>>())
+				.print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			DualInputPlanNode coGroup = (DualInputPlanNode) sink.getInput().getSource();
+
+			assertEquals(ShipStrategyType.PARTITION_HASH, coGroup.getInput1().getShipStrategy());
+			assertTrue(coGroup.getInput2().getShipStrategy() == ShipStrategyType.PARTITION_HASH || 
+						coGroup.getInput2().getShipStrategy() == ShipStrategyType.FORWARD);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
 	// --------------------------------------------------------------------------------------------
 	
 	private static class TestPartitionerInt implements Partitioner<Integer> {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/45fb6d82/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/JoinCustomPartitioningTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/JoinCustomPartitioningTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/JoinCustomPartitioningTest.java
index 0020c66..682cd5f 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/JoinCustomPartitioningTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/JoinCustomPartitioningTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.*;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -33,6 +34,9 @@ import org.apache.flink.compiler.CompilerTestBase;
 import org.apache.flink.compiler.plan.DualInputPlanNode;
 import org.apache.flink.compiler.plan.OptimizedPlan;
 import org.apache.flink.compiler.plan.SinkPlanNode;
+import org.apache.flink.compiler.testfunctions.DummyFlatJoinFunction;
+import org.apache.flink.compiler.testfunctions.IdentityGroupReducer;
+import org.apache.flink.compiler.testfunctions.IdentityMapper;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.junit.Test;
 
@@ -220,6 +224,48 @@ public class JoinCustomPartitioningTest extends CompilerTestBase {
 		}
 	}
 	
+	@Test
+	public void testIncompatibleHashAndCustomPartitioning() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L));
+			
+			DataSet<Tuple3<Long, Long, Long>> partitioned = input
+				.partitionCustom(new Partitioner<Long>() {
+					@Override
+					public int partition(Long key, int numPartitions) { return 0; }
+				}, 0)
+				.map(new IdentityMapper<Tuple3<Long,Long,Long>>()).withConstantSet("0", "1", "2");
+				
+			
+			DataSet<Tuple3<Long, Long, Long>> grouped = partitioned
+				.distinct(0, 1)
+				.groupBy(1)
+				.sortGroup(0, Order.ASCENDING)
+				.reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>()).withConstantSet("0", "1");
+			
+			grouped
+				.join(partitioned, JoinHint.REPARTITION_HASH_FIRST).where(0).equalTo(0)
+				.with(new DummyFlatJoinFunction<Tuple3<Long,Long,Long>>())
+				.print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			DualInputPlanNode coGroup = (DualInputPlanNode) sink.getInput().getSource();
+
+			assertEquals(ShipStrategyType.PARTITION_HASH, coGroup.getInput1().getShipStrategy());
+			assertTrue(coGroup.getInput2().getShipStrategy() == ShipStrategyType.PARTITION_HASH || 
+						coGroup.getInput2().getShipStrategy() == ShipStrategyType.FORWARD);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
 	// --------------------------------------------------------------------------------------------
 	
 	private static class TestPartitionerInt implements Partitioner<Integer> {