You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2014/09/24 12:24:17 UTC
[3/3] git commit: [FLINK-1073] Enables sorted group input for
GroupReduce combiners. - GroupReduceCombineDriver uses separate comparators
for sorting and grouping. - Adding support for multiple comparators for a
single input driver required some refactor
[FLINK-1073] Enables sorted group input for GroupReduce combiners.
- GroupReduceCombineDriver uses separate comparators for sorting and grouping.
- Adding support for multiple comparators for a single input driver required some refactoring.
This closes #109
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/e5731e0e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/e5731e0e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/e5731e0e
Branch: refs/heads/master
Commit: e5731e0ed7f7294e6f73970fed8315cb572c9cfd
Parents: 4b6d2c5
Author: Fabian Hueske <fh...@apache.org>
Authored: Tue Sep 2 15:51:44 2014 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed Sep 24 12:22:47 2014 +0200
----------------------------------------------------------------------
.../GroupReduceWithCombineProperties.java | 6 +-
.../operators/PartialGroupProperties.java | 10 ++-
.../compiler/plan/SingleInputPlanNode.java | 82 +++++++++++++----
.../plantranslate/NepheleJobGraphGenerator.java | 5 +-
.../postpass/GenericFlatTypePostPass.java | 6 +-
.../compiler/postpass/JavaApiPostPass.java | 9 +-
.../apache/flink/compiler/GroupOrderTest.java | 2 +-
.../compiler/GroupReduceCompilationTest.java | 14 +--
.../flink/compiler/ReduceCompilationTest.java | 8 +-
.../WorksetIterationsJavaApiCompilerTest.java | 6 +-
.../WorksetIterationsRecordApiCompilerTest.java | 6 +-
.../AbstractCachedBuildSideMatchDriver.java | 4 +-
.../runtime/operators/AllGroupReduceDriver.java | 4 +-
.../runtime/operators/AllReduceDriver.java | 4 +-
.../flink/runtime/operators/CoGroupDriver.java | 8 +-
.../CoGroupWithSolutionSetFirstDriver.java | 6 +-
.../CoGroupWithSolutionSetSecondDriver.java | 6 +-
.../runtime/operators/CollectorMapDriver.java | 4 +-
.../flink/runtime/operators/CrossDriver.java | 4 +-
.../flink/runtime/operators/DriverStrategy.java | 64 +++++++-------
.../flink/runtime/operators/FlatMapDriver.java | 4 +-
.../operators/GroupReduceCombineDriver.java | 29 +++---
.../runtime/operators/GroupReduceDriver.java | 6 +-
.../JoinWithSolutionSetFirstDriver.java | 4 +-
.../JoinWithSolutionSetSecondDriver.java | 4 +-
.../flink/runtime/operators/MapDriver.java | 4 +-
.../runtime/operators/MapPartitionDriver.java | 4 +-
.../flink/runtime/operators/MatchDriver.java | 8 +-
.../flink/runtime/operators/NoOpDriver.java | 4 +-
.../flink/runtime/operators/PactDriver.java | 14 +--
.../runtime/operators/PactTaskContext.java | 2 +-
.../runtime/operators/ReduceCombineDriver.java | 6 +-
.../flink/runtime/operators/ReduceDriver.java | 6 +-
.../runtime/operators/RegularPactTask.java | 23 +++--
.../SynchronousChainedCombineDriver.java | 18 ++--
.../runtime/operators/CachedMatchTaskTest.java | 44 +++++-----
.../operators/CoGroupTaskExternalITCase.java | 4 +-
.../runtime/operators/CoGroupTaskTest.java | 36 ++++----
.../operators/CombineTaskExternalITCase.java | 6 +-
.../runtime/operators/CombineTaskTest.java | 9 +-
.../operators/MatchTaskExternalITCase.java | 12 +--
.../flink/runtime/operators/MatchTaskTest.java | 92 ++++++++++----------
.../operators/ReduceTaskExternalITCase.java | 8 +-
.../flink/runtime/operators/ReduceTaskTest.java | 12 +--
.../operators/chaining/ChainTaskTest.java | 1 +
.../operators/drivers/TestTaskContext.java | 2 +-
.../operators/testutils/DriverTestBase.java | 4 +-
.../compiler/examples/KMeansSingleStepTest.java | 7 +-
.../examples/RelationalQueryCompilerTest.java | 12 +--
.../examples/WordCountCompilerTest.java | 7 +-
.../iterations/IterativeKMeansTest.java | 7 +-
...mpensatableDanglingPageRankWithCombiner.java | 1 +
52 files changed, 372 insertions(+), 286 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupReduceWithCombineProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupReduceWithCombineProperties.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupReduceWithCombineProperties.java
index f8cd87a..92b2297 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupReduceWithCombineProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/GroupReduceWithCombineProperties.java
@@ -93,9 +93,13 @@ public final class GroupReduceWithCombineProperties extends OperatorDescriptorSi
combinerNode.setDegreeOfParallelism(in.getSource().getDegreeOfParallelism());
SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine("+node.getPactContract()
- .getName()+")", toCombiner, DriverStrategy.SORTED_GROUP_COMBINE, this.keyList);
+ .getName()+")", toCombiner, DriverStrategy.SORTED_GROUP_COMBINE);
combiner.setCosts(new Costs(0, 0));
combiner.initProperties(toCombiner.getGlobalProperties(), toCombiner.getLocalProperties());
+ // set sorting comparator key info
+ combiner.setDriverKeyInfo(in.getLocalStrategyKeys(), in.getLocalStrategySortOrder(), 0);
+ // set grouping comparator key info
+ combiner.setDriverKeyInfo(this.keyList, 1);
Channel toReducer = new Channel(combiner);
toReducer.setShipStrategy(in.getShipStrategy(), in.getShipStrategyKeys(), in.getShipStrategySortOrder());
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-compiler/src/main/java/org/apache/flink/compiler/operators/PartialGroupProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/PartialGroupProperties.java b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/PartialGroupProperties.java
index 60ec090..cf33bbe 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/operators/PartialGroupProperties.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/operators/PartialGroupProperties.java
@@ -51,8 +51,14 @@ public final class PartialGroupProperties extends OperatorDescriptorSingle {
GroupReduceNode combinerNode = new GroupReduceNode((GroupReduceOperatorBase<?, ?, ?>) node.getPactContract());
combinerNode.setDegreeOfParallelism(in.getSource().getDegreeOfParallelism());
- return new SingleInputPlanNode(combinerNode, "Combine("+node.getPactContract().getName()+")", in,
- DriverStrategy.SORTED_GROUP_COMBINE, this.keyList);
+ SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine("+node.getPactContract().getName()+")", in,
+ DriverStrategy.SORTED_GROUP_COMBINE);
+ // sorting key info
+ combiner.setDriverKeyInfo(in.getLocalStrategyKeys(), in.getLocalStrategySortOrder(), 0);
+ // set grouping comparator key info
+ combiner.setDriverKeyInfo(this.keyList, 1);
+
+ return combiner;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-compiler/src/main/java/org/apache/flink/compiler/plan/SingleInputPlanNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/SingleInputPlanNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/SingleInputPlanNode.java
index 485687e..9b9202b 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/SingleInputPlanNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/SingleInputPlanNode.java
@@ -29,6 +29,7 @@ import java.util.List;
import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
+import org.apache.flink.compiler.CompilerException;
import org.apache.flink.compiler.dag.OptimizerNode;
import org.apache.flink.compiler.dag.SingleInputNode;
import org.apache.flink.runtime.operators.DamBehavior;
@@ -43,11 +44,11 @@ public class SingleInputPlanNode extends PlanNode {
protected final Channel input;
- protected final FieldList keys;
+ protected final FieldList[] driverKeys;
- protected final boolean[] sortOrders;
+ protected final boolean[][] driverSortOrders;
- private TypeComparatorFactory<?> comparator;
+ private TypeComparatorFactory<?>[] comparators;
public Object postPassHelper;
@@ -68,8 +69,15 @@ public class SingleInputPlanNode extends PlanNode {
{
super(template, nodeName, driverStrategy);
this.input = input;
- this.keys = driverKeyFields;
- this.sortOrders = driverSortOrders;
+
+ this.comparators = new TypeComparatorFactory<?>[driverStrategy.getNumRequiredComparators()];
+ this.driverKeys = new FieldList[driverStrategy.getNumRequiredComparators()];
+ this.driverSortOrders = new boolean[driverStrategy.getNumRequiredComparators()][];
+
+ if(driverStrategy.getNumRequiredComparators() > 0) {
+ this.driverKeys[0] = driverKeyFields;
+ this.driverSortOrders[0] = driverSortOrders;
+ }
if (this.input.getShipStrategy() == ShipStrategyType.BROADCAST) {
this.input.setReplicationFactor(getDegreeOfParallelism());
@@ -81,6 +89,7 @@ public class SingleInputPlanNode extends PlanNode {
} else if (predNode.branchPlan != null) {
this.branchPlan.putAll(predNode.branchPlan);
}
+
}
// --------------------------------------------------------------------------------------------
@@ -111,30 +120,71 @@ public class SingleInputPlanNode extends PlanNode {
return this.input.getSource();
}
- public FieldList getKeys() {
- return this.keys;
+ /**
+ * Sets the key field indexes for the specified driver comparator.
+ *
+ * @param keys The key field indexes for the specified driver comparator.
+ * @param id The ID of the driver comparator.
+ */
+ public void setDriverKeyInfo(FieldList keys, int id) {
+ this.setDriverKeyInfo(keys, getTrueArray(keys.size()), id);
+ }
+
+ /**
+ * Sets the key field information for the specified driver comparator.
+ *
+ * @param keys The key field indexes for the specified driver comparator.
+ * @param sortOrder The key sort order for the specified driver comparator.
+ * @param id The ID of the driver comparator.
+ */
+ public void setDriverKeyInfo(FieldList keys, boolean[] sortOrder, int id) {
+ if(id < 0 || id >= driverKeys.length) {
+ throw new CompilerException("Invalid id for driver key information. DriverStrategy requires only "
+ +super.getDriverStrategy().getNumRequiredComparators()+" comparators.");
+ }
+ this.driverKeys[id] = keys;
+ this.driverSortOrders[id] = sortOrder;
+ }
+
+ /**
+ * Gets the key field indexes for the specified driver comparator.
+ *
+ * @param id The id of the driver comparator for which the key field indexes are requested.
+ * @return The key field indexes of the specified driver comparator.
+ */
+ public FieldList getKeys(int id) {
+ return this.driverKeys[id];
}
- public boolean[] getSortOrders() {
- return sortOrders;
+ /**
+ * Gets the sort order for the specified driver comparator.
+ *
+ * @param id The id of the driver comparator for which the sort order is requested.
+ * @return The sort order of the specified driver comparator.
+ */
+ public boolean[] getSortOrders(int id) {
+ return driverSortOrders[id];
}
/**
- * Gets the comparator from this PlanNode.
+ * Gets the specified comparator from this PlanNode.
+ *
+ * @param id The ID of the requested comparator.
*
- * @return The comparator.
+ * @return The specified comparator.
*/
- public TypeComparatorFactory<?> getComparator() {
- return comparator;
+ public TypeComparatorFactory<?> getComparator(int id) {
+ return comparators[id];
}
/**
- * Sets the comparator for this PlanNode.
+ * Sets the specified comparator for this PlanNode.
*
* @param comparator The comparator to set.
+ * @param id The ID of the comparator to set.
*/
- public void setComparator(TypeComparatorFactory<?> comparator) {
- this.comparator = comparator;
+ public void setComparator(TypeComparatorFactory<?> comparator, int id) {
+ this.comparators[id] = comparator;
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
index 78ec671..da7e0a2 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plantranslate/NepheleJobGraphGenerator.java
@@ -770,10 +770,9 @@ public class NepheleJobGraphGenerator implements Visitor<PlanNode> {
// set the driver strategy
config.setDriverStrategy(ds);
- if (node.getComparator() != null) {
- config.setDriverComparator(node.getComparator(), 0);
+ for(int i=0;i<ds.getNumRequiredComparators();i++) {
+ config.setDriverComparator(node.getComparator(i), i);
}
-
// assign memory, file-handles, etc.
assignDriverResources(node, config);
return vertex;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/GenericFlatTypePostPass.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/GenericFlatTypePostPass.java b/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/GenericFlatTypePostPass.java
index 2697621..717a0c2 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/GenericFlatTypePostPass.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/GenericFlatTypePostPass.java
@@ -302,9 +302,9 @@ public abstract class GenericFlatTypePostPass<X, T extends AbstractSchema<X>> im
if (createUtilities) {
// parameterize the node's driver strategy
- if (sn.getDriverStrategy().requiresComparator()) {
+ for(int i=0;i<sn.getDriverStrategy().getNumRequiredComparators();i++) {
try {
- sn.setComparator(createComparator(sn.getKeys(), sn.getSortOrders(), schema));
+ sn.setComparator(createComparator(sn.getKeys(i), sn.getSortOrders(i), schema),i);
} catch (MissingFieldTypeInfoException e) {
throw new CompilerPostPassException("Could not set up runtime strategy for node '" +
optNode.getPactContract().getName() + "'. Missing type information for key field " +
@@ -371,7 +371,7 @@ public abstract class GenericFlatTypePostPass<X, T extends AbstractSchema<X>> im
// parameterize the node's driver strategy
if (createUtilities) {
- if (dn.getDriverStrategy().requiresComparator()) {
+ if (dn.getDriverStrategy().getNumRequiredComparators() > 0) {
// set the individual comparators
try {
dn.setComparator1(createComparator(dn.getKeysForInput1(), dn.getSortOrders(), schema1));
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/JavaApiPostPass.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/JavaApiPostPass.java b/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/JavaApiPostPass.java
index b1c088d..ea3cc87 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/JavaApiPostPass.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/JavaApiPostPass.java
@@ -161,11 +161,10 @@ public class JavaApiPostPass implements OptimizerPostPass {
SingleInputOperator<?, ?, ?> singleInputOperator = (SingleInputOperator<?, ?, ?>) sn.getOptimizerNode().getPactContract();
// parameterize the node's driver strategy
- if (sn.getDriverStrategy().requiresComparator()) {
- sn.setComparator(createComparator(singleInputOperator.getOperatorInfo().getInputType(), sn.getKeys(),
- getSortOrders(sn.getKeys(), sn.getSortOrders())));
+ for(int i=0;i<sn.getDriverStrategy().getNumRequiredComparators();i++) {
+ sn.setComparator(createComparator(singleInputOperator.getOperatorInfo().getInputType(), sn.getKeys(i),
+ getSortOrders(sn.getKeys(i), sn.getSortOrders(i))), i);
}
-
// done, we can now propagate our info down
traverseChannel(sn.getInput());
@@ -184,7 +183,7 @@ public class JavaApiPostPass implements OptimizerPostPass {
DualInputOperator<?, ?, ?, ?> dualInputOperator = (DualInputOperator<?, ?, ?, ?>) dn.getOptimizerNode().getPactContract();
// parameterize the node's driver strategy
- if (dn.getDriverStrategy().requiresComparator()) {
+ if (dn.getDriverStrategy().getNumRequiredComparators() > 0) {
dn.setComparator1(createComparator(dualInputOperator.getOperatorInfo().getFirstInputType(), dn.getKeysForInput1(),
getSortOrders(dn.getKeysForInput1(), dn.getSortOrders())));
dn.setComparator2(createComparator(dualInputOperator.getOperatorInfo().getSecondInputType(), dn.getKeysForInput2(),
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-compiler/src/test/java/org/apache/flink/compiler/GroupOrderTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/GroupOrderTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/GroupOrderTest.java
index 8c3a836..5009edd 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/GroupOrderTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/GroupOrderTest.java
@@ -95,7 +95,7 @@ public class GroupOrderTest extends CompilerTestBase {
FieldList local = new FieldList(2, 5);
Assert.assertEquals(ship, c.getShipStrategyKeys());
Assert.assertEquals(local, c.getLocalStrategyKeys());
- Assert.assertTrue(c.getLocalStrategySortOrder()[0] == reducer.getSortOrders()[0]);
+ Assert.assertTrue(c.getLocalStrategySortOrder()[0] == reducer.getSortOrders(0)[0]);
// check that we indeed sort descending
Assert.assertTrue(c.getLocalStrategySortOrder()[1] == groupOrder.getFieldSortDirections()[0]);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java
index 0921364..29fc6b9 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/GroupReduceCompilationTest.java
@@ -167,7 +167,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy());
// check the keys
- assertEquals(new FieldList(1), reduceNode.getKeys());
+ assertEquals(new FieldList(1), reduceNode.getKeys(0));
assertEquals(new FieldList(1), reduceNode.getInput().getLocalStrategyKeys());
// check DOP
@@ -222,8 +222,9 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combineNode.getDriverStrategy());
// check the keys
- assertEquals(new FieldList(1), reduceNode.getKeys());
- assertEquals(new FieldList(1), combineNode.getKeys());
+ assertEquals(new FieldList(1), reduceNode.getKeys(0));
+ assertEquals(new FieldList(1), combineNode.getKeys(0));
+ assertEquals(new FieldList(1), combineNode.getKeys(1));
assertEquals(new FieldList(1), reduceNode.getInput().getLocalStrategyKeys());
// check DOP
@@ -279,7 +280,7 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy());
// check the keys
- assertEquals(new FieldList(0), reduceNode.getKeys());
+ assertEquals(new FieldList(0), reduceNode.getKeys(0));
assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys());
// check DOP
@@ -343,8 +344,9 @@ public class GroupReduceCompilationTest extends CompilerTestBase implements java
assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combineNode.getDriverStrategy());
// check the keys
- assertEquals(new FieldList(0), reduceNode.getKeys());
- assertEquals(new FieldList(0), combineNode.getKeys());
+ assertEquals(new FieldList(0), reduceNode.getKeys(0));
+ assertEquals(new FieldList(0), combineNode.getKeys(0));
+ assertEquals(new FieldList(0), combineNode.getKeys(1));
assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys());
// check DOP
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-compiler/src/test/java/org/apache/flink/compiler/ReduceCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/ReduceCompilationTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/ReduceCompilationTest.java
index 2e09810..2ec32e6 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/ReduceCompilationTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/ReduceCompilationTest.java
@@ -175,8 +175,8 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
assertEquals(DriverStrategy.SORTED_PARTIAL_REDUCE, combineNode.getDriverStrategy());
// check the keys
- assertEquals(new FieldList(1), reduceNode.getKeys());
- assertEquals(new FieldList(1), combineNode.getKeys());
+ assertEquals(new FieldList(1), reduceNode.getKeys(0));
+ assertEquals(new FieldList(1), combineNode.getKeys(0));
assertEquals(new FieldList(1), reduceNode.getInput().getLocalStrategyKeys());
// check DOP
@@ -239,8 +239,8 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
assertEquals(DriverStrategy.SORTED_PARTIAL_REDUCE, combineNode.getDriverStrategy());
// check the keys
- assertEquals(new FieldList(0), reduceNode.getKeys());
- assertEquals(new FieldList(0), combineNode.getKeys());
+ assertEquals(new FieldList(0), reduceNode.getKeys(0));
+ assertEquals(new FieldList(0), combineNode.getKeys(0));
assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys());
// check DOP
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java
index ca47443..d07743e 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsJavaApiCompilerTest.java
@@ -83,7 +83,7 @@ public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase {
// verify reducer
assertEquals(ShipStrategyType.PARTITION_HASH, worksetReducer.getInput().getShipStrategy());
- assertEquals(new FieldList(1, 2), worksetReducer.getKeys());
+ assertEquals(new FieldList(1, 2), worksetReducer.getKeys(0));
// currently, the system may partition before or after the mapper
ShipStrategyType ss1 = deltaMapper.getInput().getShipStrategy();
@@ -129,7 +129,7 @@ public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase {
// verify reducer
assertEquals(ShipStrategyType.PARTITION_HASH, worksetReducer.getInput().getShipStrategy());
- assertEquals(new FieldList(1, 2), worksetReducer.getKeys());
+ assertEquals(new FieldList(1, 2), worksetReducer.getKeys(0));
// verify solution delta
assertEquals(2, joinWithSolutionSetNode.getOutgoingChannels().size());
@@ -174,7 +174,7 @@ public class WorksetIterationsJavaApiCompilerTest extends CompilerTestBase {
// verify reducer
assertEquals(ShipStrategyType.FORWARD, worksetReducer.getInput().getShipStrategy());
- assertEquals(new FieldList(1, 2), worksetReducer.getKeys());
+ assertEquals(new FieldList(1, 2), worksetReducer.getKeys(0));
// verify solution delta
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsRecordApiCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsRecordApiCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsRecordApiCompilerTest.java
index fbdcd3d..62c5d84 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsRecordApiCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/WorksetIterationsRecordApiCompilerTest.java
@@ -97,7 +97,7 @@ public class WorksetIterationsRecordApiCompilerTest extends CompilerTestBase {
// verify reducer
assertEquals(ShipStrategyType.PARTITION_HASH, worksetReducer.getInput().getShipStrategy());
- assertEquals(list0, worksetReducer.getKeys());
+ assertEquals(list0, worksetReducer.getKeys(0));
// currently, the system may partition before or after the mapper
ShipStrategyType ss1 = deltaMapper.getInput().getShipStrategy();
@@ -142,7 +142,7 @@ public class WorksetIterationsRecordApiCompilerTest extends CompilerTestBase {
// verify reducer
assertEquals(ShipStrategyType.PARTITION_HASH, worksetReducer.getInput().getShipStrategy());
- assertEquals(list0, worksetReducer.getKeys());
+ assertEquals(list0, worksetReducer.getKeys(0));
// verify solution delta
@@ -186,7 +186,7 @@ public class WorksetIterationsRecordApiCompilerTest extends CompilerTestBase {
// verify reducer
assertEquals(ShipStrategyType.FORWARD, worksetReducer.getInput().getShipStrategy());
- assertEquals(list0, worksetReducer.getKeys());
+ assertEquals(list0, worksetReducer.getKeys(0));
// verify solution delta
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideMatchDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideMatchDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideMatchDriver.java
index 3bbd768..286d830 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideMatchDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideMatchDriver.java
@@ -59,8 +59,8 @@ public abstract class AbstractCachedBuildSideMatchDriver<IT1, IT2, OT> extends M
TypeSerializer<IT1> serializer1 = this.taskContext.<IT1>getInputSerializer(0).getSerializer();
TypeSerializer<IT2> serializer2 = this.taskContext.<IT2>getInputSerializer(1).getSerializer();
- TypeComparator<IT1> comparator1 = this.taskContext.getInputComparator(0);
- TypeComparator<IT2> comparator2 = this.taskContext.getInputComparator(1);
+ TypeComparator<IT1> comparator1 = this.taskContext.getDriverComparator(0);
+ TypeComparator<IT2> comparator2 = this.taskContext.getDriverComparator(1);
MutableObjectIterator<IT1> input1 = this.taskContext.getInput(0);
MutableObjectIterator<IT2> input2 = this.taskContext.getInput(1);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
index 14e0b57..5e68bab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
@@ -71,8 +71,8 @@ public class AllGroupReduceDriver<IT, OT> implements PactDriver<GroupReduceFunct
}
@Override
- public boolean requiresComparatorOnInput() {
- return false;
+ public int getNumberOfDriverComparators() {
+ return 0;
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
index 60a79f1..e26f4eb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
@@ -70,8 +70,8 @@ public class AllReduceDriver<T> implements PactDriver<ReduceFunction<T>, T> {
}
@Override
- public boolean requiresComparatorOnInput() {
- return false;
+ public int getNumberOfDriverComparators() {
+ return 0;
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java
index 4a65ea8..55498fb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupDriver.java
@@ -77,8 +77,8 @@ public class CoGroupDriver<IT1, IT2, OT> implements PactDriver<CoGroupFunction<I
@Override
- public boolean requiresComparatorOnInput() {
- return true;
+ public int getNumberOfDriverComparators() {
+ return 2;
}
@@ -96,8 +96,8 @@ public class CoGroupDriver<IT1, IT2, OT> implements PactDriver<CoGroupFunction<I
// get the key positions and types
final TypeSerializer<IT1> serializer1 = this.taskContext.<IT1>getInputSerializer(0).getSerializer();
final TypeSerializer<IT2> serializer2 = this.taskContext.<IT2>getInputSerializer(1).getSerializer();
- final TypeComparator<IT1> groupComparator1 = this.taskContext.getInputComparator(0);
- final TypeComparator<IT2> groupComparator2 = this.taskContext.getInputComparator(1);
+ final TypeComparator<IT1> groupComparator1 = this.taskContext.getDriverComparator(0);
+ final TypeComparator<IT2> groupComparator2 = this.taskContext.getDriverComparator(1);
final TypePairComparatorFactory<IT1, IT2> pairComparatorFactory = config.getPairComparatorFactory(
this.taskContext.getUserCodeClassLoader());
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
index 6dda3f0..bc1a4bf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetFirstDriver.java
@@ -69,8 +69,8 @@ public class CoGroupWithSolutionSetFirstDriver<IT1, IT2, OT> implements Resettab
}
@Override
- public boolean requiresComparatorOnInput() {
- return true;
+ public int getNumberOfDriverComparators() {
+ return 1;
}
@Override
@@ -103,7 +103,7 @@ public class CoGroupWithSolutionSetFirstDriver<IT1, IT2, OT> implements Resettab
TypeComparator<IT1> buildSideComparator = hashTable.getBuildSideComparator().duplicate();
probeSideSerializer = taskContext.<IT2>getInputSerializer(0).getSerializer();
- probeSideComparator = taskContext.getInputComparator(0);
+ probeSideComparator = taskContext.getDriverComparator(0);
solutionSideRecord = buildSideSerializer.createInstance();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
index 35fc1b7..3359bb2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CoGroupWithSolutionSetSecondDriver.java
@@ -68,8 +68,8 @@ public class CoGroupWithSolutionSetSecondDriver<IT1, IT2, OT> implements Resetta
}
@Override
- public boolean requiresComparatorOnInput() {
- return true;
+ public int getNumberOfDriverComparators() {
+ return 1;
}
@Override
@@ -102,7 +102,7 @@ public class CoGroupWithSolutionSetSecondDriver<IT1, IT2, OT> implements Resetta
TypeComparator<IT2> buildSideComparator = hashTable.getBuildSideComparator().duplicate();
probeSideSerializer = taskContext.<IT1>getInputSerializer(0).getSerializer();
- probeSideComparator = taskContext.getInputComparator(0);
+ probeSideComparator = taskContext.getDriverComparator(0);
solutionSideRecord = buildSideSerializer.createInstance();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CollectorMapDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CollectorMapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CollectorMapDriver.java
index 8a398d2..3858864 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CollectorMapDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CollectorMapDriver.java
@@ -63,8 +63,8 @@ public class CollectorMapDriver<IT, OT> implements PactDriver<GenericCollectorMa
}
@Override
- public boolean requiresComparatorOnInput() {
- return false;
+ public int getNumberOfDriverComparators() {
+ return 0;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java
index 6856702..4e6745a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/CrossDriver.java
@@ -88,8 +88,8 @@ public class CrossDriver<T1, T2, OT> implements PactDriver<CrossFunction<T1, T2,
@Override
- public boolean requiresComparatorOnInput() {
- return false;
+ public int getNumberOfDriverComparators() {
+ return 0;
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
index 49e34c9..a133c6c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java
@@ -34,66 +34,66 @@ import org.apache.flink.runtime.operators.chaining.SynchronousChainedCombineDriv
*/
public enum DriverStrategy {
// no local strategy, as for sources and sinks
- NONE(null, null, PIPELINED, false),
+ NONE(null, null, PIPELINED, 0),
// a unary no-op operator
- UNARY_NO_OP(NoOpDriver.class, null, PIPELINED, PIPELINED, false),
+ UNARY_NO_OP(NoOpDriver.class, null, PIPELINED, PIPELINED, 0),
// a binary no-op operator. non implementation available
- BINARY_NO_OP(null, null, PIPELINED, PIPELINED, false),
+ BINARY_NO_OP(null, null, PIPELINED, PIPELINED, 0),
// the old mapper
- COLLECTOR_MAP(CollectorMapDriver.class, ChainedCollectorMapDriver.class, PIPELINED, false),
+ COLLECTOR_MAP(CollectorMapDriver.class, ChainedCollectorMapDriver.class, PIPELINED, 0),
// the proper mapper
- MAP(MapDriver.class, ChainedMapDriver.class, PIPELINED, false),
+ MAP(MapDriver.class, ChainedMapDriver.class, PIPELINED, 0),
// the proper map partition
- MAP_PARTITION(MapPartitionDriver.class, null, PIPELINED, false),
+ MAP_PARTITION(MapPartitionDriver.class, null, PIPELINED, 0),
// the flat mapper
- FLAT_MAP(FlatMapDriver.class, ChainedFlatMapDriver.class, PIPELINED, false),
+ FLAT_MAP(FlatMapDriver.class, ChainedFlatMapDriver.class, PIPELINED, 0),
// group everything together into one group and apply the Reduce function
- ALL_REDUCE(AllReduceDriver.class, null, PIPELINED, false),
+ ALL_REDUCE(AllReduceDriver.class, null, PIPELINED, 0),
// group everything together into one group and apply the GroupReduce function
- ALL_GROUP_REDUCE(AllGroupReduceDriver.class, null, PIPELINED, false),
+ ALL_GROUP_REDUCE(AllGroupReduceDriver.class, null, PIPELINED, 0),
// group everything together into one group and apply the GroupReduce's combine function
- ALL_GROUP_COMBINE(AllGroupReduceDriver.class, null, PIPELINED, false),
+ ALL_GROUP_COMBINE(AllGroupReduceDriver.class, null, PIPELINED, 0),
// grouping the inputs and apply the Reduce Function
- SORTED_REDUCE(ReduceDriver.class, null, PIPELINED, true),
+ SORTED_REDUCE(ReduceDriver.class, null, PIPELINED, 1),
// sorted partial reduce is the combiner for the Reduce. same function, but potentially not fully sorted
- SORTED_PARTIAL_REDUCE(ReduceCombineDriver.class, null, MATERIALIZING, true),
+ SORTED_PARTIAL_REDUCE(ReduceCombineDriver.class, null, MATERIALIZING, 1),
// grouping the inputs and apply the GroupReduce function
- SORTED_GROUP_REDUCE(GroupReduceDriver.class, null, PIPELINED, true),
+ SORTED_GROUP_REDUCE(GroupReduceDriver.class, null, PIPELINED, 1),
// partially grouping inputs (best effort resulting possibly in duplicates --> combiner)
- SORTED_GROUP_COMBINE(GroupReduceCombineDriver.class, SynchronousChainedCombineDriver.class, MATERIALIZING, true),
+ SORTED_GROUP_COMBINE(GroupReduceCombineDriver.class, SynchronousChainedCombineDriver.class, MATERIALIZING, 2),
// both inputs are merged, but materialized to the side for block-nested-loop-join among values with equal key
- MERGE(MatchDriver.class, null, MATERIALIZING, MATERIALIZING, true),
+ MERGE(MatchDriver.class, null, MATERIALIZING, MATERIALIZING, 2),
// co-grouping inputs
- CO_GROUP(CoGroupDriver.class, null, PIPELINED, PIPELINED, true),
+ CO_GROUP(CoGroupDriver.class, null, PIPELINED, PIPELINED, 2),
// the first input is build side, the second side is probe side of a hybrid hash table
- HYBRIDHASH_BUILD_FIRST(MatchDriver.class, null, FULL_DAM, MATERIALIZING, true),
+ HYBRIDHASH_BUILD_FIRST(MatchDriver.class, null, FULL_DAM, MATERIALIZING, 2),
// the second input is build side, the first side is probe side of a hybrid hash table
- HYBRIDHASH_BUILD_SECOND(MatchDriver.class, null, MATERIALIZING, FULL_DAM, true),
+ HYBRIDHASH_BUILD_SECOND(MatchDriver.class, null, MATERIALIZING, FULL_DAM, 2),
// a cached variant of HYBRIDHASH_BUILD_FIRST, that can only be used inside of iterations
- HYBRIDHASH_BUILD_FIRST_CACHED(BuildFirstCachedMatchDriver.class, null, FULL_DAM, MATERIALIZING, true),
+ HYBRIDHASH_BUILD_FIRST_CACHED(BuildFirstCachedMatchDriver.class, null, FULL_DAM, MATERIALIZING, 2),
// cached variant of HYBRIDHASH_BUILD_SECOND, that can only be used inside of iterations
- HYBRIDHASH_BUILD_SECOND_CACHED(BuildSecondCachedMatchDriver.class, null, MATERIALIZING, FULL_DAM, true),
+ HYBRIDHASH_BUILD_SECOND_CACHED(BuildSecondCachedMatchDriver.class, null, MATERIALIZING, FULL_DAM, 2),
// the second input is inner loop, the first input is outer loop and block-wise processed
- NESTEDLOOP_BLOCKED_OUTER_FIRST(CrossDriver.class, null, MATERIALIZING, FULL_DAM, false),
+ NESTEDLOOP_BLOCKED_OUTER_FIRST(CrossDriver.class, null, MATERIALIZING, FULL_DAM, 0),
// the first input is inner loop, the second input is outer loop and block-wise processed
- NESTEDLOOP_BLOCKED_OUTER_SECOND(CrossDriver.class, null, FULL_DAM, MATERIALIZING, false),
+ NESTEDLOOP_BLOCKED_OUTER_SECOND(CrossDriver.class, null, FULL_DAM, MATERIALIZING, 0),
// the second input is inner loop, the first input is outer loop and stream-processed
- NESTEDLOOP_STREAMED_OUTER_FIRST(CrossDriver.class, null, PIPELINED, FULL_DAM, false),
+ NESTEDLOOP_STREAMED_OUTER_FIRST(CrossDriver.class, null, PIPELINED, FULL_DAM, 0),
// the first input is inner loop, the second input is outer loop and stream-processed
- NESTEDLOOP_STREAMED_OUTER_SECOND(CrossDriver.class, null, FULL_DAM, PIPELINED, false),
+ NESTEDLOOP_STREAMED_OUTER_SECOND(CrossDriver.class, null, FULL_DAM, PIPELINED, 0),
// union utility op. unions happen implicitly on the network layer (in the readers) when bundeling streams
- UNION(null, null, FULL_DAM, FULL_DAM, false);
+ UNION(null, null, FULL_DAM, FULL_DAM, 0);
// explicit binary union between a streamed and a cached input
// UNION_WITH_CACHED(UnionWithTempOperator.class, null, FULL_DAM, PIPELINED, false);
@@ -108,35 +108,35 @@ public enum DriverStrategy {
private final int numInputs;
- private final boolean requiresComparator;
+ private final int numRequiredComparators;
@SuppressWarnings("unchecked")
private DriverStrategy(
@SuppressWarnings("rawtypes") Class<? extends PactDriver> driverClass,
@SuppressWarnings("rawtypes") Class<? extends ChainedDriver> pushChainDriverClass,
- DamBehavior dam, boolean comparator)
+ DamBehavior dam, int numComparator)
{
this.driverClass = (Class<? extends PactDriver<?, ?>>) driverClass;
this.pushChainDriver = (Class<? extends ChainedDriver<?, ?>>) pushChainDriverClass;
this.numInputs = 1;
this.dam1 = dam;
this.dam2 = null;
- this.requiresComparator = comparator;
+ this.numRequiredComparators = numComparator;
}
@SuppressWarnings("unchecked")
private DriverStrategy(
@SuppressWarnings("rawtypes") Class<? extends PactDriver> driverClass,
@SuppressWarnings("rawtypes") Class<? extends ChainedDriver> pushChainDriverClass,
- DamBehavior firstDam, DamBehavior secondDam, boolean comparator)
+ DamBehavior firstDam, DamBehavior secondDam, int numComparator)
{
this.driverClass = (Class<? extends PactDriver<?, ?>>) driverClass;
this.pushChainDriver = (Class<? extends ChainedDriver<?, ?>>) pushChainDriverClass;
this.numInputs = 2;
this.dam1 = firstDam;
this.dam2 = secondDam;
- this.requiresComparator = comparator;
+ this.numRequiredComparators = numComparator;
}
// --------------------------------------------------------------------------------------------
@@ -180,7 +180,7 @@ public enum DriverStrategy {
return this.dam1.isMaterializing() || (this.dam2 != null && this.dam2.isMaterializing());
}
- public boolean requiresComparator() {
- return this.requiresComparator;
+ public int getNumRequiredComparators() {
+ return this.numRequiredComparators;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java
index b3f5e96..f4ae62d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FlatMapDriver.java
@@ -62,8 +62,8 @@ public class FlatMapDriver<IT, OT> implements PactDriver<FlatMapFunction<IT, OT>
}
@Override
- public boolean requiresComparatorOnInput() {
- return false;
+ public int getNumberOfDriverComparators() {
+ return 0;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
index ffa3bf9..4323eae 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
@@ -60,7 +60,9 @@ public class GroupReduceCombineDriver<T> implements PactDriver<FlatCombineFuncti
private TypeSerializer<T> serializer;
- private TypeComparator<T> comparator;
+ private TypeComparator<T> sortingComparator;
+
+ private TypeComparator<T> groupingComparator;
private QuickSort sortAlgo = new QuickSort();
@@ -82,7 +84,7 @@ public class GroupReduceCombineDriver<T> implements PactDriver<FlatCombineFuncti
public int getNumberOfInputs() {
return 1;
}
-
+
@Override
public Class<FlatCombineFunction<T>> getStubType() {
@SuppressWarnings("unchecked")
@@ -91,8 +93,8 @@ public class GroupReduceCombineDriver<T> implements PactDriver<FlatCombineFuncti
}
@Override
- public boolean requiresComparatorOnInput() {
- return true;
+ public int getNumberOfDriverComparators() {
+ return 2;
}
@Override
@@ -107,7 +109,8 @@ public class GroupReduceCombineDriver<T> implements PactDriver<FlatCombineFuncti
final TypeSerializerFactory<T> serializerFactory = this.taskContext.getInputSerializer(0);
this.serializer = serializerFactory.getSerializer();
- this.comparator = this.taskContext.getInputComparator(0);
+ this.sortingComparator = this.taskContext.getDriverComparator(0);
+ this.groupingComparator = this.taskContext.getDriverComparator(1);
this.combiner = this.taskContext.getStub();
this.output = this.taskContext.getOutputCollector();
@@ -115,12 +118,12 @@ public class GroupReduceCombineDriver<T> implements PactDriver<FlatCombineFuncti
numMemoryPages);
// instantiate a fix-length in-place sorter, if possible, otherwise the out-of-place sorter
- if (this.comparator.supportsSerializationWithKeyNormalization() &&
+ if (this.sortingComparator.supportsSerializationWithKeyNormalization() &&
this.serializer.getLength() > 0 && this.serializer.getLength() <= THRESHOLD_FOR_IN_PLACE_SORTING)
{
- this.sorter = new FixedLengthRecordSorter<T>(this.serializer, this.comparator, memory);
+ this.sorter = new FixedLengthRecordSorter<T>(this.serializer, this.sortingComparator, memory);
} else {
- this.sorter = new NormalizedKeySorter<T>(this.serializer, this.comparator.duplicate(), memory);
+ this.sorter = new NormalizedKeySorter<T>(this.serializer, this.sortingComparator.duplicate(), memory);
}
}
@@ -163,7 +166,7 @@ public class GroupReduceCombineDriver<T> implements PactDriver<FlatCombineFuncti
this.sortAlgo.sort(sorter);
final KeyGroupedIterator<T> keyIter = new KeyGroupedIterator<T>(sorter.getIterator(), this.serializer,
- this.comparator);
+ this.groupingComparator);
final FlatCombineFunction<T> combiner = this.combiner;
final Collector<T> output = this.output;
@@ -177,12 +180,16 @@ public class GroupReduceCombineDriver<T> implements PactDriver<FlatCombineFuncti
@Override
public void cleanup() throws Exception {
- this.memManager.release(this.sorter.dispose());
+ if(this.sorter != null) {
+ this.memManager.release(this.sorter.dispose());
+ }
}
@Override
public void cancel() {
this.running = false;
- this.memManager.release(this.sorter.dispose());
+ if(this.sorter != null) {
+ this.memManager.release(this.sorter.dispose());
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
index 28c1761..feeceb0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceDriver.java
@@ -74,8 +74,8 @@ public class GroupReduceDriver<IT, OT> implements PactDriver<GroupReduceFunction
}
@Override
- public boolean requiresComparatorOnInput() {
- return true;
+ public int getNumberOfDriverComparators() {
+ return 1;
}
// --------------------------------------------------------------------------------------------
@@ -87,7 +87,7 @@ public class GroupReduceDriver<IT, OT> implements PactDriver<GroupReduceFunction
throw new Exception("Unrecognized driver strategy for GroupReduce driver: " + config.getDriverStrategy().name());
}
this.serializer = this.taskContext.<IT>getInputSerializer(0).getSerializer();
- this.comparator = this.taskContext.getInputComparator(0);
+ this.comparator = this.taskContext.getDriverComparator(0);
this.input = this.taskContext.getInput(0);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java
index 29461c8..8740ab7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetFirstDriver.java
@@ -68,8 +68,8 @@ public class JoinWithSolutionSetFirstDriver<IT1, IT2, OT> implements ResettableP
}
@Override
- public boolean requiresComparatorOnInput() {
- return true;
+ public int getNumberOfDriverComparators() {
+ return 1;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java
index f2ba9ec..f762602 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinWithSolutionSetSecondDriver.java
@@ -68,8 +68,8 @@ public class JoinWithSolutionSetSecondDriver<IT1, IT2, OT> implements Resettable
}
@Override
- public boolean requiresComparatorOnInput() {
- return true;
+ public int getNumberOfDriverComparators() {
+ return 1;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java
index f4499c1..2d98954 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapDriver.java
@@ -62,8 +62,8 @@ public class MapDriver<IT, OT> implements PactDriver<MapFunction<IT, OT>, OT> {
}
@Override
- public boolean requiresComparatorOnInput() {
- return false;
+ public int getNumberOfDriverComparators() {
+ return 0;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java
index 9c5094d..41aa312 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MapPartitionDriver.java
@@ -59,8 +59,8 @@ public class MapPartitionDriver<IT, OT> implements PactDriver<MapPartitionFuncti
}
@Override
- public boolean requiresComparatorOnInput() {
- return false;
+ public int getNumberOfDriverComparators() {
+ return 0;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java
index f9ede1c..82f24d7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/MatchDriver.java
@@ -75,8 +75,8 @@ public class MatchDriver<IT1, IT2, OT> implements PactDriver<FlatJoinFunction<IT
}
@Override
- public boolean requiresComparatorOnInput() {
- return true;
+ public int getNumberOfDriverComparators() {
+ return 2;
}
@Override
@@ -100,8 +100,8 @@ public class MatchDriver<IT1, IT2, OT> implements PactDriver<FlatJoinFunction<IT
// get the key positions and types
final TypeSerializer<IT1> serializer1 = this.taskContext.<IT1>getInputSerializer(0).getSerializer();
final TypeSerializer<IT2> serializer2 = this.taskContext.<IT2>getInputSerializer(1).getSerializer();
- final TypeComparator<IT1> comparator1 = this.taskContext.getInputComparator(0);
- final TypeComparator<IT2> comparator2 = this.taskContext.getInputComparator(1);
+ final TypeComparator<IT1> comparator1 = this.taskContext.getDriverComparator(0);
+ final TypeComparator<IT2> comparator2 = this.taskContext.getDriverComparator(1);
final TypePairComparatorFactory<IT1, IT2> pairComparatorFactory = config.getPairComparatorFactory(
this.taskContext.getUserCodeClassLoader());
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
index 82bc7b7..3eeb2d5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
@@ -52,8 +52,8 @@ public class NoOpDriver<T> implements PactDriver<AbstractRichFunction, T> {
}
@Override
- public boolean requiresComparatorOnInput() {
- return false;
+ public int getNumberOfDriverComparators() {
+ return 0;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactDriver.java
index 6fe2770..d0f4116 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactDriver.java
@@ -44,19 +44,19 @@ public interface PactDriver<S extends Function, OT> {
int getNumberOfInputs();
/**
- * Gets the class of the stub type that is run by this task. For example, a <tt>MapTask</tt> should return
- * <code>MapFunction.class</code>.
+ * Gets the number of comparators required for this driver.
*
- * @return The class of the stub type run by the task.
+ * @return The number of comparators required for this driver.
*/
- Class<S> getStubType();
+ int getNumberOfDriverComparators();
/**
- * Flag indicating whether the inputs require always comparators or not.
+ * Gets the class of the stub type that is run by this task. For example, a <tt>MapTask</tt> should return
+ * <code>MapFunction.class</code>.
*
- * @return True, if the initialization should look for and create comparators, false otherwise.
+ * @return The class of the stub type run by the task.
*/
- boolean requiresComparatorOnInput();
+ Class<S> getStubType();
/**
* This method is called before the user code is opened. An exception thrown by this method
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java
index bd2f178..c3a2d59 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/PactTaskContext.java
@@ -54,7 +54,7 @@ public interface PactTaskContext<S, OT> {
<X> TypeSerializerFactory<X> getInputSerializer(int index);
- <X> TypeComparator<X> getInputComparator(int index);
+ <X> TypeComparator<X> getDriverComparator(int index);
S getStub();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
index 658ca46..35fb739 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
@@ -93,8 +93,8 @@ public class ReduceCombineDriver<T> implements PactDriver<ReduceFunction<T>, T>
}
@Override
- public boolean requiresComparatorOnInput() {
- return true;
+ public int getNumberOfDriverComparators() {
+ return 1;
}
@Override
@@ -109,7 +109,7 @@ public class ReduceCombineDriver<T> implements PactDriver<ReduceFunction<T>, T>
// instantiate the serializer / comparator
final TypeSerializerFactory<T> serializerFactory = this.taskContext.getInputSerializer(0);
- this.comparator = this.taskContext.getInputComparator(0);
+ this.comparator = this.taskContext.getDriverComparator(0);
this.serializer = serializerFactory.getSerializer();
this.reducer = this.taskContext.getStub();
this.output = this.taskContext.getOutputCollector();
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
index 875e744..9cdcdda 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
@@ -73,8 +73,8 @@ public class ReduceDriver<T> implements PactDriver<ReduceFunction<T>, T> {
}
@Override
- public boolean requiresComparatorOnInput() {
- return true;
+ public int getNumberOfDriverComparators() {
+ return 1;
}
// --------------------------------------------------------------------------------------------
@@ -86,7 +86,7 @@ public class ReduceDriver<T> implements PactDriver<ReduceFunction<T>, T> {
throw new Exception("Unrecognized driver strategy for Reduce driver: " + config.getDriverStrategy().name());
}
this.serializer = this.taskContext.<T>getInputSerializer(0).getSerializer();
- this.comparator = this.taskContext.getInputComparator(0);
+ this.comparator = this.taskContext.getDriverComparator(0);
this.input = this.taskContext.getInput(0);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
index 4b77909..72d0dc0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
@@ -299,9 +299,10 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
// the local processing includes building the dams / caches
try {
int numInputs = driver.getNumberOfInputs();
+ int numComparators = driver.getNumberOfDriverComparators();
int numBroadcastInputs = this.config.getNumBroadcastInputs();
- initInputsSerializersAndComparators(numInputs);
+ initInputsSerializersAndComparators(numInputs, numComparators);
initBroadcastInputsSerializers(numBroadcastInputs);
// set the iterative status for inputs and broadcast inputs
@@ -781,23 +782,27 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
/**
* Creates all the serializers and comparators.
*/
- protected void initInputsSerializersAndComparators(int numInputs) throws Exception {
+ protected void initInputsSerializersAndComparators(int numInputs, int numComparators) throws Exception {
this.inputSerializers = new TypeSerializerFactory<?>[numInputs];
- this.inputComparators = this.driver.requiresComparatorOnInput() ? new TypeComparator[numInputs] : null;
+ this.inputComparators = numComparators > 0 ? new TypeComparator[numComparators] : null;
this.inputIterators = new MutableObjectIterator[numInputs];
+ // ---------------- create the input serializers ---------------------
for (int i = 0; i < numInputs; i++) {
- // ---------------- create the serializer first ---------------------
+
final TypeSerializerFactory<?> serializerFactory = this.config.getInputSerializer(i, this.userCodeClassLoader);
this.inputSerializers[i] = serializerFactory;
- // ---------------- create the driver's comparator ---------------------
+ this.inputIterators[i] = createInputIterator(this.inputReaders[i], this.inputSerializers[i]);
+ }
+
+ // ---------------- create the driver's comparators ---------------------
+ for (int i = 0; i < numComparators; i++) {
+
if (this.inputComparators != null) {
final TypeComparatorFactory<?> comparatorFactory = this.config.getDriverComparator(i, this.userCodeClassLoader);
this.inputComparators[i] = comparatorFactory.createComparator();
}
-
- this.inputIterators[i] = createInputIterator(this.inputReaders[i], this.inputSerializers[i]);
}
}
@@ -1157,11 +1162,11 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
@Override
- public <X> TypeComparator<X> getInputComparator(int index) {
+ public <X> TypeComparator<X> getDriverComparator(int index) {
if (this.inputComparators == null) {
throw new IllegalStateException("Comparators have not been created!");
}
- else if (index < 0 || index >= this.driver.getNumberOfInputs()) {
+ else if (index < 0 || index >= this.driver.getNumberOfDriverComparators()) {
throw new IndexOutOfBoundsException();
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
index 69ea360..5492635 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/SynchronousChainedCombineDriver.java
@@ -56,7 +56,9 @@ public class SynchronousChainedCombineDriver<T> extends ChainedDriver<T, T> {
private TypeSerializer<T> serializer;
- private TypeComparator<T> comparator;
+ private TypeComparator<T> sortingComparator;
+
+ private TypeComparator<T> groupingComparator;
private AbstractInvokable parent;
@@ -92,19 +94,21 @@ public class SynchronousChainedCombineDriver<T> extends ChainedDriver<T, T> {
// instantiate the serializer / comparator
final TypeSerializerFactory<T> serializerFactory = this.config.getInputSerializer(0, this.userCodeClassLoader);
- final TypeComparatorFactory<T> comparatorFactory = this.config.getDriverComparator(0, this.userCodeClassLoader);
+ final TypeComparatorFactory<T> sortingComparatorFactory = this.config.getDriverComparator(0, this.userCodeClassLoader);
+ final TypeComparatorFactory<T> groupingComparatorFactory = this.config.getDriverComparator(1, this.userCodeClassLoader);
this.serializer = serializerFactory.getSerializer();
- this.comparator = comparatorFactory.createComparator();
+ this.sortingComparator = sortingComparatorFactory.createComparator();
+ this.groupingComparator = groupingComparatorFactory.createComparator();
final List<MemorySegment> memory = this.memManager.allocatePages(this.parent, numMemoryPages);
// instantiate a fix-length in-place sorter, if possible, otherwise the out-of-place sorter
- if (this.comparator.supportsSerializationWithKeyNormalization() &&
+ if (this.sortingComparator.supportsSerializationWithKeyNormalization() &&
this.serializer.getLength() > 0 && this.serializer.getLength() <= THRESHOLD_FOR_IN_PLACE_SORTING)
{
- this.sorter = new FixedLengthRecordSorter<T>(this.serializer, this.comparator, memory);
+ this.sorter = new FixedLengthRecordSorter<T>(this.serializer, this.sortingComparator, memory);
} else {
- this.sorter = new NormalizedKeySorter<T>(this.serializer, this.comparator.duplicate(), memory);
+ this.sorter = new NormalizedKeySorter<T>(this.serializer, this.sortingComparator.duplicate(), memory);
}
}
@@ -183,7 +187,7 @@ public class SynchronousChainedCombineDriver<T> extends ChainedDriver<T, T> {
this.sortAlgo.sort(sorter);
// run the combiner
final KeyGroupedIterator<T> keyIter = new KeyGroupedIterator<T>(sorter.getIterator(), this.serializer,
- this.comparator);
+ this.groupingComparator);
// cache references on the stack
final FlatCombineFunction<T> stub = this.combiner;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java
index f667ce0..262a4e9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CachedMatchTaskTest.java
@@ -70,8 +70,8 @@ public class CachedMatchTaskTest extends DriverTestBase<FlatJoinFunction<Record,
addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
- addInputComparator(this.comparator1);
- addInputComparator(this.comparator2);
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
setOutput(this.outList);
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST_CACHED);
@@ -101,8 +101,8 @@ public class CachedMatchTaskTest extends DriverTestBase<FlatJoinFunction<Record,
addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
- addInputComparator(this.comparator1);
- addInputComparator(this.comparator2);
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
setOutput(this.outList);
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED);
@@ -132,8 +132,8 @@ public class CachedMatchTaskTest extends DriverTestBase<FlatJoinFunction<Record,
addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
- addInputComparator(this.comparator1);
- addInputComparator(this.comparator2);
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
setOutput(this.outList);
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST_CACHED);
@@ -163,8 +163,8 @@ public class CachedMatchTaskTest extends DriverTestBase<FlatJoinFunction<Record,
addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
- addInputComparator(this.comparator1);
- addInputComparator(this.comparator2);
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
setOutput(this.outList);
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED);
@@ -194,8 +194,8 @@ public class CachedMatchTaskTest extends DriverTestBase<FlatJoinFunction<Record,
addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
- addInputComparator(this.comparator1);
- addInputComparator(this.comparator2);
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
setOutput(this.outList);
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST_CACHED);
@@ -225,8 +225,8 @@ public class CachedMatchTaskTest extends DriverTestBase<FlatJoinFunction<Record,
addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
- addInputComparator(this.comparator1);
- addInputComparator(this.comparator2);
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
setOutput(new NirvanaOutputList());
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST_CACHED);
@@ -255,8 +255,8 @@ public class CachedMatchTaskTest extends DriverTestBase<FlatJoinFunction<Record,
addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
- addInputComparator(this.comparator1);
- addInputComparator(this.comparator2);
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
setOutput(new NirvanaOutputList());
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED);
@@ -283,8 +283,8 @@ public class CachedMatchTaskTest extends DriverTestBase<FlatJoinFunction<Record,
addInput(new DelayingInfinitiveInputIterator(100));
addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
- addInputComparator(this.comparator1);
- addInputComparator(this.comparator2);
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
@@ -330,8 +330,8 @@ public class CachedMatchTaskTest extends DriverTestBase<FlatJoinFunction<Record,
addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
addInput(new DelayingInfinitiveInputIterator(100));
- addInputComparator(this.comparator1);
- addInputComparator(this.comparator2);
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
setOutput(new NirvanaOutputList());
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED);
@@ -374,8 +374,8 @@ public class CachedMatchTaskTest extends DriverTestBase<FlatJoinFunction<Record,
addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
- addInputComparator(this.comparator1);
- addInputComparator(this.comparator2);
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
setOutput(new NirvanaOutputList());
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
@@ -418,8 +418,8 @@ public class CachedMatchTaskTest extends DriverTestBase<FlatJoinFunction<Record,
addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
- addInputComparator(this.comparator1);
- addInputComparator(this.comparator2);
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
setOutput(new NirvanaOutputList());
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java
index 01775f3..8e7d9d3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskExternalITCase.java
@@ -62,8 +62,8 @@ public class CoGroupTaskExternalITCase extends DriverTestBase<CoGroupFunction<Re
(keyCnt1 > keyCnt2 ? (keyCnt1 - keyCnt2) * valCnt1 : (keyCnt2 - keyCnt1) * valCnt2);
setOutput(this.output);
- addInputComparator(this.comparator1);
- addInputComparator(this.comparator2);
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
getTaskConfig().setDriverStrategy(DriverStrategy.CO_GROUP);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskTest.java
index da04cc3..46007ab 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CoGroupTaskTest.java
@@ -68,8 +68,8 @@ public class CoGroupTaskTest extends DriverTestBase<CoGroupFunction<Record, Reco
(keyCnt1 > keyCnt2 ? (keyCnt1 - keyCnt2) * valCnt1 : (keyCnt2 - keyCnt1) * valCnt2);
setOutput(this.output);
- addInputComparator(this.comparator1);
- addInputComparator(this.comparator2);
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
getTaskConfig().setDriverStrategy(DriverStrategy.CO_GROUP);
@@ -99,8 +99,8 @@ public class CoGroupTaskTest extends DriverTestBase<CoGroupFunction<Record, Reco
(keyCnt1 > keyCnt2 ? (keyCnt1 - keyCnt2) * valCnt1 : (keyCnt2 - keyCnt1) * valCnt2);
setOutput(this.output);
- addInputComparator(this.comparator1);
- addInputComparator(this.comparator2);
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
getTaskConfig().setDriverStrategy(DriverStrategy.CO_GROUP);
@@ -130,8 +130,8 @@ public class CoGroupTaskTest extends DriverTestBase<CoGroupFunction<Record, Reco
(keyCnt1 > keyCnt2 ? (keyCnt1 - keyCnt2) * valCnt1 : (keyCnt2 - keyCnt1) * valCnt2);
setOutput(this.output);
- addInputComparator(this.comparator1);
- addInputComparator(this.comparator2);
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
getTaskConfig().setDriverStrategy(DriverStrategy.CO_GROUP);
@@ -161,8 +161,8 @@ public class CoGroupTaskTest extends DriverTestBase<CoGroupFunction<Record, Reco
(keyCnt1 > keyCnt2 ? (keyCnt1 - keyCnt2) * valCnt1 : (keyCnt2 - keyCnt1) * valCnt2);
setOutput(this.output);
- addInputComparator(this.comparator1);
- addInputComparator(this.comparator2);
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
getTaskConfig().setDriverStrategy(DriverStrategy.CO_GROUP);
@@ -195,8 +195,8 @@ public class CoGroupTaskTest extends DriverTestBase<CoGroupFunction<Record, Reco
addInput(new UniformRecordGenerator(keyCnt1, valCnt1, true));
addInput(new UniformRecordGenerator(keyCnt2, valCnt2, true));
- addInputComparator(this.comparator1);
- addInputComparator(this.comparator2);
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
getTaskConfig().setDriverStrategy(DriverStrategy.CO_GROUP);
@@ -225,8 +225,8 @@ public class CoGroupTaskTest extends DriverTestBase<CoGroupFunction<Record, Reco
addInput(new UniformRecordGenerator(keyCnt1, valCnt1, true));
addInput(new UniformRecordGenerator(keyCnt2, valCnt2, true));
- addInputComparator(this.comparator1);
- addInputComparator(this.comparator2);
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
getTaskConfig().setDriverStrategy(DriverStrategy.CO_GROUP);
@@ -251,8 +251,8 @@ public class CoGroupTaskTest extends DriverTestBase<CoGroupFunction<Record, Reco
setOutput(this.output);
- addInputComparator(this.comparator1);
- addInputComparator(this.comparator2);
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
getTaskConfig().setDriverStrategy(DriverStrategy.CO_GROUP);
@@ -302,8 +302,8 @@ public class CoGroupTaskTest extends DriverTestBase<CoGroupFunction<Record, Reco
setOutput(this.output);
- addInputComparator(this.comparator1);
- addInputComparator(this.comparator2);
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
getTaskConfig().setDriverStrategy(DriverStrategy.CO_GROUP);
@@ -354,8 +354,8 @@ public class CoGroupTaskTest extends DriverTestBase<CoGroupFunction<Record, Reco
setOutput(this.output);
- addInputComparator(this.comparator1);
- addInputComparator(this.comparator2);
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
getTaskConfig().setDriverStrategy(DriverStrategy.CO_GROUP);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
index 934608a..decf358 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
@@ -59,7 +59,8 @@ public class CombineTaskExternalITCase extends DriverTestBase<RichGroupReduceFun
final int valCnt = 8;
addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
- addInputComparator(this.comparator);
+ addDriverComparator(this.comparator);
+ addDriverComparator(this.comparator);
setOutput(this.outList);
getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
@@ -112,7 +113,8 @@ public class CombineTaskExternalITCase extends DriverTestBase<RichGroupReduceFun
final int valCnt = 8;
addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
- addInputComparator(this.comparator);
+ addDriverComparator(this.comparator);
+ addDriverComparator(this.comparator);
setOutput(this.outList);
getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
index 0a11ff3..1f18917 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
@@ -61,7 +61,8 @@ public class CombineTaskTest extends DriverTestBase<RichGroupReduceFunction<Reco
int valCnt = 20;
addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
- addInputComparator(this.comparator);
+ addDriverComparator(this.comparator);
+ addDriverComparator(this.comparator);
setOutput(this.outList);
getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
@@ -97,7 +98,8 @@ public class CombineTaskTest extends DriverTestBase<RichGroupReduceFunction<Reco
int valCnt = 20;
addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
- addInputComparator(this.comparator);
+ addDriverComparator(this.comparator);
+ addDriverComparator(this.comparator);
setOutput(new DiscardingOutputCollector<Record>());
getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
@@ -121,7 +123,8 @@ public class CombineTaskTest extends DriverTestBase<RichGroupReduceFunction<Reco
public void testCancelCombineTaskSorting()
{
addInput(new DelayingInfinitiveInputIterator(100));
- addInputComparator(this.comparator);
+ addDriverComparator(this.comparator);
+ addDriverComparator(this.comparator);
setOutput(new DiscardingOutputCollector<Record>());
getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e5731e0e/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java
index fd422ab..6e66e72 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java
@@ -71,8 +71,8 @@ public class MatchTaskExternalITCase extends DriverTestBase<FlatJoinFunction<Rec
final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
setOutput(this.output);
- addInputComparator(this.comparator1);
- addInputComparator(this.comparator2);
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
@@ -104,8 +104,8 @@ public class MatchTaskExternalITCase extends DriverTestBase<FlatJoinFunction<Rec
addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
- addInputComparator(this.comparator1);
- addInputComparator(this.comparator2);
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
setOutput(this.output);
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
@@ -135,8 +135,8 @@ public class MatchTaskExternalITCase extends DriverTestBase<FlatJoinFunction<Rec
addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
- addInputComparator(this.comparator1);
- addInputComparator(this.comparator2);
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
setOutput(this.output);
getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);