You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/11/26 13:00:25 UTC
[2/3] incubator-flink git commit: [FLINK-658] [APIs] Add group
sorting to CoGroup
[FLINK-658] [APIs] Add group sorting to CoGroup
This closes #234
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/392683f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/392683f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/392683f3
Branch: refs/heads/master
Commit: 392683f3767170c33e3fe2be59825bec5002a4ca
Parents: 606a6d4
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Nov 25 16:34:55 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Nov 25 21:02:10 2014 +0100
----------------------------------------------------------------------
.../operators/base/CoGroupOperatorBase.java | 73 +++++--
.../api/java/operators/CoGroupOperator.java | 218 ++++++++++++++++---
.../translation/CoGroupSortTranslationTest.java | 132 +++++++++++
.../apache/flink/api/scala/coGroupDataSet.scala | 113 +++++++++-
.../apache/flink/api/scala/joinDataSet.scala | 2 +-
.../test/operators/CoGroupGroupSortITCase.java | 122 +++++++++++
.../CoGroupGroupSortTranslationTest.scala | 171 +++++++++++++++
7 files changed, 776 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/392683f3/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java
index 7fe46eb..65b9d1c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java
@@ -192,25 +192,66 @@ public class CoGroupOperatorBase<IN1, IN2, OUT, FT extends CoGroupFunction<IN1,
// --------------------------------------------------------------------
TypeInformation<IN1> inputType1 = getOperatorInfo().getFirstInputType();
TypeInformation<IN2> inputType2 = getOperatorInfo().getSecondInputType();
-
+
+ // for the grouping / merging comparator
int[] inputKeys1 = getKeyColumns(0);
int[] inputKeys2 = getKeyColumns(1);
-
- boolean[] inputSortDirections1 = new boolean[inputKeys1.length];
- boolean[] inputSortDirections2 = new boolean[inputKeys2.length];
-
- Arrays.fill(inputSortDirections1, true);
- Arrays.fill(inputSortDirections2, true);
-
+
+ boolean[] inputDirections1 = new boolean[inputKeys1.length];
+ boolean[] inputDirections2 = new boolean[inputKeys2.length];
+ Arrays.fill(inputDirections1, true);
+ Arrays.fill(inputDirections2, true);
+
final TypeSerializer<IN1> inputSerializer1 = inputType1.createSerializer();
final TypeSerializer<IN2> inputSerializer2 = inputType2.createSerializer();
- final TypeComparator<IN1> inputComparator1 = getTypeComparator(inputType1, inputKeys1, inputSortDirections1);
- final TypeComparator<IN2> inputComparator2 = getTypeComparator(inputType2, inputKeys2, inputSortDirections2);
+ final TypeComparator<IN1> inputComparator1 = getTypeComparator(inputType1, inputKeys1, inputDirections1);
+ final TypeComparator<IN2> inputComparator2 = getTypeComparator(inputType2, inputKeys2, inputDirections2);
+
+ final TypeComparator<IN1> inputSortComparator1;
+ final TypeComparator<IN2> inputSortComparator2;
+
+ if (groupOrder1 == null || groupOrder1.getNumberOfFields() == 0) {
+ // no group sorting
+ inputSortComparator1 = inputComparator1;
+ }
+ else {
+ // group sorting
+ int[] groupSortKeys = groupOrder1.getFieldPositions();
+ int[] allSortKeys = new int[inputKeys1.length + groupOrder1.getNumberOfFields()];
+ System.arraycopy(inputKeys1, 0, allSortKeys, 0, inputKeys1.length);
+ System.arraycopy(groupSortKeys, 0, allSortKeys, inputKeys1.length, groupSortKeys.length);
+
+ boolean[] groupSortDirections = groupOrder1.getFieldSortDirections();
+ boolean[] allSortDirections = new boolean[inputKeys1.length + groupSortKeys.length];
+ Arrays.fill(allSortDirections, 0, inputKeys1.length, true);
+ System.arraycopy(groupSortDirections, 0, allSortDirections, inputKeys1.length, groupSortDirections.length);
+
+ inputSortComparator1 = getTypeComparator(inputType1, allSortKeys, allSortDirections);
+ }
+
+ if (groupOrder2 == null || groupOrder2.getNumberOfFields() == 0) {
+ // no group sorting
+ inputSortComparator2 = inputComparator2;
+ }
+ else {
+ // group sorting
+ int[] groupSortKeys = groupOrder2.getFieldPositions();
+ int[] allSortKeys = new int[inputKeys2.length + groupOrder2.getNumberOfFields()];
+ System.arraycopy(inputKeys2, 0, allSortKeys, 0, inputKeys2.length);
+ System.arraycopy(groupSortKeys, 0, allSortKeys, inputKeys2.length, groupSortKeys.length);
+
+ boolean[] groupSortDirections = groupOrder2.getFieldSortDirections();
+ boolean[] allSortDirections = new boolean[inputKeys2.length + groupSortKeys.length];
+ Arrays.fill(allSortDirections, 0, inputKeys2.length, true);
+ System.arraycopy(groupSortDirections, 0, allSortDirections, inputKeys2.length, groupSortDirections.length);
+
+ inputSortComparator2 = getTypeComparator(inputType2, allSortKeys, allSortDirections);
+ }
CoGroupSortListIterator<IN1, IN2> coGroupIterator =
- new CoGroupSortListIterator<IN1, IN2>(input1, inputComparator1, inputSerializer1,
- input2, inputComparator2, inputSerializer2, mutableObjectSafe);
+ new CoGroupSortListIterator<IN1, IN2>(input1, inputSortComparator1, inputComparator1, inputSerializer1,
+ input2, inputSortComparator2, inputComparator2, inputSerializer2, mutableObjectSafe);
// --------------------------------------------------------------------
// Run UDF
@@ -261,8 +302,8 @@ public class CoGroupOperatorBase<IN1, IN2, OUT, FT extends CoGroupFunction<IN1,
private Iterable<IN2> secondReturn;
private CoGroupSortListIterator(
- List<IN1> input1, final TypeComparator<IN1> inputComparator1, TypeSerializer<IN1> serializer1,
- List<IN2> input2, final TypeComparator<IN2> inputComparator2, TypeSerializer<IN2> serializer2,
+ List<IN1> input1, final TypeComparator<IN1> inputSortComparator1, TypeComparator<IN1> inputComparator1, TypeSerializer<IN1> serializer1,
+ List<IN2> input2, final TypeComparator<IN2> inputSortComparator2, TypeComparator<IN2> inputComparator2, TypeSerializer<IN2> serializer2,
boolean copyElements)
{
this.pairComparator = new GenericPairComparator<IN1, IN2>(inputComparator1, inputComparator2);
@@ -276,14 +317,14 @@ public class CoGroupOperatorBase<IN1, IN2, OUT, FT extends CoGroupFunction<IN1,
Collections.sort(input1, new Comparator<IN1>() {
@Override
public int compare(IN1 o1, IN1 o2) {
- return inputComparator1.compare(o1, o2);
+ return inputSortComparator1.compare(o1, o2);
}
});
Collections.sort(input2, new Comparator<IN2>() {
@Override
public int compare(IN2 o1, IN2 o2) {
- return inputComparator2.compare(o1, o2);
+ return inputSortComparator2.compare(o1, o2);
}
});
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/392683f3/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
index 7394c18..b69f326 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
@@ -19,17 +19,25 @@
package org.apache.flink.api.java.operators;
import java.security.InvalidParameterException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.operators.BinaryOperatorInformation;
import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.Ordering;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
import org.apache.flink.api.common.operators.base.MapOperatorBase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.operators.DeltaIteration.SolutionSetPlaceHolder;
@@ -45,7 +53,6 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
-
/**
* A {@link DataSet} that is the result of a CoGroup transformation.
*
@@ -64,21 +71,32 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
private final String defaultName;
+ private final List<Pair<Integer, Order>> groupSortKeyOrderFirst;
+ private final List<Pair<Integer, Order>> groupSortKeyOrderSecond;
+
private Partitioner<?> customPartitioner;
- public CoGroupOperator(DataSet<I1> input1, DataSet<I2> input2,
- Keys<I1> keys1, Keys<I2> keys2,
- CoGroupFunction<I1, I2, OUT> function,
- TypeInformation<OUT> returnType,
- Partitioner<?> customPartitioner,
- String defaultName)
+ public CoGroupOperator(DataSet<I1> input1, DataSet<I2> input2, Keys<I1> keys1, Keys<I2> keys2,
+ CoGroupFunction<I1, I2, OUT> function, TypeInformation<OUT> returnType,
+ Partitioner<?> customPartitioner, String defaultName)
+ {
+ this(input1, input2, keys1, keys2, function, returnType, null, null, customPartitioner, defaultName);
+ }
+
+ public CoGroupOperator(DataSet<I1> input1, DataSet<I2> input2, Keys<I1> keys1, Keys<I2> keys2,
+ CoGroupFunction<I1, I2, OUT> function, TypeInformation<OUT> returnType,
+ List<Pair<Integer, Order>> groupSortKeyOrderFirst, List<Pair<Integer, Order>> groupSortKeyOrderSecond,
+ Partitioner<?> customPartitioner, String defaultName)
{
super(input1, input2, returnType);
this.function = function;
this.customPartitioner = customPartitioner;
this.defaultName = defaultName;
+
+ this.groupSortKeyOrderFirst = groupSortKeyOrderFirst == null ? Collections.<Pair<Integer, Order>>emptyList() : groupSortKeyOrderFirst;
+ this.groupSortKeyOrderSecond = groupSortKeyOrderSecond == null ? Collections.<Pair<Integer, Order>>emptyList() : groupSortKeyOrderSecond;
if (keys1 == null || keys2 == null) {
throw new NullPointerException();
@@ -147,12 +165,15 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
@Override
protected org.apache.flink.api.common.operators.base.CoGroupOperatorBase<?, ?, OUT, ?> translateToDataFlow(Operator<I1> input1, Operator<I2> input2) {
- String name = getName() != null ? getName() : "CoGroup at "+defaultName;
+ String name = getName() != null ? getName() : "CoGroup at " + defaultName;
try {
keys1.areCompatible(keys2);
- } catch (IncompatibleKeysException e) {
+ }
+ catch (IncompatibleKeysException e) {
throw new InvalidProgramException("The types of the key fields do not match.", e);
}
+
+ final org.apache.flink.api.common.operators.base.CoGroupOperatorBase<?, ?, OUT, ?> po;
if (keys1 instanceof Keys.SelectorFunctionKeys
&& keys2 instanceof Keys.SelectorFunctionKeys) {
@@ -164,15 +185,11 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
Keys.SelectorFunctionKeys<I2, ?> selectorKeys2 =
(Keys.SelectorFunctionKeys<I2, ?>) keys2;
- PlanBothUnwrappingCoGroupOperator<I1, I2, OUT, ?> po =
- translateSelectorFunctionCoGroup(selectorKeys1, selectorKeys2, function,
+ po = translateSelectorFunctionCoGroup(selectorKeys1, selectorKeys2, function,
getInput1Type(), getInput2Type(), getResultType(), name, input1, input2);
po.setDegreeOfParallelism(getParallelism());
po.setCustomPartitioner(customPartitioner);
-
- return po;
-
}
else if (keys2 instanceof Keys.SelectorFunctionKeys) {
@@ -181,14 +198,11 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
@SuppressWarnings("unchecked")
Keys.SelectorFunctionKeys<I2, ?> selectorKeys2 = (Keys.SelectorFunctionKeys<I2, ?>) keys2;
- PlanRightUnwrappingCoGroupOperator<I1, I2, OUT, ?> po =
- translateSelectorFunctionCoGroupRight(logicalKeyPositions1, selectorKeys2, function,
+ po = translateSelectorFunctionCoGroupRight(logicalKeyPositions1, selectorKeys2, function,
getInput1Type(), getInput2Type(), getResultType(), name, input1, input2);
po.setDegreeOfParallelism(getParallelism());
po.setCustomPartitioner(customPartitioner);
-
- return po;
}
else if (keys1 instanceof Keys.SelectorFunctionKeys) {
@@ -197,14 +211,8 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
int[] logicalKeyPositions2 = keys2.computeLogicalKeyPositions();
- PlanLeftUnwrappingCoGroupOperator<I1, I2, OUT, ?> po =
- translateSelectorFunctionCoGroupLeft(selectorKeys1, logicalKeyPositions2, function,
+ po = translateSelectorFunctionCoGroupLeft(selectorKeys1, logicalKeyPositions2, function,
getInput1Type(), getInput2Type(), getResultType(), name, input1, input2);
-
- po.setDegreeOfParallelism(getParallelism());
- po.setCustomPartitioner(customPartitioner);
-
- return po;
}
else if ( keys1 instanceof Keys.ExpressionKeys && keys2 instanceof Keys.ExpressionKeys)
{
@@ -217,22 +225,39 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
int[] logicalKeyPositions1 = keys1.computeLogicalKeyPositions();
int[] logicalKeyPositions2 = keys2.computeLogicalKeyPositions();
- CoGroupOperatorBase<I1, I2, OUT, CoGroupFunction<I1, I2, OUT>> po =
+ CoGroupOperatorBase<I1, I2, OUT, CoGroupFunction<I1, I2, OUT>> op =
new CoGroupOperatorBase<I1, I2, OUT, CoGroupFunction<I1, I2, OUT>>(
function, new BinaryOperatorInformation<I1, I2, OUT>(getInput1Type(), getInput2Type(), getResultType()),
logicalKeyPositions1, logicalKeyPositions2, name);
- // set inputs
- po.setFirstInput(input1);
- po.setSecondInput(input2);
-
- po.setDegreeOfParallelism(getParallelism());
- po.setCustomPartitioner(customPartitioner);
- return po;
+ op.setFirstInput(input1);
+ op.setSecondInput(input2);
+ po = op;
}
else {
throw new UnsupportedOperationException("Unrecognized or incompatible key types.");
}
+
+ // configure shared characteristics
+ po.setDegreeOfParallelism(getParallelism());
+ po.setCustomPartitioner(customPartitioner);
+
+ if (groupSortKeyOrderFirst.size() > 0) {
+ Ordering o = new Ordering();
+ for (Pair<Integer, Order> entry : groupSortKeyOrderFirst) {
+ o.appendOrdering(entry.getLeft(), null, entry.getRight());
+ }
+ po.setGroupOrderForInputOne(o);
+ }
+ if (groupSortKeyOrderSecond.size() > 0) {
+ Ordering o = new Ordering();
+ for (Pair<Integer, Order> entry : groupSortKeyOrderSecond) {
+ o.appendOrdering(entry.getLeft(), null, entry.getRight());
+ }
+ po.setGroupOrderForInputTwo(o);
+ }
+
+ return po;
}
@@ -524,23 +549,30 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
return new CoGroupOperatorWithoutFunction(keys2);
}
+
+ // ------------------------------------------------------------------------------------
public final class CoGroupOperatorWithoutFunction {
private final Keys<I2> keys2;
+ private final List<Pair<Integer, Order>> groupSortKeyOrderFirst;
+ private final List<Pair<Integer, Order>> groupSortKeyOrderSecond;
+
private Partitioner<?> customPartitioner;
private CoGroupOperatorWithoutFunction(Keys<I2> keys2) {
if (keys2 == null) {
throw new NullPointerException();
}
-
if (keys2.isEmpty()) {
throw new InvalidProgramException("The co-group keys must not be empty.");
}
this.keys2 = keys2;
+
+ this.groupSortKeyOrderFirst = new ArrayList<Pair<Integer, Order>>();
+ this.groupSortKeyOrderSecond = new ArrayList<Pair<Integer, Order>>();
}
/**
@@ -586,9 +618,125 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
throw new NullPointerException("CoGroup function must not be null.");
}
TypeInformation<R> returnType = TypeExtractor.getCoGroupReturnTypes(function, input1.getType(), input2.getType());
- return new CoGroupOperator<I1, I2, R>(input1, input2, keys1, keys2, function, returnType,
+
+ return new CoGroupOperator<I1, I2, R>(input1, input2, keys1, keys2, function, returnType,
+ groupSortKeyOrderFirst, groupSortKeyOrderSecond,
customPartitioner, Utils.getCallLocationName());
}
+
+ // --------------------------------------------------------------------------------
+ // Group Operations
+ // --------------------------------------------------------------------------------
+
+ /**
+ * Sorts {@link org.apache.flink.api.java.tuple.Tuple} elements within a group in the first input on the
+ * specified field in the specified {@link Order}.</br>
+ * <b>Note: Only groups of Tuple elements and Pojos can be sorted.</b><br/>
+ * Groups can be sorted by multiple fields by chaining {@link #sortFirstGroup(int, Order)} calls.
+ *
+ * @param field The Tuple field on which the group is sorted.
+ * @param order The Order in which the specified Tuple field is sorted.
+ * @return A SortedGrouping with specified order of group element.
+ *
+ * @see org.apache.flink.api.java.tuple.Tuple
+ * @see Order
+ */
+ public CoGroupOperatorWithoutFunction sortFirstGroup(int field, Order order) {
+ if (!input1.getType().isTupleType()) {
+ throw new InvalidProgramException("Specifying order keys via field positions is only valid for tuple data types");
+ }
+ if (field >= input1.getType().getArity()) {
+ throw new IllegalArgumentException("Order key out of tuple bounds.");
+ }
+ ExpressionKeys<I1> ek = new ExpressionKeys<I1>(new int[]{field}, input1.getType());
+ int[] groupOrderKeys = ek.computeLogicalKeyPositions();
+
+ for (int key : groupOrderKeys) {
+ this.groupSortKeyOrderFirst.add(new ImmutablePair<Integer, Order>(key, order));
+ }
+
+ return this;
+ }
+
+ /**
+ * Sorts {@link org.apache.flink.api.java.tuple.Tuple} elements within a group in the second input on the
+ * specified field in the specified {@link Order}.</br>
+ * <b>Note: Only groups of Tuple elements and Pojos can be sorted.</b><br/>
+ * Groups can be sorted by multiple fields by chaining {@link #sortSecondGroup(int, Order)} calls.
+ *
+ * @param field The Tuple field on which the group is sorted.
+ * @param order The Order in which the specified Tuple field is sorted.
+ * @return A SortedGrouping with specified order of group element.
+ *
+ * @see org.apache.flink.api.java.tuple.Tuple
+ * @see Order
+ */
+ public CoGroupOperatorWithoutFunction sortSecondGroup(int field, Order order) {
+ if (!input2.getType().isTupleType()) {
+ throw new InvalidProgramException("Specifying order keys via field positions is only valid for tuple data types");
+ }
+ if (field >= input2.getType().getArity()) {
+ throw new IllegalArgumentException("Order key out of tuple bounds.");
+ }
+ ExpressionKeys<I2> ek = new ExpressionKeys<I2>(new int[]{field}, input2.getType());
+ int[] groupOrderKeys = ek.computeLogicalKeyPositions();
+
+ for (int key : groupOrderKeys) {
+ this.groupSortKeyOrderSecond.add(new ImmutablePair<Integer, Order>(key, order));
+ }
+
+ return this;
+ }
+
+ /**
+ * Sorts Pojo or {@link org.apache.flink.api.java.tuple.Tuple} elements within a group in the first input on the
+ * specified field in the specified {@link Order}.</br>
+ * Groups can be sorted by multiple fields by chaining {@link #sortFirstGroup(String, Order)} calls.
+ *
+ * @param fieldExpression The expression to the field on which the group is to be sorted.
+ * @param order The Order in which the specified Tuple field is sorted.
+ * @return A SortedGrouping with specified order of group element.
+ *
+ * @see Order
+ */
+ public CoGroupOperatorWithoutFunction sortFirstGroup(String fieldExpression, Order order) {
+ if (! (input1.getType() instanceof CompositeType)) {
+ throw new InvalidProgramException("Specifying order keys via field positions is only valid for composite data types (pojo / tuple / case class)");
+ }
+ ExpressionKeys<I1> ek = new ExpressionKeys<I1>(new String[]{fieldExpression}, input1.getType());
+ int[] groupOrderKeys = ek.computeLogicalKeyPositions();
+
+ for (int key : groupOrderKeys) {
+ this.groupSortKeyOrderFirst.add(new ImmutablePair<Integer, Order>(key, order));
+ }
+
+ return this;
+ }
+
+ /**
+ * Sorts Pojo or {@link org.apache.flink.api.java.tuple.Tuple} elements within a group in the second input on the
+ * specified field in the specified {@link Order}.</br>
+ * Groups can be sorted by multiple fields by chaining {@link #sortSecondGroup(String, Order)} calls.
+ *
+ * @param fieldExpression The expression to the field on which the group is to be sorted.
+ * @param order The Order in which the specified Tuple field is sorted.
+ * @return A SortedGrouping with specified order of group element.
+ *
+ * @see Order
+ */
+ public CoGroupOperatorWithoutFunction sortSecondGroup(String fieldExpression, Order order) {
+ if (! (input2.getType() instanceof CompositeType)) {
+ throw new InvalidProgramException("Specifying order keys via field positions is only valid for composite data types (pojo / tuple / case class)");
+ }
+ ExpressionKeys<I2> ek = new ExpressionKeys<I2>(new String[]{fieldExpression}, input2.getType());
+ int[] groupOrderKeys = ek.computeLogicalKeyPositions();
+
+ for (int key : groupOrderKeys) {
+ this.groupSortKeyOrderSecond.add(new ImmutablePair<Integer, Order>(key, order));
+ }
+
+ return this;
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/392683f3/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/CoGroupSortTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/CoGroupSortTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/CoGroupSortTranslationTest.java
new file mode 100644
index 0000000..2fe9965
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/CoGroupSortTranslationTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operators.translation;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.operators.GenericDataSinkBase;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+@SuppressWarnings({"serial", "unchecked"})
+public class CoGroupSortTranslationTest implements java.io.Serializable {
+
+ @Test
+ public void testGroupSortTuples() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple2<Long, Long>> input1 = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
+ DataSet<Tuple3<Long, Long, Long>> input2 = env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L));
+
+ input1.coGroup(input2)
+ .where(1).equalTo(2)
+ .sortFirstGroup(0, Order.DESCENDING)
+ .sortSecondGroup(1, Order.ASCENDING).sortSecondGroup(0, Order.DESCENDING)
+
+ .with(new CoGroupFunction<Tuple2<Long, Long>, Tuple3<Long, Long, Long>, Long>() {
+ @Override
+ public void coGroup(Iterable<Tuple2<Long, Long>> first, Iterable<Tuple3<Long, Long, Long>> second,
+ Collector<Long> out) {}
+ })
+
+ .print();
+
+ Plan p = env.createProgramPlan();
+
+ GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
+ CoGroupOperatorBase<?, ?, ?, ?> coGroup = (CoGroupOperatorBase<?, ?, ?, ?>) sink.getInput();
+
+ assertNotNull(coGroup.getGroupOrderForInputOne());
+ assertNotNull(coGroup.getGroupOrderForInputTwo());
+
+ assertEquals(1, coGroup.getGroupOrderForInputOne().getNumberOfFields());
+ assertEquals(0, coGroup.getGroupOrderForInputOne().getFieldNumber(0).intValue());
+ assertEquals(Order.DESCENDING, coGroup.getGroupOrderForInputOne().getOrder(0));
+
+ assertEquals(2, coGroup.getGroupOrderForInputTwo().getNumberOfFields());
+ assertEquals(1, coGroup.getGroupOrderForInputTwo().getFieldNumber(0).intValue());
+ assertEquals(0, coGroup.getGroupOrderForInputTwo().getFieldNumber(1).intValue());
+ assertEquals(Order.ASCENDING, coGroup.getGroupOrderForInputTwo().getOrder(0));
+ assertEquals(Order.DESCENDING, coGroup.getGroupOrderForInputTwo().getOrder(1));
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testSortTuplesAndPojos() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple2<Long, Long>> input1 = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
+ DataSet<TestPoJo> input2 = env.fromElements(new TestPoJo());
+
+ input1.coGroup(input2)
+ .where(1).equalTo("b")
+ .sortFirstGroup(0, Order.DESCENDING)
+ .sortSecondGroup("c", Order.ASCENDING).sortSecondGroup("a", Order.DESCENDING)
+
+ .with(new CoGroupFunction<Tuple2<Long, Long>, TestPoJo, Long>() {
+ @Override
+ public void coGroup(Iterable<Tuple2<Long, Long>> first, Iterable<TestPoJo> second, Collector<Long> out) {}
+ })
+
+ .print();
+
+ Plan p = env.createProgramPlan();
+
+ GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
+ CoGroupOperatorBase<?, ?, ?, ?> coGroup = (CoGroupOperatorBase<?, ?, ?, ?>) sink.getInput();
+
+ assertNotNull(coGroup.getGroupOrderForInputOne());
+ assertNotNull(coGroup.getGroupOrderForInputTwo());
+
+ assertEquals(1, coGroup.getGroupOrderForInputOne().getNumberOfFields());
+ assertEquals(0, coGroup.getGroupOrderForInputOne().getFieldNumber(0).intValue());
+ assertEquals(Order.DESCENDING, coGroup.getGroupOrderForInputOne().getOrder(0));
+
+ assertEquals(2, coGroup.getGroupOrderForInputTwo().getNumberOfFields());
+ assertEquals(2, coGroup.getGroupOrderForInputTwo().getFieldNumber(0).intValue());
+ assertEquals(0, coGroup.getGroupOrderForInputTwo().getFieldNumber(1).intValue());
+ assertEquals(Order.ASCENDING, coGroup.getGroupOrderForInputTwo().getOrder(0));
+ assertEquals(Order.DESCENDING, coGroup.getGroupOrderForInputTwo().getOrder(1));
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ public static class TestPoJo {
+ public long a;
+ public long b;
+ public long c;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/392683f3/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala
index da71b6d..28e468c 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala
@@ -15,20 +15,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.api.scala
import org.apache.commons.lang3.Validate
+import org.apache.commons.lang3.tuple.Pair
+import org.apache.commons.lang3.tuple.ImmutablePair
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.common.InvalidProgramException
import org.apache.flink.api.common.functions.{RichCoGroupFunction, CoGroupFunction}
+import org.apache.flink.api.common.functions.Partitioner
+import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.common.typeutils.TypeSerializer
import org.apache.flink.api.java.operators._
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, CaseClassTypeInfo}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.util.Collector
+import scala.collection.mutable
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
-import org.apache.flink.api.common.functions.Partitioner
-
+import org.apache.flink.api.java.operators.Keys.ExpressionKeys
/**
* A specific [[DataSet]] that results from a `coGroup` operation. The result of a default coGroup
@@ -65,7 +72,12 @@ class CoGroupDataSet[L, R](
rightKeys: Keys[R])
extends DataSet(defaultCoGroup) {
- var customPartitioner : Partitioner[_] = _
+ private val groupSortKeyPositionsFirst = mutable.MutableList[Either[Int, String]]()
+ private val groupSortKeyPositionsSecond = mutable.MutableList[Either[Int, String]]()
+ private val groupSortOrdersFirst = mutable.MutableList[Order]()
+ private val groupSortOrdersSecond = mutable.MutableList[Order]()
+
+ private var customPartitioner : Partitioner[_] = _
/**
* Creates a new [[DataSet]] where the result for each pair of co-grouped element lists is the
@@ -86,6 +98,8 @@ class CoGroupDataSet[L, R](
rightKeys,
coGrouper,
implicitly[TypeInformation[O]],
+ buildGroupSortList(leftInput.getType, groupSortKeyPositionsFirst, groupSortOrdersFirst),
+ buildGroupSortList(rightInput.getType, groupSortKeyPositionsSecond, groupSortOrdersSecond),
customPartitioner,
getCallLocationName())
@@ -113,6 +127,8 @@ class CoGroupDataSet[L, R](
rightKeys,
coGrouper,
implicitly[TypeInformation[O]],
+ buildGroupSortList(leftInput.getType, groupSortKeyPositionsFirst, groupSortOrdersFirst),
+ buildGroupSortList(rightInput.getType, groupSortKeyPositionsSecond, groupSortOrdersSecond),
customPartitioner,
getCallLocationName())
@@ -136,6 +152,8 @@ class CoGroupDataSet[L, R](
rightKeys,
coGrouper,
implicitly[TypeInformation[O]],
+ buildGroupSortList(leftInput.getType, groupSortKeyPositionsFirst, groupSortOrdersFirst),
+ buildGroupSortList(rightInput.getType, groupSortKeyPositionsSecond, groupSortOrdersSecond),
customPartitioner,
getCallLocationName())
@@ -165,6 +183,95 @@ class CoGroupDataSet[L, R](
def getPartitioner[K]() : Partitioner[K] = {
customPartitioner.asInstanceOf[Partitioner[K]]
}
+
+ /**
+ * Adds a secondary sort key to the first input of this [[CoGroupDataSet]].
+ *
+ * This only works on Tuple DataSets.
+ */
+ def sortFirstGroup(field: Int, order: Order): CoGroupDataSet[L, R] = {
+ if (!defaultCoGroup.getInput1Type().isTupleType) {
+ throw new InvalidProgramException("Specifying order keys via field positions is only valid " +
+ "for tuple data types.")
+ }
+ if (field >= defaultCoGroup.getInput1Type().getArity) {
+ throw new IllegalArgumentException("Order key out of tuple bounds.")
+ }
+ groupSortKeyPositionsFirst += Left(field)
+ groupSortOrdersFirst += order
+ this
+ }
+
+ /**
+ * Adds a secondary sort key to the first input of this [[CoGroupDataSet]].
+ */
+ def sortFirstGroup(field: String, order: Order): CoGroupDataSet[L, R] = {
+ groupSortKeyPositionsFirst += Right(field)
+ groupSortOrdersFirst += order
+ this
+ }
+
+ /**
+ * Adds a secondary sort key to the second input of this [[CoGroupDataSet]].
+ *
+ * This only works on Tuple DataSets.
+ */
+ def sortSecondGroup(field: Int, order: Order): CoGroupDataSet[L, R] = {
+ if (!defaultCoGroup.getInput2Type().isTupleType) {
+ throw new InvalidProgramException("Specifying order keys via field positions is only valid " +
+ "for tuple data types.")
+ }
+ if (field >= defaultCoGroup.getInput2Type().getArity) {
+ throw new IllegalArgumentException("Order key out of tuple bounds.")
+ }
+ groupSortKeyPositionsSecond += Left(field)
+ groupSortOrdersSecond += order
+ this
+ }
+
+ /**
+ * Adds a secondary sort key to the second input of this [[CoGroupDataSet]].
+ */
+ def sortSecondGroup(field: String, order: Order): CoGroupDataSet[L, R] = {
+ groupSortKeyPositionsSecond += Right(field)
+ groupSortOrdersSecond += order
+ this
+ }
+
+ private def buildGroupSortList[T](typeInfo: TypeInformation[T],
+ keys: mutable.MutableList[Either[Int, String]],
+ orders: mutable.MutableList[Order])
+ : java.util.List[Pair[java.lang.Integer, Order]] =
+ {
+ if (keys.isEmpty) {
+ null
+ }
+ else {
+ val result = new java.util.ArrayList[Pair[java.lang.Integer, Order]]
+
+ keys.zip(orders).foreach {
+ case ( Left(position), order ) => result.add(
+ new ImmutablePair[java.lang.Integer, Order](position, order))
+
+ case ( Right(expression), order ) => {
+ if (! (typeInfo.isInstanceOf[CompositeType[_]])) {
+ throw new InvalidProgramException("Specifying order keys via field positions is only "
+ + "valid for composite data types (pojo / tuple / case class)");
+ }
+ else {
+ val ek = new ExpressionKeys[T](Array[String](expression), typeInfo)
+ val groupOrderKeys : Array[Int] = ek.computeLogicalKeyPositions()
+
+ for (k <- groupOrderKeys) {
+ result.add(new ImmutablePair[java.lang.Integer, Order](k, order))
+ }
+ }
+ }
+ }
+
+ result
+ }
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/392683f3/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala
index f5b0783..3e3dc9a 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala
@@ -66,7 +66,7 @@ class JoinDataSet[L, R](
rightKeys: Keys[R])
extends DataSet(defaultJoin) {
- var customPartitioner : Partitioner[_] = _
+ private var customPartitioner : Partitioner[_] = _
/**
* Creates a new [[DataSet]] where the result for each pair of joined elements is the result
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/392683f3/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupGroupSortITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupGroupSortITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupGroupSortITCase.java
new file mode 100644
index 0000000..77bfc6b
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupGroupSortITCase.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.operators;
+
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOuputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
+
+@SuppressWarnings({"serial", "unchecked"})
+public class CoGroupGroupSortITCase extends JavaProgramTestBase {
+
+ @Override
+ protected void testProgram() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+
+ DataSet<Tuple2<Long, Long>> input1 = env.fromElements(
+ new Tuple2<Long, Long>(0L, 5L),
+ new Tuple2<Long, Long>(0L, 4L),
+ new Tuple2<Long, Long>(0L, 3L),
+ new Tuple2<Long, Long>(0L, 2L),
+ new Tuple2<Long, Long>(0L, 1L),
+ new Tuple2<Long, Long>(1L, 10L),
+ new Tuple2<Long, Long>(1L, 8L),
+ new Tuple2<Long, Long>(1L, 9L),
+ new Tuple2<Long, Long>(1L, 7L));
+
+ DataSet<TestPojo> input2 = env.fromElements(
+ new TestPojo(0L, 10L, 3L),
+ new TestPojo(0L, 8L, 3L),
+ new TestPojo(0L, 10L, 1L),
+ new TestPojo(0L, 9L, 0L),
+ new TestPojo(0L, 8L, 2L),
+ new TestPojo(0L, 8L, 4L),
+ new TestPojo(1L, 10L, 3L),
+ new TestPojo(1L, 8L, 3L),
+ new TestPojo(1L, 10L, 1L),
+ new TestPojo(1L, 9L, 0L),
+ new TestPojo(1L, 8L, 2L),
+ new TestPojo(1L, 8L, 4L));
+
+ input1.coGroup(input2)
+ .where(1).equalTo("b")
+ .sortFirstGroup(0, Order.DESCENDING)
+ .sortSecondGroup("c", Order.ASCENDING).sortSecondGroup("a", Order.DESCENDING)
+
+ .with(new ValidatingCoGroup())
+ .output(new DiscardingOuputFormat<NullValue>());
+
+ env.execute();
+ }
+
+
+ private static class ValidatingCoGroup implements CoGroupFunction<Tuple2<Long, Long>, TestPojo, NullValue> {
+
+ @Override
+ public void coGroup(Iterable<Tuple2<Long, Long>> first, Iterable<TestPojo> second, Collector<NullValue> out) throws Exception {
+ // validate the tuple input, field 1, descending
+ {
+ long lastValue = Long.MAX_VALUE;
+
+ for (Tuple2<Long, Long> t : first) {
+ long current = t.f1;
+ Assert.assertTrue(current <= lastValue);
+ lastValue = current;
+ }
+ }
+
+
+ // validate the pojo input
+ {
+ TestPojo lastValue = new TestPojo(Long.MAX_VALUE, 0, Long.MIN_VALUE);
+
+ for (TestPojo current : second) {
+ Assert.assertTrue(current.c >= lastValue.c);
+ Assert.assertTrue(current.c != lastValue.c || current.a <= lastValue.a);
+
+ lastValue = current;
+ }
+ }
+
+ }
+ }
+
+ public static class TestPojo implements Cloneable {
+ public long a;
+ public long b;
+ public long c;
+
+
+ public TestPojo() {}
+
+ public TestPojo(long a, long b, long c) {
+ this.a = a;
+ this.b = b;
+ this.c = c;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/392683f3/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupGroupSortTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupGroupSortTranslationTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupGroupSortTranslationTest.scala
new file mode 100644
index 0000000..7304310
--- /dev/null
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupGroupSortTranslationTest.scala
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.operators.translation
+
+import org.junit.Assert._
+import org.junit.Test
+import org.apache.flink.api.common.functions.Partitioner
+import org.apache.flink.api.scala._
+import org.apache.flink.test.compiler.util.CompilerTestBase
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType
+import org.apache.flink.compiler.plan.SingleInputPlanNode
+import org.apache.flink.api.common.operators.Order
+import org.apache.flink.api.common.InvalidProgramException
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
+import org.apache.flink.compiler.plan.DualInputPlanNode
+import org.apache.flink.api.common.operators.base.CoGroupOperatorBase
+import org.junit.Ignore
+
+class CoGroupGroupSortTranslationTest {
+
+ @Test
+ def testGroupSortTuples() {
+ try {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+
+ val input1 = env.fromElements( (0L, 0L) )
+ val input2 = env.fromElements( (0L, 0L, 0L) )
+
+ input1
+ .coGroup(input2)
+ .where(1).equalTo(2)
+ .sortFirstGroup(0, Order.DESCENDING)
+ .sortSecondGroup(1, Order.ASCENDING).sortSecondGroup(0, Order.DESCENDING) {
+ (first, second) => first.buffered.head
+ }
+ .print()
+
+ val p = env.createProgramPlan()
+
+ val sink = p.getDataSinks.iterator().next()
+ val coGroup = sink.getInput.asInstanceOf[CoGroupOperatorBase[_, _, _, _]]
+
+ assertNotNull(coGroup.getGroupOrderForInputOne)
+ assertNotNull(coGroup.getGroupOrderForInputTwo)
+
+ assertEquals(1, coGroup.getGroupOrderForInputOne.getNumberOfFields)
+ assertEquals(0, coGroup.getGroupOrderForInputOne.getFieldNumber(0).intValue())
+ assertEquals(Order.DESCENDING, coGroup.getGroupOrderForInputOne.getOrder(0))
+
+ assertEquals(2, coGroup.getGroupOrderForInputTwo.getNumberOfFields)
+ assertEquals(1, coGroup.getGroupOrderForInputTwo.getFieldNumber(0).intValue())
+ assertEquals(0, coGroup.getGroupOrderForInputTwo.getFieldNumber(1).intValue())
+ assertEquals(Order.ASCENDING, coGroup.getGroupOrderForInputTwo.getOrder(0))
+ assertEquals(Order.DESCENDING, coGroup.getGroupOrderForInputTwo.getOrder(1))
+ }
+ catch {
+ case e: Exception => {
+ e.printStackTrace()
+ fail(e.getMessage)
+ }
+ }
+ }
+
+ @Test
+ def testSortTuplesAndPojos() {
+ try {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+
+ val input1 = env.fromElements(new Tuple2[Long, Long](0L, 0L))
+ val input2 = env.fromElements(new CoGroupTestPoJo())
+
+ input1
+ .coGroup(input2)
+ .where(1).equalTo("b")
+ .sortFirstGroup(0, Order.DESCENDING)
+ .sortSecondGroup("c", Order.ASCENDING).sortSecondGroup("a", Order.DESCENDING) {
+ (first, second) => first.buffered.head
+ }
+ .print()
+
+ val p = env.createProgramPlan()
+
+ val sink = p.getDataSinks.iterator().next()
+ val coGroup = sink.getInput.asInstanceOf[CoGroupOperatorBase[_, _, _, _]]
+
+ assertNotNull(coGroup.getGroupOrderForInputOne)
+ assertNotNull(coGroup.getGroupOrderForInputTwo)
+
+ assertEquals(1, coGroup.getGroupOrderForInputOne.getNumberOfFields)
+ assertEquals(0, coGroup.getGroupOrderForInputOne.getFieldNumber(0).intValue())
+ assertEquals(Order.DESCENDING, coGroup.getGroupOrderForInputOne.getOrder(0))
+
+ assertEquals(2, coGroup.getGroupOrderForInputTwo.getNumberOfFields)
+ assertEquals(2, coGroup.getGroupOrderForInputTwo.getFieldNumber(0).intValue())
+ assertEquals(0, coGroup.getGroupOrderForInputTwo.getFieldNumber(1).intValue())
+ assertEquals(Order.ASCENDING, coGroup.getGroupOrderForInputTwo.getOrder(0))
+ assertEquals(Order.DESCENDING, coGroup.getGroupOrderForInputTwo.getOrder(1))
+ }
+ catch {
+ case e: Exception => {
+ e.printStackTrace()
+ fail(e.getMessage)
+ }
+ }
+ }
+
+ @Test
+ @Ignore
+ def testGroupSortTuplesDefaultCoGroup() {
+ try {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+
+ val input1 = env.fromElements( (0L, 0L) )
+ val input2 = env.fromElements( (0L, 0L, 0L) )
+
+ input1
+ .coGroup(input2)
+ .where(1).equalTo(2)
+ .sortFirstGroup(0, Order.DESCENDING)
+ .sortSecondGroup(1, Order.ASCENDING).sortSecondGroup(0, Order.DESCENDING)
+ .print()
+
+ val p = env.createProgramPlan()
+
+ val sink = p.getDataSinks.iterator().next()
+ val coGroup = sink.getInput.asInstanceOf[CoGroupOperatorBase[_, _, _, _]]
+
+ assertNotNull(coGroup.getGroupOrderForInputOne)
+ assertNotNull(coGroup.getGroupOrderForInputTwo)
+
+ assertEquals(1, coGroup.getGroupOrderForInputOne.getNumberOfFields)
+ assertEquals(0, coGroup.getGroupOrderForInputOne.getFieldNumber(0).intValue())
+ assertEquals(Order.DESCENDING, coGroup.getGroupOrderForInputOne.getOrder(0))
+
+ assertEquals(2, coGroup.getGroupOrderForInputTwo.getNumberOfFields)
+ assertEquals(1, coGroup.getGroupOrderForInputTwo.getFieldNumber(0).intValue())
+ assertEquals(0, coGroup.getGroupOrderForInputTwo.getFieldNumber(1).intValue())
+ assertEquals(Order.ASCENDING, coGroup.getGroupOrderForInputTwo.getOrder(0))
+ assertEquals(Order.DESCENDING, coGroup.getGroupOrderForInputTwo.getOrder(1))
+ }
+ catch {
+ case e: Exception => {
+ e.printStackTrace()
+ fail(e.getMessage)
+ }
+ }
+ }
+}
+
+class CoGroupTestPoJo {
+
+ var a: Long = _
+ var b: Long = _
+ var c: Long = _
+}