You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2014/09/24 12:24:16 UTC
[2/3] [FLINK-1073] Enables sorted group input for GroupReduce
combiners. - GroupReduceCombineDriver uses separate comparators for sorting
and grouping. - Adding support for multiple comparators for a single input
driver required some refactoring.
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
index 43da7a3..a824f77 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
@@ -81,8 +81,8 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
final int valCnt2 = 2;
setOutput(this.outList);
- addInputComparator(this.comparator1);
- addInputComparator(this.comparator2);
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
@@ -115,8 +115,8 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
int valCnt2 = 1;
setOutput(this.outList);
- addInputComparator(this.comparator1);
- addInputComparator(this.comparator2);
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
@@ -151,8 +151,8 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
int valCnt2 = 20;
setOutput(this.outList);
- addInputComparator(this.comparator1);
- addInputComparator(this.comparator2);
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
@@ -187,8 +187,8 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
int valCnt2 = 1;
setOutput(this.outList);
- addInputComparator(this.comparator1);
- addInputComparator(this.comparator2);
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
@@ -223,8 +223,8 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
int valCnt2 = 20;
setOutput(this.outList);
- addInputComparator(this.comparator1);
- addInputComparator(this.comparator2);
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
@@ -259,8 +259,8 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
int valCnt2 = 20;
setOutput(this.outList);
- addInputComparator(this.comparator1);
- addInputComparator(this.comparator2);
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
@@ -295,8 +295,8 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
int valCnt2 = 20;
setOutput(this.outList);
- addInputComparator(this.comparator1);
- addInputComparator(this.comparator2);
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
@@ -330,8 +330,8 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
int valCnt2 = 20;
setOutput(this.outList);
- addInputComparator(this.comparator1);
- addInputComparator(this.comparator2);
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
@@ -366,8 +366,8 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
int valCnt2 = 20;
setOutput(new NirvanaOutputList());
- addInputComparator(this.comparator1);
- addInputComparator(this.comparator2);
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
@@ -395,8 +395,8 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
int valCnt = 20;
setOutput(new NirvanaOutputList());
- addInputComparator(this.comparator1);
- addInputComparator(this.comparator2);
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
@@ -446,8 +446,8 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
int valCnt = 20;
setOutput(new NirvanaOutputList());
- addInputComparator(this.comparator1);
- addInputComparator(this.comparator2);
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
@@ -497,8 +497,8 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
int valCnt = 20;
setOutput(new NirvanaOutputList());
- addInputComparator(this.comparator1);
- addInputComparator(this.comparator2);
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
@@ -547,8 +547,8 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
- addInputComparator(this.comparator1);
- addInputComparator(this.comparator2);
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
setOutput(this.outList);
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
@@ -578,8 +578,8 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
- addInputComparator(this.comparator1);
- addInputComparator(this.comparator2);
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
setOutput(this.outList);
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
@@ -609,8 +609,8 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
- addInputComparator(this.comparator1);
- addInputComparator(this.comparator2);
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
setOutput(this.outList);
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
@@ -640,8 +640,8 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
- addInputComparator(this.comparator1);
- addInputComparator(this.comparator2);
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
setOutput(this.outList);
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
@@ -671,8 +671,8 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
- addInputComparator(this.comparator1);
- addInputComparator(this.comparator2);
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
setOutput(this.outList);
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
@@ -702,8 +702,8 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
- addInputComparator(this.comparator1);
- addInputComparator(this.comparator2);
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
setOutput(new NirvanaOutputList());
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
@@ -732,8 +732,8 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
- addInputComparator(this.comparator1);
- addInputComparator(this.comparator2);
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
setOutput(new NirvanaOutputList());
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
@@ -760,8 +760,8 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
addInput(new DelayingInfinitiveInputIterator(100));
addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
- addInputComparator(this.comparator1);
- addInputComparator(this.comparator2);
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
@@ -807,8 +807,8 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
addInput(new DelayingInfinitiveInputIterator(100));
- addInputComparator(this.comparator1);
- addInputComparator(this.comparator2);
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
setOutput(new NirvanaOutputList());
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
@@ -851,8 +851,8 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
- addInputComparator(this.comparator1);
- addInputComparator(this.comparator2);
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
setOutput(new NirvanaOutputList());
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
@@ -895,8 +895,8 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
- addInputComparator(this.comparator1);
- addInputComparator(this.comparator2);
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
setOutput(new NirvanaOutputList());
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
index bf2a89e..eb26f5b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
@@ -60,7 +60,7 @@ public class ReduceTaskExternalITCase extends DriverTestBase<RichGroupReduceFunc
setNumFileHandlesForSort(2);
- addInputComparator(this.comparator);
+ addDriverComparator(this.comparator);
setOutput(this.outList);
getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
@@ -92,7 +92,7 @@ public class ReduceTaskExternalITCase extends DriverTestBase<RichGroupReduceFunc
setNumFileHandlesForSort(2);
- addInputComparator(this.comparator);
+ addDriverComparator(this.comparator);
setOutput(this.outList);
getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
@@ -123,7 +123,7 @@ public class ReduceTaskExternalITCase extends DriverTestBase<RichGroupReduceFunc
final int keyCnt = 8192;
final int valCnt = 8;
- addInputComparator(this.comparator);
+ addDriverComparator(this.comparator);
setOutput(this.outList);
getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
@@ -169,7 +169,7 @@ public class ReduceTaskExternalITCase extends DriverTestBase<RichGroupReduceFunc
int keyCnt = 32768;
int valCnt = 8;
- addInputComparator(this.comparator);
+ addDriverComparator(this.comparator);
setOutput(this.outList);
getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
index 2d56dab..c5a6762 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
@@ -61,7 +61,7 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor
final int keyCnt = 100;
final int valCnt = 20;
- addInputComparator(this.comparator);
+ addDriverComparator(this.comparator);
setOutput(this.outList);
getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
@@ -91,7 +91,7 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor
final int valCnt = 20;
addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
- addInputComparator(this.comparator);
+ addDriverComparator(this.comparator);
setOutput(this.outList);
getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
@@ -118,7 +118,7 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor
final int keyCnt = 100;
final int valCnt = 20;
- addInputComparator(this.comparator);
+ addDriverComparator(this.comparator);
setOutput(this.outList);
getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
@@ -163,7 +163,7 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor
final int valCnt = 20;
addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
- addInputComparator(this.comparator);
+ addDriverComparator(this.comparator);
setOutput(this.outList);
getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
@@ -185,7 +185,7 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor
@Test
public void testCancelReduceTaskWhileSorting()
{
- addInputComparator(this.comparator);
+ addDriverComparator(this.comparator);
setOutput(new NirvanaOutputList());
getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
@@ -233,7 +233,7 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor
final int valCnt = 2;
addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
- addInputComparator(this.comparator);
+ addDriverComparator(this.comparator);
setOutput(new NirvanaOutputList());
getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
index 6d4d03d..4c08ebd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
@@ -85,6 +85,7 @@ public class ChainTaskTest extends TaskTestBase {
// driver
combineConfig.setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
combineConfig.setDriverComparator(compFact, 0);
+ combineConfig.setDriverComparator(compFact, 1);
combineConfig.setRelativeMemoryDriver(memoryFraction);
// udf
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
index 4d3efe3..f0f51a6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
@@ -171,7 +171,7 @@ public class TestTaskContext<S, T> implements PactTaskContext<S, T> {
@Override
@SuppressWarnings("unchecked")
- public <X> TypeComparator<X> getInputComparator(int index) {
+ public <X> TypeComparator<X> getDriverComparator(int index) {
switch (index) {
case 0:
return (TypeComparator<X>) this.comparator1;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
index 7a7760f..4d04bf4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
@@ -118,7 +118,7 @@ public class DriverTestBase<S extends Function> implements PactTaskContext<S, Re
this.inputs.add(null);
}
- public void addInputComparator(RecordComparator comparator) {
+ public void addDriverComparator(RecordComparator comparator) {
this.comparators.add(comparator);
}
@@ -283,7 +283,7 @@ public class DriverTestBase<S extends Function> implements PactTaskContext<S, Re
}
@Override
- public <X> TypeComparator<X> getInputComparator(int index) {
+ public <X> TypeComparator<X> getDriverComparator(int index) {
@SuppressWarnings("unchecked")
TypeComparator<X> comparator = (TypeComparator<X>) this.comparators.get(index);
return comparator;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/KMeansSingleStepTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/KMeansSingleStepTest.java b/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/KMeansSingleStepTest.java
index d3a8701..422a8ff 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/KMeansSingleStepTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/KMeansSingleStepTest.java
@@ -111,15 +111,16 @@ public class KMeansSingleStepTest extends CompilerTestBase {
assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combiner.getDriverStrategy());
assertNull(combiner.getInput().getLocalStrategyKeys());
assertNull(combiner.getInput().getLocalStrategySortOrder());
- assertEquals(set0, combiner.getKeys());
+ assertEquals(set0, combiner.getKeys(0));
+ assertEquals(set0, combiner.getKeys(1));
// check the reducer
assertEquals(ShipStrategyType.PARTITION_HASH, reducer.getInput().getShipStrategy());
assertEquals(LocalStrategy.COMBININGSORT, reducer.getInput().getLocalStrategy());
assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reducer.getDriverStrategy());
- assertEquals(set0, reducer.getKeys());
+ assertEquals(set0, reducer.getKeys(0));
assertEquals(set0, reducer.getInput().getLocalStrategyKeys());
- assertTrue(Arrays.equals(reducer.getInput().getLocalStrategySortOrder(), reducer.getSortOrders()));
+ assertTrue(Arrays.equals(reducer.getInput().getLocalStrategySortOrder(), reducer.getSortOrders(0)));
// check the sink
assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/RelationalQueryCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/RelationalQueryCompilerTest.java b/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/RelationalQueryCompilerTest.java
index 05ec8c2..c429655 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/RelationalQueryCompilerTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/RelationalQueryCompilerTest.java
@@ -290,8 +290,8 @@ public class RelationalQueryCompilerTest extends CompilerTestBase {
// local strategy keys
Assert.assertEquals(set01, reducer.getInput().getLocalStrategyKeys());
- Assert.assertEquals(set01, reducer.getKeys());
- Assert.assertTrue(Arrays.equals(reducer.getInput().getLocalStrategySortOrder(), reducer.getSortOrders()));
+ Assert.assertEquals(set01, reducer.getKeys(0));
+ Assert.assertTrue(Arrays.equals(reducer.getInput().getLocalStrategySortOrder(), reducer.getSortOrders(0)));
return true;
} else {
return false;
@@ -314,8 +314,8 @@ public class RelationalQueryCompilerTest extends CompilerTestBase {
Assert.assertEquals(set0, join.getInput2().getLocalStrategyKeys());
Assert.assertTrue(Arrays.equals(join.getInput1().getLocalStrategySortOrder(), join.getInput2().getLocalStrategySortOrder()));
Assert.assertEquals(set01, reducer.getInput().getLocalStrategyKeys());
- Assert.assertEquals(set01, reducer.getKeys());
- Assert.assertTrue(Arrays.equals(reducer.getInput().getLocalStrategySortOrder(), reducer.getSortOrders()));
+ Assert.assertEquals(set01, reducer.getKeys(0));
+ Assert.assertTrue(Arrays.equals(reducer.getInput().getLocalStrategySortOrder(), reducer.getSortOrders(0)));
return true;
} else {
return false;
@@ -337,8 +337,8 @@ public class RelationalQueryCompilerTest extends CompilerTestBase {
Assert.assertEquals(set01, join.getInput1().getLocalStrategyKeys());
Assert.assertEquals(set0, join.getInput2().getLocalStrategyKeys());
Assert.assertTrue(join.getInput1().getLocalStrategySortOrder()[0] == join.getInput2().getLocalStrategySortOrder()[0]);
- Assert.assertEquals(set01, reducer.getKeys());
- Assert.assertTrue(Arrays.equals(join.getInput1().getLocalStrategySortOrder(), reducer.getSortOrders()));
+ Assert.assertEquals(set01, reducer.getKeys(0));
+ Assert.assertTrue(Arrays.equals(join.getInput1().getLocalStrategySortOrder(), reducer.getSortOrders(0)));
return true;
} else {
return false;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/WordCountCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/WordCountCompilerTest.java b/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/WordCountCompilerTest.java
index f0c7f7d..b345a45 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/WordCountCompilerTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/compiler/examples/WordCountCompilerTest.java
@@ -91,12 +91,12 @@ public class WordCountCompilerTest extends CompilerTestBase {
FieldList l = new FieldList(0);
Assert.assertEquals(l, c.getShipStrategyKeys());
Assert.assertEquals(l, c.getLocalStrategyKeys());
- Assert.assertTrue(Arrays.equals(c.getLocalStrategySortOrder(), reducer.getSortOrders()));
+ Assert.assertTrue(Arrays.equals(c.getLocalStrategySortOrder(), reducer.getSortOrders(0)));
// check the combiner
SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getPredecessor();
Assert.assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combiner.getDriverStrategy());
- Assert.assertEquals(l, combiner.getKeys());
+ Assert.assertEquals(l, combiner.getKeys(0));
Assert.assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
} catch (Exception e) {
@@ -169,7 +169,8 @@ public class WordCountCompilerTest extends CompilerTestBase {
// check the combiner
SingleInputPlanNode combiner = (SingleInputPlanNode) reducer.getPredecessor();
Assert.assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combiner.getDriverStrategy());
- Assert.assertEquals(l, combiner.getKeys());
+ Assert.assertEquals(l, combiner.getKeys(0));
+ Assert.assertEquals(l, combiner.getKeys(1));
Assert.assertEquals(ShipStrategyType.FORWARD, combiner.getInput().getShipStrategy());
} catch (Exception e) {
e.printStackTrace();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/IterativeKMeansTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/IterativeKMeansTest.java b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/IterativeKMeansTest.java
index 981b00e..d7d429a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/IterativeKMeansTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/IterativeKMeansTest.java
@@ -143,15 +143,16 @@ public class IterativeKMeansTest extends CompilerTestBase {
assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combiner.getDriverStrategy());
assertNull(combiner.getInput().getLocalStrategyKeys());
assertNull(combiner.getInput().getLocalStrategySortOrder());
- assertEquals(set0, combiner.getKeys());
+ assertEquals(set0, combiner.getKeys(0));
+ assertEquals(set0, combiner.getKeys(1));
// check the reducer
assertEquals(ShipStrategyType.PARTITION_HASH, reducer.getInput().getShipStrategy());
assertTrue(reducer.getInput().isOnDynamicPath());
assertEquals(LocalStrategy.COMBININGSORT, reducer.getInput().getLocalStrategy());
assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reducer.getDriverStrategy());
- assertEquals(set0, reducer.getKeys());
+ assertEquals(set0, reducer.getKeys(0));
assertEquals(set0, reducer.getInput().getLocalStrategyKeys());
- assertTrue(Arrays.equals(reducer.getInput().getLocalStrategySortOrder(), reducer.getSortOrders()));
+ assertTrue(Arrays.equals(reducer.getInput().getLocalStrategySortOrder(), reducer.getSortOrders(0)));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
index 8255f81..8bede88 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/CustomCompensatableDanglingPageRankWithCombiner.java
@@ -231,6 +231,7 @@ public class CustomCompensatableDanglingPageRankWithCombiner {
combinerConfig.setInputSerializer(vertexWithRankSerializer, 0);
combinerConfig.setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
combinerConfig.setDriverComparator(vertexWithRankComparator, 0);
+ combinerConfig.setDriverComparator(vertexWithRankComparator, 1);
combinerConfig.setRelativeMemoryDriver((double)coGroupSortMemory/totalMemoryConsumption);
combinerConfig.setOutputSerializer(vertexWithRankSerializer);
combinerConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);