You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/04/21 01:37:55 UTC

[2/3] flink git commit: [FLINK-2998] [dataSet] Add support for explicit range partitioning for joins and coGroup.

[FLINK-2998] [dataSet] Add support for explicit range partitioning for joins and coGroup.

This closes #1838


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/605b6d87
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/605b6d87
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/605b6d87

Branch: refs/heads/master
Commit: 605b6d870d0da38fb5446675709c7243127cdff1
Parents: d8fb230
Author: gallenvara <ga...@126.com>
Authored: Tue Mar 29 22:36:21 2016 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Apr 20 22:55:49 2016 +0200

----------------------------------------------------------------------
 .../api/java/operators/PartitionOperator.java   |   5 +-
 .../dataproperties/GlobalProperties.java        |   3 +
 .../operators/AbstractJoinDescriptor.java       |   9 +-
 .../optimizer/operators/CoGroupDescriptor.java  |  11 +-
 .../operators/OperatorDescriptorDual.java       |  23 ++++
 ...oGroupGlobalPropertiesCompatibilityTest.java | 124 +++++++++++++++++++
 .../operators/CoGroupWithDistributionTest.java  |  99 +++++++++++++++
 .../JoinGlobalPropertiesCompatibilityTest.java  | 123 ++++++++++++++++++
 .../operators/JoinWithDistributionTest.java     |  97 +++++++++++++++
 .../optimizer/operators/TestDistribution.java   |  71 +++++++++++
 .../operators/shipping/OutputEmitter.java       |   4 +-
 .../test/javaApiOperators/CoGroupITCase.java    |  81 +++++++++++-
 .../CustomDistributionITCase.java               |  77 +++++++++++-
 .../flink/test/javaApiOperators/JoinITCase.java |  85 ++++++++++++-
 14 files changed, 792 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/605b6d87/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
index fb6d579..2ed0300 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/PartitionOperator.java
@@ -88,8 +88,9 @@ public class PartitionOperator<T> extends SingleInputOperator<T, T, PartitionOpe
 		Preconditions.checkArgument(distribution == null || pMethod == PartitionMethod.RANGE, "Customized data distribution is only neccessary for range partition.");
 		
 		if (distribution != null) {
-			Preconditions.checkArgument(distribution.getNumberOfFields() == pKeys.getNumberOfKeyFields(), "The number of key fields in the distribution and range partitioner should be the same.");
-			Preconditions.checkArgument(Arrays.equals(distribution.getKeyTypes(), pKeys.getKeyFieldTypes()), "The types of key from the distribution and range partitioner are not equal.");
+			Preconditions.checkArgument(pKeys.getNumberOfKeyFields() <= distribution.getNumberOfFields(), "The distribution must provide at least as many fields as flat key fields are specified.");
+			Preconditions.checkArgument(Arrays.equals(pKeys.getKeyFieldTypes(), Arrays.copyOfRange(distribution.getKeyTypes(), 0, pKeys.getNumberOfKeyFields())),
+					"The types of the flat key fields must be equal to the types of the fields of the distribution.");
 		}
 		
 		if (customPartitioner != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/605b6d87/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java
index ca17c2b..e64782f 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dataproperties/GlobalProperties.java
@@ -311,6 +311,7 @@ public class GlobalProperties implements Cloneable {
 					gp.partitioning = PartitioningProperty.RANGE_PARTITIONED;
 					gp.ordering = newOrdering;
 					gp.partitioningFields = newOrdering.getInvolvedIndexes();
+					gp.distribution = this.distribution;
 				}
 				break;
 			case HASH_PARTITIONED:
@@ -436,6 +437,7 @@ public class GlobalProperties implements Cloneable {
 				throw new CompilerException("Unsupported partitioning strategy");
 		}
 
+		channel.setDataDistribution(this.distribution);
 		DataExchangeMode exMode = DataExchangeMode.select(exchangeMode, shipType, breakPipeline);
 		channel.setShipStrategy(shipType, partitionKeys, sortDirection, partitioner, exMode);
 	}
@@ -490,6 +492,7 @@ public class GlobalProperties implements Cloneable {
 		newProps.partitioning = this.partitioning;
 		newProps.partitioningFields = this.partitioningFields;
 		newProps.ordering = this.ordering;
+		newProps.distribution = this.distribution;
 		newProps.customPartitioner = this.customPartitioner;
 		newProps.uniqueFieldCombinations = this.uniqueFieldCombinations == null ? null : new HashSet<FieldSet>(this.uniqueFieldCombinations);
 		return newProps;

http://git-wip-us.apache.org/repos/asf/flink/blob/605b6d87/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/AbstractJoinDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/AbstractJoinDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/AbstractJoinDescriptor.java
index 6c2776e..1f2f42a 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/AbstractJoinDescriptor.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/AbstractJoinDescriptor.java
@@ -140,11 +140,12 @@ public abstract class AbstractJoinDescriptor extends OperatorDescriptorDual {
 
 			}
 			else if(produced1.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED &&
-					produced2.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED) {
+					produced2.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED &&
+					produced1.getDataDistribution() != null && produced2.getDataDistribution() != null) {
 
-				// Return false anyway now, we need both the partition key and data distribution
-				// information to make sure whether the range partitions are equivalent.
-				return false;
+				return produced1.getPartitioningFields().size() == produced2.getPartitioningFields().size() &&
+						checkSameOrdering(produced1, produced2, produced1.getPartitioningFields().size()) &&
+						produced1.getDataDistribution().equals(produced2.getDataDistribution());
 
 			}
 			else if(produced1.getPartitioning() == PartitioningProperty.CUSTOM_PARTITIONING &&

http://git-wip-us.apache.org/repos/asf/flink/blob/605b6d87/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CoGroupDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CoGroupDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CoGroupDescriptor.java
index b99b1c1..0e3fb55 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CoGroupDescriptor.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CoGroupDescriptor.java
@@ -149,12 +149,13 @@ public class CoGroupDescriptor extends OperatorDescriptorDual {
 
 		}
 		else if(produced1.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED &&
-				produced2.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED) {
-
-			// Return false anyway now, we need both the partition key and data distribution
-			// information to make sure whether the range partitions are equivalent.
-			return false;
+				produced2.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED &&
+				produced1.getDataDistribution() != null && produced2.getDataDistribution() != null) {
 
+			return produced1.getPartitioningFields().size() == produced2.getPartitioningFields().size() &&
+					checkSameOrdering(produced1, produced2, produced1.getPartitioningFields().size()) &&
+					produced1.getDataDistribution().equals(produced2.getDataDistribution());
+			
 		}
 		else if(produced1.getPartitioning() == PartitioningProperty.CUSTOM_PARTITIONING &&
 				produced2.getPartitioning() == PartitioningProperty.CUSTOM_PARTITIONING) {

http://git-wip-us.apache.org/repos/asf/flink/blob/605b6d87/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorDual.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorDual.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorDual.java
index 17ea8a5..e5e43ca 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorDual.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorDual.java
@@ -125,6 +125,29 @@ public abstract class OperatorDescriptorDual implements AbstractOperatorDescript
 		return true;
 	}
 
+	protected boolean checkSameOrdering(GlobalProperties produced1, GlobalProperties produced2, int numRelevantFields) {
+		Ordering prod1 = produced1.getPartitioningOrdering();
+		Ordering prod2 = produced2.getPartitioningOrdering();
+
+		if (prod1 == null || prod2 == null) {
+			throw new CompilerException("The given properties do not meet this operators requirements.");
+		}
+
+		// check that order of fields is equivalent
+		if (!checkEquivalentFieldPositionsInKeyFields(
+				prod1.getInvolvedIndexes(), prod2.getInvolvedIndexes(), numRelevantFields)) {
+			return false;
+		}
+
+		// check that both inputs have the same directions of order
+		for (int i = 0; i < numRelevantFields; i++) {
+			if (prod1.getOrder(i) != prod2.getOrder(i)) {
+				return false;
+			}
+		}
+		return true;
+	}
+
 	protected boolean checkSameOrdering(LocalProperties produced1, LocalProperties produced2, int numRelevantFields) {
 		Ordering prod1 = produced1.getOrdering();
 		Ordering prod2 = produced2.getOrdering();

http://git-wip-us.apache.org/repos/asf/flink/blob/605b6d87/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupGlobalPropertiesCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupGlobalPropertiesCompatibilityTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupGlobalPropertiesCompatibilityTest.java
index 23f8897..76f48e9 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupGlobalPropertiesCompatibilityTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupGlobalPropertiesCompatibilityTest.java
@@ -21,6 +21,8 @@ package org.apache.flink.optimizer.operators;
 import static org.junit.Assert.*;
 
 import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.Ordering;
 import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.optimizer.dataproperties.GlobalProperties;
 import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
@@ -95,6 +97,52 @@ public class CoGroupGlobalPropertiesCompatibilityTest {
 				
 				assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
 			}
+
+			TestDistribution dist1 = new TestDistribution(1);
+			TestDistribution dist2 = new TestDistribution(1);
+			
+			// test compatible range partitioning with one ordering
+			{
+				Ordering ordering1 = new Ordering();
+				for (int field : keysLeft) {
+					ordering1.appendOrdering(field, null, Order.ASCENDING);
+				}
+				Ordering ordering2 = new Ordering();
+				for (int field : keysRight) {
+					ordering2.appendOrdering(field, null, Order.ASCENDING);
+				}
+				
+				RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
+				reqLeft.setRangePartitioned(ordering1, dist1);
+				RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
+				reqRight.setRangePartitioned(ordering2, dist2);
+
+				GlobalProperties propsLeft = new GlobalProperties();
+				propsLeft.setRangePartitioned(ordering1, dist1);
+				GlobalProperties propsRight = new GlobalProperties();
+				propsRight.setRangePartitioned(ordering2, dist2);
+				assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
+			}
+			// test compatible range partitioning with two orderings
+			{
+				Ordering ordering1 = new Ordering();
+				ordering1.appendOrdering(keysLeft.get(0), null, Order.DESCENDING);
+				ordering1.appendOrdering(keysLeft.get(1), null, Order.ASCENDING);
+				Ordering ordering2 = new Ordering();
+				ordering2.appendOrdering(keysRight.get(0), null, Order.DESCENDING);
+				ordering2.appendOrdering(keysRight.get(1), null, Order.ASCENDING);
+
+				RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
+				reqLeft.setRangePartitioned(ordering1, dist1);
+				RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
+				reqRight.setRangePartitioned(ordering2, dist2);
+
+				GlobalProperties propsLeft = new GlobalProperties();
+				propsLeft.setRangePartitioned(ordering1, dist1);
+				GlobalProperties propsRight = new GlobalProperties();
+				propsRight.setRangePartitioned(ordering2, dist2);
+				assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
+			}
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -152,6 +200,82 @@ public class CoGroupGlobalPropertiesCompatibilityTest {
 				
 				assertFalse(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
 			}
+
+			TestDistribution dist1 = new TestDistribution(1);
+			TestDistribution dist2 = new TestDistribution(1);
+
+			// test incompatible range partitioning with different key size
+			{
+				Ordering ordering1 = new Ordering();
+				for (int field : keysLeft) {
+					ordering1.appendOrdering(field, null, Order.ASCENDING);
+				}
+				Ordering ordering2 = new Ordering();
+				for (int field : keysRight) {
+					ordering1.appendOrdering(field, null, Order.ASCENDING);
+					ordering2.appendOrdering(field, null, Order.ASCENDING);
+				}
+
+				RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
+				reqLeft.setRangePartitioned(ordering1, dist1);
+				RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
+				reqRight.setRangePartitioned(ordering2, dist2);
+
+				GlobalProperties propsLeft = new GlobalProperties();
+				propsLeft.setRangePartitioned(ordering1, dist1);
+				GlobalProperties propsRight = new GlobalProperties();
+				propsRight.setRangePartitioned(ordering2, dist2);
+				assertFalse(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
+			}
+
+			// test incompatible range partitioning with different ordering
+			{
+				Ordering ordering1 = new Ordering();
+				for (int field : keysLeft) {
+					ordering1.appendOrdering(field, null, Order.ASCENDING);
+				}
+				Ordering ordering2 = new Ordering();
+				for (int field : keysRight) {
+					ordering2.appendOrdering(field, null, Order.DESCENDING);
+				}
+
+				RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
+				reqLeft.setRangePartitioned(ordering1, dist1);
+				RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
+				reqRight.setRangePartitioned(ordering2, dist2);
+
+				GlobalProperties propsLeft = new GlobalProperties();
+				propsLeft.setRangePartitioned(ordering1, dist1);
+				GlobalProperties propsRight = new GlobalProperties();
+				propsRight.setRangePartitioned(ordering2, dist2);
+				assertFalse(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
+			}
+
+			TestDistribution dist3 = new TestDistribution(1);
+			TestDistribution dist4 = new TestDistribution(2);
+
+			// test incompatible range partitioning with different distribution
+			{
+				Ordering ordering1 = new Ordering();
+				for (int field : keysLeft) {
+					ordering1.appendOrdering(field, null, Order.ASCENDING);
+				}
+				Ordering ordering2 = new Ordering();
+				for (int field : keysRight) {
+					ordering2.appendOrdering(field, null, Order.ASCENDING);
+				}
+
+				RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
+				reqLeft.setRangePartitioned(ordering1, dist3);
+				RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
+				reqRight.setRangePartitioned(ordering2, dist4);
+
+				GlobalProperties propsLeft = new GlobalProperties();
+				propsLeft.setRangePartitioned(ordering1, dist3);
+				GlobalProperties propsRight = new GlobalProperties();
+				propsRight.setRangePartitioned(ordering2, dist4);
+				assertFalse(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
+			}
 		}
 		catch (Exception e) {
 			e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/605b6d87/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupWithDistributionTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupWithDistributionTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupWithDistributionTest.java
new file mode 100644
index 0000000..2d37c7c
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/CoGroupWithDistributionTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.operators;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.optimizer.plan.*;
+import org.apache.flink.optimizer.util.CompilerTestBase;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class CoGroupWithDistributionTest extends CompilerTestBase {
+
+	@Test
+	 public void CoGroupWithSameDistributionTest() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		TestDistribution testDistribution1 = new TestDistribution(3);
+		TestDistribution testDistribution2 = new TestDistribution(3);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = DataSetUtils.partitionByRange(set1, testDistribution1, 0)
+				.coGroup(DataSetUtils.partitionByRange(set2, testDistribution2, 0))
+				.where(0).equalTo(0).with(new CoGroupFunc());
+
+		coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
+		Plan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
+		Channel input1 = coGroup.getInput1();
+		Channel input2 = coGroup.getInput2();
+		assertEquals(ShipStrategyType.FORWARD, input1.getShipStrategy());
+		assertEquals(ShipStrategyType.FORWARD, input2.getShipStrategy());
+
+	}
+
+	@Test
+	public void CoGroupWithDifferentDistributionTest() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		TestDistribution testDistribution1 = new TestDistribution(3);
+		TestDistribution testDistribution2 = new TestDistribution(2);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = DataSetUtils.partitionByRange(set1, testDistribution1, 0)
+				.coGroup(DataSetUtils.partitionByRange(set2, testDistribution2, 0))
+				.where(0).equalTo(0).with(new CoGroupFunc());
+
+		coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
+		Plan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode coGroup= (DualInputPlanNode)sink.getInput().getSource();
+		Channel input1 = coGroup.getInput1();
+		Channel input2 = coGroup.getInput2();
+		assertEquals(ShipStrategyType.PARTITION_HASH, input1.getShipStrategy());
+		assertEquals(ShipStrategyType.PARTITION_HASH, input2.getShipStrategy());
+
+	}
+
+	public static class CoGroupFunc implements CoGroupFunction<Tuple3<Integer, Integer, Integer>,
+			Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>> {
+
+		@Override
+		public void coGroup(Iterable<Tuple3<Integer, Integer, Integer>> first, Iterable<Tuple3<Integer, Integer, Integer>> second,
+							Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception {
+
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/605b6d87/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinGlobalPropertiesCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinGlobalPropertiesCompatibilityTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinGlobalPropertiesCompatibilityTest.java
index 1d559c2..610cbd8 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinGlobalPropertiesCompatibilityTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinGlobalPropertiesCompatibilityTest.java
@@ -21,6 +21,8 @@ package org.apache.flink.optimizer.operators;
 import static org.junit.Assert.*;
 
 import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.Ordering;
 import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.optimizer.dataproperties.GlobalProperties;
 import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
@@ -95,6 +97,51 @@ public class JoinGlobalPropertiesCompatibilityTest {
 				
 				assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
 			}
+
+			TestDistribution dist1 = new TestDistribution(1);
+			TestDistribution dist2 = new TestDistribution(1);
+			// test compatible range partitioning with one ordering
+			{
+				Ordering ordering1 = new Ordering();
+				for (int field : keysLeft) {
+					ordering1.appendOrdering(field, null, Order.ASCENDING);
+				}
+				Ordering ordering2 = new Ordering();
+				for (int field : keysRight) {
+					ordering2.appendOrdering(field, null, Order.ASCENDING);
+				}
+
+				RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
+				reqLeft.setRangePartitioned(ordering1, dist1);
+				RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
+				reqRight.setRangePartitioned(ordering2, dist2);
+
+				GlobalProperties propsLeft = new GlobalProperties();
+				propsLeft.setRangePartitioned(ordering1, dist1);
+				GlobalProperties propsRight = new GlobalProperties();
+				propsRight.setRangePartitioned(ordering2, dist2);
+				assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
+			}
+			// test compatible range partitioning with two orderings
+			{
+				Ordering ordering1 = new Ordering();
+				ordering1.appendOrdering(keysLeft.get(0), null, Order.DESCENDING);
+				ordering1.appendOrdering(keysLeft.get(1), null, Order.ASCENDING);
+				Ordering ordering2 = new Ordering();
+				ordering2.appendOrdering(keysRight.get(0), null, Order.DESCENDING);
+				ordering2.appendOrdering(keysRight.get(1), null, Order.ASCENDING);
+
+				RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
+				reqLeft.setRangePartitioned(ordering1, dist1);
+				RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
+				reqRight.setRangePartitioned(ordering2, dist2);
+
+				GlobalProperties propsLeft = new GlobalProperties();
+				propsLeft.setRangePartitioned(ordering1, dist1);
+				GlobalProperties propsRight = new GlobalProperties();
+				propsRight.setRangePartitioned(ordering2, dist2);
+				assertTrue(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
+			}
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -152,6 +199,82 @@ public class JoinGlobalPropertiesCompatibilityTest {
 				
 				assertFalse(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
 			}
+
+			TestDistribution dist1 = new TestDistribution(1);
+			TestDistribution dist2 = new TestDistribution(1);
+
+			// test incompatible range partitioning with different key size
+			{
+				Ordering ordering1 = new Ordering();
+				for (int field : keysLeft) {
+					ordering1.appendOrdering(field, null, Order.ASCENDING);
+				}
+				Ordering ordering2 = new Ordering();
+				for (int field : keysRight) {
+					ordering1.appendOrdering(field, null, Order.ASCENDING);
+					ordering2.appendOrdering(field, null, Order.ASCENDING);
+				}
+
+				RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
+				reqLeft.setRangePartitioned(ordering1, dist1);
+				RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
+				reqRight.setRangePartitioned(ordering2, dist2);
+
+				GlobalProperties propsLeft = new GlobalProperties();
+				propsLeft.setRangePartitioned(ordering1, dist1);
+				GlobalProperties propsRight = new GlobalProperties();
+				propsRight.setRangePartitioned(ordering2, dist2);
+				assertFalse(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
+			}
+
+			// test incompatible range partitioning with different ordering
+			{
+				Ordering ordering1 = new Ordering();
+				for (int field : keysLeft) {
+					ordering1.appendOrdering(field, null, Order.ASCENDING);
+				}
+				Ordering ordering2 = new Ordering();
+				for (int field : keysRight) {
+					ordering2.appendOrdering(field, null, Order.DESCENDING);
+				}
+
+				RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
+				reqLeft.setRangePartitioned(ordering1, dist1);
+				RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
+				reqRight.setRangePartitioned(ordering2, dist2);
+
+				GlobalProperties propsLeft = new GlobalProperties();
+				propsLeft.setRangePartitioned(ordering1, dist1);
+				GlobalProperties propsRight = new GlobalProperties();
+				propsRight.setRangePartitioned(ordering2, dist2);
+				assertFalse(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
+			}
+
+			TestDistribution dist3 = new TestDistribution(1);
+			TestDistribution dist4 = new TestDistribution(2);
+
+			// test incompatible range partitioning with different distribution
+			{
+				Ordering ordering1 = new Ordering();
+				for (int field : keysLeft) {
+					ordering1.appendOrdering(field, null, Order.ASCENDING);
+				}
+				Ordering ordering2 = new Ordering();
+				for (int field : keysRight) {
+					ordering2.appendOrdering(field, null, Order.ASCENDING);
+				}
+
+				RequestedGlobalProperties reqLeft = new RequestedGlobalProperties();
+				reqLeft.setRangePartitioned(ordering1, dist3);
+				RequestedGlobalProperties reqRight = new RequestedGlobalProperties();
+				reqRight.setRangePartitioned(ordering2, dist4);
+
+				GlobalProperties propsLeft = new GlobalProperties();
+				propsLeft.setRangePartitioned(ordering1, dist3);
+				GlobalProperties propsRight = new GlobalProperties();
+				propsRight.setRangePartitioned(ordering2, dist4);
+				assertFalse(descr.areCompatible(reqLeft, reqRight, propsLeft, propsRight));
+			}
 		}
 		catch (Exception e) {
 			e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/605b6d87/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinWithDistributionTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinWithDistributionTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinWithDistributionTest.java
new file mode 100644
index 0000000..0c579f6
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/JoinWithDistributionTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.operators;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.optimizer.plan.*;
+import org.apache.flink.optimizer.util.CompilerTestBase;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class JoinWithDistributionTest extends CompilerTestBase {
+
+	@Test
+	public void JoinWithSameDistributionTest() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		TestDistribution dist1 = new TestDistribution(3);
+		TestDistribution dist2 = new TestDistribution(3);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = DataSetUtils.partitionByRange(set1, dist1, 0)
+				.join(DataSetUtils.partitionByRange(set2, dist2, 0))
+				.where(0).equalTo(0).with(new JoinFunc());
+
+		coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
+		Plan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
+		Channel input1 = join.getInput1();
+		Channel input2 = join.getInput2();
+		assertEquals(ShipStrategyType.FORWARD, input1.getShipStrategy());
+		assertEquals(ShipStrategyType.FORWARD, input2.getShipStrategy());
+
+	}
+
+	@Test
+	public void JoinWithDifferentDistributionTest() throws Exception{
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple3<Integer, Integer, Integer>> set1 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+		DataSet<Tuple3<Integer, Integer, Integer>> set2 = env.readCsvFile(IN_FILE).types(Integer.class, Integer.class, Integer.class);
+
+		TestDistribution dist1 = new TestDistribution(3);
+		TestDistribution dist2 = new TestDistribution(4);
+
+		DataSet<Tuple3<Integer, Integer, Integer>> coGrouped = DataSetUtils.partitionByRange(set1, dist1, 0)
+				.join(DataSetUtils.partitionByRange(set2, dist2, 0))
+				.where(0).equalTo(0).with(new JoinFunc());
+
+		coGrouped.output(new DiscardingOutputFormat<Tuple3<Integer, Integer, Integer>>());
+		Plan plan = env.createProgramPlan();
+		OptimizedPlan oPlan = compileWithStats(plan);
+
+		SinkPlanNode sink = oPlan.getDataSinks().iterator().next();
+		DualInputPlanNode join = (DualInputPlanNode)sink.getInput().getSource();
+		Channel input1 = join.getInput1();
+		Channel input2 = join.getInput2();
+		assertEquals(ShipStrategyType.PARTITION_HASH, input1.getShipStrategy());
+		assertEquals(ShipStrategyType.PARTITION_HASH, input2.getShipStrategy());
+
+	}
+
+	public static class JoinFunc implements JoinFunction<Tuple3<Integer, Integer, Integer>,
+			Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>> {
+
+		@Override
+		public Tuple3<Integer, Integer, Integer> join(Tuple3<Integer, Integer, Integer> first, Tuple3<Integer, Integer, Integer> second) throws Exception {
+			return null;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/605b6d87/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/TestDistribution.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/TestDistribution.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/TestDistribution.java
new file mode 100644
index 0000000..d99930b
--- /dev/null
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/operators/TestDistribution.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.operators;
+
+import org.apache.flink.api.common.distributions.DataDistribution;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+
+public class TestDistribution implements DataDistribution {
+
+	public int boundary;
+
+	public TestDistribution(int boundary) {
+		this.boundary = boundary;
+	}
+
+	@Override
+	public Object[] getBucketBoundary(int bucketNum, int totalNumBuckets) {
+		return new Object[0];
+	}
+
+	@Override
+	public int getNumberOfFields() {
+		return 1;
+	}
+
+	@Override
+	public TypeInformation[] getKeyTypes() {
+		return new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO};
+	}
+
+	@Override
+	public void write(DataOutputView out) throws IOException {
+
+	}
+
+	@Override
+	public void read(DataInputView in) throws IOException {
+
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		boolean isEqual = true;
+		TestDistribution dist = (TestDistribution)obj;
+		if (this.boundary != dist.boundary) {
+			isEqual = false;
+		}
+		return isEqual;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/605b6d87/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputEmitter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputEmitter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputEmitter.java
index 4e20842..e6f3d26 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputEmitter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputEmitter.java
@@ -258,11 +258,11 @@ public class OutputEmitter<T> implements ChannelSelector<SerializationDelegate<T
 	private final int compareRecordAndBoundary(T record, Object[] boundary) {
 		this.comparator.extractKeys(record, keys, 0);
 
-		if (flatComparators.length != keys.length || flatComparators.length != boundary.length) {
+		if (flatComparators.length != keys.length || flatComparators.length > boundary.length) {
 			throw new RuntimeException("Can not compare keys with boundary due to mismatched length.");
 		}
 
-		for (int i=0; i<flatComparators.length; i++) {
+		for (int i = 0; i < flatComparators.length; i++) {
 			int result = flatComparators[i].compare(keys[i], boundary[i]);
 			if (result != 0) {
 				return result;

http://git-wip-us.apache.org/repos/asf/flink/blob/605b6d87/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
index 7bc8480..5b7caa7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
@@ -18,8 +18,11 @@
 
 package org.apache.flink.test.javaApiOperators;
 
+import org.apache.flink.api.common.distributions.DataDistribution;
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.RichCoGroupFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.KeySelector;
@@ -28,7 +31,10 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.tuple.Tuple7;
+import org.apache.flink.api.java.utils.DataSetUtils;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.POJO;
@@ -40,6 +46,7 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -286,7 +293,7 @@ public class CoGroupITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
 
 		DataSet<Tuple3<Integer, Long, String>> coGrouped = ds1.coGroup(ds2).
-				where(0,4).equalTo(0,1).with(new Tuple5Tuple3CoGroup());
+				where(0, 4).equalTo(0, 1).with(new Tuple5Tuple3CoGroup());
 
 		List<Tuple3<Integer, Long, String>> result = coGrouped.collect();
 		
@@ -516,6 +523,37 @@ public class CoGroupITCase extends MultipleProgramsTestBase {
 		compareResultAsText(result, expected);
 	}
 
+	@Test
+	public void testCoGroupWithRangePartitioning() throws Exception {
+		/*
+		 * Test coGroup on tuples with multiple key field positions and same customized distribution
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = CollectionDataSets.get5TupleDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
+
+		env.setParallelism(4);
+		TestDistribution testDis = new TestDistribution();
+		DataSet<Tuple3<Integer, Long, String>> coGrouped =
+				DataSetUtils.partitionByRange(ds1, testDis, 0, 4)
+						.coGroup(DataSetUtils.partitionByRange(ds2, testDis, 0, 1))
+						.where(0, 4)
+						.equalTo(0, 1)
+						.with(new Tuple5Tuple3CoGroup());
+
+		List<Tuple3<Integer, Long, String>> result = coGrouped.collect();
+
+		String expected = "1,1,Hallo\n" +
+				"2,2,Hallo Welt\n" +
+				"3,2,Hallo Welt wie gehts?\n" +
+				"3,2,ABC\n" +
+				"5,3,HIJ\n" +
+				"5,3,IJK\n";
+
+		compareResultAsTuples(result, expected);
+	}
 
 
 	// --------------------------------------------------------------------------------------------
@@ -797,4 +835,45 @@ public class CoGroupITCase extends MultipleProgramsTestBase {
 			}
 		}
 	}
+
+	public static class TestDistribution implements DataDistribution {
+		public Object[][] boundaries = new Object[][]{
+				new Object[]{2, 2L},
+				new Object[]{5, 4L},
+				new Object[]{10, 12L},
+				new Object[]{21, 6L}
+		};
+
+		public TestDistribution() {}
+
+		@Override
+		public Object[] getBucketBoundary(int bucketNum, int totalNumBuckets) {
+			return boundaries[bucketNum];
+		}
+
+		@Override
+		public int getNumberOfFields() {
+			return 2;
+		}
+
+		@Override
+		public TypeInformation[] getKeyTypes() {
+			return new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO};
+		}
+
+		@Override
+		public void write(DataOutputView out) throws IOException {
+
+		}
+
+		@Override
+		public void read(DataInputView in) throws IOException {
+
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			return obj instanceof TestDistribution; 
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/605b6d87/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
index 427e83e..c6bc08e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CustomDistributionITCase.java
@@ -49,13 +49,13 @@ public class CustomDistributionITCase {
 
 		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
 
-		DataSet<Tuple3<Integer, Long, String>> input1 = CollectionDataSets.get3TupleDataSet(env);
+		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
 		final TestDataDist1 dist = new TestDataDist1();
 
 		env.setParallelism(dist.getParallelism());
 
 		DataSet<Boolean> result = DataSetUtils
-			.partitionByRange(input1, dist, 0)
+			.partitionByRange(input, dist, 0)
 			.mapPartition(new RichMapPartitionFunction<Tuple3<Integer, Long, String>, Boolean>() {
 
 				@Override
@@ -104,7 +104,7 @@ public class CustomDistributionITCase {
 
 		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
 
-		DataSet<Tuple3<Integer, Integer, String>> input1 = env.fromElements(
+		DataSet<Tuple3<Integer, Integer, String>> input = env.fromElements(
 						new Tuple3<>(1, 5, "Hi"),
 						new Tuple3<>(1, 6, "Hi"),
 						new Tuple3<>(1, 7, "Hi"),
@@ -127,7 +127,7 @@ public class CustomDistributionITCase {
 		env.setParallelism(dist.getParallelism());
 
 		DataSet<Boolean> result = DataSetUtils
-			.partitionByRange(input1, dist, 0, 1)
+			.partitionByRange(input, dist, 0, 1)
 			.mapPartition(new RichMapPartitionFunction<Tuple3<Integer, Integer, String>, Boolean>() {
 
 				@Override
@@ -175,6 +175,75 @@ public class CustomDistributionITCase {
 		env.execute();
 	}
 
+	@Test
+	public void testPartitionKeyLessDistribution() throws Exception{
+		/*
+		 * Test the number of partition keys less than the number of distribution fields
+		 */
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+		final TestDataDist2 dist = new TestDataDist2();
+
+		env.setParallelism(dist.getParallelism());
+
+		DataSet<Boolean> result = DataSetUtils
+			.partitionByRange(input, dist, 0)
+			.mapPartition(new RichMapPartitionFunction<Tuple3<Integer, Long, String>, Boolean>() {
+
+				@Override
+				public void mapPartition(Iterable<Tuple3<Integer, Long, String>> values, Collector<Boolean> out) throws Exception {
+					int pIdx = getRuntimeContext().getIndexOfThisSubtask();
+
+					for (Tuple3<Integer, Long, String> s : values) {
+						boolean correctlyPartitioned = true;
+						if (pIdx == 0) {
+							Integer[] upper = dist.boundaries[0];
+							if (s.f0.compareTo(upper[0]) > 0) {
+								correctlyPartitioned = false;
+							}
+						}
+						else if (pIdx > 0 && pIdx < dist.getParallelism() - 1) {
+							Integer[] lower = dist.boundaries[pIdx - 1];
+							Integer[] upper = dist.boundaries[pIdx];
+							if (s.f0.compareTo(upper[0]) > 0 || (s.f0.compareTo(lower[0]) <= 0)) {
+								correctlyPartitioned = false;
+							}
+						}
+						else {
+							Integer[] lower = dist.boundaries[pIdx - 1];
+							if ((s.f0.compareTo(lower[0]) <= 0)) {
+								correctlyPartitioned = false;
+							}
+						}
+
+						if (!correctlyPartitioned) {
+							fail("Record was not correctly partitioned: " + s.toString());
+						}
+					}
+				}
+			}
+			);
+
+		result.output(new DiscardingOutputFormat<Boolean>());
+		env.execute();
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testPartitionMoreThanDistribution() throws Exception{
+		/*
+		 * Test the number of partition keys larger than the number of distribution fields
+		 */
+
+		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+		final TestDataDist2 dist = new TestDataDist2();
+
+		DataSet<Tuple3<Integer, Long, String>> result = DataSetUtils
+				.partitionByRange(input, dist, 0, 1, 2);
+	}
+	
 	/**
 	 * The class is used to do the tests of range partition with one key.
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/605b6d87/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
index 61e07fe..0d8c80b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
@@ -18,11 +18,15 @@
 
 package org.apache.flink.test.javaApiOperators;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 
+import org.apache.flink.api.common.distributions.DataDistribution;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.common.functions.RichFlatJoinFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -30,7 +34,10 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.tuple.Tuple6;
 import org.apache.flink.api.java.tuple.Tuple7;
+import org.apache.flink.api.java.utils.DataSetUtils;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.POJO;
@@ -87,8 +94,8 @@ public class JoinITCase extends MultipleProgramsTestBase {
 		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
 		DataSet<Tuple2<String, String>> joinDs =
 				ds1.join(ds2)
-				.where(0,1)
-				.equalTo(0,4)
+				.where(0, 1)
+				.equalTo(0, 4)
 				.with(new T3T5FlatJoin());
 
 		List<Tuple2<String, String>> result = joinDs.collect();
@@ -680,6 +687,7 @@ public class JoinITCase extends MultipleProgramsTestBase {
 		compareResultAsTuples(result, expected);
 	}
 
+	@Test
 	public void testJoinWithAtomicType2() throws Exception {
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
@@ -696,6 +704,38 @@ public class JoinITCase extends MultipleProgramsTestBase {
 		compareResultAsTuples(result, expected);
 	}
 
+	@Test
+	public void testJoinWithRangePartitioning() throws Exception {
+		/*
+		 * Test Join on tuples with multiple key field positions and same customized distribution
+		 */
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.get3TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+
+		env.setParallelism(4);
+		TestDistribution testDis = new TestDistribution();
+		DataSet<Tuple2<String, String>> joinDs =
+				DataSetUtils.partitionByRange(ds1, testDis, 0, 1)
+						.join(DataSetUtils.partitionByRange(ds2, testDis, 0, 4))
+						.where(0, 1)
+						.equalTo(0, 4)
+						.with(new T3T5FlatJoin());
+
+		List<Tuple2<String, String>> result = joinDs.collect();
+
+		String expected = "Hi,Hallo\n" +
+				"Hello,Hallo Welt\n" +
+				"Hello world,Hallo Welt wie gehts?\n" +
+				"Hello world,ABC\n" +
+				"I am fine.,HIJ\n" +
+				"I am fine.,IJK\n";
+
+		compareResultAsTuples(result, expected);
+	}
+
 	public static class T3T5FlatJoin implements FlatJoinFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple2<String, String>> {
 
 		@Override
@@ -778,4 +818,45 @@ public class JoinITCase extends MultipleProgramsTestBase {
 			return new Tuple2<String, String>(first.myString, second.f2);
 		}
 	}
+	
+	public static class TestDistribution implements DataDistribution {
+		public Object boundaries[][] = new Object[][]{
+				new Object[]{2, 2L},
+				new Object[]{5, 4L},
+				new Object[]{10, 12L},
+				new Object[]{21, 6L}
+		};
+
+		public TestDistribution() {}
+
+		@Override
+		public Object[] getBucketBoundary(int bucketNum, int totalNumBuckets) {
+			return boundaries[bucketNum];
+		}
+
+		@Override
+		public int getNumberOfFields() {
+			return 2;
+		}
+
+		@Override
+		public TypeInformation[] getKeyTypes() {
+			return new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO};
+		}
+
+		@Override
+		public void write(DataOutputView out) throws IOException {
+
+		}
+
+		@Override
+		public void read(DataInputView in) throws IOException {
+
+		}
+		
+		@Override
+		public boolean equals(Object obj) {
+			return obj instanceof TestDistribution;
+		}
+	}
 }