You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2014/09/24 12:24:17 UTC

[3/3] git commit: [FLINK-1073] Enables sorted group input for GroupReduce combiners. - GroupReduceCombineDriver uses separate comparators for sorting and grouping. - Adding support for multiple comparators for a single input driver required some refactor

[FLINK-1073] Enables sorted group input for GroupReduce combiners.
- GroupReduceCombineDriver uses separate comparators for sorting and grouping.
- Adding support for multiple comparators for a single input driver required some refactoring.

This closes #109


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/e5731e0e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/e5731e0e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/e5731e0e

Branch: refs/heads/master
Commit: e5731e0ed7f7294e6f73970fed8315cb572c9cfd
Parents: 4b6d2c5
Author: Fabian Hueske <fh...@apache.org>
Authored: Tue Sep 2 15:51:44 2014 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Sep 24 12:22:47 2014 +0200

----------------------------------------------------------------------
 .../GroupReduceWithCombineProperties.java       |  6 +-
 .../operators/PartialGroupProperties.java       | 10 ++-
 .../compiler/plan/SingleInputPlanNode.java      | 82 +++++++++++++----
 .../plantranslate/NepheleJobGraphGenerator.java |  5 +-
 .../postpass/GenericFlatTypePostPass.java       |  6 +-
 .../compiler/postpass/JavaApiPostPass.java      |  9 +-
 .../apache/flink/compiler/GroupOrderTest.java   |  2 +-
 .../compiler/GroupReduceCompilationTest.java    | 14 +--
 .../flink/compiler/ReduceCompilationTest.java   |  8 +-
 .../WorksetIterationsJavaApiCompilerTest.java   |  6 +-
 .../WorksetIterationsRecordApiCompilerTest.java |  6 +-
 .../AbstractCachedBuildSideMatchDriver.java     |  4 +-
 .../runtime/operators/AllGroupReduceDriver.java |  4 +-
 .../runtime/operators/AllReduceDriver.java      |  4 +-
 .../flink/runtime/operators/CoGroupDriver.java  |  8 +-
 .../CoGroupWithSolutionSetFirstDriver.java      |  6 +-
 .../CoGroupWithSolutionSetSecondDriver.java     |  6 +-
 .../runtime/operators/CollectorMapDriver.java   |  4 +-
 .../flink/runtime/operators/CrossDriver.java    |  4 +-
 .../flink/runtime/operators/DriverStrategy.java | 64 +++++++-------
 .../flink/runtime/operators/FlatMapDriver.java  |  4 +-
 .../operators/GroupReduceCombineDriver.java     | 29 +++---
 .../runtime/operators/GroupReduceDriver.java    |  6 +-
 .../JoinWithSolutionSetFirstDriver.java         |  4 +-
 .../JoinWithSolutionSetSecondDriver.java        |  4 +-
 .../flink/runtime/operators/MapDriver.java      |  4 +-
 .../runtime/operators/MapPartitionDriver.java   |  4 +-
 .../flink/runtime/operators/MatchDriver.java    |  8 +-
 .../flink/runtime/operators/NoOpDriver.java     |  4 +-
 .../flink/runtime/operators/PactDriver.java     | 14 +--
 .../runtime/operators/PactTaskContext.java      |  2 +-
 .../runtime/operators/ReduceCombineDriver.java  |  6 +-
 .../flink/runtime/operators/ReduceDriver.java   |  6 +-
 .../runtime/operators/RegularPactTask.java      | 23 +++--
 .../SynchronousChainedCombineDriver.java        | 18 ++--
 .../runtime/operators/CachedMatchTaskTest.java  | 44 +++++-----
 .../operators/CoGroupTaskExternalITCase.java    |  4 +-
 .../runtime/operators/CoGroupTaskTest.java      | 36 ++++----
 .../operators/CombineTaskExternalITCase.java    |  6 +-
 .../runtime/operators/CombineTaskTest.java      |  9 +-
 .../operators/MatchTaskExternalITCase.java      | 12 +--
 .../flink/runtime/operators/MatchTaskTest.java  | 92 ++++++++++----------
 .../operators/ReduceTaskExternalITCase.java     |  8 +-
 .../flink/runtime/operators/ReduceTaskTest.java | 12 +--
 .../operators/chaining/ChainTaskTest.java       |  1 +
 .../operators/drivers/TestTaskContext.java      |  2 +-
 .../operators/testutils/DriverTestBase.java     |  4 +-
 .../compiler/examples/KMeansSingleStepTest.java |  7 +-
 .../examples/RelationalQueryCompilerTest.java   | 12 +--
 .../examples/WordCountCompilerTest.java         |  7 +-
 .../iterations/IterativeKMeansTest.java         |  7 +-
 ...mpensatableDanglingPageRankWithCombiner.java |  1 +
 52 files changed, 372 insertions(+), 286 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupReduceWithCombineProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupReduceWithCombineProperties.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupReduceWithCombineProperties.java
index f8cd87a..92b2297 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupReduceWithCombineProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupReduceWithCombineProperties.java
@@ -93,9 +93,13 @@ public final class GroupReduceWithCombineProperties extends OperatorDescriptorSi
 			combinerNode.setDegreeOfParallelism(in.getSource().getDegreeOfParallelism());
 
 			SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine("+node.getPactContract()
-					.getName()+")", toCombiner, DriverStrategy.SORTED_GROUP_COMBINE, this.keyList);
+					.getName()+")", toCombiner, DriverStrategy.SORTED_GROUP_COMBINE);
 			combiner.setCosts(new Costs(0, 0));
 			combiner.initProperties(toCombiner.getGlobalProperties(), toCombiner.getLocalProperties());
+			// set sorting comparator key info
+			combiner.setDriverKeyInfo(in.getLocalStrategyKeys(), in.getLocalStrategySortOrder(), 0);
+			// set grouping comparator key info
+			combiner.setDriverKeyInfo(this.keyList, 1);
 			
 			Channel toReducer = new Channel(combiner);
 			toReducer.setShipStrategy(in.getShipStrategy(), in.getShipStrategyKeys(), in.getShipStrategySortOrder());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-compiler/src/main/java/org/apache/flink/compiler/operators/PartialGroupProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/PartialGroupProperties.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/PartialGroupProperties.java
index 60ec090..cf33bbe 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/PartialGroupProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/PartialGroupProperties.java
@@ -51,8 +51,14 @@ public final class PartialGroupProperties extends OperatorDescriptorSingle {
 		GroupReduceNode combinerNode = new GroupReduceNode((GroupReduceOperatorBase<?, ?, ?>) node.getPactContract());
 		combinerNode.setDegreeOfParallelism(in.getSource().getDegreeOfParallelism());
 
-		return new SingleInputPlanNode(combinerNode, "Combine("+node.getPactContract().getName()+")", in,
-				DriverStrategy.SORTED_GROUP_COMBINE, this.keyList);
+		SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine("+node.getPactContract().getName()+")", in,
+				DriverStrategy.SORTED_GROUP_COMBINE);
+		// sorting key info
+		combiner.setDriverKeyInfo(in.getLocalStrategyKeys(), in.getLocalStrategySortOrder(), 0);
+		// set grouping comparator key info
+		combiner.setDriverKeyInfo(this.keyList, 1);
+		
+		return combiner;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-compiler/src/main/java/org/apache/flink/compiler/plan/SingleInputPlanNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/SingleInputPlanNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/SingleInputPlanNode.java
index 485687e..9b9202b 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/SingleInputPlanNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/SingleInputPlanNode.java
@@ -29,6 +29,7 @@ import java.util.List;
 
 import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
+import org.apache.flink.compiler.CompilerException;
 import org.apache.flink.compiler.dag.OptimizerNode;
 import org.apache.flink.compiler.dag.SingleInputNode;
 import org.apache.flink.runtime.operators.DamBehavior;
@@ -43,11 +44,11 @@ public class SingleInputPlanNode extends PlanNode {
 	
 	protected final Channel input;
 	
-	protected final FieldList keys;
+	protected final FieldList[] driverKeys;
 	
-	protected final boolean[] sortOrders;
+	protected final boolean[][] driverSortOrders;
 	
-	private TypeComparatorFactory<?> comparator;
+	private TypeComparatorFactory<?>[] comparators;
 	
 	public Object postPassHelper;
 	
@@ -68,8 +69,15 @@ public class SingleInputPlanNode extends PlanNode {
 	{
 		super(template, nodeName, driverStrategy);
 		this.input = input;
-		this.keys = driverKeyFields;
-		this.sortOrders = driverSortOrders;
+		
+		this.comparators = new TypeComparatorFactory<?>[driverStrategy.getNumRequiredComparators()];
+		this.driverKeys = new FieldList[driverStrategy.getNumRequiredComparators()];
+		this.driverSortOrders = new boolean[driverStrategy.getNumRequiredComparators()][];
+		
+		if(driverStrategy.getNumRequiredComparators() > 0) {
+			this.driverKeys[0] = driverKeyFields;
+			this.driverSortOrders[0] = driverSortOrders;
+		}
 		
 		if (this.input.getShipStrategy() == ShipStrategyType.BROADCAST) {
 			this.input.setReplicationFactor(getDegreeOfParallelism());
@@ -81,6 +89,7 @@ public class SingleInputPlanNode extends PlanNode {
 		} else if (predNode.branchPlan != null) {
 			this.branchPlan.putAll(predNode.branchPlan);
 		}
+		
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -111,30 +120,71 @@ public class SingleInputPlanNode extends PlanNode {
 		return this.input.getSource();
 	}
 	
-	public FieldList getKeys() {
-		return this.keys;
+	/**
+	 * Sets the key field indexes for the specified driver comparator.
+	 * 
+	 * @param keys The key field indexes for the specified driver comparator.
+	 * @param id The ID of the driver comparator.
+	 */
+	public void setDriverKeyInfo(FieldList keys, int id) {
+		this.setDriverKeyInfo(keys, getTrueArray(keys.size()), id);
+	}
+	
+	/**
+	 * Sets the key field information for the specified driver comparator.
+	 * 
+	 * @param keys The key field indexes for the specified driver comparator.
+	 * @param sortOrder The key sort order for the specified driver comparator.
+	 * @param id The ID of the driver comparator.
+	 */
+	public void setDriverKeyInfo(FieldList keys, boolean[] sortOrder, int id) {
+		if(id < 0 || id >= driverKeys.length) {
+			throw new CompilerException("Invalid id for driver key information. DriverStrategy requires only "
+											+super.getDriverStrategy().getNumRequiredComparators()+" comparators.");
+		}
+		this.driverKeys[id] = keys;
+		this.driverSortOrders[id] = sortOrder;
+	}
+	
+	/**
+	 * Gets the key field indexes for the specified driver comparator.
+	 * 
+	 * @param id The id of the driver comparator for which the key field indexes are requested.
+	 * @return The key field indexes of the specified driver comparator.
+	 */
+	public FieldList getKeys(int id) {
+		return this.driverKeys[id];
 	}
 	
-	public boolean[] getSortOrders() {
-		return sortOrders;
+	/**
+	 * Gets the sort order for the specified driver comparator.
+	 * 
+	 * @param id The id of the driver comparator for which the sort order is requested.
+	 * @return The sort order of the specified driver comparator.
+	 */
+	public boolean[] getSortOrders(int id) {
+		return driverSortOrders[id];
 	}
 	
 	/**
-	 * Gets the comparator from this PlanNode.
+	 * Gets the specified comparator from this PlanNode.
+	 * 
+	 * @param id The ID of the requested comparator.
 	 *
-	 * @return The comparator.
+	 * @return The specified comparator.
 	 */
-	public TypeComparatorFactory<?> getComparator() {
-		return comparator;
+	public TypeComparatorFactory<?> getComparator(int id) {
+		return comparators[id];
 	}
 	
 	/**
-	 * Sets the comparator for this PlanNode.
+	 * Sets the specified comparator for this PlanNode.
 	 *
 	 * @param comparator The comparator to set.
+	 * @param id The ID of the comparator to set.
 	 */
-	public void setComparator(TypeComparatorFactory<?> comparator) {
-		this.comparator = comparator;
+	public void setComparator(TypeComparatorFactory<?> comparator, int id) {
+		this.comparators[id] = comparator;
 	}
 	
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
index 78ec671..da7e0a2 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
@@ -770,10 +770,9 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
 		
 		// set the driver strategy
 		config.setDriverStrategy(ds);
-		if (node.getComparator() != null) {
-			config.setDriverComparator(node.getComparator(), 0);
+		for(int i=0;i<ds.getNumRequiredComparators();i++) {
+			config.setDriverComparator(node.getComparator(i), i);
 		}
-		
 		// assign memory, file-handles, etc.
 		assignDriverResources(node, config);
 		return vertex;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/GenericFlatTypePostPass.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/GenericFlatTypePostPass.java b/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/GenericFlatTypePostPass.java
index 2697621..717a0c2 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/GenericFlatTypePostPass.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/GenericFlatTypePostPass.java
@@ -302,9 +302,9 @@ public abstract class GenericFlatTypePostPass<X, T extends AbstractSchema<X>> im
 			
 			if (createUtilities) {
 				// parameterize the node's driver strategy
-				if (sn.getDriverStrategy().requiresComparator()) {
+				for(int i=0;i<sn.getDriverStrategy().getNumRequiredComparators();i++) {
 					try {
-						sn.setComparator(createComparator(sn.getKeys(), sn.getSortOrders(), schema));
+						sn.setComparator(createComparator(sn.getKeys(i), sn.getSortOrders(i), schema),i);
 					} catch (MissingFieldTypeInfoException e) {
 						throw new CompilerPostPassException("Could not set up runtime strategy for node '" + 
 								optNode.getPactContract().getName() + "'. Missing type information for key field " +
@@ -371,7 +371,7 @@ public abstract class GenericFlatTypePostPass<X, T extends AbstractSchema<X>> im
 			
 			// parameterize the node's driver strategy
 			if (createUtilities) {
-				if (dn.getDriverStrategy().requiresComparator()) {
+				if (dn.getDriverStrategy().getNumRequiredComparators() > 0) {
 					// set the individual comparators
 					try {
 						dn.setComparator1(createComparator(dn.getKeysForInput1(), dn.getSortOrders(), schema1));

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/JavaApiPostPass.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/JavaApiPostPass.java b/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/JavaApiPostPass.java
index b1c088d..ea3cc87 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/JavaApiPostPass.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/JavaApiPostPass.java
@@ -161,11 +161,10 @@ public class JavaApiPostPass implements OptimizerPostPass {
 			SingleInputOperator<?, ?, ?> singleInputOperator = (SingleInputOperator<?, ?, ?>) sn.getOptimizerNode().getPactContract();
 			
 			// parameterize the node's driver strategy
-			if (sn.getDriverStrategy().requiresComparator()) {
-				sn.setComparator(createComparator(singleInputOperator.getOperatorInfo().getInputType(), sn.getKeys(),
-					getSortOrders(sn.getKeys(), sn.getSortOrders())));
+			for(int i=0;i<sn.getDriverStrategy().getNumRequiredComparators();i++) {
+				sn.setComparator(createComparator(singleInputOperator.getOperatorInfo().getInputType(), sn.getKeys(i),
+						getSortOrders(sn.getKeys(i), sn.getSortOrders(i))), i);
 			}
-			
 			// done, we can now propagate our info down
 			traverseChannel(sn.getInput());
 			
@@ -184,7 +183,7 @@ public class JavaApiPostPass implements OptimizerPostPass {
 			DualInputOperator<?, ?, ?, ?> dualInputOperator = (DualInputOperator<?, ?, ?, ?>) dn.getOptimizerNode().getPactContract();
 			
 			// parameterize the node's driver strategy
-			if (dn.getDriverStrategy().requiresComparator()) {
+			if (dn.getDriverStrategy().getNumRequiredComparators() > 0) {
 				dn.setComparator1(createComparator(dualInputOperator.getOperatorInfo().getFirstInputType(), dn.getKeysForInput1(),
 					getSortOrders(dn.getKeysForInput1(), dn.getSortOrders())));
 				dn.setComparator2(createComparator(dualInputOperator.getOperatorInfo().getSecondInputType(), dn.getKeysForInput2(),

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-compiler/src/test/java/org/apache/flink/compiler/GroupOrderTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/GroupOrderTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/GroupOrderTest.java
index 8c3a836..5009edd 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/GroupOrderTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/GroupOrderTest.java
@@ -95,7 +95,7 @@ public class GroupOrderTest extends CompilerTestBase  {
 		FieldList local = new FieldList(2, 5);
 		Assert.assertEquals(ship, c.getShipStrategyKeys());
 		Assert.assertEquals(local, c.getLocalStrategyKeys());
-		Assert.assertTrue(c.getLocalStrategySortOrder()[0] == reducer.getSortOrders()[0]);
+		Assert.assertTrue(c.getLocalStrategySortOrder()[0] == reducer.getSortOrders(0)[0]);
 		
 		// check that we indeed sort descending
 		Assert.assertTrue(c.getLocalStrategySortOrder()[1] == groupOrder.getFieldSortDirections()[0]);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java
index 0921364..29fc6b9 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java
@@ -167,7 +167,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 			assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy());
 			
 			// check the keys
-			assertEquals(new FieldList(1), reduceNode.getKeys());
+			assertEquals(new FieldList(1), reduceNode.getKeys(0));
 			assertEquals(new FieldList(1), reduceNode.getInput().getLocalStrategyKeys());
 			
 			// check DOP
@@ -222,8 +222,9 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 			assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combineNode.getDriverStrategy());
 			
 			// check the keys
-			assertEquals(new FieldList(1), reduceNode.getKeys());
-			assertEquals(new FieldList(1), combineNode.getKeys());
+			assertEquals(new FieldList(1), reduceNode.getKeys(0));
+			assertEquals(new FieldList(1), combineNode.getKeys(0));
+			assertEquals(new FieldList(1), combineNode.getKeys(1));
 			assertEquals(new FieldList(1), reduceNode.getInput().getLocalStrategyKeys());
 			
 			// check DOP
@@ -279,7 +280,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 			assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy());
 			
 			// check the keys
-			assertEquals(new FieldList(0), reduceNode.getKeys());
+			assertEquals(new FieldList(0), reduceNode.getKeys(0));
 			assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys());
 			
 			// check DOP
@@ -343,8 +344,9 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
 			assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combineNode.getDriverStrategy());
 			
 			// check the keys
-			assertEquals(new FieldList(0), reduceNode.getKeys());
-			assertEquals(new FieldList(0), combineNode.getKeys());
+			assertEquals(new FieldList(0), reduceNode.getKeys(0));
+			assertEquals(new FieldList(0), combineNode.getKeys(0));
+			assertEquals(new FieldList(0), combineNode.getKeys(1));
 			assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys());
 			
 			// check DOP

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-compiler/src/test/java/org/apache/flink/compiler/ReduceCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/ReduceCompilationTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/ReduceCompilationTest.java
index 2e09810..2ec32e6 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/ReduceCompilationTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/ReduceCompilationTest.java
@@ -175,8 +175,8 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
 			assertEquals(DriverStrategy.SORTED_PARTIAL_REDUCE, combineNode.getDriverStrategy());
 			
 			// check the keys
-			assertEquals(new FieldList(1), reduceNode.getKeys());
-			assertEquals(new FieldList(1), combineNode.getKeys());
+			assertEquals(new FieldList(1), reduceNode.getKeys(0));
+			assertEquals(new FieldList(1), combineNode.getKeys(0));
 			assertEquals(new FieldList(1), reduceNode.getInput().getLocalStrategyKeys());
 			
 			// check DOP
@@ -239,8 +239,8 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
 			assertEquals(DriverStrategy.SORTED_PARTIAL_REDUCE, combineNode.getDriverStrategy());
 			
 			// check the keys
-			assertEquals(new FieldList(0), reduceNode.getKeys());
-			assertEquals(new FieldList(0), combineNode.getKeys());
+			assertEquals(new FieldList(0), reduceNode.getKeys(0));
+			assertEquals(new FieldList(0), combineNode.getKeys(0));
 			assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys());
 			
 			// check DOP

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java
index ca47443..d07743e 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java
@@ -83,7 +83,7 @@ public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase {
 			
 			// verify reducer
 			assertEquals(ShipStrategyType.PARTITION_HASH, worksetReducer.getInput().getShipStrategy());
-			assertEquals(new FieldList(1, 2), worksetReducer.getKeys());
+			assertEquals(new FieldList(1, 2), worksetReducer.getKeys(0));
 			
 			// currently, the system may partition before or after the mapper
 			ShipStrategyType ss1 = deltaMapper.getInput().getShipStrategy();
@@ -129,7 +129,7 @@ public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase {
 			
 			// verify reducer
 			assertEquals(ShipStrategyType.PARTITION_HASH, worksetReducer.getInput().getShipStrategy());
-			assertEquals(new FieldList(1, 2), worksetReducer.getKeys());
+			assertEquals(new FieldList(1, 2), worksetReducer.getKeys(0));
 			
 			// verify solution delta
 			assertEquals(2, joinWithSolutionSetNode.getOutgoingChannels().size());
@@ -174,7 +174,7 @@ public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase {
 			
 			// verify reducer
 			assertEquals(ShipStrategyType.FORWARD, worksetReducer.getInput().getShipStrategy());
-			assertEquals(new FieldList(1, 2), worksetReducer.getKeys());
+			assertEquals(new FieldList(1, 2), worksetReducer.getKeys(0));
 			
 			
 			// verify solution delta

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsRecordApiCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsRecordApiCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsRecordApiCompilerTest.java
index fbdcd3d..62c5d84 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsRecordApiCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsRecordApiCompilerTest.java
@@ -97,7 +97,7 @@ public class WorksetIterationsRecordApiCompilerTest extends CompilerTestBase {
 		
 		// verify reducer
 		assertEquals(ShipStrategyType.PARTITION_HASH, worksetReducer.getInput().getShipStrategy());
-		assertEquals(list0, worksetReducer.getKeys());
+		assertEquals(list0, worksetReducer.getKeys(0));
 		
 		// currently, the system may partition before or after the mapper
 		ShipStrategyType ss1 = deltaMapper.getInput().getShipStrategy();
@@ -142,7 +142,7 @@ public class WorksetIterationsRecordApiCompilerTest extends CompilerTestBase {
 		
 		// verify reducer
 		assertEquals(ShipStrategyType.PARTITION_HASH, worksetReducer.getInput().getShipStrategy());
-		assertEquals(list0, worksetReducer.getKeys());
+		assertEquals(list0, worksetReducer.getKeys(0));
 		
 		
 		// verify solution delta
@@ -186,7 +186,7 @@ public class WorksetIterationsRecordApiCompilerTest extends CompilerTestBase {
 		
 		// verify reducer
 		assertEquals(ShipStrategyType.FORWARD, worksetReducer.getInput().getShipStrategy());
-		assertEquals(list0, worksetReducer.getKeys());
+		assertEquals(list0, worksetReducer.getKeys(0));
 		
 		
 		// verify solution delta

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideMatchDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideMatchDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideMatchDriver.java
index 3bbd768..286d830 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideMatchDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideMatchDriver.java
@@ -59,8 +59,8 @@ public abstract class AbstractCachedBuildSideMatchDriver<IT1, IT2, OT> extends M
 		
 		TypeSerializer<IT1> serializer1 = this.taskContext.<IT1>getInputSerializer(0).getSerializer();
 		TypeSerializer<IT2> serializer2 = this.taskContext.<IT2>getInputSerializer(1).getSerializer();
-		TypeComparator<IT1> comparator1 = this.taskContext.getInputComparator(0);
-		TypeComparator<IT2> comparator2 = this.taskContext.getInputComparator(1);
+		TypeComparator<IT1> comparator1 = this.taskContext.getDriverComparator(0);
+		TypeComparator<IT2> comparator2 = this.taskContext.getDriverComparator(1);
 		MutableObjectIterator<IT1> input1 = this.taskContext.getInput(0);
 		MutableObjectIterator<IT2> input2 = this.taskContext.getInput(1);
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
index 14e0b57..5e68bab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
@@ -71,8 +71,8 @@ public class AllGroupReduceDriver<IT, OT> implements PactDriver<GroupReduceFunct
 	}
 
 	@Override
-	public boolean requiresComparatorOnInput() {
-		return false;
+	public int getNumberOfDriverComparators() {
+		return 0;
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
index 60a79f1..e26f4eb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
@@ -70,8 +70,8 @@ public class AllReduceDriver<T> implements PactDriver<ReduceFunction<T>, T> {
 	}
 
 	@Override
-	public boolean requiresComparatorOnInput() {
-		return false;
+	public int getNumberOfDriverComparators() {
+		return 0;
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java
index 4a65ea8..55498fb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java
@@ -77,8 +77,8 @@ public class CoGroupDriver<IT1, IT2, OT> implements PactDriver<CoGroupFunction<I
 	
 
 	@Override
-	public boolean requiresComparatorOnInput() {
-		return true;
+	public int getNumberOfDriverComparators() {
+		return 2;
 	}
 	
 
@@ -96,8 +96,8 @@ public class CoGroupDriver<IT1, IT2, OT> implements PactDriver<CoGroupFunction<I
 		// get the key positions and types
 		final TypeSerializer<IT1> serializer1 = this.taskContext.<IT1>getInputSerializer(0).getSerializer();
 		final TypeSerializer<IT2> serializer2 = this.taskContext.<IT2>getInputSerializer(1).getSerializer();
-		final TypeComparator<IT1> groupComparator1 = this.taskContext.getInputComparator(0);
-		final TypeComparator<IT2> groupComparator2 = this.taskContext.getInputComparator(1);
+		final TypeComparator<IT1> groupComparator1 = this.taskContext.getDriverComparator(0);
+		final TypeComparator<IT2> groupComparator2 = this.taskContext.getDriverComparator(1);
 		
 		final TypePairComparatorFactory<IT1, IT2> pairComparatorFactory = config.getPairComparatorFactory(
 					this.taskContext.getUserCodeClassLoader());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
index 6dda3f0..bc1a4bf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
@@ -69,8 +69,8 @@ public class CoGroupWithSolutionSetFirstDriver<IT1, IT2, OT> implements Resettab
 	}
 	
 	@Override
-	public boolean requiresComparatorOnInput() {
-		return true;
+	public int getNumberOfDriverComparators() {
+		return 1;
 	}
 	
 	@Override
@@ -103,7 +103,7 @@ public class CoGroupWithSolutionSetFirstDriver<IT1, IT2, OT> implements Resettab
 		TypeComparator<IT1> buildSideComparator = hashTable.getBuildSideComparator().duplicate();
 		
 		probeSideSerializer = taskContext.<IT2>getInputSerializer(0).getSerializer();
-		probeSideComparator = taskContext.getInputComparator(0);
+		probeSideComparator = taskContext.getDriverComparator(0);
 		
 		solutionSideRecord = buildSideSerializer.createInstance();
 		

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
index 35fc1b7..3359bb2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
@@ -68,8 +68,8 @@ public class CoGroupWithSolutionSetSecondDriver<IT1, IT2, OT> implements Resetta
 	}
 	
 	@Override
-	public boolean requiresComparatorOnInput() {
-		return true;
+	public int getNumberOfDriverComparators() {
+		return 1;
 	}
 	
 	@Override
@@ -102,7 +102,7 @@ public class CoGroupWithSolutionSetSecondDriver<IT1, IT2, OT> implements Resetta
 		TypeComparator<IT2> buildSideComparator = hashTable.getBuildSideComparator().duplicate();
 		
 		probeSideSerializer = taskContext.<IT1>getInputSerializer(0).getSerializer();
-		probeSideComparator = taskContext.getInputComparator(0);
+		probeSideComparator = taskContext.getDriverComparator(0);
 		
 		solutionSideRecord = buildSideSerializer.createInstance();
 		

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CollectorMapDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CollectorMapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CollectorMapDriver.java
index 8a398d2..3858864 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CollectorMapDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CollectorMapDriver.java
@@ -63,8 +63,8 @@ public class CollectorMapDriver<IT, OT> implements PactDriver<GenericCollectorMa
 	}
 
 	@Override
-	public boolean requiresComparatorOnInput() {
-		return false;
+	public int getNumberOfDriverComparators() {
+		return 0;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java
index 6856702..4e6745a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java
@@ -88,8 +88,8 @@ public class CrossDriver<T1, T2, OT> implements PactDriver<CrossFunction<T1, T2,
 	
 
 	@Override
-	public boolean requiresComparatorOnInput() {
-		return false;
+	public int getNumberOfDriverComparators() {
+		return 0;
 	}
 
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
index 49e34c9..a133c6c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
@@ -34,66 +34,66 @@ import org.apache.flink.runtime.operators.chaining.SynchronousChainedCombineDriv
  */
 public enum DriverStrategy {
 	// no local strategy, as for sources and sinks
-	NONE(null, null, PIPELINED, false),
+	NONE(null, null, PIPELINED, 0),
 	// a unary no-op operator
-	UNARY_NO_OP(NoOpDriver.class, null, PIPELINED, PIPELINED, false),
+	UNARY_NO_OP(NoOpDriver.class, null, PIPELINED, PIPELINED, 0),
 	// a binary no-op operator. non implementation available
-	BINARY_NO_OP(null, null, PIPELINED, PIPELINED, false),
+	BINARY_NO_OP(null, null, PIPELINED, PIPELINED, 0),
 
 	// the old mapper
-	COLLECTOR_MAP(CollectorMapDriver.class, ChainedCollectorMapDriver.class, PIPELINED, false),
+	COLLECTOR_MAP(CollectorMapDriver.class, ChainedCollectorMapDriver.class, PIPELINED, 0),
 	// the proper mapper
-	MAP(MapDriver.class, ChainedMapDriver.class, PIPELINED, false),
+	MAP(MapDriver.class, ChainedMapDriver.class, PIPELINED, 0),
 
 	// the proper map partition
-	MAP_PARTITION(MapPartitionDriver.class, null, PIPELINED, false),
+	MAP_PARTITION(MapPartitionDriver.class, null, PIPELINED, 0),
 
 	// the flat mapper
-	FLAT_MAP(FlatMapDriver.class, ChainedFlatMapDriver.class, PIPELINED, false),
+	FLAT_MAP(FlatMapDriver.class, ChainedFlatMapDriver.class, PIPELINED, 0),
 
 	// group everything together into one group and apply the Reduce function
-	ALL_REDUCE(AllReduceDriver.class, null, PIPELINED, false),
+	ALL_REDUCE(AllReduceDriver.class, null, PIPELINED, 0),
 	// group everything together into one group and apply the GroupReduce function
-	ALL_GROUP_REDUCE(AllGroupReduceDriver.class, null, PIPELINED, false),
+	ALL_GROUP_REDUCE(AllGroupReduceDriver.class, null, PIPELINED, 0),
 	// group everything together into one group and apply the GroupReduce's combine function
-	ALL_GROUP_COMBINE(AllGroupReduceDriver.class, null, PIPELINED, false),
+	ALL_GROUP_COMBINE(AllGroupReduceDriver.class, null, PIPELINED, 0),
 
 	// grouping the inputs and apply the Reduce Function
-	SORTED_REDUCE(ReduceDriver.class, null, PIPELINED, true),
+	SORTED_REDUCE(ReduceDriver.class, null, PIPELINED, 1),
 	// sorted partial reduce is the combiner for the Reduce. same function, but potentially not fully sorted
-	SORTED_PARTIAL_REDUCE(ReduceCombineDriver.class, null, MATERIALIZING, true),
+	SORTED_PARTIAL_REDUCE(ReduceCombineDriver.class, null, MATERIALIZING, 1),
 	
 	// grouping the inputs and apply the GroupReduce function
-	SORTED_GROUP_REDUCE(GroupReduceDriver.class, null, PIPELINED, true),
+	SORTED_GROUP_REDUCE(GroupReduceDriver.class, null, PIPELINED, 1),
 	// partially grouping inputs (best effort resulting possibly in duplicates --> combiner)
-	SORTED_GROUP_COMBINE(GroupReduceCombineDriver.class, SynchronousChainedCombineDriver.class, MATERIALIZING, true),
+	SORTED_GROUP_COMBINE(GroupReduceCombineDriver.class, SynchronousChainedCombineDriver.class, MATERIALIZING, 2),
 
 	// both inputs are merged, but materialized to the side for block-nested-loop-join among values with equal key
-	MERGE(MatchDriver.class, null, MATERIALIZING, MATERIALIZING, true),
+	MERGE(MatchDriver.class, null, MATERIALIZING, MATERIALIZING, 2),
 
 	// co-grouping inputs
-	CO_GROUP(CoGroupDriver.class, null, PIPELINED, PIPELINED, true),
+	CO_GROUP(CoGroupDriver.class, null, PIPELINED, PIPELINED, 2),
 	
 	// the first input is build side, the second side is probe side of a hybrid hash table
-	HYBRIDHASH_BUILD_FIRST(MatchDriver.class, null, FULL_DAM, MATERIALIZING, true),
+	HYBRIDHASH_BUILD_FIRST(MatchDriver.class, null, FULL_DAM, MATERIALIZING, 2),
 	// the second input is build side, the first side is probe side of a hybrid hash table
-	HYBRIDHASH_BUILD_SECOND(MatchDriver.class, null, MATERIALIZING, FULL_DAM, true),
+	HYBRIDHASH_BUILD_SECOND(MatchDriver.class, null, MATERIALIZING, FULL_DAM, 2),
 	// a cached variant of HYBRIDHASH_BUILD_FIRST, that can only be used inside of iterations
-	HYBRIDHASH_BUILD_FIRST_CACHED(BuildFirstCachedMatchDriver.class, null, FULL_DAM, MATERIALIZING, true),
+	HYBRIDHASH_BUILD_FIRST_CACHED(BuildFirstCachedMatchDriver.class, null, FULL_DAM, MATERIALIZING, 2),
 	//  cached variant of HYBRIDHASH_BUILD_SECOND, that can only be used inside of iterations
-	HYBRIDHASH_BUILD_SECOND_CACHED(BuildSecondCachedMatchDriver.class, null, MATERIALIZING, FULL_DAM, true),
+	HYBRIDHASH_BUILD_SECOND_CACHED(BuildSecondCachedMatchDriver.class, null, MATERIALIZING, FULL_DAM, 2),
 	
 	// the second input is inner loop, the first input is outer loop and block-wise processed
-	NESTEDLOOP_BLOCKED_OUTER_FIRST(CrossDriver.class, null, MATERIALIZING, FULL_DAM, false),
+	NESTEDLOOP_BLOCKED_OUTER_FIRST(CrossDriver.class, null, MATERIALIZING, FULL_DAM, 0),
 	// the first input is inner loop, the second input is outer loop and block-wise processed
-	NESTEDLOOP_BLOCKED_OUTER_SECOND(CrossDriver.class, null, FULL_DAM, MATERIALIZING, false),
+	NESTEDLOOP_BLOCKED_OUTER_SECOND(CrossDriver.class, null, FULL_DAM, MATERIALIZING, 0),
 	// the second input is inner loop, the first input is outer loop and stream-processed
-	NESTEDLOOP_STREAMED_OUTER_FIRST(CrossDriver.class, null, PIPELINED, FULL_DAM, false),
+	NESTEDLOOP_STREAMED_OUTER_FIRST(CrossDriver.class, null, PIPELINED, FULL_DAM, 0),
 	// the first input is inner loop, the second input is outer loop and stream-processed
-	NESTEDLOOP_STREAMED_OUTER_SECOND(CrossDriver.class, null, FULL_DAM, PIPELINED, false),
+	NESTEDLOOP_STREAMED_OUTER_SECOND(CrossDriver.class, null, FULL_DAM, PIPELINED, 0),
 	
 	// union utility op. unions happen implicitly on the network layer (in the readers) when bundeling streams
-	UNION(null, null, FULL_DAM, FULL_DAM, false);
+	UNION(null, null, FULL_DAM, FULL_DAM, 0);
 	// explicit binary union between a streamed and a cached input
 //	UNION_WITH_CACHED(UnionWithTempOperator.class, null, FULL_DAM, PIPELINED, false);
 	
@@ -108,35 +108,35 @@ public enum DriverStrategy {
 	
 	private final int numInputs;
 	
-	private final boolean requiresComparator;
+	private final int numRequiredComparators;
 	
 
 	@SuppressWarnings("unchecked")
 	private DriverStrategy(
 			@SuppressWarnings("rawtypes") Class<? extends PactDriver> driverClass, 
 			@SuppressWarnings("rawtypes") Class<? extends ChainedDriver> pushChainDriverClass, 
-			DamBehavior dam, boolean comparator)
+			DamBehavior dam, int numComparator)
 	{
 		this.driverClass = (Class<? extends PactDriver<?, ?>>) driverClass;
 		this.pushChainDriver = (Class<? extends ChainedDriver<?, ?>>) pushChainDriverClass;
 		this.numInputs = 1;
 		this.dam1 = dam;
 		this.dam2 = null;
-		this.requiresComparator = comparator;
+		this.numRequiredComparators = numComparator;
 	}
 	
 	@SuppressWarnings("unchecked")
 	private DriverStrategy(
 			@SuppressWarnings("rawtypes") Class<? extends PactDriver> driverClass, 
 			@SuppressWarnings("rawtypes") Class<? extends ChainedDriver> pushChainDriverClass, 
-			DamBehavior firstDam, DamBehavior secondDam, boolean comparator)
+			DamBehavior firstDam, DamBehavior secondDam, int numComparator)
 	{
 		this.driverClass = (Class<? extends PactDriver<?, ?>>) driverClass;
 		this.pushChainDriver = (Class<? extends ChainedDriver<?, ?>>) pushChainDriverClass;
 		this.numInputs = 2;
 		this.dam1 = firstDam;
 		this.dam2 = secondDam;
-		this.requiresComparator = comparator;
+		this.numRequiredComparators = numComparator;
 	}
 	
 	// --------------------------------------------------------------------------------------------
@@ -180,7 +180,7 @@ public enum DriverStrategy {
 		return this.dam1.isMaterializing() || (this.dam2 != null && this.dam2.isMaterializing());
 	}
 	
-	public boolean requiresComparator() {
-		return this.requiresComparator;
+	public int getNumRequiredComparators() {
+		return this.numRequiredComparators;
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java
index b3f5e96..f4ae62d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java
@@ -62,8 +62,8 @@ public class FlatMapDriver<IT, OT> implements PactDriver<FlatMapFunction<IT, OT>
 	}
 
 	@Override
-	public boolean requiresComparatorOnInput() {
-		return false;
+	public int getNumberOfDriverComparators() {
+		return 0;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
index ffa3bf9..4323eae 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
@@ -60,7 +60,9 @@ public class GroupReduceCombineDriver<T> implements PactDriver<FlatCombineFuncti
 
 	private TypeSerializer<T> serializer;
 
-	private TypeComparator<T> comparator;
+	private TypeComparator<T> sortingComparator;
+	
+	private TypeComparator<T> groupingComparator;
 
 	private QuickSort sortAlgo = new QuickSort();
 
@@ -82,7 +84,7 @@ public class GroupReduceCombineDriver<T> implements PactDriver<FlatCombineFuncti
 	public int getNumberOfInputs() {
 		return 1;
 	}
-
+	
 	@Override
 	public Class<FlatCombineFunction<T>> getStubType() {
 		@SuppressWarnings("unchecked")
@@ -91,8 +93,8 @@ public class GroupReduceCombineDriver<T> implements PactDriver<FlatCombineFuncti
 	}
 
 	@Override
-	public boolean requiresComparatorOnInput() {
-		return true;
+	public int getNumberOfDriverComparators() {
+		return 2;
 	}
 
 	@Override
@@ -107,7 +109,8 @@ public class GroupReduceCombineDriver<T> implements PactDriver<FlatCombineFuncti
 
 		final TypeSerializerFactory<T> serializerFactory = this.taskContext.getInputSerializer(0);
 		this.serializer = serializerFactory.getSerializer();
-		this.comparator = this.taskContext.getInputComparator(0);
+		this.sortingComparator = this.taskContext.getDriverComparator(0);
+		this.groupingComparator = this.taskContext.getDriverComparator(1);
 		this.combiner = this.taskContext.getStub();
 		this.output = this.taskContext.getOutputCollector();
 
@@ -115,12 +118,12 @@ public class GroupReduceCombineDriver<T> implements PactDriver<FlatCombineFuncti
 				numMemoryPages);
 
 		// instantiate a fix-length in-place sorter, if possible, otherwise the out-of-place sorter
-		if (this.comparator.supportsSerializationWithKeyNormalization() &&
+		if (this.sortingComparator.supportsSerializationWithKeyNormalization() &&
 				this.serializer.getLength() > 0 && this.serializer.getLength() <= THRESHOLD_FOR_IN_PLACE_SORTING)
 		{
-			this.sorter = new FixedLengthRecordSorter<T>(this.serializer, this.comparator, memory);
+			this.sorter = new FixedLengthRecordSorter<T>(this.serializer, this.sortingComparator, memory);
 		} else {
-			this.sorter = new NormalizedKeySorter<T>(this.serializer, this.comparator.duplicate(), memory);
+			this.sorter = new NormalizedKeySorter<T>(this.serializer, this.sortingComparator.duplicate(), memory);
 		}
 	}
 
@@ -163,7 +166,7 @@ public class GroupReduceCombineDriver<T> implements PactDriver<FlatCombineFuncti
 			this.sortAlgo.sort(sorter);
 
 			final KeyGroupedIterator<T> keyIter = new KeyGroupedIterator<T>(sorter.getIterator(), this.serializer,
-					this.comparator);
+					this.groupingComparator);
 
 			final FlatCombineFunction<T> combiner = this.combiner;
 			final Collector<T> output = this.output;
@@ -177,12 +180,16 @@ public class GroupReduceCombineDriver<T> implements PactDriver<FlatCombineFuncti
 
 	@Override
 	public void cleanup() throws Exception {
-		this.memManager.release(this.sorter.dispose());
+		if(this.sorter != null) {
+			this.memManager.release(this.sorter.dispose());
+		}
 	}
 
 	@Override
 	public void cancel() {
 		this.running = false;
-		this.memManager.release(this.sorter.dispose());
+		if(this.sorter != null) {
+			this.memManager.release(this.sorter.dispose());
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
index 28c1761..feeceb0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
@@ -74,8 +74,8 @@ public class GroupReduceDriver<IT, OT> implements PactDriver<GroupReduceFunction
 	}
 
 	@Override
-	public boolean requiresComparatorOnInput() {
-		return true;
+	public int getNumberOfDriverComparators() {
+		return 1;
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -87,7 +87,7 @@ public class GroupReduceDriver<IT, OT> implements PactDriver<GroupReduceFunction
 			throw new Exception("Unrecognized driver strategy for GroupReduce driver: " + config.getDriverStrategy().name());
 		}
 		this.serializer = this.taskContext.<IT>getInputSerializer(0).getSerializer();
-		this.comparator = this.taskContext.getInputComparator(0);
+		this.comparator = this.taskContext.getDriverComparator(0);
 		this.input = this.taskContext.getInput(0);
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java
index 29461c8..8740ab7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java
@@ -68,8 +68,8 @@ public class JoinWithSolutionSetFirstDriver<IT1, IT2, OT> implements ResettableP
 	}
 	
 	@Override
-	public boolean requiresComparatorOnInput() {
-		return true;
+	public int getNumberOfDriverComparators() {
+		return 1;
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java
index f2ba9ec..f762602 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java
@@ -68,8 +68,8 @@ public class JoinWithSolutionSetSecondDriver<IT1, IT2, OT> implements Resettable
 	}
 	
 	@Override
-	public boolean requiresComparatorOnInput() {
-		return true;
+	public int getNumberOfDriverComparators() {
+		return 1;
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java
index f4499c1..2d98954 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java
@@ -62,8 +62,8 @@ public class MapDriver<IT, OT> implements PactDriver<MapFunction<IT, OT>, OT> {
 	}
 
 	@Override
-	public boolean requiresComparatorOnInput() {
-		return false;
+	public int getNumberOfDriverComparators() {
+		return 0;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java
index 9c5094d..41aa312 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java
@@ -59,8 +59,8 @@ public class MapPartitionDriver<IT, OT> implements PactDriver<MapPartitionFuncti
 	}
 
 	@Override
-	public boolean requiresComparatorOnInput() {
-		return false;
+	public int getNumberOfDriverComparators() {
+		return 0;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java
index f9ede1c..82f24d7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java
@@ -75,8 +75,8 @@ public class MatchDriver<IT1, IT2, OT> implements PactDriver<FlatJoinFunction<IT
 	}
 	
 	@Override
-	public boolean requiresComparatorOnInput() {
-		return true;
+	public int getNumberOfDriverComparators() {
+		return 2;
 	}
 
 	@Override
@@ -100,8 +100,8 @@ public class MatchDriver<IT1, IT2, OT> implements PactDriver<FlatJoinFunction<IT
 		// get the key positions and types
 		final TypeSerializer<IT1> serializer1 = this.taskContext.<IT1>getInputSerializer(0).getSerializer();
 		final TypeSerializer<IT2> serializer2 = this.taskContext.<IT2>getInputSerializer(1).getSerializer();
-		final TypeComparator<IT1> comparator1 = this.taskContext.getInputComparator(0);
-		final TypeComparator<IT2> comparator2 = this.taskContext.getInputComparator(1);
+		final TypeComparator<IT1> comparator1 = this.taskContext.getDriverComparator(0);
+		final TypeComparator<IT2> comparator2 = this.taskContext.getDriverComparator(1);
 		
 		final TypePairComparatorFactory<IT1, IT2> pairComparatorFactory = config.getPairComparatorFactory(
 				this.taskContext.getUserCodeClassLoader());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
index 82bc7b7..3eeb2d5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
@@ -52,8 +52,8 @@ public class NoOpDriver<T> implements PactDriver<AbstractRichFunction, T> {
 	}
 
 	@Override
-	public boolean requiresComparatorOnInput() {
-		return false;
+	public int getNumberOfDriverComparators() {
+		return 0;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactDriver.java
index 6fe2770..d0f4116 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactDriver.java
@@ -44,19 +44,19 @@ public interface PactDriver<S extends Function, OT> {
 	int getNumberOfInputs();
 	
 	/**
-	 * Gets the class of the stub type that is run by this task. For example, a <tt>MapTask</tt> should return
-	 * <code>MapFunction.class</code>.   
+	 * Gets the number of comparators required for this driver.
 	 * 
-	 * @return The class of the stub type run by the task.
+	 * @return The number of comparators required for this driver.
 	 */
-	Class<S> getStubType();
+	int getNumberOfDriverComparators();
 	
 	/**
-	 * Flag indicating whether the inputs require always comparators or not.
+	 * Gets the class of the stub type that is run by this task. For example, a <tt>MapTask</tt> should return
+	 * <code>MapFunction.class</code>.   
 	 * 
-	 * @return True, if the initialization should look for and create comparators, false otherwise.
+	 * @return The class of the stub type run by the task.
 	 */
-	boolean requiresComparatorOnInput();
+	Class<S> getStubType();
 	
 	/**
 	 * This method is called before the user code is opened. An exception thrown by this method

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java
index bd2f178..c3a2d59 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java
@@ -54,7 +54,7 @@ public interface PactTaskContext<S, OT> {
 	
 	<X> TypeSerializerFactory<X> getInputSerializer(int index);
 	
-	<X> TypeComparator<X> getInputComparator(int index);
+	<X> TypeComparator<X> getDriverComparator(int index);
 	
 	S getStub();
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
index 658ca46..35fb739 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
@@ -93,8 +93,8 @@ public class ReduceCombineDriver<T> implements PactDriver<ReduceFunction<T>, T>
 	}
 
 	@Override
-	public boolean requiresComparatorOnInput() {
-		return true;
+	public int getNumberOfDriverComparators() {
+		return 1;
 	}
 
 	@Override
@@ -109,7 +109,7 @@ public class ReduceCombineDriver<T> implements PactDriver<ReduceFunction<T>, T>
 		
 		// instantiate the serializer / comparator
 		final TypeSerializerFactory<T> serializerFactory = this.taskContext.getInputSerializer(0);
-		this.comparator = this.taskContext.getInputComparator(0);
+		this.comparator = this.taskContext.getDriverComparator(0);
 		this.serializer = serializerFactory.getSerializer();
 		this.reducer = this.taskContext.getStub();
 		this.output = this.taskContext.getOutputCollector();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
index 875e744..9cdcdda 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
@@ -73,8 +73,8 @@ public class ReduceDriver<T> implements PactDriver<ReduceFunction<T>, T> {
 	}
 
 	@Override
-	public boolean requiresComparatorOnInput() {
-		return true;
+	public int getNumberOfDriverComparators() {
+		return 1;
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -86,7 +86,7 @@ public class ReduceDriver<T> implements PactDriver<ReduceFunction<T>, T> {
 			throw new Exception("Unrecognized driver strategy for Reduce driver: " + config.getDriverStrategy().name());
 		}
 		this.serializer = this.taskContext.<T>getInputSerializer(0).getSerializer();
-		this.comparator = this.taskContext.getInputComparator(0);
+		this.comparator = this.taskContext.getDriverComparator(0);
 		this.input = this.taskContext.getInput(0);
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
index 4b77909..72d0dc0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
@@ -299,9 +299,10 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 			// the local processing includes building the dams / caches
 			try {
 				int numInputs = driver.getNumberOfInputs();
+				int numComparators = driver.getNumberOfDriverComparators();
 				int numBroadcastInputs = this.config.getNumBroadcastInputs();
 				
-				initInputsSerializersAndComparators(numInputs);
+				initInputsSerializersAndComparators(numInputs, numComparators);
 				initBroadcastInputsSerializers(numBroadcastInputs);
 				
 				// set the iterative status for inputs and broadcast inputs
@@ -781,23 +782,27 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 	/**
 	 * Creates all the serializers and comparators.
 	 */
-	protected void initInputsSerializersAndComparators(int numInputs) throws Exception {
+	protected void initInputsSerializersAndComparators(int numInputs, int numComparators) throws Exception {
 		this.inputSerializers = new TypeSerializerFactory<?>[numInputs];
-		this.inputComparators = this.driver.requiresComparatorOnInput() ? new TypeComparator[numInputs] : null;
+		this.inputComparators = numComparators > 0 ? new TypeComparator[numComparators] : null;
 		this.inputIterators = new MutableObjectIterator[numInputs];
 		
+		//  ---------------- create the input serializers  ---------------------
 		for (int i = 0; i < numInputs; i++) {
-			//  ---------------- create the serializer first ---------------------
+			
 			final TypeSerializerFactory<?> serializerFactory = this.config.getInputSerializer(i, this.userCodeClassLoader);
 			this.inputSerializers[i] = serializerFactory;
 			
-			//  ---------------- create the driver's comparator ---------------------
+			this.inputIterators[i] = createInputIterator(this.inputReaders[i], this.inputSerializers[i]);
+		}
+		
+		//  ---------------- create the driver's comparators ---------------------
+		for (int i = 0; i < numComparators; i++) {
+			
 			if (this.inputComparators != null) {
 				final TypeComparatorFactory<?> comparatorFactory = this.config.getDriverComparator(i, this.userCodeClassLoader);
 				this.inputComparators[i] = comparatorFactory.createComparator();
 			}
-
-			this.inputIterators[i] = createInputIterator(this.inputReaders[i], this.inputSerializers[i]);
 		}
 	}
 	
@@ -1157,11 +1162,11 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 
 
 	@Override
-	public <X> TypeComparator<X> getInputComparator(int index) {
+	public <X> TypeComparator<X> getDriverComparator(int index) {
 		if (this.inputComparators == null) {
 			throw new IllegalStateException("Comparators have not been created!");
 		}
-		else if (index < 0 || index >= this.driver.getNumberOfInputs()) {
+		else if (index < 0 || index >= this.driver.getNumberOfDriverComparators()) {
 			throw new IndexOutOfBoundsException();
 		}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
index 69ea360..5492635 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
@@ -56,7 +56,9 @@ public class SynchronousChainedCombineDriver<T> extends ChainedDriver<T, T> {
 
 	private TypeSerializer<T> serializer;
 
-	private TypeComparator<T> comparator;
+	private TypeComparator<T> sortingComparator;
+
+	private TypeComparator<T> groupingComparator;
 
 	private AbstractInvokable parent;
 
@@ -92,19 +94,21 @@ public class SynchronousChainedCombineDriver<T> extends ChainedDriver<T, T> {
 
 		// instantiate the serializer / comparator
 		final TypeSerializerFactory<T> serializerFactory = this.config.getInputSerializer(0, this.userCodeClassLoader);
-		final TypeComparatorFactory<T> comparatorFactory = this.config.getDriverComparator(0, this.userCodeClassLoader);
+		final TypeComparatorFactory<T> sortingComparatorFactory = this.config.getDriverComparator(0, this.userCodeClassLoader);
+		final TypeComparatorFactory<T> groupingComparatorFactory = this.config.getDriverComparator(1, this.userCodeClassLoader);
 		this.serializer = serializerFactory.getSerializer();
-		this.comparator = comparatorFactory.createComparator();
+		this.sortingComparator = sortingComparatorFactory.createComparator();
+		this.groupingComparator = groupingComparatorFactory.createComparator();
 
 		final List<MemorySegment> memory = this.memManager.allocatePages(this.parent, numMemoryPages);
 
 		// instantiate a fix-length in-place sorter, if possible, otherwise the out-of-place sorter
-		if (this.comparator.supportsSerializationWithKeyNormalization() &&
+		if (this.sortingComparator.supportsSerializationWithKeyNormalization() &&
 			this.serializer.getLength() > 0 && this.serializer.getLength() <= THRESHOLD_FOR_IN_PLACE_SORTING)
 		{
-			this.sorter = new FixedLengthRecordSorter<T>(this.serializer, this.comparator, memory);
+			this.sorter = new FixedLengthRecordSorter<T>(this.serializer, this.sortingComparator, memory);
 		} else {
-			this.sorter = new NormalizedKeySorter<T>(this.serializer, this.comparator.duplicate(), memory);
+			this.sorter = new NormalizedKeySorter<T>(this.serializer, this.sortingComparator.duplicate(), memory);
 		}
 	}
 
@@ -183,7 +187,7 @@ public class SynchronousChainedCombineDriver<T> extends ChainedDriver<T, T> {
 			this.sortAlgo.sort(sorter);
 			// run the combiner
 			final KeyGroupedIterator<T> keyIter = new KeyGroupedIterator<T>(sorter.getIterator(), this.serializer,
-				this.comparator);
+				this.groupingComparator);
 
 			// cache references on the stack
 			final FlatCombineFunction<T> stub = this.combiner;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java
index f667ce0..262a4e9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java
@@ -70,8 +70,8 @@ public class CachedMatchTaskTest extends DriverTestBase<FlatJoinFunction<Record,
 				
 		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
 		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
-		addInputComparator(this.comparator1);
-		addInputComparator(this.comparator2);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		setOutput(this.outList);
 		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST_CACHED);
@@ -101,8 +101,8 @@ public class CachedMatchTaskTest extends DriverTestBase<FlatJoinFunction<Record,
 		
 		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
 		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
-		addInputComparator(this.comparator1);
-		addInputComparator(this.comparator2);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		setOutput(this.outList);
 		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED);
@@ -132,8 +132,8 @@ public class CachedMatchTaskTest extends DriverTestBase<FlatJoinFunction<Record,
 		
 		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
 		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
-		addInputComparator(this.comparator1);
-		addInputComparator(this.comparator2);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		setOutput(this.outList);
 		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST_CACHED);
@@ -163,8 +163,8 @@ public class CachedMatchTaskTest extends DriverTestBase<FlatJoinFunction<Record,
 		
 		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
 		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
-		addInputComparator(this.comparator1);
-		addInputComparator(this.comparator2);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		setOutput(this.outList);
 		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED);
@@ -194,8 +194,8 @@ public class CachedMatchTaskTest extends DriverTestBase<FlatJoinFunction<Record,
 		
 		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
 		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
-		addInputComparator(this.comparator1);
-		addInputComparator(this.comparator2);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		setOutput(this.outList);
 		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST_CACHED);
@@ -225,8 +225,8 @@ public class CachedMatchTaskTest extends DriverTestBase<FlatJoinFunction<Record,
 		
 		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
 		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
-		addInputComparator(this.comparator1);
-		addInputComparator(this.comparator2);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		setOutput(new NirvanaOutputList());
 		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST_CACHED);
@@ -255,8 +255,8 @@ public class CachedMatchTaskTest extends DriverTestBase<FlatJoinFunction<Record,
 		
 		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
 		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
-		addInputComparator(this.comparator1);
-		addInputComparator(this.comparator2);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		setOutput(new NirvanaOutputList());
 		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED);
@@ -283,8 +283,8 @@ public class CachedMatchTaskTest extends DriverTestBase<FlatJoinFunction<Record,
 		addInput(new DelayingInfinitiveInputIterator(100));
 		addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
 		
-		addInputComparator(this.comparator1);
-		addInputComparator(this.comparator2);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
 		
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		
@@ -330,8 +330,8 @@ public class CachedMatchTaskTest extends DriverTestBase<FlatJoinFunction<Record,
 		
 		addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
 		addInput(new DelayingInfinitiveInputIterator(100));
-		addInputComparator(this.comparator1);
-		addInputComparator(this.comparator2);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		setOutput(new NirvanaOutputList());
 		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED);
@@ -374,8 +374,8 @@ public class CachedMatchTaskTest extends DriverTestBase<FlatJoinFunction<Record,
 		
 		addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
 		addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
-		addInputComparator(this.comparator1);
-		addInputComparator(this.comparator2);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		setOutput(new NirvanaOutputList());
 		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
@@ -418,8 +418,8 @@ public class CachedMatchTaskTest extends DriverTestBase<FlatJoinFunction<Record,
 		
 		addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
 		addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
-		addInputComparator(this.comparator1);
-		addInputComparator(this.comparator2);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		setOutput(new NirvanaOutputList());
 		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java
index 01775f3..8e7d9d3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java
@@ -62,8 +62,8 @@ public class CoGroupTaskExternalITCase extends DriverTestBase<CoGroupFunction<Re
 			(keyCnt1 > keyCnt2 ? (keyCnt1 - keyCnt2) * valCnt1 : (keyCnt2 - keyCnt1) * valCnt2);
 		
 		setOutput(this.output);
-		addInputComparator(this.comparator1);
-		addInputComparator(this.comparator2);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		getTaskConfig().setDriverStrategy(DriverStrategy.CO_GROUP);
 		

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskTest.java
index da04cc3..46007ab 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskTest.java
@@ -68,8 +68,8 @@ public class CoGroupTaskTest extends DriverTestBase<CoGroupFunction<Record, Reco
 			(keyCnt1 > keyCnt2 ? (keyCnt1 - keyCnt2) * valCnt1 : (keyCnt2 - keyCnt1) * valCnt2);
 		
 		setOutput(this.output);
-		addInputComparator(this.comparator1);
-		addInputComparator(this.comparator2);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		getTaskConfig().setDriverStrategy(DriverStrategy.CO_GROUP);
 		
@@ -99,8 +99,8 @@ public class CoGroupTaskTest extends DriverTestBase<CoGroupFunction<Record, Reco
 			(keyCnt1 > keyCnt2 ? (keyCnt1 - keyCnt2) * valCnt1 : (keyCnt2 - keyCnt1) * valCnt2);
 		
 		setOutput(this.output);
-		addInputComparator(this.comparator1);
-		addInputComparator(this.comparator2);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		getTaskConfig().setDriverStrategy(DriverStrategy.CO_GROUP);
 		
@@ -130,8 +130,8 @@ public class CoGroupTaskTest extends DriverTestBase<CoGroupFunction<Record, Reco
 			(keyCnt1 > keyCnt2 ? (keyCnt1 - keyCnt2) * valCnt1 : (keyCnt2 - keyCnt1) * valCnt2);
 		
 		setOutput(this.output);
-		addInputComparator(this.comparator1);
-		addInputComparator(this.comparator2);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		getTaskConfig().setDriverStrategy(DriverStrategy.CO_GROUP);
 		
@@ -161,8 +161,8 @@ public class CoGroupTaskTest extends DriverTestBase<CoGroupFunction<Record, Reco
 			(keyCnt1 > keyCnt2 ? (keyCnt1 - keyCnt2) * valCnt1 : (keyCnt2 - keyCnt1) * valCnt2);
 		
 		setOutput(this.output);
-		addInputComparator(this.comparator1);
-		addInputComparator(this.comparator2);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		getTaskConfig().setDriverStrategy(DriverStrategy.CO_GROUP);
 		
@@ -195,8 +195,8 @@ public class CoGroupTaskTest extends DriverTestBase<CoGroupFunction<Record, Reco
 		
 		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, true));
 		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, true));
-		addInputComparator(this.comparator1);
-		addInputComparator(this.comparator2);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
 		
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		getTaskConfig().setDriverStrategy(DriverStrategy.CO_GROUP);
@@ -225,8 +225,8 @@ public class CoGroupTaskTest extends DriverTestBase<CoGroupFunction<Record, Reco
 		
 		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, true));
 		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, true));
-		addInputComparator(this.comparator1);
-		addInputComparator(this.comparator2);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
 		
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		getTaskConfig().setDriverStrategy(DriverStrategy.CO_GROUP);
@@ -251,8 +251,8 @@ public class CoGroupTaskTest extends DriverTestBase<CoGroupFunction<Record, Reco
 		
 		setOutput(this.output);
 		
-		addInputComparator(this.comparator1);
-		addInputComparator(this.comparator2);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
 		
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		getTaskConfig().setDriverStrategy(DriverStrategy.CO_GROUP);
@@ -302,8 +302,8 @@ public class CoGroupTaskTest extends DriverTestBase<CoGroupFunction<Record, Reco
 		
 		setOutput(this.output);
 		
-		addInputComparator(this.comparator1);
-		addInputComparator(this.comparator2);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
 		
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		getTaskConfig().setDriverStrategy(DriverStrategy.CO_GROUP);
@@ -354,8 +354,8 @@ public class CoGroupTaskTest extends DriverTestBase<CoGroupFunction<Record, Reco
 		
 		setOutput(this.output);
 		
-		addInputComparator(this.comparator1);
-		addInputComparator(this.comparator2);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
 		
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		getTaskConfig().setDriverStrategy(DriverStrategy.CO_GROUP);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
index 934608a..decf358 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
@@ -59,7 +59,8 @@ public class CombineTaskExternalITCase extends DriverTestBase<RichGroupReduceFun
 		final int valCnt = 8;
 		
 		addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
-		addInputComparator(this.comparator);
+		addDriverComparator(this.comparator);
+		addDriverComparator(this.comparator);
 		setOutput(this.outList);
 		
 		getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
@@ -112,7 +113,8 @@ public class CombineTaskExternalITCase extends DriverTestBase<RichGroupReduceFun
 		final int valCnt = 8;
 		
 		addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
-		addInputComparator(this.comparator);
+		addDriverComparator(this.comparator);
+		addDriverComparator(this.comparator);
 		setOutput(this.outList);
 		
 		getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
index 0a11ff3..1f18917 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
@@ -61,7 +61,8 @@ public class CombineTaskTest extends DriverTestBase<RichGroupReduceFunction<Reco
 		int valCnt = 20;
 		
 		addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
-		addInputComparator(this.comparator);
+		addDriverComparator(this.comparator);
+		addDriverComparator(this.comparator);
 		setOutput(this.outList);
 		
 		getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
@@ -97,7 +98,8 @@ public class CombineTaskTest extends DriverTestBase<RichGroupReduceFunction<Reco
 		int valCnt = 20;
 		
 		addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
-		addInputComparator(this.comparator);
+		addDriverComparator(this.comparator);
+		addDriverComparator(this.comparator);
 		setOutput(new DiscardingOutputCollector<Record>());
 		
 		getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
@@ -121,7 +123,8 @@ public class CombineTaskTest extends DriverTestBase<RichGroupReduceFunction<Reco
 	public void testCancelCombineTaskSorting()
 	{
 		addInput(new DelayingInfinitiveInputIterator(100));
-		addInputComparator(this.comparator);
+		addDriverComparator(this.comparator);
+		addDriverComparator(this.comparator);
 		setOutput(new DiscardingOutputCollector<Record>());
 		
 		getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java
index fd422ab..6e66e72 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java
@@ -71,8 +71,8 @@ public class MatchTaskExternalITCase extends DriverTestBase<FlatJoinFunction<Rec
 		final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
 		
 		setOutput(this.output);
-		addInputComparator(this.comparator1);
-		addInputComparator(this.comparator2);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
 		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
@@ -104,8 +104,8 @@ public class MatchTaskExternalITCase extends DriverTestBase<FlatJoinFunction<Rec
 		
 		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
 		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
-		addInputComparator(this.comparator1);
-		addInputComparator(this.comparator2);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		setOutput(this.output);
 		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
@@ -135,8 +135,8 @@ public class MatchTaskExternalITCase extends DriverTestBase<FlatJoinFunction<Rec
 		
 		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
 		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
-		addInputComparator(this.comparator1);
-		addInputComparator(this.comparator2);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		setOutput(this.output);
 		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);