You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/11/26 13:00:25 UTC

[2/3] incubator-flink git commit: [FLINK-658] [APIs] Add group sorting to CoGroup

[FLINK-658] [APIs] Add group sorting to CoGroup

This closes #234


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/392683f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/392683f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/392683f3

Branch: refs/heads/master
Commit: 392683f3767170c33e3fe2be59825bec5002a4ca
Parents: 606a6d4
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Nov 25 16:34:55 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Nov 25 21:02:10 2014 +0100

----------------------------------------------------------------------
 .../operators/base/CoGroupOperatorBase.java     |  73 +++++--
 .../api/java/operators/CoGroupOperator.java     | 218 ++++++++++++++++---
 .../translation/CoGroupSortTranslationTest.java | 132 +++++++++++
 .../apache/flink/api/scala/coGroupDataSet.scala | 113 +++++++++-
 .../apache/flink/api/scala/joinDataSet.scala    |   2 +-
 .../test/operators/CoGroupGroupSortITCase.java  | 122 +++++++++++
 .../CoGroupGroupSortTranslationTest.scala       | 171 +++++++++++++++
 7 files changed, 776 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/392683f3/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java
index 7fe46eb..65b9d1c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CoGroupOperatorBase.java
@@ -192,25 +192,66 @@ public class CoGroupOperatorBase<IN1, IN2, OUT, FT extends CoGroupFunction<IN1,
 		// --------------------------------------------------------------------
 		TypeInformation<IN1> inputType1 = getOperatorInfo().getFirstInputType();
 		TypeInformation<IN2> inputType2 = getOperatorInfo().getSecondInputType();
-
+		
+		// for the grouping / merging comparator
 		int[] inputKeys1 = getKeyColumns(0);
 		int[] inputKeys2 = getKeyColumns(1);
-
-		boolean[] inputSortDirections1 = new boolean[inputKeys1.length];
-		boolean[] inputSortDirections2 = new boolean[inputKeys2.length];
-
-		Arrays.fill(inputSortDirections1, true);
-		Arrays.fill(inputSortDirections2, true);
-
+		
+		boolean[] inputDirections1 = new boolean[inputKeys1.length];
+		boolean[] inputDirections2 = new boolean[inputKeys2.length];
+		Arrays.fill(inputDirections1, true);
+		Arrays.fill(inputDirections2, true);
+		
 		final TypeSerializer<IN1> inputSerializer1 = inputType1.createSerializer();
 		final TypeSerializer<IN2> inputSerializer2 = inputType2.createSerializer();
 		
-		final TypeComparator<IN1> inputComparator1 = getTypeComparator(inputType1, inputKeys1, inputSortDirections1);
-		final TypeComparator<IN2> inputComparator2 = getTypeComparator(inputType2, inputKeys2, inputSortDirections2);
+		final TypeComparator<IN1> inputComparator1 = getTypeComparator(inputType1, inputKeys1, inputDirections1);
+		final TypeComparator<IN2> inputComparator2 = getTypeComparator(inputType2, inputKeys2, inputDirections2);
+		
+		final TypeComparator<IN1> inputSortComparator1;
+		final TypeComparator<IN2> inputSortComparator2;
+		
+		if (groupOrder1 == null || groupOrder1.getNumberOfFields() == 0) {
+			// no group sorting
+			inputSortComparator1 = inputComparator1;
+		}
+		else {
+			// group sorting
+			int[] groupSortKeys = groupOrder1.getFieldPositions();
+			int[] allSortKeys = new int[inputKeys1.length + groupOrder1.getNumberOfFields()];
+			System.arraycopy(inputKeys1, 0, allSortKeys, 0, inputKeys1.length);
+			System.arraycopy(groupSortKeys, 0, allSortKeys, inputKeys1.length, groupSortKeys.length);
+			
+			boolean[] groupSortDirections = groupOrder1.getFieldSortDirections();
+			boolean[] allSortDirections = new boolean[inputKeys1.length + groupSortKeys.length];
+			Arrays.fill(allSortDirections, 0, inputKeys1.length, true);
+			System.arraycopy(groupSortDirections, 0, allSortDirections, inputKeys1.length, groupSortDirections.length);
+			
+			inputSortComparator1 = getTypeComparator(inputType1, allSortKeys, allSortDirections);
+		}
+		
+		if (groupOrder2 == null || groupOrder2.getNumberOfFields() == 0) {
+			// no group sorting
+			inputSortComparator2 = inputComparator2;
+		}
+		else {
+			// group sorting
+			int[] groupSortKeys = groupOrder2.getFieldPositions();
+			int[] allSortKeys = new int[inputKeys2.length + groupOrder2.getNumberOfFields()];
+			System.arraycopy(inputKeys2, 0, allSortKeys, 0, inputKeys2.length);
+			System.arraycopy(groupSortKeys, 0, allSortKeys, inputKeys2.length, groupSortKeys.length);
+			
+			boolean[] groupSortDirections = groupOrder2.getFieldSortDirections();
+			boolean[] allSortDirections = new boolean[inputKeys2.length + groupSortKeys.length];
+			Arrays.fill(allSortDirections, 0, inputKeys2.length, true);
+			System.arraycopy(groupSortDirections, 0, allSortDirections, inputKeys2.length, groupSortDirections.length);
+			
+			inputSortComparator2 = getTypeComparator(inputType2, allSortKeys, allSortDirections);
+		}
 
 		CoGroupSortListIterator<IN1, IN2> coGroupIterator =
-				new CoGroupSortListIterator<IN1, IN2>(input1, inputComparator1, inputSerializer1,
-						input2, inputComparator2, inputSerializer2, mutableObjectSafe);
+				new CoGroupSortListIterator<IN1, IN2>(input1, inputSortComparator1, inputComparator1, inputSerializer1,
+						input2, inputSortComparator2, inputComparator2, inputSerializer2, mutableObjectSafe);
 
 		// --------------------------------------------------------------------
 		// Run UDF
@@ -261,8 +302,8 @@ public class CoGroupOperatorBase<IN1, IN2, OUT, FT extends CoGroupFunction<IN1,
 		private Iterable<IN2> secondReturn;
 
 		private CoGroupSortListIterator(
-				List<IN1> input1, final TypeComparator<IN1> inputComparator1, TypeSerializer<IN1> serializer1,
-				List<IN2> input2, final TypeComparator<IN2> inputComparator2, TypeSerializer<IN2> serializer2,
+				List<IN1> input1, final TypeComparator<IN1> inputSortComparator1, TypeComparator<IN1> inputComparator1, TypeSerializer<IN1> serializer1,
+				List<IN2> input2, final TypeComparator<IN2> inputSortComparator2, TypeComparator<IN2> inputComparator2, TypeSerializer<IN2> serializer2,
 				boolean copyElements)
 		{
 			this.pairComparator = new GenericPairComparator<IN1, IN2>(inputComparator1, inputComparator2);
@@ -276,14 +317,14 @@ public class CoGroupOperatorBase<IN1, IN2, OUT, FT extends CoGroupFunction<IN1,
 			Collections.sort(input1, new Comparator<IN1>() {
 				@Override
 				public int compare(IN1 o1, IN1 o2) {
-					return inputComparator1.compare(o1, o2);
+					return inputSortComparator1.compare(o1, o2);
 				}
 			});
 
 			Collections.sort(input2, new Comparator<IN2>() {
 				@Override
 				public int compare(IN2 o1, IN2 o2) {
-					return inputComparator2.compare(o1, o2);
+					return inputSortComparator2.compare(o1, o2);
 				}
 			});
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/392683f3/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
index 7394c18..b69f326 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/CoGroupOperator.java
@@ -19,17 +19,25 @@
 package org.apache.flink.api.java.operators;
 
 import java.security.InvalidParameterException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.api.common.operators.BinaryOperatorInformation;
 import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.Ordering;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
 import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.operators.DeltaIteration.SolutionSetPlaceHolder;
@@ -45,7 +53,6 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 
-
 /**
  * A {@link DataSet} that is the result of a CoGroup transformation. 
  * 
@@ -64,21 +71,32 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 	
 	private final String defaultName;
 	
+	private final List<Pair<Integer, Order>> groupSortKeyOrderFirst;
+	private final List<Pair<Integer, Order>> groupSortKeyOrderSecond;
+	
 	private Partitioner<?> customPartitioner;
 
 
-	public CoGroupOperator(DataSet<I1> input1, DataSet<I2> input2,
-							Keys<I1> keys1, Keys<I2> keys2,
-							CoGroupFunction<I1, I2, OUT> function,
-							TypeInformation<OUT> returnType,
-							Partitioner<?> customPartitioner,
-							String defaultName)
+	public CoGroupOperator(DataSet<I1> input1, DataSet<I2> input2, Keys<I1> keys1, Keys<I2> keys2,
+							CoGroupFunction<I1, I2, OUT> function, TypeInformation<OUT> returnType,
+							Partitioner<?> customPartitioner, String defaultName)
+	{
+		this(input1, input2, keys1, keys2, function, returnType, null, null, customPartitioner, defaultName);
+	}
+	
+	public CoGroupOperator(DataSet<I1> input1, DataSet<I2> input2, Keys<I1> keys1, Keys<I2> keys2,
+			CoGroupFunction<I1, I2, OUT> function, TypeInformation<OUT> returnType,
+			List<Pair<Integer, Order>> groupSortKeyOrderFirst, List<Pair<Integer, Order>> groupSortKeyOrderSecond,
+			Partitioner<?> customPartitioner, String defaultName)
 	{
 		super(input1, input2, returnType);
 
 		this.function = function;
 		this.customPartitioner = customPartitioner;
 		this.defaultName = defaultName;
+		
+		this.groupSortKeyOrderFirst = groupSortKeyOrderFirst == null ? Collections.<Pair<Integer, Order>>emptyList() : groupSortKeyOrderFirst;
+		this.groupSortKeyOrderSecond = groupSortKeyOrderSecond == null ? Collections.<Pair<Integer, Order>>emptyList() : groupSortKeyOrderSecond;
 
 		if (keys1 == null || keys2 == null) {
 			throw new NullPointerException();
@@ -147,12 +165,15 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 	@Override
 	protected org.apache.flink.api.common.operators.base.CoGroupOperatorBase<?, ?, OUT, ?> translateToDataFlow(Operator<I1> input1, Operator<I2> input2) {
 		
-		String name = getName() != null ? getName() : "CoGroup at "+defaultName;
+		String name = getName() != null ? getName() : "CoGroup at " + defaultName;
 		try {
 			keys1.areCompatible(keys2);
-		} catch (IncompatibleKeysException e) {
+		}
+		catch (IncompatibleKeysException e) {
 			throw new InvalidProgramException("The types of the key fields do not match.", e);
 		}
+		
+		final org.apache.flink.api.common.operators.base.CoGroupOperatorBase<?, ?, OUT, ?> po;
 
 		if (keys1 instanceof Keys.SelectorFunctionKeys
 				&& keys2 instanceof Keys.SelectorFunctionKeys) {
@@ -164,15 +185,11 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 			Keys.SelectorFunctionKeys<I2, ?> selectorKeys2 =
 					(Keys.SelectorFunctionKeys<I2, ?>) keys2;
 
-			PlanBothUnwrappingCoGroupOperator<I1, I2, OUT, ?> po =
-					translateSelectorFunctionCoGroup(selectorKeys1, selectorKeys2, function,
+			po = translateSelectorFunctionCoGroup(selectorKeys1, selectorKeys2, function,
 					getInput1Type(), getInput2Type(), getResultType(), name, input1, input2);
 
 			po.setDegreeOfParallelism(getParallelism());
 			po.setCustomPartitioner(customPartitioner);
-
-			return po;
-
 		}
 		else if (keys2 instanceof Keys.SelectorFunctionKeys) {
 
@@ -181,14 +198,11 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 			@SuppressWarnings("unchecked")
 			Keys.SelectorFunctionKeys<I2, ?> selectorKeys2 = (Keys.SelectorFunctionKeys<I2, ?>) keys2;
 
-			PlanRightUnwrappingCoGroupOperator<I1, I2, OUT, ?> po =
-					translateSelectorFunctionCoGroupRight(logicalKeyPositions1, selectorKeys2, function,
+			po = translateSelectorFunctionCoGroupRight(logicalKeyPositions1, selectorKeys2, function,
 							getInput1Type(), getInput2Type(), getResultType(), name, input1, input2);
 
 			po.setDegreeOfParallelism(getParallelism());
 			po.setCustomPartitioner(customPartitioner);
-
-			return po;
 		}
 		else if (keys1 instanceof Keys.SelectorFunctionKeys) {
 
@@ -197,14 +211,8 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 
 			int[] logicalKeyPositions2 = keys2.computeLogicalKeyPositions();
 
-			PlanLeftUnwrappingCoGroupOperator<I1, I2, OUT, ?> po =
-					translateSelectorFunctionCoGroupLeft(selectorKeys1, logicalKeyPositions2, function,
+			po = translateSelectorFunctionCoGroupLeft(selectorKeys1, logicalKeyPositions2, function,
 							getInput1Type(), getInput2Type(), getResultType(), name, input1, input2);
-
-			po.setDegreeOfParallelism(getParallelism());
-			po.setCustomPartitioner(customPartitioner);
-
-			return po;
 		}
 		else if ( keys1 instanceof Keys.ExpressionKeys && keys2 instanceof Keys.ExpressionKeys)
 			{
@@ -217,22 +225,39 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 			int[] logicalKeyPositions1 = keys1.computeLogicalKeyPositions();
 			int[] logicalKeyPositions2 = keys2.computeLogicalKeyPositions();
 			
-			CoGroupOperatorBase<I1, I2, OUT, CoGroupFunction<I1, I2, OUT>> po =
+			CoGroupOperatorBase<I1, I2, OUT, CoGroupFunction<I1, I2, OUT>> op =
 					new CoGroupOperatorBase<I1, I2, OUT, CoGroupFunction<I1, I2, OUT>>(
 							function, new BinaryOperatorInformation<I1, I2, OUT>(getInput1Type(), getInput2Type(), getResultType()),
 							logicalKeyPositions1, logicalKeyPositions2, name);
 			
-			// set inputs
-			po.setFirstInput(input1);
-			po.setSecondInput(input2);
-
-			po.setDegreeOfParallelism(getParallelism());
-			po.setCustomPartitioner(customPartitioner);
-			return po;
+			op.setFirstInput(input1);
+			op.setSecondInput(input2);
+			po = op;
 		}
 		else {
 			throw new UnsupportedOperationException("Unrecognized or incompatible key types.");
 		}
+		
+		// configure shared characteristics
+		po.setDegreeOfParallelism(getParallelism());
+		po.setCustomPartitioner(customPartitioner);
+		
+		if (groupSortKeyOrderFirst.size() > 0) {
+			Ordering o = new Ordering();
+			for (Pair<Integer, Order> entry : groupSortKeyOrderFirst) {
+				o.appendOrdering(entry.getLeft(), null, entry.getRight());
+			}
+			po.setGroupOrderForInputOne(o);
+		}
+		if (groupSortKeyOrderSecond.size() > 0) {
+			Ordering o = new Ordering();
+			for (Pair<Integer, Order> entry : groupSortKeyOrderSecond) {
+				o.appendOrdering(entry.getLeft(), null, entry.getRight());
+			}
+			po.setGroupOrderForInputTwo(o);
+		}
+		
+		return po;
 	}
 
 
@@ -524,23 +549,30 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 
 				return new CoGroupOperatorWithoutFunction(keys2);
 			}
+			
+			// ------------------------------------------------------------------------------------
 
 			public final class CoGroupOperatorWithoutFunction {
 				
 				private final Keys<I2> keys2;
 				
+				private final List<Pair<Integer, Order>> groupSortKeyOrderFirst;
+				private final List<Pair<Integer, Order>> groupSortKeyOrderSecond;
+				
 				private Partitioner<?> customPartitioner;
 
 				private CoGroupOperatorWithoutFunction(Keys<I2> keys2) {
 					if (keys2 == null) {
 						throw new NullPointerException();
 					}
-
 					if (keys2.isEmpty()) {
 						throw new InvalidProgramException("The co-group keys must not be empty.");
 					}
 
 					this.keys2 = keys2;
+					
+					this.groupSortKeyOrderFirst = new ArrayList<Pair<Integer, Order>>();
+					this.groupSortKeyOrderSecond = new ArrayList<Pair<Integer, Order>>();
 				}
 				
 				/**
@@ -586,9 +618,125 @@ public class CoGroupOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OU
 						throw new NullPointerException("CoGroup function must not be null.");
 					}
 					TypeInformation<R> returnType = TypeExtractor.getCoGroupReturnTypes(function, input1.getType(), input2.getType());
-					return new CoGroupOperator<I1, I2, R>(input1, input2, keys1, keys2, function, returnType, 
+					
+					return new CoGroupOperator<I1, I2, R>(input1, input2, keys1, keys2, function, returnType,
+							groupSortKeyOrderFirst, groupSortKeyOrderSecond,
 							customPartitioner, Utils.getCallLocationName());
 				}
+				
+				// --------------------------------------------------------------------------------
+				//  Group Operations
+				// --------------------------------------------------------------------------------
+				
+				/**
+				 * Sorts {@link org.apache.flink.api.java.tuple.Tuple} elements within a group in the first input on the
+				 * specified field in the specified {@link Order}.</br>
+				 * <b>Note: Only groups of Tuple elements and Pojos can be sorted.</b><br/>
+				 * Groups can be sorted by multiple fields by chaining {@link #sortFirstGroup(int, Order)} calls.
+				 * 
+				 * @param field The Tuple field on which the group is sorted.
+				 * @param order The Order in which the specified Tuple field is sorted.
+				 * @return A SortedGrouping with specified order of group element.
+				 * 
+				 * @see org.apache.flink.api.java.tuple.Tuple
+				 * @see Order
+				 */
+				public CoGroupOperatorWithoutFunction sortFirstGroup(int field, Order order) {
+					if (!input1.getType().isTupleType()) {
+						throw new InvalidProgramException("Specifying order keys via field positions is only valid for tuple data types");
+					}
+					if (field >= input1.getType().getArity()) {
+						throw new IllegalArgumentException("Order key out of tuple bounds.");
+					}
+					ExpressionKeys<I1> ek = new ExpressionKeys<I1>(new int[]{field}, input1.getType());
+					int[] groupOrderKeys = ek.computeLogicalKeyPositions();
+					
+					for (int key : groupOrderKeys) {
+						this.groupSortKeyOrderFirst.add(new ImmutablePair<Integer, Order>(key, order));
+					}
+					
+					return this;
+				}
+				
+				/**
+				 * Sorts {@link org.apache.flink.api.java.tuple.Tuple} elements within a group in the second input on the
+				 * specified field in the specified {@link Order}.</br>
+				 * <b>Note: Only groups of Tuple elements and Pojos can be sorted.</b><br/>
+				 * Groups can be sorted by multiple fields by chaining {@link #sortSecondGroup(int, Order)} calls.
+				 * 
+				 * @param field The Tuple field on which the group is sorted.
+				 * @param order The Order in which the specified Tuple field is sorted.
+				 * @return A SortedGrouping with specified order of group element.
+				 * 
+				 * @see org.apache.flink.api.java.tuple.Tuple
+				 * @see Order
+				 */
+				public CoGroupOperatorWithoutFunction sortSecondGroup(int field, Order order) {
+					if (!input2.getType().isTupleType()) {
+						throw new InvalidProgramException("Specifying order keys via field positions is only valid for tuple data types");
+					}
+					if (field >= input2.getType().getArity()) {
+						throw new IllegalArgumentException("Order key out of tuple bounds.");
+					}
+					ExpressionKeys<I2> ek = new ExpressionKeys<I2>(new int[]{field}, input2.getType());
+					int[] groupOrderKeys = ek.computeLogicalKeyPositions();
+					
+					for (int key : groupOrderKeys) {
+						this.groupSortKeyOrderSecond.add(new ImmutablePair<Integer, Order>(key, order));
+					}
+					
+					return this;
+				}
+				
+				/**
+				 * Sorts Pojo or {@link org.apache.flink.api.java.tuple.Tuple} elements within a group in the first input on the
+				 * specified field in the specified {@link Order}.</br>
+				 * Groups can be sorted by multiple fields by chaining {@link #sortFirstGroup(String, Order)} calls.
+				 * 
+				 * @param fieldExpression The expression to the field on which the group is to be sorted.
+				 * @param order The Order in which the specified Tuple field is sorted.
+				 * @return A SortedGrouping with specified order of group element.
+				 * 
+				 * @see Order
+				 */
+				public CoGroupOperatorWithoutFunction sortFirstGroup(String fieldExpression, Order order) {
+					if (! (input1.getType() instanceof CompositeType)) {
+						throw new InvalidProgramException("Specifying order keys via field positions is only valid for composite data types (pojo / tuple / case class)");
+					}
+					ExpressionKeys<I1> ek = new ExpressionKeys<I1>(new String[]{fieldExpression}, input1.getType());
+					int[] groupOrderKeys = ek.computeLogicalKeyPositions();
+					
+					for (int key : groupOrderKeys) {
+						this.groupSortKeyOrderFirst.add(new ImmutablePair<Integer, Order>(key, order));
+					}
+					
+					return this;
+				}
+				
+				/**
+				 * Sorts Pojo or {@link org.apache.flink.api.java.tuple.Tuple} elements within a group in the second input on the
+				 * specified field in the specified {@link Order}.</br>
+				 * Groups can be sorted by multiple fields by chaining {@link #sortSecondGroup(String, Order)} calls.
+				 * 
+				 * @param fieldExpression The expression to the field on which the group is to be sorted.
+				 * @param order The Order in which the specified Tuple field is sorted.
+				 * @return A SortedGrouping with specified order of group element.
+				 * 
+				 * @see Order
+				 */
+				public CoGroupOperatorWithoutFunction sortSecondGroup(String fieldExpression, Order order) {
+					if (! (input2.getType() instanceof CompositeType)) {
+						throw new InvalidProgramException("Specifying order keys via field positions is only valid for composite data types (pojo / tuple / case class)");
+					}
+					ExpressionKeys<I2> ek = new ExpressionKeys<I2>(new String[]{fieldExpression}, input2.getType());
+					int[] groupOrderKeys = ek.computeLogicalKeyPositions();
+					
+					for (int key : groupOrderKeys) {
+						this.groupSortKeyOrderSecond.add(new ImmutablePair<Integer, Order>(key, order));
+					}
+					
+					return this;
+				}
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/392683f3/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/CoGroupSortTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/CoGroupSortTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/CoGroupSortTranslationTest.java
new file mode 100644
index 0000000..2fe9965
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/CoGroupSortTranslationTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.operators.translation;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.operators.GenericDataSinkBase;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+@SuppressWarnings({"serial", "unchecked"})
+public class CoGroupSortTranslationTest implements java.io.Serializable {
+
+	@Test
+	public void testGroupSortTuples() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Tuple2<Long, Long>> input1 = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
+			DataSet<Tuple3<Long, Long, Long>> input2 = env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L));
+			
+			input1.coGroup(input2)
+				.where(1).equalTo(2)
+				.sortFirstGroup(0, Order.DESCENDING)
+				.sortSecondGroup(1, Order.ASCENDING).sortSecondGroup(0, Order.DESCENDING)
+				
+				.with(new CoGroupFunction<Tuple2<Long, Long>, Tuple3<Long, Long, Long>, Long>() {
+					@Override
+					public void coGroup(Iterable<Tuple2<Long, Long>> first, Iterable<Tuple3<Long, Long, Long>> second,
+							Collector<Long> out) {}
+				})
+				
+				.print();
+			
+			Plan p = env.createProgramPlan();
+			
+			GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
+			CoGroupOperatorBase<?, ?, ?, ?> coGroup = (CoGroupOperatorBase<?, ?, ?, ?>) sink.getInput();
+			
+			assertNotNull(coGroup.getGroupOrderForInputOne());
+			assertNotNull(coGroup.getGroupOrderForInputTwo());
+			
+			assertEquals(1, coGroup.getGroupOrderForInputOne().getNumberOfFields());
+			assertEquals(0, coGroup.getGroupOrderForInputOne().getFieldNumber(0).intValue());
+			assertEquals(Order.DESCENDING, coGroup.getGroupOrderForInputOne().getOrder(0));
+			
+			assertEquals(2, coGroup.getGroupOrderForInputTwo().getNumberOfFields());
+			assertEquals(1, coGroup.getGroupOrderForInputTwo().getFieldNumber(0).intValue());
+			assertEquals(0, coGroup.getGroupOrderForInputTwo().getFieldNumber(1).intValue());
+			assertEquals(Order.ASCENDING, coGroup.getGroupOrderForInputTwo().getOrder(0));
+			assertEquals(Order.DESCENDING, coGroup.getGroupOrderForInputTwo().getOrder(1));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testSortTuplesAndPojos() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Tuple2<Long, Long>> input1 = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
+			DataSet<TestPoJo> input2 = env.fromElements(new TestPoJo());
+			
+			input1.coGroup(input2)
+				.where(1).equalTo("b")
+				.sortFirstGroup(0, Order.DESCENDING)
+				.sortSecondGroup("c", Order.ASCENDING).sortSecondGroup("a", Order.DESCENDING)
+				
+				.with(new CoGroupFunction<Tuple2<Long, Long>, TestPoJo, Long>() {
+					@Override
+					public void coGroup(Iterable<Tuple2<Long, Long>> first, Iterable<TestPoJo> second, Collector<Long> out) {}
+				})
+				
+				.print();
+			
+			Plan p = env.createProgramPlan();
+			
+			GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
+			CoGroupOperatorBase<?, ?, ?, ?> coGroup = (CoGroupOperatorBase<?, ?, ?, ?>) sink.getInput();
+			
+			assertNotNull(coGroup.getGroupOrderForInputOne());
+			assertNotNull(coGroup.getGroupOrderForInputTwo());
+			
+			assertEquals(1, coGroup.getGroupOrderForInputOne().getNumberOfFields());
+			assertEquals(0, coGroup.getGroupOrderForInputOne().getFieldNumber(0).intValue());
+			assertEquals(Order.DESCENDING, coGroup.getGroupOrderForInputOne().getOrder(0));
+			
+			assertEquals(2, coGroup.getGroupOrderForInputTwo().getNumberOfFields());
+			assertEquals(2, coGroup.getGroupOrderForInputTwo().getFieldNumber(0).intValue());
+			assertEquals(0, coGroup.getGroupOrderForInputTwo().getFieldNumber(1).intValue());
+			assertEquals(Order.ASCENDING, coGroup.getGroupOrderForInputTwo().getOrder(0));
+			assertEquals(Order.DESCENDING, coGroup.getGroupOrderForInputTwo().getOrder(1));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	public static class TestPoJo {
+		public long a;
+		public long b;
+		public long c;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/392683f3/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala
index da71b6d..28e468c 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala
@@ -15,20 +15,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.api.scala
 
 import org.apache.commons.lang3.Validate
+import org.apache.commons.lang3.tuple.Pair
+import org.apache.commons.lang3.tuple.ImmutablePair
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.common.InvalidProgramException
 import org.apache.flink.api.common.functions.{RichCoGroupFunction, CoGroupFunction}
+import org.apache.flink.api.common.functions.Partitioner
+import org.apache.flink.api.common.operators.Order
 import org.apache.flink.api.common.typeutils.TypeSerializer
 import org.apache.flink.api.java.operators._
 import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
 import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, CaseClassTypeInfo}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.util.Collector
+import scala.collection.mutable
 import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
-import org.apache.flink.api.common.functions.Partitioner
-
+import org.apache.flink.api.java.operators.Keys.ExpressionKeys
 
 /**
  * A specific [[DataSet]] that results from a `coGroup` operation. The result of a default coGroup
@@ -65,7 +72,12 @@ class CoGroupDataSet[L, R](
     rightKeys: Keys[R])
   extends DataSet(defaultCoGroup) {
 
-  var customPartitioner : Partitioner[_] = _
+  private val groupSortKeyPositionsFirst = mutable.MutableList[Either[Int, String]]()
+  private val groupSortKeyPositionsSecond = mutable.MutableList[Either[Int, String]]()
+  private val groupSortOrdersFirst = mutable.MutableList[Order]()
+  private val groupSortOrdersSecond = mutable.MutableList[Order]()
+  
+  private var customPartitioner : Partitioner[_] = _
   
   /**
    * Creates a new [[DataSet]] where the result for each pair of co-grouped element lists is the
@@ -86,6 +98,8 @@ class CoGroupDataSet[L, R](
       rightKeys,
       coGrouper,
       implicitly[TypeInformation[O]],
+      buildGroupSortList(leftInput.getType, groupSortKeyPositionsFirst, groupSortOrdersFirst),
+      buildGroupSortList(rightInput.getType, groupSortKeyPositionsSecond, groupSortOrdersSecond),
       customPartitioner,
       getCallLocationName())
 
@@ -113,6 +127,8 @@ class CoGroupDataSet[L, R](
       rightKeys,
       coGrouper,
       implicitly[TypeInformation[O]],
+      buildGroupSortList(leftInput.getType, groupSortKeyPositionsFirst, groupSortOrdersFirst),
+      buildGroupSortList(rightInput.getType, groupSortKeyPositionsSecond, groupSortOrdersSecond),
       customPartitioner,
       getCallLocationName())
 
@@ -136,6 +152,8 @@ class CoGroupDataSet[L, R](
       rightKeys,
       coGrouper,
       implicitly[TypeInformation[O]],
+      buildGroupSortList(leftInput.getType, groupSortKeyPositionsFirst, groupSortOrdersFirst),
+      buildGroupSortList(rightInput.getType, groupSortKeyPositionsSecond, groupSortOrdersSecond),
       customPartitioner,
       getCallLocationName())
 
@@ -165,6 +183,95 @@ class CoGroupDataSet[L, R](
   def getPartitioner[K]() : Partitioner[K] = {
     customPartitioner.asInstanceOf[Partitioner[K]]
   }
+  
+  /**
+   * Adds a secondary sort key to the first input of this [[CoGroupDataSet]].
+   *
+   * This only works on Tuple DataSets.
+   */
+  def sortFirstGroup(field: Int, order: Order): CoGroupDataSet[L, R] = {
+    if (!defaultCoGroup.getInput1Type().isTupleType) {
+      throw new InvalidProgramException("Specifying order keys via field positions is only valid " +
+        "for tuple data types.")
+    }
+    if (field >= defaultCoGroup.getInput1Type().getArity) {
+      throw new IllegalArgumentException("Order key out of tuple bounds.")
+    }
+    groupSortKeyPositionsFirst += Left(field)
+    groupSortOrdersFirst += order
+    this
+  }
+
+  /**
+   * Adds a secondary sort key to the first input of this [[CoGroupDataSet]].
+   */
+  def sortFirstGroup(field: String, order: Order): CoGroupDataSet[L, R] = {
+    groupSortKeyPositionsFirst += Right(field)
+    groupSortOrdersFirst += order
+    this
+  }
+  
+  /**
+   * Adds a secondary sort key to the second input of this [[CoGroupDataSet]].
+   *
+   * This only works on Tuple DataSets.
+   */
+  def sortSecondGroup(field: Int, order: Order): CoGroupDataSet[L, R] = {
+    if (!defaultCoGroup.getInput2Type().isTupleType) {
+      throw new InvalidProgramException("Specifying order keys via field positions is only valid " +
+        "for tuple data types.")
+    }
+    if (field >= defaultCoGroup.getInput2Type().getArity) {
+      throw new IllegalArgumentException("Order key out of tuple bounds.")
+    }
+    groupSortKeyPositionsSecond += Left(field)
+    groupSortOrdersSecond += order
+    this
+  }
+
+  /**
+   * Adds a secondary sort key to the second input of this [[CoGroupDataSet]].
+   */
+  def sortSecondGroup(field: String, order: Order): CoGroupDataSet[L, R] = {
+    groupSortKeyPositionsSecond += Right(field)
+    groupSortOrdersSecond += order
+    this
+  }
+  
+  private def buildGroupSortList[T](typeInfo: TypeInformation[T],
+                                    keys: mutable.MutableList[Either[Int, String]],
+                                    orders: mutable.MutableList[Order])
+          : java.util.List[Pair[java.lang.Integer, Order]] =
+  {
+    if (keys.isEmpty) {
+      null
+    }
+    else {
+      val result = new java.util.ArrayList[Pair[java.lang.Integer, Order]]
+      
+      keys.zip(orders).foreach {
+        case ( Left(position), order )  => result.add(
+                                      new ImmutablePair[java.lang.Integer, Order](position, order))
+        
+        case ( Right(expression), order ) => {
+          if (! (typeInfo.isInstanceOf[CompositeType[_]])) {
+            throw new InvalidProgramException("Specifying order keys via field positions is only "
+                                   + "valid for composite data types (pojo / tuple / case class)");
+          }
+          else {
+            val ek = new ExpressionKeys[T](Array[String](expression), typeInfo)
+            val groupOrderKeys : Array[Int] = ek.computeLogicalKeyPositions()
+            
+            for (k <- groupOrderKeys) {
+              result.add(new ImmutablePair[java.lang.Integer, Order](k, order))
+            }
+          }
+        }
+      }
+      
+      result
+    }
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/392683f3/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala
index f5b0783..3e3dc9a 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala
@@ -66,7 +66,7 @@ class JoinDataSet[L, R](
     rightKeys: Keys[R])
   extends DataSet(defaultJoin) {
 
-  var customPartitioner : Partitioner[_] = _
+  private var customPartitioner : Partitioner[_] = _
   
   /**
    * Creates a new [[DataSet]] where the result for each pair of joined elements is the result

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/392683f3/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupGroupSortITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupGroupSortITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupGroupSortITCase.java
new file mode 100644
index 0000000..77bfc6b
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/CoGroupGroupSortITCase.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.operators;
+
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOuputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
+
+@SuppressWarnings({"serial", "unchecked"})
+public class CoGroupGroupSortITCase extends JavaProgramTestBase {
+
+	@Override
+	protected void testProgram() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		
+		DataSet<Tuple2<Long, Long>> input1 = env.fromElements(
+				new Tuple2<Long, Long>(0L, 5L),
+				new Tuple2<Long, Long>(0L, 4L),
+				new Tuple2<Long, Long>(0L, 3L),
+				new Tuple2<Long, Long>(0L, 2L),
+				new Tuple2<Long, Long>(0L, 1L),
+				new Tuple2<Long, Long>(1L, 10L),
+				new Tuple2<Long, Long>(1L, 8L),
+				new Tuple2<Long, Long>(1L, 9L),
+				new Tuple2<Long, Long>(1L, 7L));
+		
+		DataSet<TestPojo> input2 = env.fromElements(
+				new TestPojo(0L, 10L, 3L),
+				new TestPojo(0L, 8L, 3L),
+				new TestPojo(0L, 10L, 1L),
+				new TestPojo(0L, 9L, 0L),
+				new TestPojo(0L, 8L, 2L),
+				new TestPojo(0L, 8L, 4L),
+				new TestPojo(1L, 10L, 3L),
+				new TestPojo(1L, 8L, 3L),
+				new TestPojo(1L, 10L, 1L),
+				new TestPojo(1L, 9L, 0L),
+				new TestPojo(1L, 8L, 2L),
+				new TestPojo(1L, 8L, 4L));
+		
+		input1.coGroup(input2)
+		.where(1).equalTo("b")
+		.sortFirstGroup(0, Order.DESCENDING)
+		.sortSecondGroup("c", Order.ASCENDING).sortSecondGroup("a", Order.DESCENDING)
+		
+		.with(new ValidatingCoGroup())
+		.output(new DiscardingOuputFormat<NullValue>());
+		
+		env.execute();
+	}
+	
+	
+	private static class ValidatingCoGroup implements CoGroupFunction<Tuple2<Long, Long>, TestPojo, NullValue> {
+
+		@Override
+		public void coGroup(Iterable<Tuple2<Long, Long>> first, Iterable<TestPojo> second, Collector<NullValue> out) throws Exception {
+			// validate the tuple input, field 1, descending
+			{
+				long lastValue = Long.MAX_VALUE;
+				
+				for (Tuple2<Long, Long> t : first) {
+					long current = t.f1;
+					Assert.assertTrue(current <= lastValue);
+					lastValue = current;
+				}
+			}
+			
+			
+			// validate the pojo input
+			{
+				TestPojo lastValue = new TestPojo(Long.MAX_VALUE, 0, Long.MIN_VALUE);
+				
+				for (TestPojo current : second) {
+					Assert.assertTrue(current.c >= lastValue.c);
+					Assert.assertTrue(current.c != lastValue.c || current.a <= lastValue.a);
+					
+					lastValue = current;
+				}
+			}
+			
+		}
+	}
+	
+	public static class TestPojo implements Cloneable {
+		public long a;
+		public long b;
+		public long c;
+		
+		
+		public TestPojo() {}
+		
+		public TestPojo(long a, long b, long c) {
+			this.a = a;
+			this.b = b;
+			this.c = c;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/392683f3/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupGroupSortTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupGroupSortTranslationTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupGroupSortTranslationTest.scala
new file mode 100644
index 0000000..7304310
--- /dev/null
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupGroupSortTranslationTest.scala
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.operators.translation
+
+import org.junit.Assert._
+import org.junit.Test
+import org.apache.flink.api.common.functions.Partitioner
+import org.apache.flink.api.scala._
+import org.apache.flink.test.compiler.util.CompilerTestBase
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType
+import org.apache.flink.compiler.plan.SingleInputPlanNode
+import org.apache.flink.api.common.operators.Order
+import org.apache.flink.api.common.InvalidProgramException
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
+import org.apache.flink.compiler.plan.DualInputPlanNode
+import org.apache.flink.api.common.operators.base.CoGroupOperatorBase
+import org.junit.Ignore
+
+class CoGroupGroupSortTranslationTest {
+
+  @Test
+  def testGroupSortTuples() {
+    try {
+      val env = ExecutionEnvironment.getExecutionEnvironment
+      
+      val input1 = env.fromElements( (0L, 0L) )
+      val input2 = env.fromElements( (0L, 0L, 0L) )
+      
+      input1
+          .coGroup(input2)
+          .where(1).equalTo(2)
+          .sortFirstGroup(0, Order.DESCENDING)
+          .sortSecondGroup(1, Order.ASCENDING).sortSecondGroup(0, Order.DESCENDING) {
+               (first, second) => first.buffered.head
+            }
+        .print()
+        
+      val p = env.createProgramPlan()
+      
+      val sink = p.getDataSinks.iterator().next()
+      val coGroup = sink.getInput.asInstanceOf[CoGroupOperatorBase[_, _, _, _]]
+      
+      assertNotNull(coGroup.getGroupOrderForInputOne)
+      assertNotNull(coGroup.getGroupOrderForInputTwo)
+      
+      assertEquals(1, coGroup.getGroupOrderForInputOne.getNumberOfFields)
+      assertEquals(0, coGroup.getGroupOrderForInputOne.getFieldNumber(0).intValue())
+      assertEquals(Order.DESCENDING, coGroup.getGroupOrderForInputOne.getOrder(0))
+      
+      assertEquals(2, coGroup.getGroupOrderForInputTwo.getNumberOfFields)
+      assertEquals(1, coGroup.getGroupOrderForInputTwo.getFieldNumber(0).intValue())
+      assertEquals(0, coGroup.getGroupOrderForInputTwo.getFieldNumber(1).intValue())
+      assertEquals(Order.ASCENDING, coGroup.getGroupOrderForInputTwo.getOrder(0))
+      assertEquals(Order.DESCENDING, coGroup.getGroupOrderForInputTwo.getOrder(1))
+    }
+    catch {
+      case e: Exception => {
+        e.printStackTrace()
+        fail(e.getMessage)
+      }
+    }
+  }
+  
+  @Test
+  def testSortTuplesAndPojos() {
+    try {
+      val env = ExecutionEnvironment.getExecutionEnvironment
+      
+      val input1 = env.fromElements(new Tuple2[Long, Long](0L, 0L))
+      val input2 = env.fromElements(new CoGroupTestPoJo())
+      
+      input1
+          .coGroup(input2)
+          .where(1).equalTo("b")
+          .sortFirstGroup(0, Order.DESCENDING)
+          .sortSecondGroup("c", Order.ASCENDING).sortSecondGroup("a", Order.DESCENDING) {
+               (first, second) => first.buffered.head
+            }
+          .print()
+          
+      val p = env.createProgramPlan()
+      
+      val sink = p.getDataSinks.iterator().next()
+      val coGroup = sink.getInput.asInstanceOf[CoGroupOperatorBase[_, _, _, _]]
+      
+      assertNotNull(coGroup.getGroupOrderForInputOne)
+      assertNotNull(coGroup.getGroupOrderForInputTwo)
+
+      assertEquals(1, coGroup.getGroupOrderForInputOne.getNumberOfFields)
+      assertEquals(0, coGroup.getGroupOrderForInputOne.getFieldNumber(0).intValue())
+      assertEquals(Order.DESCENDING, coGroup.getGroupOrderForInputOne.getOrder(0))
+      
+      assertEquals(2, coGroup.getGroupOrderForInputTwo.getNumberOfFields)
+      assertEquals(2, coGroup.getGroupOrderForInputTwo.getFieldNumber(0).intValue())
+      assertEquals(0, coGroup.getGroupOrderForInputTwo.getFieldNumber(1).intValue())
+      assertEquals(Order.ASCENDING, coGroup.getGroupOrderForInputTwo.getOrder(0))
+      assertEquals(Order.DESCENDING, coGroup.getGroupOrderForInputTwo.getOrder(1))
+    }
+    catch {
+      case e: Exception => {
+        e.printStackTrace()
+        fail(e.getMessage)
+      }
+    }
+  }
+  
+  @Test
+  @Ignore
+  def testGroupSortTuplesDefaultCoGroup() {
+    try {
+      val env = ExecutionEnvironment.getExecutionEnvironment
+      
+      val input1 = env.fromElements( (0L, 0L) )
+      val input2 = env.fromElements( (0L, 0L, 0L) )
+      
+      input1
+          .coGroup(input2)
+          .where(1).equalTo(2)
+          .sortFirstGroup(0, Order.DESCENDING)
+          .sortSecondGroup(1, Order.ASCENDING).sortSecondGroup(0, Order.DESCENDING)
+        .print()
+        
+      val p = env.createProgramPlan()
+      
+      val sink = p.getDataSinks.iterator().next()
+      val coGroup = sink.getInput.asInstanceOf[CoGroupOperatorBase[_, _, _, _]]
+      
+      assertNotNull(coGroup.getGroupOrderForInputOne)
+      assertNotNull(coGroup.getGroupOrderForInputTwo)
+      
+      assertEquals(1, coGroup.getGroupOrderForInputOne.getNumberOfFields)
+      assertEquals(0, coGroup.getGroupOrderForInputOne.getFieldNumber(0).intValue())
+      assertEquals(Order.DESCENDING, coGroup.getGroupOrderForInputOne.getOrder(0))
+      
+      assertEquals(2, coGroup.getGroupOrderForInputTwo.getNumberOfFields)
+      assertEquals(1, coGroup.getGroupOrderForInputTwo.getFieldNumber(0).intValue())
+      assertEquals(0, coGroup.getGroupOrderForInputTwo.getFieldNumber(1).intValue())
+      assertEquals(Order.ASCENDING, coGroup.getGroupOrderForInputTwo.getOrder(0))
+      assertEquals(Order.DESCENDING, coGroup.getGroupOrderForInputTwo.getOrder(1))
+    }
+    catch {
+      case e: Exception => {
+        e.printStackTrace()
+        fail(e.getMessage)
+      }
+    }
+  }
+}
+
+class CoGroupTestPoJo {
+  
+  var a: Long = _
+  var b: Long = _
+  var c: Long = _
+}