You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2014/09/24 12:24:16 UTC

[2/3] [FLINK-1073] Enables sorted group input for GroupReduce combiners. - GroupReduceCombineDriver uses separate comparators for sorting and grouping. - Adding support for multiple comparators for a single input driver required some refactoring.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
index 43da7a3..a824f77 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
@@ -81,8 +81,8 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
 		final int valCnt2 = 2;
 		
 		setOutput(this.outList);
-		addInputComparator(this.comparator1);
-		addInputComparator(this.comparator2);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
 		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
@@ -115,8 +115,8 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
 		int valCnt2 = 1;
 		
 		setOutput(this.outList);
-		addInputComparator(this.comparator1);
-		addInputComparator(this.comparator2);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
 		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
@@ -151,8 +151,8 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
 		int valCnt2 = 20;
 		
 		setOutput(this.outList);
-		addInputComparator(this.comparator1);
-		addInputComparator(this.comparator2);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
 		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
@@ -187,8 +187,8 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
 		int valCnt2 = 1;
 		
 		setOutput(this.outList);
-		addInputComparator(this.comparator1);
-		addInputComparator(this.comparator2);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
 		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
@@ -223,8 +223,8 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
 		int valCnt2 = 20;
 		
 		setOutput(this.outList);
-		addInputComparator(this.comparator1);
-		addInputComparator(this.comparator2);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
 		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
@@ -259,8 +259,8 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
 		int valCnt2 = 20;
 		
 		setOutput(this.outList);
-		addInputComparator(this.comparator1);
-		addInputComparator(this.comparator2);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
 		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
@@ -295,8 +295,8 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
 		int valCnt2 = 20;
 		
 		setOutput(this.outList);
-		addInputComparator(this.comparator1);
-		addInputComparator(this.comparator2);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
 		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
@@ -330,8 +330,8 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
 		int valCnt2 = 20;
 		
 		setOutput(this.outList);
-		addInputComparator(this.comparator1);
-		addInputComparator(this.comparator2);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
 		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
@@ -366,8 +366,8 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
 		int valCnt2 = 20;
 		
 		setOutput(new NirvanaOutputList());
-		addInputComparator(this.comparator1);
-		addInputComparator(this.comparator2);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
 		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
@@ -395,8 +395,8 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
 		int valCnt = 20;
 		
 		setOutput(new NirvanaOutputList());
-		addInputComparator(this.comparator1);
-		addInputComparator(this.comparator2);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
 		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
@@ -446,8 +446,8 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
 		int valCnt = 20;
 		
 		setOutput(new NirvanaOutputList());
-		addInputComparator(this.comparator1);
-		addInputComparator(this.comparator2);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
 		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
@@ -497,8 +497,8 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
 		int valCnt = 20;
 		
 		setOutput(new NirvanaOutputList());
-		addInputComparator(this.comparator1);
-		addInputComparator(this.comparator2);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
 		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
@@ -547,8 +547,8 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
 				
 		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
 		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
-		addInputComparator(this.comparator1);
-		addInputComparator(this.comparator2);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		setOutput(this.outList);
 		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
@@ -578,8 +578,8 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
 		
 		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
 		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
-		addInputComparator(this.comparator1);
-		addInputComparator(this.comparator2);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		setOutput(this.outList);
 		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
@@ -609,8 +609,8 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
 		
 		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
 		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
-		addInputComparator(this.comparator1);
-		addInputComparator(this.comparator2);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		setOutput(this.outList);
 		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
@@ -640,8 +640,8 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
 		
 		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
 		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
-		addInputComparator(this.comparator1);
-		addInputComparator(this.comparator2);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		setOutput(this.outList);
 		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
@@ -671,8 +671,8 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
 		
 		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
 		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
-		addInputComparator(this.comparator1);
-		addInputComparator(this.comparator2);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		setOutput(this.outList);
 		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
@@ -702,8 +702,8 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
 		
 		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
 		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
-		addInputComparator(this.comparator1);
-		addInputComparator(this.comparator2);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		setOutput(new NirvanaOutputList());
 		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
@@ -732,8 +732,8 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
 		
 		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
 		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
-		addInputComparator(this.comparator1);
-		addInputComparator(this.comparator2);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		setOutput(new NirvanaOutputList());
 		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
@@ -760,8 +760,8 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
 		addInput(new DelayingInfinitiveInputIterator(100));
 		addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
 		
-		addInputComparator(this.comparator1);
-		addInputComparator(this.comparator2);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
 		
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		
@@ -807,8 +807,8 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
 		
 		addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
 		addInput(new DelayingInfinitiveInputIterator(100));
-		addInputComparator(this.comparator1);
-		addInputComparator(this.comparator2);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		setOutput(new NirvanaOutputList());
 		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
@@ -851,8 +851,8 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
 		
 		addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
 		addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
-		addInputComparator(this.comparator1);
-		addInputComparator(this.comparator2);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		setOutput(new NirvanaOutputList());
 		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
@@ -895,8 +895,8 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
 		
 		addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
 		addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
-		addInputComparator(this.comparator1);
-		addInputComparator(this.comparator2);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
 		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
 		setOutput(new NirvanaOutputList());
 		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
index bf2a89e..eb26f5b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
@@ -60,7 +60,7 @@ public class ReduceTaskExternalITCase extends DriverTestBase<RichGroupReduceFunc
 		
 		setNumFileHandlesForSort(2);
 		
-		addInputComparator(this.comparator);
+		addDriverComparator(this.comparator);
 		setOutput(this.outList);
 		getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
 		
@@ -92,7 +92,7 @@ public class ReduceTaskExternalITCase extends DriverTestBase<RichGroupReduceFunc
 
 		setNumFileHandlesForSort(2);
 		
-		addInputComparator(this.comparator);
+		addDriverComparator(this.comparator);
 		setOutput(this.outList);
 		getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
 		
@@ -123,7 +123,7 @@ public class ReduceTaskExternalITCase extends DriverTestBase<RichGroupReduceFunc
 		final int keyCnt = 8192;
 		final int valCnt = 8;
 		
-		addInputComparator(this.comparator);
+		addDriverComparator(this.comparator);
 		setOutput(this.outList);
 		getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
 		
@@ -169,7 +169,7 @@ public class ReduceTaskExternalITCase extends DriverTestBase<RichGroupReduceFunc
 		int keyCnt = 32768;
 		int valCnt = 8;
 		
-		addInputComparator(this.comparator);
+		addDriverComparator(this.comparator);
 		setOutput(this.outList);
 		getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
 		

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
index 2d56dab..c5a6762 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
@@ -61,7 +61,7 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor
 		final int keyCnt = 100;
 		final int valCnt = 20;
 		
-		addInputComparator(this.comparator);
+		addDriverComparator(this.comparator);
 		setOutput(this.outList);
 		getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
 		
@@ -91,7 +91,7 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor
 		final int valCnt = 20;
 		
 		addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
-		addInputComparator(this.comparator);
+		addDriverComparator(this.comparator);
 		setOutput(this.outList);
 		getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
 		
@@ -118,7 +118,7 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor
 		final int keyCnt = 100;
 		final int valCnt = 20;
 		
-		addInputComparator(this.comparator);
+		addDriverComparator(this.comparator);
 		setOutput(this.outList);
 		getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
 		
@@ -163,7 +163,7 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor
 		final int valCnt = 20;
 		
 		addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
-		addInputComparator(this.comparator);
+		addDriverComparator(this.comparator);
 		setOutput(this.outList);
 		getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
 		
@@ -185,7 +185,7 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor
 	@Test
 	public void testCancelReduceTaskWhileSorting()
 	{
-		addInputComparator(this.comparator);
+		addDriverComparator(this.comparator);
 		setOutput(new NirvanaOutputList());
 		getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
 		
@@ -233,7 +233,7 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor
 		final int valCnt = 2;
 		
 		addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
-		addInputComparator(this.comparator);
+		addDriverComparator(this.comparator);
 		setOutput(new NirvanaOutputList());
 		getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
 		

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
index 6d4d03d..4c08ebd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
@@ -85,6 +85,7 @@ public class ChainTaskTest extends TaskTestBase {
 				// driver
 				combineConfig.setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
 				combineConfig.setDriverComparator(compFact, 0);
+				combineConfig.setDriverComparator(compFact, 1);
 				combineConfig.setRelativeMemoryDriver(memoryFraction);
 				
 				// udf

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
index 4d3efe3..f0f51a6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
@@ -171,7 +171,7 @@ public class TestTaskContext<S, T> implements PactTaskContext<S, T> {
 
 	@Override
 	@SuppressWarnings("unchecked")
-	public <X> TypeComparator<X> getInputComparator(int index) {
+	public <X> TypeComparator<X> getDriverComparator(int index) {
 		switch (index) {
 		case 0:
 			return (TypeComparator<X>) this.comparator1;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
index 7a7760f..4d04bf4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
@@ -118,7 +118,7 @@ public class DriverTestBase<S extends Function> implements PactTaskContext<S, Re
 		this.inputs.add(null);
 	}
 	
-	public void addInputComparator(RecordComparator comparator) {
+	public void addDriverComparator(RecordComparator comparator) {
 		this.comparators.add(comparator);
 	}
 
@@ -283,7 +283,7 @@ public class DriverTestBase<S extends Function> implements PactTaskContext<S, Re
 	}
 
 	@Override
-	public <X> TypeComparator<X> getInputComparator(int index) {
+	public <X> TypeComparator<X> getDriverComparator(int index) {
 		@SuppressWarnings("unchecked")
 		TypeComparator<X> comparator = (TypeComparator<X>) this.comparators.get(index);
 		return comparator;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/KMeansSingleStepTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/KMeansSingleStepTest.java b/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/KMeansSingleStepTest.java
index d3a8701..422a8ff 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/KMeansSingleStepTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/KMeansSingleStepTest.java
@@ -111,15 +111,16 @@ public class KMeansSingleStepTest extends CompilerTestBase {
 		assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combiner.getDriverStrategy());
 		assertNull(combiner.getInput().getLocalStrategyKeys());
 		assertNull(combiner.getInput().getLocalStrategySortOrder());
-		assertEquals(set0, combiner.getKeys());
+		assertEquals(set0, combiner.getKeys(0));
+		assertEquals(set0, combiner.getKeys(1));
 		
 		// check the reducer
 		assertEquals(ShipStrategyType.PARTITION_HASH, reducer.getInput().getShipStrategy());
 		assertEquals(LocalStrategy.COMBININGSORT, reducer.getInput().getLocalStrategy());
 		assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reducer.getDriverStrategy());
-		assertEquals(set0, reducer.getKeys());
+		assertEquals(set0, reducer.getKeys(0));
 		assertEquals(set0, reducer.getInput().getLocalStrategyKeys());
-		assertTrue(Arrays.equals(reducer.getInput().getLocalStrategySortOrder(), reducer.getSortOrders()));
+		assertTrue(Arrays.equals(reducer.getInput().getLocalStrategySortOrder(), reducer.getSortOrders(0)));
 		
 		// check the sink
 		assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/RelationalQueryCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/RelationalQueryCompilerTest.java b/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/RelationalQueryCompilerTest.java
index 05ec8c2..c429655 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/RelationalQueryCompilerTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/RelationalQueryCompilerTest.java
@@ -290,8 +290,8 @@ public class RelationalQueryCompilerTest extends CompilerTestBase {
 			
 			// local strategy keys
 			Assert.assertEquals(set01, reducer.getInput().getLocalStrategyKeys());
-			Assert.assertEquals(set01, reducer.getKeys());
-			Assert.assertTrue(Arrays.equals(reducer.getInput().getLocalStrategySortOrder(), reducer.getSortOrders()));
+			Assert.assertEquals(set01, reducer.getKeys(0));
+			Assert.assertTrue(Arrays.equals(reducer.getInput().getLocalStrategySortOrder(), reducer.getSortOrders(0)));
 			return true;
 		} else {
 			return false;
@@ -314,8 +314,8 @@ public class RelationalQueryCompilerTest extends CompilerTestBase {
 			Assert.assertEquals(set0, join.getInput2().getLocalStrategyKeys());
 			Assert.assertTrue(Arrays.equals(join.getInput1().getLocalStrategySortOrder(), join.getInput2().getLocalStrategySortOrder()));
 			Assert.assertEquals(set01, reducer.getInput().getLocalStrategyKeys());
-			Assert.assertEquals(set01, reducer.getKeys());
-			Assert.assertTrue(Arrays.equals(reducer.getInput().getLocalStrategySortOrder(), reducer.getSortOrders()));
+			Assert.assertEquals(set01, reducer.getKeys(0));
+			Assert.assertTrue(Arrays.equals(reducer.getInput().getLocalStrategySortOrder(), reducer.getSortOrders(0)));
 			return true;
 		} else {
 			return false;
@@ -337,8 +337,8 @@ public class RelationalQueryCompilerTest extends CompilerTestBase {
 			Assert.assertEquals(set01, join.getInput1().getLocalStrategyKeys());
 			Assert.assertEquals(set0, join.getInput2().getLocalStrategyKeys());
 			Assert.assertTrue(join.getInput1().getLocalStrategySortOrder()[0] == join.getInput2().getLocalStrategySortOrder()[0]);
-			Assert.assertEquals(set01, reducer.getKeys());
-			Assert.assertTrue(Arrays.equals(join.getInput1().getLocalStrategySortOrder(), reducer.getSortOrders()));
+			Assert.assertEquals(set01, reducer.getKeys(0));
+			Assert.assertTrue(Arrays.equals(join.getInput1().getLocalStrategySortOrder(), reducer.getSortOrders(0)));
 			return true;
 		} else {
 			return false;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/WordCountCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/WordCountCompilerTest.java b/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/WordCountCompilerTest.java
index f0c7f7d..b345a45 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/WordCountCompilerTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/WordCountCompilerTest.java
@@ -91,12 +91,12 @@ public class WordCountCompilerTest extends CompilerTestBase {
 			FieldList l = new FieldList(0);
 			Assert.assertEquals(l, c.getShipStrategyKeys());
 			Assert.assertEquals(l, c.getLocalStrategyKeys());
-			Assert.assertTrue(Arrays.equals(c.getLocalStrategySortOrder(), reducer.getSortOrders()));
+			Assert.assertTrue(Arrays.equals(c.getLocalStrategySortOrder(), reducer.getSortOrders(0)));
 			
 			// check the combiner
 			SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getPredecessor();
 			Assert.assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combiner.getDriverStrategy());
-			Assert.assertEquals(l, combiner.getKeys());
+			Assert.assertEquals(l, combiner.getKeys(0));
 			Assert.assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
 			
 		} catch (Exception e) {
@@ -169,7 +169,8 @@ public class WordCountCompilerTest extends CompilerTestBase {
 			// check the combiner
 			SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getPredecessor();
 			Assert.assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combiner.getDriverStrategy());
-			Assert.assertEquals(l, combiner.getKeys());
+			Assert.assertEquals(l, combiner.getKeys(0));
+			Assert.assertEquals(l, combiner.getKeys(1));
 			Assert.assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
 		} catch (Exception e) {
 			e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/IterativeKMeansTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/IterativeKMeansTest.java b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/IterativeKMeansTest.java
index 981b00e..d7d429a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/IterativeKMeansTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/IterativeKMeansTest.java
@@ -143,15 +143,16 @@ public class IterativeKMeansTest extends CompilerTestBase {
 		assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combiner.getDriverStrategy());
 		assertNull(combiner.getInput().getLocalStrategyKeys());
 		assertNull(combiner.getInput().getLocalStrategySortOrder());
-		assertEquals(set0, combiner.getKeys());
+		assertEquals(set0, combiner.getKeys(0));
+		assertEquals(set0, combiner.getKeys(1));
 		
 		// check the reducer
 		assertEquals(ShipStrategyType.PARTITION_HASH, reducer.getInput().getShipStrategy());
 		assertTrue(reducer.getInput().isOnDynamicPath());
 		assertEquals(LocalStrategy.COMBININGSORT, reducer.getInput().getLocalStrategy());
 		assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reducer.getDriverStrategy());
-		assertEquals(set0, reducer.getKeys());
+		assertEquals(set0, reducer.getKeys(0));
 		assertEquals(set0, reducer.getInput().getLocalStrategyKeys());
-		assertTrue(Arrays.equals(reducer.getInput().getLocalStrategySortOrder(), reducer.getSortOrders()));
+		assertTrue(Arrays.equals(reducer.getInput().getLocalStrategySortOrder(), reducer.getSortOrders(0)));
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
index 8255f81..8bede88 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
@@ -231,6 +231,7 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
 		combinerConfig.setInputSerializer(vertexWithRankSerializer, 0);
 		combinerConfig.setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
 		combinerConfig.setDriverComparator(vertexWithRankComparator, 0);
+		combinerConfig.setDriverComparator(vertexWithRankComparator, 1);
 		combinerConfig.setRelativeMemoryDriver((double)coGroupSortMemory/totalMemoryConsumption);
 		combinerConfig.setOutputSerializer(vertexWithRankSerializer);
 		combinerConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);