You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/11/18 12:22:52 UTC

[4/4] incubator-flink git commit: [FLINK-1237] Add support for custom partitioners - Functions: GroupReduce, Reduce, Aggregate on UnsortedGrouping, SortedGrouping, Join (Java API & Scala API) - Manual partition on DataSet (Java API & S

[FLINK-1237] Add support for custom partitioners
  - Functions: GroupReduce, Reduce, Aggregate on UnsortedGrouping, SortedGrouping,
               Join (Java API & Scala API)
  - Manual partition on DataSet (Java API & Scala API)
  - Distinct operations provide semantic properties for preservation of distinctified fields
  - Tests for pushown (or not pushdown) of custom partitionings and forced rebalancing
  - Tests for GlobalProperties matching of partitionings
  - Caching of generated requested data properties for unary operators

This closes #207


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/2000b45c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/2000b45c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/2000b45c

Branch: refs/heads/master
Commit: 2000b45ce3e71ed6eddecbb3f8658ebecec58230
Parents: 83d0256
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Nov 13 16:26:07 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Nov 18 12:19:37 2014 +0100

----------------------------------------------------------------------
 .../org/apache/flink/compiler/PactCompiler.java |   8 +-
 .../flink/compiler/costs/CostEstimator.java     |   1 +
 .../flink/compiler/dag/BinaryUnionNode.java     |   1 -
 .../flink/compiler/dag/CollectorMapNode.java    |   6 +-
 .../apache/flink/compiler/dag/FilterNode.java   |   7 +-
 .../apache/flink/compiler/dag/FlatMapNode.java  |   8 +-
 .../flink/compiler/dag/GroupReduceNode.java     |  93 +++---
 .../org/apache/flink/compiler/dag/JoinNode.java | 187 ++++++++++++
 .../org/apache/flink/compiler/dag/MapNode.java  |  11 +-
 .../flink/compiler/dag/MapPartitionNode.java    |   6 +-
 .../flink/compiler/dag/PartitionNode.java       |  25 +-
 .../apache/flink/compiler/dag/ReduceNode.java   |  16 +-
 .../dataproperties/GlobalProperties.java        |  47 ++-
 .../dataproperties/PartitioningProperty.java    |  12 +-
 .../RequestedGlobalProperties.java              |  76 ++++-
 .../operators/AbstractJoinDescriptor.java       |  23 +-
 .../operators/GroupReduceProperties.java        |  29 +-
 .../GroupReduceWithCombineProperties.java       |  23 +-
 .../compiler/operators/ReduceProperties.java    |  14 +-
 .../operators/SortMergeJoinDescriptor.java      |   1 -
 .../org/apache/flink/compiler/plan/Channel.java |  30 +-
 .../plandump/PlanJSONDumpGenerator.java         |   3 +
 .../plantranslate/NepheleJobGraphGenerator.java |   1 +
 .../compiler/FeedbackPropertiesMatchTest.java   |   6 +-
 ...ustomPartitioningGlobalOptimizationTest.java |  93 ++++++
 .../custompartition/CustomPartitioningTest.java | 287 +++++++++++++++++++
 .../GroupingKeySelectorTranslationTest.java     | 268 +++++++++++++++++
 .../GroupingPojoTranslationTest.java            | 257 +++++++++++++++++
 .../GroupingTupleTranslationTest.java           | 270 +++++++++++++++++
 .../JoinCustomPartitioningTest.java             | 263 +++++++++++++++++
 .../GlobalPropertiesFilteringTest.java          |  55 ++++
 .../GlobalPropertiesMatchingTest.java           | 159 ++++++++++
 .../GlobalPropertiesPushdownTest.java           | 113 ++++++++
 .../dataproperties/MockPartitioner.java         |  31 ++
 .../java/DistinctAndGroupingOptimizerTest.java  | 112 ++++++++
 .../compiler/testfunctions/DummyReducer.java    |  31 ++
 .../IdentityPartitionerMapper.java              |  34 +++
 .../flink/api/common/functions/Partitioner.java |  36 +++
 .../operators/base/GroupReduceOperatorBase.java |  29 +-
 .../common/operators/base/JoinOperatorBase.java |  31 +-
 .../operators/base/PartitionOperatorBase.java   |  36 ++-
 .../operators/base/ReduceOperatorBase.java      |  23 ++
 .../api/common/typeutils/TypeComparator.java    |   2 +-
 .../operators/base/JoinOperatorBaseTest.java    |   2 +-
 .../common/operators/base/MapOperatorTest.java  |   2 +-
 .../base/PartitionMapOperatorTest.java          |   2 +-
 .../common/operators/util/FieldListTest.java    |   1 -
 .../api/common/operators/util/FieldSetTest.java |   1 -
 .../java/org/apache/flink/api/java/DataSet.java |  70 ++++-
 .../api/java/operators/AggregateOperator.java   |   3 +-
 .../api/java/operators/DistinctOperator.java    |  20 +-
 .../api/java/operators/GroupReduceOperator.java |  17 +-
 .../flink/api/java/operators/Grouping.java      |  16 +-
 .../flink/api/java/operators/JoinOperator.java  | 119 ++++----
 .../apache/flink/api/java/operators/Keys.java   |  57 +++-
 .../api/java/operators/PartitionOperator.java   |  94 ++++--
 .../api/java/operators/ReduceOperator.java      |  14 +-
 .../api/java/operators/SortedGrouping.java      |  19 +-
 .../api/java/operators/UnsortedGrouping.java    |  25 +-
 .../api/java/record/io/CsvInputFormat.java      |   2 +
 .../api/java/record/io/CsvOutputFormat.java     |   2 +-
 .../java/record/io/DelimitedOutputFormat.java   |   3 +-
 .../java/record/operators/ReduceOperator.java   |   1 +
 .../flink/api/java/typeutils/TypeExtractor.java |   5 +
 .../runtime/io/network/api/ChannelSelector.java |   1 -
 .../runtime/operators/RegularPactTask.java      |  17 +-
 .../shipping/HistogramPartitionFunction.java    |  58 ----
 .../operators/shipping/OutputEmitter.java       |  48 +++-
 .../operators/shipping/PartitionFunction.java   |  26 --
 .../operators/shipping/RecordOutputEmitter.java |  49 +++-
 .../operators/shipping/ShipStrategyType.java    |   9 +-
 .../runtime/operators/util/TaskConfig.java      |  24 ++
 .../scala/operators/ScalaAggregateOperator.java |   1 +
 .../org/apache/flink/api/scala/DataSet.scala    |  68 ++++-
 .../apache/flink/api/scala/GroupedDataSet.scala |  48 +++-
 .../apache/flink/api/scala/joinDataSet.scala    |  58 +++-
 .../test/cancelling/CancellingTestBase.java     |   2 +
 .../StaticlyNestedIterationsITCase.java         |   3 +-
 .../IterationWithChainingNepheleITCase.java     |   1 +
 .../translation/AggregateTranslationTest.scala  |   1 +
 ...tomPartitioningGroupingKeySelectorTest.scala | 249 ++++++++++++++++
 .../CustomPartitioningGroupingPojoTest.scala    | 255 ++++++++++++++++
 .../CustomPartitioningGroupingTupleTest.scala   | 262 +++++++++++++++++
 .../translation/CustomPartitioningTest.scala    | 243 ++++++++++++++++
 .../JoinCustomPartitioningTest.scala            | 252 ++++++++++++++++
 85 files changed, 4533 insertions(+), 381 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2000b45c/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
index 2ce2495..d1d6343 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
@@ -67,7 +67,7 @@ import org.apache.flink.compiler.dag.GroupReduceNode;
 import org.apache.flink.compiler.dag.IterationNode;
 import org.apache.flink.compiler.dag.MapNode;
 import org.apache.flink.compiler.dag.MapPartitionNode;
-import org.apache.flink.compiler.dag.MatchNode;
+import org.apache.flink.compiler.dag.JoinNode;
 import org.apache.flink.compiler.dag.OptimizerNode;
 import org.apache.flink.compiler.dag.PactConnection;
 import org.apache.flink.compiler.dag.PartitionNode;
@@ -696,7 +696,7 @@ public class PactCompiler {
 				n = new GroupReduceNode((GroupReduceOperatorBase<?, ?, ?>) c);
 			}
 			else if (c instanceof JoinOperatorBase) {
-				n = new MatchNode((JoinOperatorBase<?, ?, ?, ?>) c);
+				n = new JoinNode((JoinOperatorBase<?, ?, ?, ?>) c);
 			}
 			else if (c instanceof CoGroupOperatorBase) {
 				n = new CoGroupNode((CoGroupOperatorBase<?, ?, ?, ?>) c);
@@ -883,9 +883,9 @@ public class PactCompiler {
 					for (PactConnection conn : solutionSetNode.getOutgoingConnections()) {
 						OptimizerNode successor = conn.getTarget();
 					
-						if (successor.getClass() == MatchNode.class) {
+						if (successor.getClass() == JoinNode.class) {
 							// find out which input to the match the solution set is
-							MatchNode mn = (MatchNode) successor;
+							JoinNode mn = (JoinNode) successor;
 							if (mn.getFirstPredecessorNode() == solutionSetNode) {
 								mn.makeJoinWithSolutionSet(0);
 							} else if (mn.getSecondPredecessorNode() == solutionSetNode) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2000b45c/flink-compiler/src/main/java/org/apache/flink/compiler/costs/CostEstimator.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/costs/CostEstimator.java b/flink-compiler/src/main/java/org/apache/flink/compiler/costs/CostEstimator.java
index 99a9b12..b13c1be 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/costs/CostEstimator.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/costs/CostEstimator.java
@@ -105,6 +105,7 @@ public abstract class CostEstimator {
 				addRandomPartitioningCost(channel, costs);
 				break;
 			case PARTITION_HASH:
+			case PARTITION_CUSTOM:
 				addHashPartitioningCost(channel, costs);
 				break;
 			case PARTITION_RANGE:

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2000b45c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BinaryUnionNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BinaryUnionNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BinaryUnionNode.java
index b229a4e..9003c92 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BinaryUnionNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/BinaryUnionNode.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.compiler.dag;
 
 import java.util.ArrayList;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2000b45c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/CollectorMapNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/CollectorMapNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/CollectorMapNode.java
index dbf97b5..53a760e 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/CollectorMapNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/CollectorMapNode.java
@@ -32,9 +32,13 @@ import org.apache.flink.compiler.operators.OperatorDescriptorSingle;
  */
 public class CollectorMapNode extends SingleInputNode {
 	
+	private final List<OperatorDescriptorSingle> possibleProperties;
 
+	
 	public CollectorMapNode(SingleInputOperator<?, ?, ?> operator) {
 		super(operator);
+		
+		this.possibleProperties = Collections.<OperatorDescriptorSingle>singletonList(new CollectorMapDescriptor());
 	}
 
 	@Override
@@ -44,7 +48,7 @@ public class CollectorMapNode extends SingleInputNode {
 
 	@Override
 	protected List<OperatorDescriptorSingle> getPossibleProperties() {
-		return Collections.<OperatorDescriptorSingle>singletonList(new CollectorMapDescriptor());
+		return this.possibleProperties;
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2000b45c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/FilterNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/FilterNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/FilterNode.java
index fe12fbf..df304b1 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/FilterNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/FilterNode.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.compiler.dag;
 
 import java.util.Collections;
@@ -32,9 +31,11 @@ import org.apache.flink.compiler.operators.OperatorDescriptorSingle;
  */
 public class FilterNode extends SingleInputNode {
 	
-
+	private final List<OperatorDescriptorSingle> possibleProperties;
+	
 	public FilterNode(FilterOperatorBase<?, ?> operator) {
 		super(operator);
+		this.possibleProperties = Collections.<OperatorDescriptorSingle>singletonList(new FilterDescriptor());
 	}
 
 	@Override
@@ -54,7 +55,7 @@ public class FilterNode extends SingleInputNode {
 
 	@Override
 	protected List<OperatorDescriptorSingle> getPossibleProperties() {
-		return Collections.<OperatorDescriptorSingle>singletonList(new FilterDescriptor());
+		return this.possibleProperties;
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2000b45c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/FlatMapNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/FlatMapNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/FlatMapNode.java
index c1a86b3..234b26a 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/FlatMapNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/FlatMapNode.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.compiler.dag;
 
 import java.util.Collections;
@@ -32,9 +31,12 @@ import org.apache.flink.compiler.operators.OperatorDescriptorSingle;
  */
 public class FlatMapNode extends SingleInputNode {
 	
-
+	private final List<OperatorDescriptorSingle> possibleProperties;
+	
 	public FlatMapNode(FlatMapOperatorBase<?, ?, ?> operator) {
 		super(operator);
+		
+		this.possibleProperties = Collections.<OperatorDescriptorSingle>singletonList(new FlatMapDescriptor());
 	}
 
 	@Override
@@ -49,7 +51,7 @@ public class FlatMapNode extends SingleInputNode {
 
 	@Override
 	protected List<OperatorDescriptorSingle> getPossibleProperties() {
-		return Collections.<OperatorDescriptorSingle>singletonList(new FlatMapDescriptor());
+		return this.possibleProperties;
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2000b45c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/GroupReduceNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/GroupReduceNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/GroupReduceNode.java
index a6bb207..527adcc 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/GroupReduceNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/GroupReduceNode.java
@@ -16,12 +16,12 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.compiler.dag;
 
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.operators.Ordering;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
 import org.apache.flink.compiler.CompilerException;
@@ -35,59 +35,37 @@ import org.apache.flink.compiler.operators.OperatorDescriptorSingle;
 import org.apache.flink.configuration.Configuration;
 
 /**
- * The Optimizer representation of a <i>Reduce</i> contract node.
+ * The optimizer representation of a <i>GroupReduce</i> operation.
  */
 public class GroupReduceNode extends SingleInputNode {
 	
+	private final List<OperatorDescriptorSingle> possibleProperties;
+	
 	private GroupReduceNode combinerUtilityNode;
 	
 	/**
-	 * Creates a new ReduceNode for the given contract.
+	 * Creates a new optimizer node for the given operator.
 	 * 
-	 * @param pactContract The reduce contract object.
+	 * @param operator The reduce operation.
 	 */
-	public GroupReduceNode(GroupReduceOperatorBase<?, ?, ?> pactContract) {
-		super(pactContract);
+	public GroupReduceNode(GroupReduceOperatorBase<?, ?, ?> operator) {
+		super(operator);
 		
 		if (this.keys == null) {
 			// case of a key-less reducer. force a parallelism of 1
 			setDegreeOfParallelism(1);
 		}
+		
+		this.possibleProperties = initPossibleProperties(operator.getCustomPartitioner());
 	}
 	
 	public GroupReduceNode(GroupReduceNode reducerToCopyForCombiner) {
 		super(reducerToCopyForCombiner);
-	}
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Gets the contract object for this reduce node.
-	 * 
-	 * @return The contract.
-	 */
-	@Override
-	public GroupReduceOperatorBase<?, ?, ?> getPactContract() {
-		return (GroupReduceOperatorBase<?, ?, ?>) super.getPactContract();
-	}
-
-	/**
-	 * Checks, whether a combiner function has been given for the function encapsulated
-	 * by this reduce contract.
-	 * 
-	 * @return True, if a combiner has been given, false otherwise.
-	 */
-	public boolean isCombineable() {
-		return getPactContract().isCombinable();
-	}
-
-	@Override
-	public String getName() {
-		return "GroupReduce";
+		
+		this.possibleProperties = Collections.emptyList();
 	}
 	
-	@Override
-	protected List<OperatorDescriptorSingle> getPossibleProperties() {
+	private List<OperatorDescriptorSingle> initPossibleProperties(Partitioner<?> customPartitioner) {
 		// see if an internal hint dictates the strategy to use
 		final Configuration conf = getPactContract().getParameters();
 		final String localStrategy = conf.getString(PactCompiler.HINT_LOCAL_STRATEGY, null);
@@ -96,10 +74,11 @@ public class GroupReduceNode extends SingleInputNode {
 		if (localStrategy != null) {
 			if (PactCompiler.HINT_LOCAL_STRATEGY_SORT.equals(localStrategy)) {
 				useCombiner = false;
-			} else if (PactCompiler.HINT_LOCAL_STRATEGY_COMBINING_SORT.equals(localStrategy)) {
+			}
+			else if (PactCompiler.HINT_LOCAL_STRATEGY_COMBINING_SORT.equals(localStrategy)) {
 				if (!isCombineable()) {
-					PactCompiler.LOG.warn("Strategy hint for Reduce Pact '" + getPactContract().getName() + 
-						"' desires combinable reduce, but user function is not marked combinable.");
+					PactCompiler.LOG.warn("Strategy hint for GroupReduce '" + getPactContract().getName() + 
+						"' requires combinable reduce, but user function is not marked combinable.");
 				}
 				useCombiner = true;
 			} else {
@@ -119,10 +98,42 @@ public class GroupReduceNode extends SingleInputNode {
 		}
 		
 		OperatorDescriptorSingle props = useCombiner ?
-			(this.keys == null ? new AllGroupWithPartialPreGroupProperties() : new GroupReduceWithCombineProperties(this.keys, groupOrder)) :
-			(this.keys == null ? new AllGroupReduceProperties() : new GroupReduceProperties(this.keys, groupOrder));
+			(this.keys == null ? new AllGroupWithPartialPreGroupProperties() : new GroupReduceWithCombineProperties(this.keys, groupOrder, customPartitioner)) :
+			(this.keys == null ? new AllGroupReduceProperties() : new GroupReduceProperties(this.keys, groupOrder, customPartitioner));
+
+		return Collections.singletonList(props);
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Gets the operator represented by this optimizer node.
+	 * 
+	 * @return The operator represented by this optimizer node.
+	 */
+	@Override
+	public GroupReduceOperatorBase<?, ?, ?> getPactContract() {
+		return (GroupReduceOperatorBase<?, ?, ?>) super.getPactContract();
+	}
+
+	/**
+	 * Checks, whether a combiner function has been given for the function encapsulated
+	 * by this reduce contract.
+	 * 
+	 * @return True, if a combiner has been given, false otherwise.
+	 */
+	public boolean isCombineable() {
+		return getPactContract().isCombinable();
+	}
 
-			return Collections.singletonList(props);
+	@Override
+	public String getName() {
+		return "GroupReduce";
+	}
+	
+	@Override
+	protected List<OperatorDescriptorSingle> getPossibleProperties() {
+		return this.possibleProperties;
 	}
 	
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2000b45c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/JoinNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/JoinNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/JoinNode.java
new file mode 100644
index 0000000..19b753d
--- /dev/null
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/JoinNode.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.dag;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
+import org.apache.flink.compiler.CompilerException;
+import org.apache.flink.compiler.DataStatistics;
+import org.apache.flink.compiler.PactCompiler;
+import org.apache.flink.compiler.operators.AbstractJoinDescriptor;
+import org.apache.flink.compiler.operators.HashJoinBuildFirstProperties;
+import org.apache.flink.compiler.operators.HashJoinBuildSecondProperties;
+import org.apache.flink.compiler.operators.OperatorDescriptorDual;
+import org.apache.flink.compiler.operators.SortMergeJoinDescriptor;
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * The Optimizer representation of a join operator.
+ */
+public class JoinNode extends TwoInputNode {
+	
+	private List<OperatorDescriptorDual> dataProperties;
+	
+	/**
+	 * Creates a new JoinNode for the given join operator.
+	 * 
+	 * @param joinOperatorBase The join operator object.
+	 */
+	public JoinNode(JoinOperatorBase<?, ?, ?, ?> joinOperatorBase) {
+		super(joinOperatorBase);
+		
+		this.dataProperties = getDataProperties(joinOperatorBase,
+				joinOperatorBase.getJoinHint(), joinOperatorBase.getCustomPartitioner());
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Gets the contract object for this match node.
+	 * 
+	 * @return The contract.
+	 */
+	@Override
+	public JoinOperatorBase<?, ?, ?, ?> getPactContract() {
+		return (JoinOperatorBase<?, ?, ?, ?>) super.getPactContract();
+	}
+
+	@Override
+	public String getName() {
+		return "Join";
+	}
+
+	@Override
+	protected List<OperatorDescriptorDual> getPossibleProperties() {
+		return this.dataProperties;
+	}
+	
+	public void makeJoinWithSolutionSet(int solutionsetInputIndex) {
+		OperatorDescriptorDual op;
+		if (solutionsetInputIndex == 0) {
+			op = new HashJoinBuildFirstProperties(this.keys1, this.keys2);
+		} else if (solutionsetInputIndex == 1) {
+			op = new HashJoinBuildSecondProperties(this.keys1, this.keys2);
+		} else {
+			throw new IllegalArgumentException();
+		}
+		
+		this.dataProperties = Collections.singletonList(op);
+	}
+	
+	/**
+	 * The default estimates build on the principle of inclusion: The smaller input key domain is included in the larger
+	 * input key domain. We also assume that every key from the larger input has one join partner in the smaller input.
+	 * The result cardinality is hence the larger one.
+	 */
+	@Override
+	protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
+		long card1 = getFirstPredecessorNode().getEstimatedNumRecords();
+		long card2 = getSecondPredecessorNode().getEstimatedNumRecords();
+		this.estimatedNumRecords = (card1 < 0 || card2 < 0) ? -1 : Math.max(card1, card2);
+		
+		if (this.estimatedNumRecords >= 0) {
+			float width1 = getFirstPredecessorNode().getEstimatedAvgWidthPerOutputRecord();
+			float width2 = getSecondPredecessorNode().getEstimatedAvgWidthPerOutputRecord();
+			float width = (width1 <= 0 || width2 <= 0) ? -1 : width1 + width2;
+			
+			if (width > 0) {
+				this.estimatedOutputSize = (long) (width * this.estimatedNumRecords);
+			}
+		}
+	}
+	
+	private List<OperatorDescriptorDual> getDataProperties(JoinOperatorBase<?, ?, ?, ?> joinOperatorBase, JoinHint joinHint,
+			Partitioner<?> customPartitioner)
+	{
+		// see if an internal hint dictates the strategy to use
+		Configuration conf = joinOperatorBase.getParameters();
+		String localStrategy = conf.getString(PactCompiler.HINT_LOCAL_STRATEGY, null);
+
+		if (localStrategy != null) {
+			final AbstractJoinDescriptor fixedDriverStrat;
+			if (PactCompiler.HINT_LOCAL_STRATEGY_SORT_BOTH_MERGE.equals(localStrategy) ||
+				PactCompiler.HINT_LOCAL_STRATEGY_SORT_FIRST_MERGE.equals(localStrategy) ||
+				PactCompiler.HINT_LOCAL_STRATEGY_SORT_SECOND_MERGE.equals(localStrategy) ||
+				PactCompiler.HINT_LOCAL_STRATEGY_MERGE.equals(localStrategy) )
+			{
+				fixedDriverStrat = new SortMergeJoinDescriptor(this.keys1, this.keys2);
+			}
+			else if (PactCompiler.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST.equals(localStrategy)) {
+				fixedDriverStrat = new HashJoinBuildFirstProperties(this.keys1, this.keys2);
+			}
+			else if (PactCompiler.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND.equals(localStrategy)) {
+				fixedDriverStrat = new HashJoinBuildSecondProperties(this.keys1, this.keys2);
+			}
+			else {
+				throw new CompilerException("Invalid local strategy hint for match contract: " + localStrategy);
+			}
+			
+			if (customPartitioner != null) {
+				fixedDriverStrat.setCustomPartitioner(customPartitioner);
+			}
+			
+			ArrayList<OperatorDescriptorDual> list = new ArrayList<OperatorDescriptorDual>();
+			list.add(fixedDriverStrat);
+			return list;
+		}
+		else {
+			ArrayList<OperatorDescriptorDual> list = new ArrayList<OperatorDescriptorDual>();
+			
+			joinHint = joinHint == null ? JoinHint.OPTIMIZER_CHOOSES : joinHint;
+			
+			switch (joinHint) {
+				case BROADCAST_HASH_FIRST:
+					list.add(new HashJoinBuildFirstProperties(this.keys1, this.keys2, true, false, false));
+					break;
+				case BROADCAST_HASH_SECOND:
+					list.add(new HashJoinBuildSecondProperties(this.keys1, this.keys2, false, true, false));
+					break;
+				case REPARTITION_HASH_FIRST:
+					list.add(new HashJoinBuildFirstProperties(this.keys1, this.keys2, false, false, true));
+					break;
+				case REPARTITION_HASH_SECOND:
+					list.add(new HashJoinBuildSecondProperties(this.keys1, this.keys2, false, false, true));
+					break;
+				case REPARTITION_SORT_MERGE:
+					list.add(new SortMergeJoinDescriptor(this.keys1, this.keys2, false, false, true));
+					break;
+				case OPTIMIZER_CHOOSES:
+					list.add(new SortMergeJoinDescriptor(this.keys1, this.keys2));
+					list.add(new HashJoinBuildFirstProperties(this.keys1, this.keys2));
+					list.add(new HashJoinBuildSecondProperties(this.keys1, this.keys2));
+					break;
+				default:
+					throw new CompilerException("Unrecognized join hint: " + joinHint);
+			}
+			
+			if (customPartitioner != null) {
+				for (OperatorDescriptorDual descr : list) {
+					((AbstractJoinDescriptor) descr).setCustomPartitioner(customPartitioner);
+				}
+			}
+			
+			return list;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2000b45c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/MapNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/MapNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/MapNode.java
index e65febb..f1e26cd 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/MapNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/MapNode.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.compiler.dag;
 
 import java.util.Collections;
@@ -32,13 +31,17 @@ import org.apache.flink.compiler.operators.OperatorDescriptorSingle;
  */
 public class MapNode extends SingleInputNode {
 	
+	private final List<OperatorDescriptorSingle> possibleProperties;
+	
 	/**
-	 * Creates a new MapNode for the given contract.
+	 * Creates a new MapNode for the given operator.
 	 * 
-	 * @param operator The map contract object.
+	 * @param operator The map operator.
 	 */
 	public MapNode(SingleInputOperator<?, ?, ?> operator) {
 		super(operator);
+		
+		this.possibleProperties = Collections.<OperatorDescriptorSingle>singletonList(new MapDescriptor());
 	}
 
 	@Override
@@ -48,7 +51,7 @@ public class MapNode extends SingleInputNode {
 
 	@Override
 	protected List<OperatorDescriptorSingle> getPossibleProperties() {
-		return Collections.<OperatorDescriptorSingle>singletonList(new MapDescriptor());
+		return this.possibleProperties;
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2000b45c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/MapPartitionNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/MapPartitionNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/MapPartitionNode.java
index a180968..e21b7fc 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/MapPartitionNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/MapPartitionNode.java
@@ -32,6 +32,8 @@ import org.apache.flink.compiler.operators.OperatorDescriptorSingle;
  */
 public class MapPartitionNode extends SingleInputNode {
 	
+	private final List<OperatorDescriptorSingle> possibleProperties;
+	
 	/**
 	 * Creates a new MapNode for the given contract.
 	 * 
@@ -39,6 +41,8 @@ public class MapPartitionNode extends SingleInputNode {
 	 */
 	public MapPartitionNode(SingleInputOperator<?, ?, ?> operator) {
 		super(operator);
+		
+		this.possibleProperties = Collections.<OperatorDescriptorSingle>singletonList(new MapPartitionDescriptor());
 	}
 
 	@Override
@@ -48,7 +52,7 @@ public class MapPartitionNode extends SingleInputNode {
 
 	@Override
 	protected List<OperatorDescriptorSingle> getPossibleProperties() {
-		return Collections.<OperatorDescriptorSingle>singletonList(new MapPartitionDescriptor());
+		return this.possibleProperties;
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2000b45c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/PartitionNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/PartitionNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/PartitionNode.java
index ccd48c5..53b5dd9 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/PartitionNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/PartitionNode.java
@@ -22,6 +22,7 @@ package org.apache.flink.compiler.dag;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.operators.base.PartitionOperatorBase;
 import org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod;
 import org.apache.flink.api.common.operators.util.FieldSet;
@@ -40,8 +41,14 @@ import org.apache.flink.runtime.operators.DriverStrategy;
  */
 public class PartitionNode extends SingleInputNode {
 
+	private final List<OperatorDescriptorSingle> possibleProperties;
+	
 	public PartitionNode(PartitionOperatorBase<?> operator) {
 		super(operator);
+		
+		OperatorDescriptorSingle descr = new PartitionDescriptor(
+					this.getPactContract().getPartitionMethod(), this.keys, operator.getCustomPartitioner());
+		this.possibleProperties = Collections.singletonList(descr);
 	}
 
 	@Override
@@ -56,13 +63,14 @@ public class PartitionNode extends SingleInputNode {
 
 	@Override
 	protected List<OperatorDescriptorSingle> getPossibleProperties() {
-		return Collections.<OperatorDescriptorSingle>singletonList(new PartitionDescriptor(this.getPactContract().getPartitionMethod(), this.keys));
+		return this.possibleProperties;
 	}
 
 	@Override
 	protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
 		// partitioning does not change the number of records
 		this.estimatedNumRecords = getPredecessorNode().getEstimatedNumRecords();
+		this.estimatedOutputSize = getPredecessorNode().getEstimatedOutputSize();
 	}
 	
 	@Override
@@ -71,15 +79,18 @@ public class PartitionNode extends SingleInputNode {
 		return true;
 	}
 	
+	// --------------------------------------------------------------------------------------------
 	
 	public static class PartitionDescriptor extends OperatorDescriptorSingle {
 
 		private final PartitionMethod pMethod;
-		private final FieldSet pKeys;
+		private final Partitioner<?> customPartitioner;
 		
-		public PartitionDescriptor(PartitionMethod pMethod, FieldSet pKeys) {
+		public PartitionDescriptor(PartitionMethod pMethod, FieldSet pKeys, Partitioner<?> customPartitioner) {
+			super(pKeys);
+			
 			this.pMethod = pMethod;
-			this.pKeys = pKeys;
+			this.customPartitioner = customPartitioner;
 		}
 		
 		@Override
@@ -98,11 +109,14 @@ public class PartitionNode extends SingleInputNode {
 			
 			switch (this.pMethod) {
 			case HASH:
-				rgps.setHashPartitioned(pKeys.toFieldList());
+				rgps.setHashPartitioned(this.keys);
 				break;
 			case REBALANCE:
 				rgps.setForceRebalancing();
 				break;
+			case CUSTOM:
+				rgps.setCustomPartitioned(this.keys, this.customPartitioner);
+				break;
 			case RANGE:
 				throw new UnsupportedOperationException("Not yet supported");
 			default:
@@ -130,5 +144,4 @@ public class PartitionNode extends SingleInputNode {
 			return lProps;
 		}
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2000b45c/flink-compiler/src/main/java/org/apache/flink/compiler/dag/ReduceNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/ReduceNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/ReduceNode.java
index 2abbfb9..defae04 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/ReduceNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/ReduceNode.java
@@ -33,6 +33,8 @@ import org.apache.flink.compiler.operators.ReduceProperties;
  */
 public class ReduceNode extends SingleInputNode {
 	
+	private final List<OperatorDescriptorSingle> possibleProperties;
+	
 	private ReduceNode preReduceUtilityNode;
 	
 
@@ -43,10 +45,18 @@ public class ReduceNode extends SingleInputNode {
 			// case of a key-less reducer. force a parallelism of 1
 			setDegreeOfParallelism(1);
 		}
+		
+		OperatorDescriptorSingle props = this.keys == null ?
+			new AllReduceProperties() :
+			new ReduceProperties(this.keys, operator.getCustomPartitioner());
+		
+		this.possibleProperties = Collections.singletonList(props);
 	}
 	
 	public ReduceNode(ReduceNode reducerToCopyForCombiner) {
 		super(reducerToCopyForCombiner);
+		
+		this.possibleProperties = Collections.emptyList();
 	}
 
 	// ------------------------------------------------------------------------
@@ -63,11 +73,7 @@ public class ReduceNode extends SingleInputNode {
 	
 	@Override
 	protected List<OperatorDescriptorSingle> getPossibleProperties() {
-		OperatorDescriptorSingle props = this.keys == null ?
-			new AllReduceProperties() :
-			new ReduceProperties(this.keys);
-		
-			return Collections.singletonList(props);
+		return this.possibleProperties;
 	}
 	
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2000b45c/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java
index f3d9c2d..7dedc53 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/GlobalProperties.java
@@ -16,13 +16,13 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.compiler.dataproperties;
 
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Set;
 
+import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.Ordering;
 import org.apache.flink.api.common.operators.util.FieldList;
@@ -50,6 +50,8 @@ public class GlobalProperties implements Cloneable {
 	
 	private Set<FieldSet> uniqueFieldCombinations;
 	
+	private Partitioner<?> customPartitioner;
+	
 	// --------------------------------------------------------------------------------------------
 	
 	/**
@@ -67,6 +69,10 @@ public class GlobalProperties implements Cloneable {
 	 * @param partitionedFields 
 	 */
 	public void setHashPartitioned(FieldList partitionedFields) {
+		if (partitionedFields == null) {
+			throw new NullPointerException();
+		}
+		
 		this.partitioning = PartitioningProperty.HASH_PARTITIONED;
 		this.partitioningFields = partitionedFields;
 		this.ordering = null;
@@ -74,12 +80,20 @@ public class GlobalProperties implements Cloneable {
 	
 
 	public void setRangePartitioned(Ordering ordering) {
+		if (ordering == null) {
+			throw new NullPointerException();
+		}
+		
 		this.partitioning = PartitioningProperty.RANGE_PARTITIONED;
 		this.ordering = ordering;
 		this.partitioningFields = ordering.getInvolvedIndexes();
 	}
 	
 	public void setAnyPartitioning(FieldList partitionedFields) {
+		if (partitionedFields == null) {
+			throw new NullPointerException();
+		}
+		
 		this.partitioning = PartitioningProperty.ANY_PARTITIONING;
 		this.partitioningFields = partitionedFields;
 		this.ordering = null;
@@ -103,7 +117,21 @@ public class GlobalProperties implements Cloneable {
 		this.ordering = null;
 	}
 	
+	public void setCustomPartitioned(FieldList partitionedFields, Partitioner<?> partitioner) {
+		if (partitionedFields == null || partitioner == null) {
+			throw new NullPointerException();
+		}
+		
+		this.partitioning = PartitioningProperty.CUSTOM_PARTITIONING;
+		this.partitioningFields = partitionedFields;
+		this.ordering = null;
+		this.customPartitioner = partitioner;
+	}
+	
 	public void addUniqueFieldCombination(FieldSet fields) {
+		if (fields == null) {
+			return;
+		}
 		if (this.uniqueFieldCombinations == null) {
 			this.uniqueFieldCombinations = new HashSet<FieldSet>();
 		}
@@ -128,12 +156,16 @@ public class GlobalProperties implements Cloneable {
 		return this.ordering;
 	}
 	
-	// --------------------------------------------------------------------------------------------
-	
 	public PartitioningProperty getPartitioning() {
 		return this.partitioning;
 	}
 	
+	public Partitioner<?> getCustomPartitioner() {
+		return this.customPartitioner;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
 	public boolean isPartitionedOnFields(FieldSet fields) {
 		if (this.partitioning.isPartitionedOnKey() && fields.isValidSubset(this.partitioningFields)) {
 			return true;
@@ -267,8 +299,14 @@ public class GlobalProperties implements Cloneable {
 			case RANGE_PARTITIONED:
 				channel.setShipStrategy(ShipStrategyType.PARTITION_RANGE, this.ordering.getInvolvedIndexes(), this.ordering.getFieldSortDirections());
 				break;
+			case FORCED_REBALANCED:
+				channel.setShipStrategy(ShipStrategyType.PARTITION_RANDOM);
+				break;
+			case CUSTOM_PARTITIONING:
+				channel.setShipStrategy(ShipStrategyType.PARTITION_CUSTOM, this.partitioningFields, this.customPartitioner);
+				break;
 			default:
-				throw new CompilerException();
+				throw new CompilerException("Unsupported partitioning strategy");
 		}
 	}
 
@@ -322,6 +360,7 @@ public class GlobalProperties implements Cloneable {
 		newProps.partitioning = this.partitioning;
 		newProps.partitioningFields = this.partitioningFields;
 		newProps.ordering = this.ordering;
+		newProps.customPartitioner = this.customPartitioner;
 		newProps.uniqueFieldCombinations = this.uniqueFieldCombinations == null ? null : new HashSet<FieldSet>(this.uniqueFieldCombinations);
 		return newProps;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2000b45c/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/PartitioningProperty.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/PartitioningProperty.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/PartitioningProperty.java
index f73f491..47cd6b8 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/PartitioningProperty.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/PartitioningProperty.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.compiler.dataproperties;
 
 /**
@@ -50,9 +49,14 @@ public enum PartitioningProperty {
 	FULL_REPLICATION,
 
 	/**
-	 * Constant indicating a forced even rebalancing.
+	 * Constant indicating a forced even re-balancing.
+	 */
+	FORCED_REBALANCED,
+	
+	/**
+	 * A custom partitioning, accompanied by a {@link org.apache.flink.api.common.functions.Partitioner}.
 	 */
-	FORCED_REBALANCED;
+	CUSTOM_PARTITIONING;
 	
 	/**
 	 * Checks, if this property represents in fact a partitioning. That is,
@@ -95,6 +99,6 @@ public enum PartitioningProperty {
 	 * @return True, if this enum constant is a re-computable partitioning.
 	 */
 	public boolean isComputablyPartitioned() {
-		return this == HASH_PARTITIONED || this == RANGE_PARTITIONED;
+		return this == HASH_PARTITIONED || this == RANGE_PARTITIONED || this == CUSTOM_PARTITIONING;
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2000b45c/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java
index dcf0afa..4e9d60a 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dataproperties/RequestedGlobalProperties.java
@@ -16,10 +16,10 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.compiler.dataproperties;
 
 import org.apache.flink.api.common.distributions.DataDistribution;
+import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.operators.Ordering;
 import org.apache.flink.api.common.operators.util.FieldSet;
 import org.apache.flink.compiler.CompilerException;
@@ -43,7 +43,9 @@ public final class RequestedGlobalProperties implements Cloneable {
 	
 	private Ordering ordering;					// order of the partitioned fields, if it is an ordered (range) range partitioning
 	
-	private DataDistribution dataDistribution; // optional data distribution, for a range partitioning
+	private DataDistribution dataDistribution;	// optional data distribution, for a range partitioning
+	
+	private Partitioner<?> customPartitioner;	// optional, partitioner for custom partitioning
 	
 	// --------------------------------------------------------------------------------------------
 	
@@ -112,6 +114,17 @@ public final class RequestedGlobalProperties implements Cloneable {
 		this.ordering = null;
 	}
 	
+	public void setCustomPartitioned(FieldSet partitionedFields, Partitioner<?> partitioner) {
+		if (partitionedFields == null || partitioner == null) {
+			throw new NullPointerException();
+		}
+		
+		this.partitioning = PartitioningProperty.CUSTOM_PARTITIONING;
+		this.partitioningFields = partitionedFields;
+		this.ordering = null;
+		this.customPartitioner = partitioner;
+	}
+	
 	/**
 	 * Gets the partitioning property.
 	 * 
@@ -147,6 +160,15 @@ public final class RequestedGlobalProperties implements Cloneable {
 	public DataDistribution getDataDistribution() {
 		return this.dataDistribution;
 	}
+	
+	/**
+	 * Gets the custom partitioner associated with these properties.
+	 * 
+	 * @return The custom partitioner associated with these properties.
+	 */
+	public Partitioner<?> getCustomPartitioner() {
+		return customPartitioner;
+	}
 
 	/**
 	 * Checks, if the properties in this object are trivial, i.e. only standard values.
@@ -162,6 +184,8 @@ public final class RequestedGlobalProperties implements Cloneable {
 		this.partitioning = PartitioningProperty.RANDOM;
 		this.ordering = null;
 		this.partitioningFields = null;
+		this.dataDistribution = null;
+		this.customPartitioner = null;
 	}
 
 	/**
@@ -188,7 +212,12 @@ public final class RequestedGlobalProperties implements Cloneable {
 			}
 		}
 		
-		if (this.partitioning == PartitioningProperty.FULL_REPLICATION) {
+		// make sure that certain properties are not pushed down
+		final PartitioningProperty partitioning = this.partitioning;
+		if (partitioning == PartitioningProperty.FULL_REPLICATION ||
+				partitioning == PartitioningProperty.FORCED_REBALANCED ||
+				partitioning == PartitioningProperty.CUSTOM_PARTITIONING)
+		{
 			return null;
 		}
 		
@@ -205,22 +234,34 @@ public final class RequestedGlobalProperties implements Cloneable {
 	public boolean isMetBy(GlobalProperties props) {
 		if (this.partitioning == PartitioningProperty.FULL_REPLICATION) {
 			return props.isFullyReplicated();
-		} else if (props.isFullyReplicated()) {
+		}
+		else if (props.isFullyReplicated()) {
 			return false;
-		} else if (this.partitioning == PartitioningProperty.RANDOM) {
+		}
+		else if (this.partitioning == PartitioningProperty.RANDOM) {
 			return true;
-		} else if (this.partitioning == PartitioningProperty.ANY_PARTITIONING) {
+		}
+		else if (this.partitioning == PartitioningProperty.ANY_PARTITIONING) {
 			return props.isPartitionedOnFields(this.partitioningFields);
-		} else if (this.partitioning == PartitioningProperty.HASH_PARTITIONED) {
+		}
+		else if (this.partitioning == PartitioningProperty.HASH_PARTITIONED) {
 			return props.getPartitioning() == PartitioningProperty.HASH_PARTITIONED &&
 					props.isPartitionedOnFields(this.partitioningFields);
-		} else if (this.partitioning == PartitioningProperty.RANGE_PARTITIONED) {
+		}
+		else if (this.partitioning == PartitioningProperty.RANGE_PARTITIONED) {
 			return props.getPartitioning() == PartitioningProperty.RANGE_PARTITIONED &&
 					props.matchesOrderedPartitioning(this.ordering);
-		} else if (this.partitioning == PartitioningProperty.FORCED_REBALANCED) {
+		}
+		else if (this.partitioning == PartitioningProperty.FORCED_REBALANCED) {
 			return props.getPartitioning() == PartitioningProperty.FORCED_REBALANCED;
-		} else {
-			throw new CompilerException("Bug in properties matching logic.");
+		}
+		else if (this.partitioning == PartitioningProperty.CUSTOM_PARTITIONING) {
+			return props.getPartitioning() == PartitioningProperty.CUSTOM_PARTITIONING &&
+					props.isPartitionedOnFields(this.partitioningFields) &&
+					props.getCustomPartitioner().equals(this.customPartitioner);
+		}
+		else {
+			throw new CompilerException("Properties matching logic leaves open cases.");
 		}
 	}
 	
@@ -250,22 +291,29 @@ public final class RequestedGlobalProperties implements Cloneable {
 			case FULL_REPLICATION:
 				channel.setShipStrategy(ShipStrategyType.BROADCAST);
 				break;
+			
 			case ANY_PARTITIONING:
 			case HASH_PARTITIONED:
 				channel.setShipStrategy(ShipStrategyType.PARTITION_HASH, Utils.createOrderedFromSet(this.partitioningFields));
 				break;
+			
 			case RANGE_PARTITIONED:
-
-				channel.setShipStrategy(ShipStrategyType.PARTITION_RANGE, this.ordering.getInvolvedIndexes(), this.ordering.getFieldSortDirections());				
+				channel.setShipStrategy(ShipStrategyType.PARTITION_RANGE, this.ordering.getInvolvedIndexes(), this.ordering.getFieldSortDirections());
 				if(this.dataDistribution != null) {
 					channel.setDataDistribution(this.dataDistribution);
 				}
 				break;
+			
 			case FORCED_REBALANCED:
 				channel.setShipStrategy(ShipStrategyType.PARTITION_FORCED_REBALANCE);
 				break;
+				
+			case CUSTOM_PARTITIONING:
+				channel.setShipStrategy(ShipStrategyType.PARTITION_CUSTOM, Utils.createOrderedFromSet(this.partitioningFields), this.customPartitioner);
+				break;
+				
 			default:
-				throw new CompilerException();
+				throw new CompilerException("Invalid partitioning to create through a data exchange: " + this.partitioning.name());
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2000b45c/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java
index 47069e6..84af77c 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/AbstractJoinDescriptor.java
@@ -21,6 +21,7 @@ package org.apache.flink.compiler.operators;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.compiler.dataproperties.GlobalProperties;
 import org.apache.flink.compiler.dataproperties.PartitioningProperty;
@@ -35,6 +36,8 @@ public abstract class AbstractJoinDescriptor extends OperatorDescriptorDual {
 	private final boolean broadcastSecondAllowed;
 	private final boolean repartitionAllowed;
 	
+	private Partitioner<?> customPartitioner;
+	
 	protected AbstractJoinDescriptor(FieldList keys1, FieldList keys2) {
 		this(keys1, keys2, true, true, true);
 	}
@@ -49,16 +52,30 @@ public abstract class AbstractJoinDescriptor extends OperatorDescriptorDual {
 		this.repartitionAllowed = repartitionAllowed;
 	}
 	
+	public void setCustomPartitioner(Partitioner<?> partitioner) {
+		customPartitioner = partitioner;
+	}
+	
 	@Override
 	protected List<GlobalPropertiesPair> createPossibleGlobalProperties() {
 		ArrayList<GlobalPropertiesPair> pairs = new ArrayList<GlobalPropertiesPair>();
 		
 		if (repartitionAllowed) {
-			// partition both (hash)
+			// partition both (hash or custom)
 			RequestedGlobalProperties partitioned1 = new RequestedGlobalProperties();
-			partitioned1.setHashPartitioned(this.keys1);
+			if (customPartitioner == null) {
+				partitioned1.setHashPartitioned(this.keys1);
+			} else {
+				partitioned1.setCustomPartitioned(this.keys1, this.customPartitioner);
+			}
+			
 			RequestedGlobalProperties partitioned2 = new RequestedGlobalProperties();
-			partitioned2.setHashPartitioned(this.keys2);
+			if (customPartitioner == null) {
+				partitioned2.setHashPartitioned(this.keys2);
+			} else {
+				partitioned2.setCustomPartitioned(this.keys2, this.customPartitioner);
+			}
+			
 			pairs.add(new GlobalPropertiesPair(partitioned1, partitioned2));
 		}
 		

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2000b45c/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupReduceProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupReduceProperties.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupReduceProperties.java
index bf09bcc..ab93170 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupReduceProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupReduceProperties.java
@@ -21,6 +21,7 @@ package org.apache.flink.compiler.operators;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.Ordering;
 import org.apache.flink.api.common.operators.util.FieldSet;
@@ -38,12 +39,22 @@ public final class GroupReduceProperties extends OperatorDescriptorSingle {
 	
 	private final Ordering ordering;		// ordering that we need to use if an additional ordering is requested 
 
+	private final Partitioner<?> customPartitioner;
+	
 	
 	public GroupReduceProperties(FieldSet keys) {
-		this(keys, null);
+		this(keys, null, null);
+	}
+	
+	public GroupReduceProperties(FieldSet keys, Ordering additionalOrderKeys) {
+		this(keys, additionalOrderKeys, null);
 	}
 	
-	public GroupReduceProperties(FieldSet groupKeys, Ordering additionalOrderKeys) {
+	public GroupReduceProperties(FieldSet keys, Partitioner<?> customPartitioner) {
+		this(keys, null, customPartitioner);
+	}
+	
+	public GroupReduceProperties(FieldSet groupKeys, Ordering additionalOrderKeys, Partitioner<?> customPartitioner) {
 		super(groupKeys);
 		
 		// if we have an additional ordering, construct the ordering to have primarily the grouping fields
@@ -59,9 +70,12 @@ public final class GroupReduceProperties extends OperatorDescriptorSingle {
 				Order order = additionalOrderKeys.getOrder(i);
 				this.ordering.appendOrdering(field, additionalOrderKeys.getType(i), order);
 			}
-		} else {
+		}
+		else {
 			this.ordering = null;
 		}
+		
+		this.customPartitioner = customPartitioner;
 	}
 	
 	@Override
@@ -71,13 +85,18 @@ public final class GroupReduceProperties extends OperatorDescriptorSingle {
 
 	@Override
 	public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
-		return new SingleInputPlanNode(node, "Reduce("+node.getPactContract().getName()+")", in, DriverStrategy.SORTED_GROUP_REDUCE, this.keyList);
+		return new SingleInputPlanNode(node, "GroupReduce ("+node.getPactContract().getName()+")", in, DriverStrategy.SORTED_GROUP_REDUCE, this.keyList);
 	}
 
 	@Override
 	protected List<RequestedGlobalProperties> createPossibleGlobalProperties() {
 		RequestedGlobalProperties props = new RequestedGlobalProperties();
-		props.setAnyPartitioning(this.keys);
+		
+		if (customPartitioner == null) {
+			props.setAnyPartitioning(this.keys);
+		} else {
+			props.setCustomPartitioned(this.keys, this.customPartitioner);
+		}
 		return Collections.singletonList(props);
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2000b45c/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupReduceWithCombineProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupReduceWithCombineProperties.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupReduceWithCombineProperties.java
index 92b2297..8604951 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupReduceWithCombineProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupReduceWithCombineProperties.java
@@ -21,6 +21,7 @@ package org.apache.flink.compiler.operators;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.operators.Ordering;
 import org.apache.flink.api.common.operators.util.FieldSet;
@@ -42,12 +43,22 @@ public final class GroupReduceWithCombineProperties extends OperatorDescriptorSi
 	
 	private final Ordering ordering;		// ordering that we need to use if an additional ordering is requested 
 	
+	private final Partitioner<?> customPartitioner;
 	
-	public GroupReduceWithCombineProperties(FieldSet keys) {
-		this(keys, null);
+	
+	public GroupReduceWithCombineProperties(FieldSet groupKeys) {
+		this(groupKeys, null, null);
 	}
 	
 	public GroupReduceWithCombineProperties(FieldSet groupKeys, Ordering additionalOrderKeys) {
+		this(groupKeys, additionalOrderKeys, null);
+	}
+	
+	public GroupReduceWithCombineProperties(FieldSet groupKeys, Partitioner<?> customPartitioner) {
+		this(groupKeys, null, customPartitioner);
+	}
+	
+	public GroupReduceWithCombineProperties(FieldSet groupKeys, Ordering additionalOrderKeys, Partitioner<?> customPartitioner) {
 		super(groupKeys);
 		
 		// if we have an additional ordering, construct the ordering to have primarily the grouping fields
@@ -66,6 +77,8 @@ public final class GroupReduceWithCombineProperties extends OperatorDescriptorSi
 		} else {
 			this.ordering = null;
 		}
+		
+		this.customPartitioner = customPartitioner;
 	}
 	
 	@Override
@@ -111,7 +124,11 @@ public final class GroupReduceWithCombineProperties extends OperatorDescriptorSi
 	@Override
 	protected List<RequestedGlobalProperties> createPossibleGlobalProperties() {
 		RequestedGlobalProperties props = new RequestedGlobalProperties();
-		props.setAnyPartitioning(this.keys);
+		if (customPartitioner == null) {
+			props.setAnyPartitioning(this.keys);
+		} else {
+			props.setCustomPartitioned(this.keys, this.customPartitioner);
+		}
 		return Collections.singletonList(props);
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2000b45c/flink-compiler/src/main/java/org/apache/flink/compiler/operators/ReduceProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/ReduceProperties.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/ReduceProperties.java
index 9d2e86a..813af20 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/ReduceProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/ReduceProperties.java
@@ -21,6 +21,7 @@ package org.apache.flink.compiler.operators;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.operators.util.FieldSet;
 import org.apache.flink.compiler.costs.Costs;
 import org.apache.flink.compiler.dag.ReduceNode;
@@ -38,8 +39,15 @@ import org.apache.flink.runtime.operators.util.LocalStrategy;
 
 public final class ReduceProperties extends OperatorDescriptorSingle {
 	
+	private final Partitioner<?> customPartitioner;
+	
 	public ReduceProperties(FieldSet keys) {
+		this(keys, null);
+	}
+	
+	public ReduceProperties(FieldSet keys, Partitioner<?> customPartitioner) {
 		super(keys);
+		this.customPartitioner = customPartitioner;
 	}
 	
 	@Override
@@ -77,7 +85,11 @@ public final class ReduceProperties extends OperatorDescriptorSingle {
 	@Override
 	protected List<RequestedGlobalProperties> createPossibleGlobalProperties() {
 		RequestedGlobalProperties props = new RequestedGlobalProperties();
-		props.setAnyPartitioning(this.keys);
+		if (customPartitioner == null) {
+			props.setAnyPartitioning(this.keys);
+		} else {
+			props.setCustomPartitioned(this.keys, this.customPartitioner);
+		}
 		return Collections.singletonList(props);
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2000b45c/flink-compiler/src/main/java/org/apache/flink/compiler/operators/SortMergeJoinDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/SortMergeJoinDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/SortMergeJoinDescriptor.java
index 5c6de30..cd6094e 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/SortMergeJoinDescriptor.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/SortMergeJoinDescriptor.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.compiler.operators;
 
 import java.util.Collections;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2000b45c/flink-compiler/src/main/java/org/apache/flink/compiler/plan/Channel.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/Channel.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/Channel.java
index 5fb03f5..e159481 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/Channel.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/Channel.java
@@ -16,10 +16,10 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.compiler.plan;
 
 import org.apache.flink.api.common.distributions.DataDistribution;
+import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
@@ -36,7 +36,7 @@ import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
 
 /**
- * 
+ * A Channel is a data exchange between two operators.
  */
 public class Channel implements EstimateProvider, Cloneable, DumpableConnection<PlanNode> {
 	
@@ -72,6 +72,8 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
 	
 	private DataDistribution dataDistribution;
 	
+	private Partitioner<?> partitioner;
+	
 	private TempMode tempMode;
 	
 	private double relativeTempMemory;
@@ -125,17 +127,27 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
 	}
 	
 	public void setShipStrategy(ShipStrategyType strategy) {
-		setShipStrategy(strategy, null, null);
+		setShipStrategy(strategy, null, null, null);
 	}
 	
 	public void setShipStrategy(ShipStrategyType strategy, FieldList keys) {
-		setShipStrategy(strategy, keys, null);
+		setShipStrategy(strategy, keys, null, null);
 	}
 	
 	public void setShipStrategy(ShipStrategyType strategy, FieldList keys, boolean[] sortDirection) {
+		setShipStrategy(strategy, keys, sortDirection, null);
+	}
+	
+	public void setShipStrategy(ShipStrategyType strategy, FieldList keys, Partitioner<?> partitioner) {
+		setShipStrategy(strategy, keys, null, partitioner);
+	}
+	
+	public void setShipStrategy(ShipStrategyType strategy, FieldList keys, boolean[] sortDirection, Partitioner<?> partitioner) {
 		this.shipStrategy = strategy;
 		this.shipKeys = keys;
 		this.shipSortOrder = sortDirection;
+		this.partitioner = partitioner;
+		
 		this.globalProps = null;		// reset the global properties
 	}
 	
@@ -187,6 +199,10 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
 		return this.dataDistribution;
 	}
 	
+	public Partitioner<?> getPartitioner() {
+		return partitioner;
+	}
+	
 	public TempMode getTempMode() {
 		return this.tempMode;
 	}
@@ -245,7 +261,6 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
 	public TypeSerializerFactory<?> getSerializer() {
 		return serializer;
 	}
-
 	
 	/**
 	 * Sets the serializer for this Channel.
@@ -381,6 +396,9 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
 				case PARTITION_FORCED_REBALANCE:
 					this.globalProps.setForcedRebalanced();
 					break;
+				case PARTITION_CUSTOM:
+					this.globalProps.setCustomPartitioned(this.shipKeys, this.partitioner);
+					break;
 				case NONE:
 					throw new CompilerException("Cannot produce GlobalProperties before ship strategy is set.");
 			}
@@ -411,6 +429,7 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
 		switch (this.shipStrategy) {
 			case BROADCAST:
 			case PARTITION_HASH:
+			case PARTITION_CUSTOM:
 			case PARTITION_RANGE:
 			case PARTITION_RANDOM:
 			case PARTITION_FORCED_REBALANCE:
@@ -448,6 +467,7 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection<
 		case PARTITION_RANGE:
 		case PARTITION_RANDOM:
 		case PARTITION_FORCED_REBALANCE:
+		case PARTITION_CUSTOM:
 			return;
 		}
 		throw new CompilerException("Unrecognized Ship Strategy Type: " + this.shipStrategy);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2000b45c/flink-compiler/src/main/java/org/apache/flink/compiler/plandump/PlanJSONDumpGenerator.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plandump/PlanJSONDumpGenerator.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plandump/PlanJSONDumpGenerator.java
index 41dfd9b..7728948 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/plandump/PlanJSONDumpGenerator.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plandump/PlanJSONDumpGenerator.java
@@ -336,6 +336,9 @@ public class PlanJSONDumpGenerator {
 					case PARTITION_FORCED_REBALANCE:
 						shipStrategy = "Rebalance";
 						break;
+					case PARTITION_CUSTOM:
+						shipStrategy = "Custom Partition";
+						break;
 					default:
 						throw new CompilerException("Unknown ship strategy '" + inConn.getShipStrategy().name()
 							+ "' in JSON generator.");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2000b45c/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
index b717924..64eca7c 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
@@ -1046,6 +1046,7 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 			case PARTITION_RANDOM:
 			case BROADCAST:
 			case PARTITION_HASH:
+			case PARTITION_CUSTOM:
 			case PARTITION_RANGE:
 			case PARTITION_FORCED_REBALANCE:
 				distributionPattern = DistributionPattern.BIPARTITE;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2000b45c/flink-compiler/src/test/java/org/apache/flink/compiler/FeedbackPropertiesMatchTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/FeedbackPropertiesMatchTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/FeedbackPropertiesMatchTest.java
index 5d45159..e3f5267 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/FeedbackPropertiesMatchTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/FeedbackPropertiesMatchTest.java
@@ -37,7 +37,7 @@ import org.apache.flink.api.common.operators.util.FieldSet;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.compiler.dag.DataSourceNode;
 import org.apache.flink.compiler.dag.MapNode;
-import org.apache.flink.compiler.dag.MatchNode;
+import org.apache.flink.compiler.dag.JoinNode;
 import org.apache.flink.compiler.dataproperties.GlobalProperties;
 import org.apache.flink.compiler.dataproperties.LocalProperties;
 import org.apache.flink.compiler.dataproperties.RequestedGlobalProperties;
@@ -1429,7 +1429,7 @@ public class FeedbackPropertiesMatchTest {
 		return new MapNode(new MapOperatorBase<String, String, MapFunction<String,String>>(new IdentityMapper<String>(), new UnaryOperatorInformation<String, String>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), "map op"));
 	}
 	
-	private static final MatchNode getJoinNode() {
-		return new MatchNode(new JoinOperatorBase<String, String, String, FlatJoinFunction<String, String, String>>(new DummyFlatJoinFunction<String>(), new BinaryOperatorInformation<String, String, String>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), new int[] {1}, new int[] {2}, "join op"));
+	private static final JoinNode getJoinNode() {
+		return new JoinNode(new JoinOperatorBase<String, String, String, FlatJoinFunction<String, String, String>>(new DummyFlatJoinFunction<String>(), new BinaryOperatorInformation<String, String, String>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), new int[] {1}, new int[] {2}, "join op"));
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2000b45c/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/CustomPartitioningGlobalOptimizationTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/CustomPartitioningGlobalOptimizationTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/CustomPartitioningGlobalOptimizationTest.java
new file mode 100644
index 0000000..34484d7
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/CustomPartitioningGlobalOptimizationTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.custompartition;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.compiler.CompilerTestBase;
+import org.apache.flink.compiler.plan.DualInputPlanNode;
+import org.apache.flink.compiler.plan.OptimizedPlan;
+import org.apache.flink.compiler.plan.SingleInputPlanNode;
+import org.apache.flink.compiler.plan.SinkPlanNode;
+import org.apache.flink.compiler.testfunctions.IdentityGroupReducer;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+
+
+@SuppressWarnings({"serial", "unchecked"})
+public class CustomPartitioningGlobalOptimizationTest extends CompilerTestBase {
+
+	@Test
+	public void testJoinReduceCombination() {
+		try {
+			final Partitioner<Long> partitioner = new TestPartitionerLong();
+			
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Tuple2<Long, Long>> input1 = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
+			DataSet<Tuple3<Long, Long, Long>> input2 = env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L));
+			
+			DataSet<Tuple3<Long, Long, Long>> joined = input1.join(input2)
+				.where(1).equalTo(0)
+				.projectFirst(0,1).projectSecond(2).types(Long.class, Long.class, Long.class)
+				.withPartitioner(partitioner);
+				
+			joined.groupBy(1).withPartitioner(partitioner)
+				.reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>())
+				.print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			SingleInputPlanNode reducer = (SingleInputPlanNode) sink.getInput().getSource();
+			
+			assertTrue("Reduce is not chained, property reuse does not happen", 
+					reducer.getInput().getSource() instanceof DualInputPlanNode);
+			
+			DualInputPlanNode join = (DualInputPlanNode) reducer.getInput().getSource();
+			
+			assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput1().getShipStrategy());
+			assertEquals(ShipStrategyType.PARTITION_CUSTOM, join.getInput2().getShipStrategy());
+			assertEquals(partitioner, join.getInput1().getPartitioner());
+			assertEquals(partitioner, join.getInput2().getPartitioner());
+			
+			assertEquals(ShipStrategyType.FORWARD, reducer.getInput().getShipStrategy());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	private static class TestPartitionerLong implements Partitioner<Long> {
+		@Override
+		public int partition(Long key, int numPartitions) {
+			return 0;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2000b45c/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/CustomPartitioningTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/CustomPartitioningTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/CustomPartitioningTest.java
new file mode 100644
index 0000000..67505bf
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/CustomPartitioningTest.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.custompartition;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.compiler.CompilerTestBase;
+import org.apache.flink.compiler.plan.OptimizedPlan;
+import org.apache.flink.compiler.plan.SingleInputPlanNode;
+import org.apache.flink.compiler.plan.SinkPlanNode;
+import org.apache.flink.compiler.testfunctions.IdentityPartitionerMapper;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.junit.Test;
+
+@SuppressWarnings({"serial", "unchecked"})
+public class CustomPartitioningTest extends CompilerTestBase {
+
+	@Test
+	public void testPartitionTuples() {
+		try {
+			final Partitioner<Integer> part = new TestPartitionerInt();
+			final int parallelism = 4;
+			
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(parallelism);
+			
+			DataSet<Tuple2<Integer, Integer>> data = env.fromElements(new Tuple2<Integer,Integer>(0, 0))
+					.rebalance();
+			
+			data
+				.partitionCustom(part, 0)
+				.mapPartition(new IdentityPartitionerMapper<Tuple2<Integer,Integer>>())
+				.print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			SingleInputPlanNode mapper = (SingleInputPlanNode) sink.getInput().getSource();
+			SingleInputPlanNode partitioner = (SingleInputPlanNode) mapper.getInput().getSource();
+			SingleInputPlanNode balancer = (SingleInputPlanNode) partitioner.getInput().getSource();
+			
+			assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+			assertEquals(parallelism, sink.getDegreeOfParallelism());
+			
+			assertEquals(ShipStrategyType.FORWARD, mapper.getInput().getShipStrategy());
+			assertEquals(parallelism, mapper.getDegreeOfParallelism());
+			
+			assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitioner.getInput().getShipStrategy());
+			assertEquals(part, partitioner.getInput().getPartitioner());
+			assertEquals(parallelism, partitioner.getDegreeOfParallelism());
+			
+			assertEquals(ShipStrategyType.PARTITION_FORCED_REBALANCE, balancer.getInput().getShipStrategy());
+			assertEquals(parallelism, balancer.getDegreeOfParallelism());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testPartitionTuplesInvalidType() {
+		try {
+			final int parallelism = 4;
+			
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(parallelism);
+			
+			DataSet<Tuple2<Integer, Integer>> data = env.fromElements(new Tuple2<Integer,Integer>(0, 0))
+					.rebalance();
+			
+			try {
+				data
+					.partitionCustom(new TestPartitionerLong(), 0);
+				fail("Should throw an exception");
+			}
+			catch (InvalidProgramException e) {
+				// expected
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testPartitionPojo() {
+		try {
+			final Partitioner<Integer> part = new TestPartitionerInt();
+			final int parallelism = 4;
+			
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(parallelism);
+			
+			DataSet<Pojo> data = env.fromElements(new Pojo())
+					.rebalance();
+			
+			data
+				.partitionCustom(part, "a")
+				.mapPartition(new IdentityPartitionerMapper<Pojo>())
+				.print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			SingleInputPlanNode mapper = (SingleInputPlanNode) sink.getInput().getSource();
+			SingleInputPlanNode partitioner = (SingleInputPlanNode) mapper.getInput().getSource();
+			SingleInputPlanNode balancer = (SingleInputPlanNode) partitioner.getInput().getSource();
+			
+			assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+			assertEquals(parallelism, sink.getDegreeOfParallelism());
+			
+			assertEquals(ShipStrategyType.FORWARD, mapper.getInput().getShipStrategy());
+			assertEquals(parallelism, mapper.getDegreeOfParallelism());
+			
+			assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitioner.getInput().getShipStrategy());
+			assertEquals(part, partitioner.getInput().getPartitioner());
+			assertEquals(parallelism, partitioner.getDegreeOfParallelism());
+			
+			assertEquals(ShipStrategyType.PARTITION_FORCED_REBALANCE, balancer.getInput().getShipStrategy());
+			assertEquals(parallelism, balancer.getDegreeOfParallelism());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testPartitionPojoInvalidType() {
+		try {
+			final int parallelism = 4;
+			
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(parallelism);
+			
+			DataSet<Pojo> data = env.fromElements(new Pojo())
+					.rebalance();
+			
+			try {
+				data
+					.partitionCustom(new TestPartitionerLong(), "a");
+				fail("Should throw an exception");
+			}
+			catch (InvalidProgramException e) {
+				// expected
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testPartitionKeySelector() {
+		try {
+			final Partitioner<Integer> part = new TestPartitionerInt();
+			final int parallelism = 4;
+			
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(parallelism);
+			
+			DataSet<Pojo> data = env.fromElements(new Pojo())
+					.rebalance();
+			
+			data
+				.partitionCustom(part, new TestKeySelectorInt<Pojo>())
+				.mapPartition(new IdentityPartitionerMapper<Pojo>())
+				.print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			SingleInputPlanNode mapper = (SingleInputPlanNode) sink.getInput().getSource();
+			SingleInputPlanNode keyRemover = (SingleInputPlanNode) mapper.getInput().getSource();
+			SingleInputPlanNode partitioner = (SingleInputPlanNode) keyRemover.getInput().getSource();
+			SingleInputPlanNode keyExtractor = (SingleInputPlanNode) partitioner.getInput().getSource();
+			SingleInputPlanNode balancer = (SingleInputPlanNode) keyExtractor.getInput().getSource();
+			
+			assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+			assertEquals(parallelism, sink.getDegreeOfParallelism());
+			
+			assertEquals(ShipStrategyType.FORWARD, mapper.getInput().getShipStrategy());
+			assertEquals(parallelism, mapper.getDegreeOfParallelism());
+			
+			assertEquals(ShipStrategyType.FORWARD, keyRemover.getInput().getShipStrategy());
+			assertEquals(parallelism, keyRemover.getDegreeOfParallelism());
+			
+			assertEquals(ShipStrategyType.PARTITION_CUSTOM, partitioner.getInput().getShipStrategy());
+			assertEquals(part, partitioner.getInput().getPartitioner());
+			assertEquals(parallelism, partitioner.getDegreeOfParallelism());
+			
+			assertEquals(ShipStrategyType.FORWARD, keyExtractor.getInput().getShipStrategy());
+			assertEquals(parallelism, keyExtractor.getDegreeOfParallelism());
+			
+			assertEquals(ShipStrategyType.PARTITION_FORCED_REBALANCE, balancer.getInput().getShipStrategy());
+			assertEquals(parallelism, balancer.getDegreeOfParallelism());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testPartitionKeySelectorInvalidType() {
+		try {
+			final Partitioner<Integer> part = (Partitioner<Integer>) (Partitioner<?>) new TestPartitionerLong();
+			final int parallelism = 4;
+			
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(parallelism);
+			
+			DataSet<Pojo> data = env.fromElements(new Pojo())
+					.rebalance();
+			
+			try {
+				data
+					.partitionCustom(part, new TestKeySelectorInt<Pojo>());
+				fail("Should throw an exception");
+			}
+			catch (InvalidProgramException e) {
+				// expected
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public static class Pojo {
+		public int a;
+		public int b;
+	}
+	
+	private static class TestPartitionerInt implements Partitioner<Integer> {
+		@Override
+		public int partition(Integer key, int numPartitions) {
+			return 0;
+		}
+	}
+	
+	private static class TestPartitionerLong implements Partitioner<Long> {
+		@Override
+		public int partition(Long key, int numPartitions) {
+			return 0;
+		}
+	}
+	
+	private static class TestKeySelectorInt<T> implements KeySelector<T, Integer> {
+		@Override
+		public Integer getKey(T value) {
+			return null;
+		}
+	}
+}