You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/04/21 01:37:55 UTC
[2/3] flink git commit: [FLINK-2998] [dataSet] Add support for
explicit range partitioning for joins and coGroup.
[FLINK-2998] [dataSet] Add support for explicit range partitioning for joins and coGroup.
This closes #1838
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/605b6d87
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/605b6d87
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/605b6d87
Branch: refs/heads/master
Commit: 605b6d870d0da38fb5446675709c7243127cdff1
Parents: d8fb230
Author: gallenvara <ga...@126.com>
Authored: Tue Mar 29 22:36:21 2016 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Apr 20 22:55:49 2016 +0200
----------------------------------------------------------------------
.../api/java/operators/PartitionOperator.java | 5 +-
.../dataproperties/GlobalProperties.java | 3 +
.../operators/AbstractJoinDescriptor.java | 9 +-
.../optimizer/operators/CoGroupDescriptor.java | 11 +-
.../operators/OperatorDescriptorDual.java | 23 ++++
...oGroupGlobalPropertiesCompatibilityTest.java | 124 +++++++++++++++++++
.../operators/CoGroupWithDistributionTest.java | 99 +++++++++++++++
.../JoinGlobalPropertiesCompatibilityTest.java | 123 ++++++++++++++++++
.../operators/JoinWithDistributionTest.java | 97 +++++++++++++++
.../optimizer/operators/TestDistribution.java | 71 +++++++++++
.../operators/shipping/OutputEmitter.java | 4 +-
.../test/javaApiOperators/CoGroupITCase.java | 81 +++++++++++-
.../CustomDistributionITCase.java | 77 +++++++++++-
.../flink/test/javaApiOperators/JoinITCase.java | 85 ++++++++++++-
14 files changed, 792 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/605b6d87/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
index fb6d579..2ed0300 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
@@ -88,8 +88,9 @@ public class PartitionOperator<T> extends SingleInputOperator<T, T, PartitionOpe
Preconditions.checkArgument(distribution == null || pMethod == PartitionMethod.RANGE, "Customized data distribution is only neccessary for range partition.");
if (distribution != null) {
- Preconditions.checkArgument(distribution.getNumberOfFields() == pKeys.getNumberOfKeyFields(), "The number of key fields in the distribution and range partitioner should be the same.");
- Preconditions.checkArgument(Arrays.equals(distribution.getKeyTypes(), pKeys.getKeyFieldTypes()), "The types of key from the distribution and range partitioner are not equal.");
+ Preconditions.checkArgument(pKeys.getNumberOfKeyFields() <= distribution.getNumberOfFields(), "The distribution must provide at least as many fields as flat key fields are specified.");
+ Preconditions.checkArgument(Arrays.equals(pKeys.getKeyFieldTypes(), Arrays.copyOfRange(distribution.getKeyTypes(), 0, pKeys.getNumberOfKeyFields())),
+ "The types of the flat key fields must be equal to the types of the fields of the distribution.");
}
if (customPartitioner != null) {
http://git-wip-us.apache.org/repos/asf/flink/blob/605b6d87/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java
index ca17c2b..e64782f 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java
@@ -311,6 +311,7 @@ public class GlobalProperties implements Cloneable {
gp.partitioning = PartitioningProperty.RANGE_PARTITIONED;
gp.ordering = newOrdering;
gp.partitioningFields = newOrdering.getInvolvedIndexes();
+ gp.distribution = this.distribution;
}
break;
case HASH_PARTITIONED:
@@ -436,6 +437,7 @@ public class GlobalProperties implements Cloneable {
throw new CompilerException("Unsupported partitioning strategy");
}
+ channel.setDataDistribution(this.distribution);
DataExchangeMode exMode = DataExchangeMode.select(exchangeMode, shipType, breakPipeline);
channel.setShipStrategy(shipType, partitionKeys, sortDirection, partitioner, exMode);
}
@@ -490,6 +492,7 @@ public class GlobalProperties implements Cloneable {
newProps.partitioning = this.partitioning;
newProps.partitioningFields = this.partitioningFields;
newProps.ordering = this.ordering;
+ newProps.distribution = this.distribution;
newProps.customPartitioner = this.customPartitioner;
newProps.uniqueFieldCombinations = this.uniqueFieldCombinations == null ? null : new HashSet<FieldSet>(this.uniqueFieldCombinations);
return newProps;
http://git-wip-us.apache.org/repos/asf/flink/blob/605b6d87/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/AbstractJoinDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/AbstractJoinDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/AbstractJoinDescriptor.java
index 6c2776e..1f2f42a 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/AbstractJoinDescriptor.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/AbstractJoinDescriptor.java
@@ -140,11 +140,12 @@ public abstract class AbstractJoinDescriptor extends OperatorDescriptorDual {
}
else if(produced1.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED &&
- produced2.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED) {
+ produced2.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED &&
+ produced1.getDataDistribution() != null && produced2.getDataDistribution() != null) {
- // Return false anyway now, we need both the partition key and data distribution
- // information to make sure whether the range partitions are equivalent.
- return false;
+ return produced1.getPartitioningFields().size() == produced2.getPartitioningFields().size() &&
+ checkSameOrdering(produced1, produced2, produced1.getPartitioningFields().size()) &&
+ produced1.getDataDistribution().equals(produced2.getDataDistribution());
}
else if(produced1.getPartitioning() == PartitioningProperty.CUSTOM_PARTITIONING &&
http://git-wip-us.apache.org/repos/asf/flink/blob/605b6d87/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CoGroupDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CoGroupDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CoGroupDescriptor.java
index b99b1c1..0e3fb55 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CoGroupDescriptor.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CoGroupDescriptor.java
@@ -149,12 +149,13 @@ public class CoGroupDescriptor extends OperatorDescriptorDual {
}
else if(produced1.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED &&
- produced2.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED) {
-
- // Return false anyway now, we need both the partition key and data distribution
- // information to make sure whether the range partitions are equivalent.
- return false;
+ produced2.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED &&
+ produced1.getDataDistribution() != null && produced2.getDataDistribution() != null) {
+ return produced1.getPartitioningFields().size() == produced2.getPartitioningFields().size() &&
+ checkSameOrdering(produced1, produced2, produced1.getPartitioningFields().size()) &&
+ produced1.getDataDistribution().equals(produced2.getDataDistribution());
+
}
else if(produced1.getPartitioning() == PartitioningProperty.CUSTOM_PARTITIONING &&
produced2.getPartitioning() == PartitioningProperty.CUSTOM_PARTITIONING) {
http://git-wip-us.apache.org/repos/asf/flink/blob/605b6d87/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorDual.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorDual.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorDual.java
index 17ea8a5..e5e43ca 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorDual.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorDual.java
@@ -125,6 +125,29 @@ public abstract class OperatorDescriptorDual implements AbstractOperatorDescript
return true;
}
+ protected boolean checkSameOrdering(GlobalProperties produced1, GlobalProperties produced2, int numRelevantFields) {
+ Ordering prod1 = produced1.getPartitioningOrdering();
+ Ordering prod2 = produced2.getPartitioningOrdering();
+
+ if (prod1 == null || prod2 == null) {
+ throw new CompilerException("The given properties do not meet this operators requirements.");
+ }
+
+ // check that order of fields is equivalent
+ if (!checkEquivalentFieldPositionsInKeyFields(
+ prod1.getInvolvedIndexes(), prod2.getInvolvedIndexes(), numRelevantFields)) {
+ return false;
+ }
+
+ // check that both inputs have the same directions of order
+ for (int i = 0; i < numRelevantFields; i++) {
+ if (prod1.getOrder(i) != prod2.getOrder(i)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
protected boolean checkSameOrdering(LocalProperties produced1, LocalProperties produced2, int numRelevantFields) {
Ordering prod1 = produced1.getOrdering();
Ordering prod2 = produced2.getOrdering();
http://git-wip-us.apache.org/repos/asf/flink/blob/605b6d87/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupGlobalPropertiesCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupGlobalPropertiesCompatibilityTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupGlobalPropertiesCompatibilityTest.java
index 23f8897..76f48e9 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupGlobalPropertiesCompatibilityTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupGlobalPropertiesCompatibilityTest.java
@@ -21,6 +21,8 @@ package org.apache.flink.optimizer.operators;
import static org.junit.Assert.*;
import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.Ordering;
import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.optimizer.dataproperties.GlobalProperties;
import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
@@ -95,6 +97,52 @@ public class CoGroupGlobalPropertiesCompatibilityTest {
assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
}
+
+ TestDistribution dist1 = new TestDistribution(1);
+ TestDistribution dist2 = new TestDistribution(1);
+
+ // test compatible range partitioning with one ordering
+ {
+ Ordering ordering1 = new Ordering();
+ for (int field : keysLeft) {
+ ordering1.appendOrdering(field, null, Order.ASCENDING);
+ }
+ Ordering ordering2 = new Ordering();
+ for (int field : keysRight) {
+ ordering2.appendOrdering(field, null, Order.ASCENDING);
+ }
+
+ RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
+ reqLeft.setRangePartitioned(ordering1, dist1);
+ RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
+ reqRight.setRangePartitioned(ordering2, dist2);
+
+ GlobalProperties propsLeft = new GlobalProperties();
+ propsLeft.setRangePartitioned(ordering1, dist1);
+ GlobalProperties propsRight = new GlobalProperties();
+ propsRight.setRangePartitioned(ordering2, dist2);
+ assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
+ }
+ // test compatible range partitioning with two orderings
+ {
+ Ordering ordering1 = new Ordering();
+ ordering1.appendOrdering(keysLeft.get(0), null, Order.DESCENDING);
+ ordering1.appendOrdering(keysLeft.get(1), null, Order.ASCENDING);
+ Ordering ordering2 = new Ordering();
+ ordering2.appendOrdering(keysRight.get(0), null, Order.DESCENDING);
+ ordering2.appendOrdering(keysRight.get(1), null, Order.ASCENDING);
+
+ RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
+ reqLeft.setRangePartitioned(ordering1, dist1);
+ RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
+ reqRight.setRangePartitioned(ordering2, dist2);
+
+ GlobalProperties propsLeft = new GlobalProperties();
+ propsLeft.setRangePartitioned(ordering1, dist1);
+ GlobalProperties propsRight = new GlobalProperties();
+ propsRight.setRangePartitioned(ordering2, dist2);
+ assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
+ }
}
catch (Exception e) {
e.printStackTrace();
@@ -152,6 +200,82 @@ public class CoGroupGlobalPropertiesCompatibilityTest {
assertFalse(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
}
+
+ TestDistribution dist1 = new TestDistribution(1);
+ TestDistribution dist2 = new TestDistribution(1);
+
+ // test incompatible range partitioning with different key size
+ {
+ Ordering ordering1 = new Ordering();
+ for (int field : keysLeft) {
+ ordering1.appendOrdering(field, null, Order.ASCENDING);
+ }
+ Ordering ordering2 = new Ordering();
+ for (int field : keysRight) {
+ ordering1.appendOrdering(field, null, Order.ASCENDING);
+ ordering2.appendOrdering(field, null, Order.ASCENDING);
+ }
+
+ RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
+ reqLeft.setRangePartitioned(ordering1, dist1);
+ RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
+ reqRight.setRangePartitioned(ordering2, dist2);
+
+ GlobalProperties propsLeft = new GlobalProperties();
+ propsLeft.setRangePartitioned(ordering1, dist1);
+ GlobalProperties propsRight = new GlobalProperties();
+ propsRight.setRangePartitioned(ordering2, dist2);
+ assertFalse(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
+ }
+
+ // test incompatible range partitioning with different ordering
+ {
+ Ordering ordering1 = new Ordering();
+ for (int field : keysLeft) {
+ ordering1.appendOrdering(field, null, Order.ASCENDING);
+ }
+ Ordering ordering2 = new Ordering();
+ for (int field : keysRight) {
+ ordering2.appendOrdering(field, null, Order.DESCENDING);
+ }
+
+ RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
+ reqLeft.setRangePartitioned(ordering1, dist1);
+ RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
+ reqRight.setRangePartitioned(ordering2, dist2);
+
+ GlobalProperties propsLeft = new GlobalProperties();
+ propsLeft.setRangePartitioned(ordering1, dist1);
+ GlobalProperties propsRight = new GlobalProperties();
+ propsRight.setRangePartitioned(ordering2, dist2);
+ assertFalse(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
+ }
+
+ TestDistribution dist3 = new TestDistribution(1);
+ TestDistribution dist4 = new TestDistribution(2);
+
+ // test incompatible range partitioning with different distribution
+ {
+ Ordering ordering1 = new Ordering();
+ for (int field : keysLeft) {
+ ordering1.appendOrdering(field, null, Order.ASCENDING);
+ }
+ Ordering ordering2 = new Ordering();
+ for (int field : keysRight) {
+ ordering2.appendOrdering(field, null, Order.ASCENDING);
+ }
+
+ RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
+ reqLeft.setRangePartitioned(ordering1, dist3);
+ RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
+ reqRight.setRangePartitioned(ordering2, dist4);
+
+ GlobalProperties propsLeft = new GlobalProperties();
+ propsLeft.setRangePartitioned(ordering1, dist3);
+ GlobalProperties propsRight = new GlobalProperties();
+ propsRight.setRangePartitioned(ordering2, dist4);
+ assertFalse(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
+ }
}
catch (Exception e) {
e.printStackTrace();
http://git-wip-us.apache.org/repos/asf/flink/blob/605b6d87/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupWithDistributionTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupWithDistributionTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupWithDistributionTest.java
new file mode 100644
index 0000000..2d37c7c
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupWithDistributionTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.operators;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.optimizer.plan.*;
+import org.apache.flink.optimizer.util.CompilerTestBase;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class CoGroupWithDistributionTest extends CompilerTestBase {
+
+ @Test
+ public void CoGroupWithSameDistributionTest() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+ DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+ TestDistribution testDistribution1 = new TestDistribution(3);
+ TestDistribution testDistribution2 = new TestDistribution(3);
+
+ DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = DataSetUtils.partitionByRange(set1, testDistribution1, 0)
+ .coGroup(DataSetUtils.partitionByRange(set2, testDistribution2, 0))
+ .where(0).equalTo(0).with(new CoGroupFunc());
+
+ coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
+ Plan plan = env.createProgramPlan();
+ OptimizedPlan oPlan = compileWithStats(plan);
+
+ SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+ DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
+ Channel input1 = coGroup.getInput1();
+ Channel input2 = coGroup.getInput2();
+ assertEquals(ShipStrategyType.FORWARD, input1.getShipStrategy());
+ assertEquals(ShipStrategyType.FORWARD, input2.getShipStrategy());
+
+ }
+
+ @Test
+ public void CoGroupWithDifferentDistributionTest() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+ DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+ TestDistribution testDistribution1 = new TestDistribution(3);
+ TestDistribution testDistribution2 = new TestDistribution(2);
+
+ DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = DataSetUtils.partitionByRange(set1, testDistribution1, 0)
+ .coGroup(DataSetUtils.partitionByRange(set2, testDistribution2, 0))
+ .where(0).equalTo(0).with(new CoGroupFunc());
+
+ coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
+ Plan plan = env.createProgramPlan();
+ OptimizedPlan oPlan = compileWithStats(plan);
+
+ SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+ DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
+ Channel input1 = coGroup.getInput1();
+ Channel input2 = coGroup.getInput2();
+ assertEquals(ShipStrategyType.PARTITION_HASH, input1.getShipStrategy());
+ assertEquals(ShipStrategyType.PARTITION_HASH, input2.getShipStrategy());
+
+ }
+
+ public static class CoGroupFunc implements CoGroupFunction<Tuple3<Integer, Integer, Integer>,
+ Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>> {
+
+ @Override
+ public void coGroup(Iterable<Tuple3<Integer, Integer, Integer>> first, Iterable<Tuple3<Integer, Integer, Integer>> second,
+ Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception {
+
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/605b6d87/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinGlobalPropertiesCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinGlobalPropertiesCompatibilityTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinGlobalPropertiesCompatibilityTest.java
index 1d559c2..610cbd8 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinGlobalPropertiesCompatibilityTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinGlobalPropertiesCompatibilityTest.java
@@ -21,6 +21,8 @@ package org.apache.flink.optimizer.operators;
import static org.junit.Assert.*;
import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.Ordering;
import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.optimizer.dataproperties.GlobalProperties;
import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
@@ -95,6 +97,51 @@ public class JoinGlobalPropertiesCompatibilityTest {
assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
}
+
+ TestDistribution dist1 = new TestDistribution(1);
+ TestDistribution dist2 = new TestDistribution(1);
+ // test compatible range partitioning with one ordering
+ {
+ Ordering ordering1 = new Ordering();
+ for (int field : keysLeft) {
+ ordering1.appendOrdering(field, null, Order.ASCENDING);
+ }
+ Ordering ordering2 = new Ordering();
+ for (int field : keysRight) {
+ ordering2.appendOrdering(field, null, Order.ASCENDING);
+ }
+
+ RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
+ reqLeft.setRangePartitioned(ordering1, dist1);
+ RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
+ reqRight.setRangePartitioned(ordering2, dist2);
+
+ GlobalProperties propsLeft = new GlobalProperties();
+ propsLeft.setRangePartitioned(ordering1, dist1);
+ GlobalProperties propsRight = new GlobalProperties();
+ propsRight.setRangePartitioned(ordering2, dist2);
+ assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
+ }
+ // test compatible range partitioning with two orderings
+ {
+ Ordering ordering1 = new Ordering();
+ ordering1.appendOrdering(keysLeft.get(0), null, Order.DESCENDING);
+ ordering1.appendOrdering(keysLeft.get(1), null, Order.ASCENDING);
+ Ordering ordering2 = new Ordering();
+ ordering2.appendOrdering(keysRight.get(0), null, Order.DESCENDING);
+ ordering2.appendOrdering(keysRight.get(1), null, Order.ASCENDING);
+
+ RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
+ reqLeft.setRangePartitioned(ordering1, dist1);
+ RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
+ reqRight.setRangePartitioned(ordering2, dist2);
+
+ GlobalProperties propsLeft = new GlobalProperties();
+ propsLeft.setRangePartitioned(ordering1, dist1);
+ GlobalProperties propsRight = new GlobalProperties();
+ propsRight.setRangePartitioned(ordering2, dist2);
+ assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
+ }
}
catch (Exception e) {
e.printStackTrace();
@@ -152,6 +199,82 @@ public class JoinGlobalPropertiesCompatibilityTest {
assertFalse(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
}
+
+ TestDistribution dist1 = new TestDistribution(1);
+ TestDistribution dist2 = new TestDistribution(1);
+
+ // test incompatible range partitioning with different key size
+ {
+ Ordering ordering1 = new Ordering();
+ for (int field : keysLeft) {
+ ordering1.appendOrdering(field, null, Order.ASCENDING);
+ }
+ Ordering ordering2 = new Ordering();
+ for (int field : keysRight) {
+ ordering1.appendOrdering(field, null, Order.ASCENDING);
+ ordering2.appendOrdering(field, null, Order.ASCENDING);
+ }
+
+ RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
+ reqLeft.setRangePartitioned(ordering1, dist1);
+ RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
+ reqRight.setRangePartitioned(ordering2, dist2);
+
+ GlobalProperties propsLeft = new GlobalProperties();
+ propsLeft.setRangePartitioned(ordering1, dist1);
+ GlobalProperties propsRight = new GlobalProperties();
+ propsRight.setRangePartitioned(ordering2, dist2);
+ assertFalse(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
+ }
+
+ // test incompatible range partitioning with different ordering
+ {
+ Ordering ordering1 = new Ordering();
+ for (int field : keysLeft) {
+ ordering1.appendOrdering(field, null, Order.ASCENDING);
+ }
+ Ordering ordering2 = new Ordering();
+ for (int field : keysRight) {
+ ordering2.appendOrdering(field, null, Order.DESCENDING);
+ }
+
+ RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
+ reqLeft.setRangePartitioned(ordering1, dist1);
+ RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
+ reqRight.setRangePartitioned(ordering2, dist2);
+
+ GlobalProperties propsLeft = new GlobalProperties();
+ propsLeft.setRangePartitioned(ordering1, dist1);
+ GlobalProperties propsRight = new GlobalProperties();
+ propsRight.setRangePartitioned(ordering2, dist2);
+ assertFalse(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
+ }
+
+ TestDistribution dist3 = new TestDistribution(1);
+ TestDistribution dist4 = new TestDistribution(2);
+
+ // test incompatible range partitioning with different distribution
+ {
+ Ordering ordering1 = new Ordering();
+ for (int field : keysLeft) {
+ ordering1.appendOrdering(field, null, Order.ASCENDING);
+ }
+ Ordering ordering2 = new Ordering();
+ for (int field : keysRight) {
+ ordering2.appendOrdering(field, null, Order.ASCENDING);
+ }
+
+ RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
+ reqLeft.setRangePartitioned(ordering1, dist3);
+ RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
+ reqRight.setRangePartitioned(ordering2, dist4);
+
+ GlobalProperties propsLeft = new GlobalProperties();
+ propsLeft.setRangePartitioned(ordering1, dist3);
+ GlobalProperties propsRight = new GlobalProperties();
+ propsRight.setRangePartitioned(ordering2, dist4);
+ assertFalse(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
+ }
}
catch (Exception e) {
e.printStackTrace();
http://git-wip-us.apache.org/repos/asf/flink/blob/605b6d87/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinWithDistributionTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinWithDistributionTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinWithDistributionTest.java
new file mode 100644
index 0000000..0c579f6
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinWithDistributionTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.operators;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.optimizer.plan.*;
+import org.apache.flink.optimizer.util.CompilerTestBase;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class JoinWithDistributionTest extends CompilerTestBase {
+
+ @Test
+ public void JoinWithSameDistributionTest() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+ DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+ TestDistribution dist1 = new TestDistribution(3);
+ TestDistribution dist2 = new TestDistribution(3);
+
+ DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = DataSetUtils.partitionByRange(set1, dist1, 0)
+ .join(DataSetUtils.partitionByRange(set2, dist2, 0))
+ .where(0).equalTo(0).with(new JoinFunc());
+
+ coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
+ Plan plan = env.createProgramPlan();
+ OptimizedPlan oPlan = compileWithStats(plan);
+
+ SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+ DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
+ Channel input1 = join.getInput1();
+ Channel input2 = join.getInput2();
+ assertEquals(ShipStrategyType.FORWARD, input1.getShipStrategy());
+ assertEquals(ShipStrategyType.FORWARD, input2.getShipStrategy());
+
+ }
+
+ @Test
+ public void JoinWithDifferentDistributionTest() throws Exception{
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+ DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+ TestDistribution dist1 = new TestDistribution(3);
+ TestDistribution dist2 = new TestDistribution(4);
+
+ DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = DataSetUtils.partitionByRange(set1, dist1, 0)
+ .join(DataSetUtils.partitionByRange(set2, dist2, 0))
+ .where(0).equalTo(0).with(new JoinFunc());
+
+ coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
+ Plan plan = env.createProgramPlan();
+ OptimizedPlan oPlan = compileWithStats(plan);
+
+ SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+ DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
+ Channel input1 = join.getInput1();
+ Channel input2 = join.getInput2();
+ assertEquals(ShipStrategyType.PARTITION_HASH, input1.getShipStrategy());
+ assertEquals(ShipStrategyType.PARTITION_HASH, input2.getShipStrategy());
+
+ }
+
+ public static class JoinFunc implements JoinFunction<Tuple3<Integer, Integer, Integer>,
+ Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>> {
+
+ @Override
+ public Tuple3<Integer, Integer, Integer> join(Tuple3<Integer, Integer, Integer> first, Tuple3<Integer, Integer, Integer> second) throws Exception {
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/605b6d87/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/TestDistribution.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/TestDistribution.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/TestDistribution.java
new file mode 100644
index 0000000..d99930b
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/TestDistribution.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.operators;
+
+import org.apache.flink.api.common.distributions.DataDistribution;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+
+public class TestDistribution implements DataDistribution {
+
+ public int boundary;
+
+ public TestDistribution(int boundary) {
+ this.boundary = boundary;
+ }
+
+ @Override
+ public Object[] getBucketBoundary(int bucketNum, int totalNumBuckets) {
+ return new Object[0];
+ }
+
+ @Override
+ public int getNumberOfFields() {
+ return 1;
+ }
+
+ @Override
+ public TypeInformation[] getKeyTypes() {
+ return new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO};
+ }
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+
+ }
+
+ @Override
+ public void read(DataInputView in) throws IOException {
+
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ boolean isEqual = true;
+ TestDistribution dist = (TestDistribution)obj;
+ if (this.boundary != dist.boundary) {
+ isEqual = false;
+ }
+ return isEqual;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/605b6d87/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputEmitter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputEmitter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputEmitter.java
index 4e20842..e6f3d26 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputEmitter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputEmitter.java
@@ -258,11 +258,11 @@ public class OutputEmitter<T> implements ChannelSelector<SerializationDelegate<T
private final int compareRecordAndBoundary(T record, Object[] boundary) {
this.comparator.extractKeys(record, keys, 0);
- if (flatComparators.length != keys.length || flatComparators.length != boundary.length) {
+ if (flatComparators.length != keys.length || flatComparators.length > boundary.length) {
throw new RuntimeException("Can not compare keys with boundary due to mismatched length.");
}
- for (int i=0; i<flatComparators.length; i++) {
+ for (int i = 0; i < flatComparators.length; i++) {
int result = flatComparators[i].compare(keys[i], boundary[i]);
if (result != 0) {
return result;
http://git-wip-us.apache.org/repos/asf/flink/blob/605b6d87/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
index 7bc8480..5b7caa7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
@@ -18,8 +18,11 @@
package org.apache.flink.test.javaApiOperators;
+import org.apache.flink.api.common.distributions.DataDistribution;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.RichCoGroupFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
@@ -28,7 +31,10 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.tuple.Tuple7;
+import org.apache.flink.api.java.utils.DataSetUtils;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.POJO;
@@ -40,6 +46,7 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@ -286,7 +293,7 @@ public class CoGroupITCase extends MultipleProgramsTestBase {
DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
DataSet<Tuple3<Integer, Long, String>> coGrouped = ds1.coGroup(ds2).
- where(0,4).equalTo(0,1).with(new Tuple5Tuple3CoGroup());
+ where(0, 4).equalTo(0, 1).with(new Tuple5Tuple3CoGroup());
List<Tuple3<Integer, Long, String>> result = coGrouped.collect();
@@ -516,6 +523,37 @@ public class CoGroupITCase extends MultipleProgramsTestBase {
compareResultAsText(result, expected);
}
+ @Test
+ public void testCoGroupWithRangePartitioning() throws Exception {
+ /*
+ * Test coGroup on tuples with multiple key field positions and same customized distribution
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = CollectionDataSets.get5TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
+
+ env.setParallelism(4);
+ TestDistribution testDis = new TestDistribution();
+ DataSet<Tuple3<Integer, Long, String>> coGrouped =
+ DataSetUtils.partitionByRange(ds1, testDis, 0, 4)
+ .coGroup(DataSetUtils.partitionByRange(ds2, testDis, 0, 1))
+ .where(0, 4)
+ .equalTo(0, 1)
+ .with(new Tuple5Tuple3CoGroup());
+
+ List<Tuple3<Integer, Long, String>> result = coGrouped.collect();
+
+ String expected = "1,1,Hallo\n" +
+ "2,2,Hallo Welt\n" +
+ "3,2,Hallo Welt wie gehts?\n" +
+ "3,2,ABC\n" +
+ "5,3,HIJ\n" +
+ "5,3,IJK\n";
+
+ compareResultAsTuples(result, expected);
+ }
// --------------------------------------------------------------------------------------------
@@ -797,4 +835,45 @@ public class CoGroupITCase extends MultipleProgramsTestBase {
}
}
}
+
+ public static class TestDistribution implements DataDistribution {
+ public Object[][] boundaries = new Object[][]{
+ new Object[]{2, 2L},
+ new Object[]{5, 4L},
+ new Object[]{10, 12L},
+ new Object[]{21, 6L}
+ };
+
+ public TestDistribution() {}
+
+ @Override
+ public Object[] getBucketBoundary(int bucketNum, int totalNumBuckets) {
+ return boundaries[bucketNum];
+ }
+
+ @Override
+ public int getNumberOfFields() {
+ return 2;
+ }
+
+ @Override
+ public TypeInformation[] getKeyTypes() {
+ return new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO};
+ }
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+
+ }
+
+ @Override
+ public void read(DataInputView in) throws IOException {
+
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj instanceof TestDistribution;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/605b6d87/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
index 427e83e..c6bc08e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
@@ -49,13 +49,13 @@ public class CustomDistributionITCase {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
- DataSet<Tuple3<Integer, Long, String>> input1 = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
final TestDataDist1 dist = new TestDataDist1();
env.setParallelism(dist.getParallelism());
DataSet<Boolean> result = DataSetUtils
- .partitionByRange(input1, dist, 0)
+ .partitionByRange(input, dist, 0)
.mapPartition(new RichMapPartitionFunction<Tuple3<Integer, Long, String>, Boolean>() {
@Override
@@ -104,7 +104,7 @@ public class CustomDistributionITCase {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
- DataSet<Tuple3<Integer, Integer, String>> input1 = env.fromElements(
+ DataSet<Tuple3<Integer, Integer, String>> input = env.fromElements(
new Tuple3<>(1, 5, "Hi"),
new Tuple3<>(1, 6, "Hi"),
new Tuple3<>(1, 7, "Hi"),
@@ -127,7 +127,7 @@ public class CustomDistributionITCase {
env.setParallelism(dist.getParallelism());
DataSet<Boolean> result = DataSetUtils
- .partitionByRange(input1, dist, 0, 1)
+ .partitionByRange(input, dist, 0, 1)
.mapPartition(new RichMapPartitionFunction<Tuple3<Integer, Integer, String>, Boolean>() {
@Override
@@ -175,6 +175,75 @@ public class CustomDistributionITCase {
env.execute();
}
+ @Test
+ public void testPartitionKeyLessDistribution() throws Exception{
+ /*
+ * Test the number of partition keys less than the number of distribution fields
+ */
+ ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+ final TestDataDist2 dist = new TestDataDist2();
+
+ env.setParallelism(dist.getParallelism());
+
+ DataSet<Boolean> result = DataSetUtils
+ .partitionByRange(input, dist, 0)
+ .mapPartition(new RichMapPartitionFunction<Tuple3<Integer, Long, String>, Boolean>() {
+
+ @Override
+ public void mapPartition(Iterable<Tuple3<Integer, Long, String>> values, Collector<Boolean> out) throws Exception {
+ int pIdx = getRuntimeContext().getIndexOfThisSubtask();
+
+ for (Tuple3<Integer, Long, String> s : values) {
+ boolean correctlyPartitioned = true;
+ if (pIdx == 0) {
+ Integer[] upper = dist.boundaries[0];
+ if (s.f0.compareTo(upper[0]) > 0) {
+ correctlyPartitioned = false;
+ }
+ }
+ else if (pIdx > 0 && pIdx < dist.getParallelism() - 1) {
+ Integer[] lower = dist.boundaries[pIdx - 1];
+ Integer[] upper = dist.boundaries[pIdx];
+ if (s.f0.compareTo(upper[0]) > 0 || (s.f0.compareTo(lower[0]) <= 0)) {
+ correctlyPartitioned = false;
+ }
+ }
+ else {
+ Integer[] lower = dist.boundaries[pIdx - 1];
+ if ((s.f0.compareTo(lower[0]) <= 0)) {
+ correctlyPartitioned = false;
+ }
+ }
+
+ if (!correctlyPartitioned) {
+ fail("Record was not correctly partitioned: " + s.toString());
+ }
+ }
+ }
+ }
+ );
+
+ result.output(new DiscardingOutputFormat<Boolean>());
+ env.execute();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testPartitionMoreThanDistribution() throws Exception{
+ /*
+ * Test the number of partition keys larger than the number of distribution fields
+ */
+
+ ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+ final TestDataDist2 dist = new TestDataDist2();
+
+ DataSet<Tuple3<Integer, Long, String>> result = DataSetUtils
+ .partitionByRange(input, dist, 0, 1, 2);
+ }
+
/**
* The class is used to do the tests of range partition with one key.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/605b6d87/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
index 61e07fe..0d8c80b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
@@ -18,11 +18,15 @@
package org.apache.flink.test.javaApiOperators;
+import java.io.IOException;
import java.util.Collection;
import java.util.List;
+import org.apache.flink.api.common.distributions.DataDistribution;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.common.functions.RichFlatJoinFunction;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -30,7 +34,10 @@ import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.api.java.tuple.Tuple7;
+import org.apache.flink.api.java.utils.DataSetUtils;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType;
import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.POJO;
@@ -87,8 +94,8 @@ public class JoinITCase extends MultipleProgramsTestBase {
DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
DataSet<Tuple2<String, String>> joinDs =
ds1.join(ds2)
- .where(0,1)
- .equalTo(0,4)
+ .where(0, 1)
+ .equalTo(0, 4)
.with(new T3T5FlatJoin());
List<Tuple2<String, String>> result = joinDs.collect();
@@ -680,6 +687,7 @@ public class JoinITCase extends MultipleProgramsTestBase {
compareResultAsTuples(result, expected);
}
+ @Test
public void testJoinWithAtomicType2() throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -696,6 +704,38 @@ public class JoinITCase extends MultipleProgramsTestBase {
compareResultAsTuples(result, expected);
}
+ @Test
+ public void testJoinWithRangePartitioning() throws Exception {
+ /*
+ * Test Join on tuples with multiple key field positions and same customized distribution
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+
+ env.setParallelism(4);
+ TestDistribution testDis = new TestDistribution();
+ DataSet<Tuple2<String, String>> joinDs =
+ DataSetUtils.partitionByRange(ds1, testDis, 0, 1)
+ .join(DataSetUtils.partitionByRange(ds2, testDis, 0, 4))
+ .where(0, 1)
+ .equalTo(0, 4)
+ .with(new T3T5FlatJoin());
+
+ List<Tuple2<String, String>> result = joinDs.collect();
+
+ String expected = "Hi,Hallo\n" +
+ "Hello,Hallo Welt\n" +
+ "Hello world,Hallo Welt wie gehts?\n" +
+ "Hello world,ABC\n" +
+ "I am fine.,HIJ\n" +
+ "I am fine.,IJK\n";
+
+ compareResultAsTuples(result, expected);
+ }
+
public static class T3T5FlatJoin implements FlatJoinFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple2<String, String>> {
@Override
@@ -778,4 +818,45 @@ public class JoinITCase extends MultipleProgramsTestBase {
return new Tuple2<String, String>(first.myString, second.f2);
}
}
+
+ public static class TestDistribution implements DataDistribution {
+ public Object boundaries[][] = new Object[][]{
+ new Object[]{2, 2L},
+ new Object[]{5, 4L},
+ new Object[]{10, 12L},
+ new Object[]{21, 6L}
+ };
+
+ public TestDistribution() {}
+
+ @Override
+ public Object[] getBucketBoundary(int bucketNum, int totalNumBuckets) {
+ return boundaries[bucketNum];
+ }
+
+ @Override
+ public int getNumberOfFields() {
+ return 2;
+ }
+
+ @Override
+ public TypeInformation[] getKeyTypes() {
+ return new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO};
+ }
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+
+ }
+
+ @Override
+ public void read(DataInputView in) throws IOException {
+
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj instanceof TestDistribution;
+ }
+ }
}