You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/02 15:00:31 UTC

git commit: [FLINK-1020] Introduce minBy and maxBy

Repository: incubator-flink
Updated Branches:
  refs/heads/master 4d4151d53 -> d60a3169f


[FLINK-1020] Introduce minBy and maxBy

This closes #101


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/d60a3169
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/d60a3169
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/d60a3169

Branch: refs/heads/master
Commit: d60a3169fec671f9176f29c5a9a5c66142d1425f
Parents: 4d4151d
Author: TobiasWiens <to...@gmail.com>
Authored: Fri Aug 22 12:10:37 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 2 12:59:45 2014 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/api/java/DataSet.java |  46 ++++
 .../api/java/functions/SelectByMaxFunction.java |  91 ++++++++
 .../api/java/functions/SelectByMinFunction.java |  91 ++++++++
 .../api/java/operators/UnsortedGrouping.java    |  47 +++-
 .../java/functions/SelectByFunctionsTest.java   | 184 +++++++++++++++
 .../api/java/operator/MaxByOperatorTest.java    | 229 +++++++++++++++++++
 .../api/java/operator/MinByOperatorTest.java    | 229 +++++++++++++++++++
 7 files changed, 915 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d60a3169/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index d25e64b..ca2a5e9 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.java;
 
 import org.apache.commons.lang3.Validate;
+import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
@@ -31,6 +32,8 @@ import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.java.aggregation.Aggregations;
 import org.apache.flink.api.java.functions.FormattingMapper;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.functions.SelectByMaxFunction;
+import org.apache.flink.api.java.functions.SelectByMinFunction;
 import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
 import org.apache.flink.api.java.io.CsvOutputFormat;
 import org.apache.flink.api.java.io.PrintingOutputFormat;
@@ -59,6 +62,7 @@ import org.apache.flink.api.java.operators.UnsortedGrouping;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.types.TypeInformation;
@@ -339,6 +343,48 @@ public abstract class DataSet<T> {
 		return new GroupReduceOperator<T, R>(this, reducer);
 	}
 
+/**
+	 * Applies a special case of a reduce transformation (minBy) on a non-grouped {@link DataSet}.<br/>
+	 * The transformation consecutively calls a {@link ReduceFunction} 
+	 * until only a single element remains which is the result of the transformation.
+	 * A ReduceFunction combines two elements into one new element of the same type.
+	 *  
+	 * @param fields Keys taken into account for finding the minimum.
+	 * @return A {@link ReduceOperator} representing the minimum.
+	 */
+	@SuppressWarnings({ "unchecked", "rawtypes" })
+	public ReduceOperator<T> minBy(int... fields)  {
+		
+		// Check for using a tuple
+		if(!this.type.isTupleType()) {
+			throw new InvalidProgramException("Method minBy(int) only works on tuples.");
+		}
+			
+		return new ReduceOperator<T>(this, new SelectByMinFunction(
+				(TupleTypeInfo) this.type, fields));
+	}
+	
+	/**
+	 * Applies a special case of a reduce transformation (maxBy) on a non-grouped {@link DataSet}.<br/>
+	 * The transformation consecutively calls a {@link ReduceFunction} 
+	 * until only a single element remains which is the result of the transformation.
+	 * A ReduceFunction combines two elements into one new element of the same type.
+	 *  
+	 * @param fields Keys taken into account for finding the minimum.
+	 * @return A {@link ReduceOperator} representing the minimum.
+	 */
+	@SuppressWarnings({ "unchecked", "rawtypes" })
+	public ReduceOperator<T> maxBy(int... fields)  {
+		
+		// Check for using a tuple
+		if(!this.type.isTupleType()) {
+			throw new InvalidProgramException("Method maxBy(int) only works on tuples.");
+		}
+			
+		return new ReduceOperator<T>(this, new SelectByMaxFunction(
+				(TupleTypeInfo) this.type, fields));
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  distinct
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d60a3169/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMaxFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMaxFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMaxFunction.java
new file mode 100644
index 0000000..614676e
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMaxFunction.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.functions;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+
+
+public class SelectByMaxFunction<T extends Tuple> implements ReduceFunction<T> {
+	private static final long serialVersionUID = 1L;
+	
+	// Fields which are used as KEYS
+	private int[] fields;
+	
+	/**
+	 * Constructor which is overwriting the default constructor.
+	 * @param type Types of tuple whether to check if given fields are key types.
+	 * @param fields Array of integers which are used as key for comparison. The order of indexes
+	 * is regarded in the reduce function. First index has highest priority and last index has
+	 * least priority.
+	 */
+	public SelectByMaxFunction(TupleTypeInfo<T> type, int... fields) {
+		this.fields = fields;
+		
+		// Check correctness of each position
+		for (int field : fields) {
+			// Is field inside array
+			if (field < 0 || field >= type.getArity()) {
+				throw new IndexOutOfBoundsException(
+						"MinReduceFunction field position " + field + " is out of range.");
+			}
+
+			// Check whether type is comparable
+			if (!type.getTypeAt(field).isKeyType()) {
+				throw new java.lang.IllegalArgumentException(
+						"MinReduceFunction supports only key(Comparable) types.");
+			}
+
+		}
+	}
+
+	/**
+	 * Reduce implementation, returns bigger tuple or value1 if both tuples are 
+	 * equal. Comparison highly depends on the order and amount of fields chosen
+	 * as indices. All given fields (at construction time) are checked in the same
+	 * order as defined (at construction time). If both tuples are equal in one 
+	 * index, the next index is compared. Or if no next index is available value1
+	 * is returned. 
+	 * The tuple which has a bigger value at one index will be returned.
+	 */
+	@SuppressWarnings({ "unchecked", "rawtypes" })
+	@Override
+	public T reduce(T value1, T value2) throws Exception {
+
+		for (int index = 0; index < fields.length; index++) {
+			// Save position of compared key
+			int position = this.fields[index];
+
+			// Get both values - both implement comparable
+			Comparable comparable1 = value1.getField(position);
+			Comparable comparable2 = value2.getField(position);
+
+			// Compare values
+			int comp = comparable1.compareTo(comparable2);
+			// If comp is bigger than 0 comparable 1 is bigger.
+			// Return the smaller value.
+			if (comp > 0) {
+				return value1;
+			} else if (comp < 0) {
+				return value2;
+			}
+		}
+		return value1;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d60a3169/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMinFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMinFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMinFunction.java
new file mode 100644
index 0000000..4b0a7bf
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMinFunction.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.functions;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+
+
+public class SelectByMinFunction<T extends Tuple> implements ReduceFunction<T> {
+	private static final long serialVersionUID = 1L;
+	
+	// Fields which are used as KEYS
+	private int[] fields;
+	
+	/**
+	 * Constructor which is overwriting the default constructor.
+	 * @param type Types of tuple whether to check if given fields are key types.
+	 * @param fields Array of integers which are used as key for comparison. The order of indexes
+	 * is regarded in the reduce function. First index has highest priority and last index has
+	 * least priority.
+	 */
+	public SelectByMinFunction(TupleTypeInfo<T> type, int... fields) {
+		this.fields = fields;
+		
+		// Check correctness of each position
+		for (int field : fields) {
+			// Is field inside array
+			if (field < 0 || field >= type.getArity()) {
+				throw new java.lang.IndexOutOfBoundsException(
+						"MinReduceFunction field position " + field + " is out of range.");
+			}
+
+			// Check whether type is comparable
+			if (!type.getTypeAt(field).isKeyType()) {
+				throw new java.lang.IllegalArgumentException(
+						"MinReduceFunction supports only key(Comparable) types.");
+			}
+
+		}
+	}
+	
+	/**
+	 * Reduce implementation, returns smaller tuple or value1 if both tuples are 
+	 * equal. Comparison highly depends on the order and amount of fields chosen
+	 * as indices. All given fields (at construction time) are checked in the same
+	 * order as defined (at construction time). If both tuples are equal in one 
+	 * index, the next index is compared. Or if no next index is available value1
+	 * is returned. 
+	 * The tuple which has a smaller value at one index will be returned.
+	 */
+	@SuppressWarnings({ "unchecked", "rawtypes" })
+	@Override
+	public T reduce(T value1, T value2) throws Exception {
+
+		for (int index = 0; index < fields.length; index++) {
+			// Save position of compared key
+			int position = this.fields[index];
+
+			// Get both values - both implement comparable
+			Comparable comparable1 = value1.getField(position);
+			Comparable comparable2 = value2.getField(position);
+
+			// Compare values
+			int comp = comparable1.compareTo(comparable2);
+			// If comp is smaller than 0 comparable 1 is smaller.
+			// Return the smaller value.
+			if (comp < 0) {
+				return value1;
+			} else if (comp > 0) {
+				return value2;
+			}
+		}
+		return value1;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d60a3169/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
index 87b1454..702f149 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
@@ -18,14 +18,17 @@
 
 package org.apache.flink.api.java.operators;
 
+import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.java.aggregation.Aggregations;
-
 import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.SelectByMaxFunction;
+import org.apache.flink.api.java.functions.SelectByMinFunction;
 import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 
 public class UnsortedGrouping<T> extends Grouping<T> {
 
@@ -133,7 +136,47 @@ public class UnsortedGrouping<T> extends Grouping<T> {
 		return new GroupReduceOperator<T, R>(this, reducer);
 	}
 
-
+	/**
+	 * Applies a special case of a reduce transformation (minBy) on a grouped {@link DataSet}.<br/>
+	 * The transformation consecutively calls a {@link ReduceFunction} 
+	 * until only a single element remains which is the result of the transformation.
+	 * A ReduceFunction combines two elements into one new element of the same type.
+	 *  
+	 * @param fields Keys taken into account for finding the minimum.
+	 * @return A {@link ReduceOperator} representing the minimum.
+	 */
+	@SuppressWarnings({ "unchecked", "rawtypes" })
+	public ReduceOperator<T> minBy(int... fields)  {
+		
+		// Check for using a tuple
+		if(!this.dataSet.getType().isTupleType()) {
+			throw new InvalidProgramException("Method minBy(int) only works on tuples.");
+		}
+			
+		return new ReduceOperator<T>(this, new SelectByMinFunction(
+				(TupleTypeInfo) this.dataSet.getType(), fields));
+	}
+	
+	/**
+	 * Applies a special case of a reduce transformation (maxBy) on a grouped {@link DataSet}.<br/>
+	 * The transformation consecutively calls a {@link ReduceFunction} 
+	 * until only a single element remains which is the result of the transformation.
+	 * A ReduceFunction combines two elements into one new element of the same type.
+	 *  
+	 * @param fields Keys taken into account for finding the minimum.
+	 * @return A {@link ReduceOperator} representing the minimum.
+	 */
+	@SuppressWarnings({ "unchecked", "rawtypes" })
+	public ReduceOperator<T> maxBy(int... fields)  {
+		
+		// Check for using a tuple
+		if(!this.dataSet.getType().isTupleType()) {
+			throw new InvalidProgramException("Method maxBy(int) only works on tuples.");
+		}
+			
+		return new ReduceOperator<T>(this, new SelectByMaxFunction(
+				(TupleTypeInfo) this.dataSet.getType(), fields));
+	}
 	// --------------------------------------------------------------------------------------------
 	//  Group Operations
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d60a3169/flink-java/src/test/java/org/apache/flink/api/java/functions/SelectByFunctionsTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/functions/SelectByFunctionsTest.java b/flink-java/src/test/java/org/apache/flink/api/java/functions/SelectByFunctionsTest.java
new file mode 100644
index 0000000..8bd9b04
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/functions/SelectByFunctionsTest.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.functions;
+
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.BasicTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SelectByFunctionsTest {
+	
+	private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>>(
+			BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO,
+			BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO,
+			BasicTypeInfo.INT_TYPE_INFO);
+	
+	private final Tuple5<Integer, Long, String, Long, Integer> bigger = new Tuple5<Integer, Long, String, Long, Integer>(10, 100L, "HelloWorld", 200L, 20);
+	private final Tuple5<Integer, Long, String, Long, Integer> smaller = new Tuple5<Integer, Long, String, Long, Integer>(5, 50L, "Hello", 50L, 15);
+	
+	//Special case where only the last value determines if bigger or smaller
+	private final Tuple5<Integer, Long, String, Long, Integer> specialCaseBigger = new Tuple5<Integer, Long, String, Long, Integer>(10, 100L, "HelloWorld", 200L, 17);
+	private final Tuple5<Integer, Long, String, Long, Integer> specialCaseSmaller = new Tuple5<Integer, Long, String, Long, Integer>(5, 50L, "Hello", 50L, 17);
+	
+	
+	/**
+	 * This test validates whether the order of tuples has any impact on the outcome and if the bigger tuple is returned.
+	 */
+	@Test
+	public void testMaxByComparison() {
+		SelectByMaxFunction<Tuple5<Integer, Long, String, Long, Integer>> maxByTuple = new SelectByMaxFunction<Tuple5<Integer, Long, String, Long, Integer>>(tupleTypeInfo, new int[] {0});
+		
+		try {
+			Assert.assertSame("SelectByMax must return bigger tuple", bigger, maxByTuple.reduce(smaller, bigger));
+			Assert.assertSame("SelectByMax must return bigger tuple", bigger, maxByTuple.reduce(bigger, smaller));
+		} catch (Exception e) {
+			Assert.fail("No exception should be thrown while comapring both tuples");
+		}
+	}
+	
+	// ----------------------- MAXIMUM FUNCTION TEST BELOW --------------------------
+	
+	/**
+	 * This test cases checks when two tuples only differ in one value, but this value is not
+	 * in the fields list. In that case it should be seen as equal and then the first given tuple (value1) should be returned by reduce().
+	 */
+	@Test
+	public void testMaxByComparisonSpecialCase1() {
+		SelectByMaxFunction<Tuple5<Integer, Long, String, Long, Integer>> maxByTuple = new SelectByMaxFunction<Tuple5<Integer, Long, String, Long, Integer>>(tupleTypeInfo, new int[] {0,3});
+		
+		try {
+			Assert.assertSame("SelectByMax must return the first given tuple", specialCaseBigger, maxByTuple.reduce(specialCaseBigger, bigger));
+			Assert.assertSame("SelectByMax must return the first given tuple", bigger, maxByTuple.reduce(bigger, specialCaseBigger));
+		} catch (Exception e) {
+			Assert.fail("No exception should be thrown while comapring both tuples");
+		}
+	}
+	
+	/**
+	 * This test cases checks when two tuples only differ in one value.
+	 */
+	@Test
+	public void testMaxByComparisonSpecialCase2() {
+		SelectByMaxFunction<Tuple5<Integer, Long, String, Long, Integer>> maxByTuple = new SelectByMaxFunction<Tuple5<Integer, Long, String, Long, Integer>>(tupleTypeInfo, new int[] {0,2,1,4,3});
+		
+		try {
+			Assert.assertSame("SelectByMax must return bigger tuple", bigger, maxByTuple.reduce(specialCaseBigger, bigger));
+			Assert.assertSame("SelectByMax must return bigger tuple", bigger, maxByTuple.reduce(bigger, specialCaseBigger));
+		} catch (Exception e) {
+			Assert.fail("No exception should be thrown while comapring both tuples");
+		}
+	}
+	
+	/**
+	 * This test validates that equality is independent of the amount of used indices.
+	 */
+	@Test
+	public void testMaxByComparisonMultiple() {
+		SelectByMaxFunction<Tuple5<Integer, Long, String, Long, Integer>> maxByTuple = new SelectByMaxFunction<Tuple5<Integer, Long, String, Long, Integer>>(tupleTypeInfo, new int[] {0,1,2,3,4});
+		
+		try {
+			Assert.assertSame("SelectByMax must return bigger tuple", bigger, maxByTuple.reduce(smaller, bigger));
+			Assert.assertSame("SelectByMax must return bigger tuple", bigger, maxByTuple.reduce(bigger, smaller));
+		} catch (Exception e) {
+			Assert.fail("No exception should be thrown while comapring both tuples");
+		}
+	}
+	
+	/**
+	 * Checks whether reduce does behave as expected if both values are the same object. 
+	 */
+	@Test
+	public void testMaxByComparisonMustReturnATuple() {
+		SelectByMaxFunction<Tuple5<Integer, Long, String, Long, Integer>> maxByTuple = new SelectByMaxFunction<Tuple5<Integer, Long, String, Long, Integer>>(tupleTypeInfo, new int[] {0});
+		
+		try {
+			Assert.assertSame("SelectByMax must return bigger tuple", bigger, maxByTuple.reduce(bigger, bigger));
+			Assert.assertSame("SelectByMax must return smaller tuple", smaller, maxByTuple.reduce(smaller, smaller));
+		} catch (Exception e) {
+			Assert.fail("No exception should be thrown while comapring both tuples");
+		}
+	}
+	
+	// ----------------------- MINIMUM FUNCTION TEST BELOW --------------------------
+	
+	/**
+	 * This test validates whether the order of tuples has any impact on the outcome and if the smaller tuple is returned.
+	 */
+	@Test
+	public void testMinByComparison() {
+		SelectByMinFunction<Tuple5<Integer, Long, String, Long, Integer>> minByTuple = new SelectByMinFunction<Tuple5<Integer, Long, String, Long, Integer>>(tupleTypeInfo, new int[] {0});
+		
+		try {
+			Assert.assertSame("SelectByMin must return smaller tuple", smaller, minByTuple.reduce(smaller, bigger));
+			Assert.assertSame("SelectByMin must return smaller tuple", smaller, minByTuple.reduce(bigger, smaller));
+		} catch (Exception e) {
+			Assert.fail("No exception should be thrown while comapring both tuples");
+		}
+	}
+	
+	/**
+	 * This test cases checks when two tuples only differ in one value, but this value is not
+	 * in the fields list. In that case it should be seen as equal and then the first given tuple (value1) should be returned by reduce().
+	 */
+	@Test
+	public void testMinByComparisonSpecialCase1() {
+		SelectByMinFunction<Tuple5<Integer, Long, String, Long, Integer>> minByTuple = new SelectByMinFunction<Tuple5<Integer, Long, String, Long, Integer>>(tupleTypeInfo, new int[] {0,3});
+		
+		try {
+			Assert.assertSame("SelectByMin must return the first given tuple", specialCaseBigger, minByTuple.reduce(specialCaseBigger, bigger));
+			Assert.assertSame("SelectByMin must return the first given tuple", bigger, minByTuple.reduce(bigger, specialCaseBigger));
+		} catch (Exception e) {
+			Assert.fail("No exception should be thrown while comapring both tuples");
+		}
+	}
+	
+	/**
+	 * This test validates that when two tuples only differ in one value and that value's index is given
+	 * at construction time. The smaller tuple must be returned then.
+	 */
+	@Test
+	public void testMinByComparisonSpecialCase2() {
+		SelectByMinFunction<Tuple5<Integer, Long, String, Long, Integer>> minByTuple = new SelectByMinFunction<Tuple5<Integer, Long, String, Long, Integer>>(tupleTypeInfo, new int[] {0,2,1,4,3});
+		
+		try {
+			Assert.assertSame("SelectByMin must return smaller tuple", smaller, minByTuple.reduce(specialCaseSmaller, smaller));
+			Assert.assertSame("SelectByMin must return smaller tuple", smaller, minByTuple.reduce(smaller, specialCaseSmaller));
+		} catch (Exception e) {
+			Assert.fail("No exception should be thrown while comapring both tuples");
+		}
+	}
+	
+	/**
+	 * Checks whether reduce does behave as expected if both values are the same object. 
+	 */
+	@Test
+	public void testMinByComparisonMultiple() {
+		SelectByMinFunction<Tuple5<Integer, Long, String, Long, Integer>> minByTuple = new SelectByMinFunction<Tuple5<Integer, Long, String, Long, Integer>>(tupleTypeInfo, new int[] {0,1,2,3,4});
+		
+		try {
+			Assert.assertSame("SelectByMin must return smaller tuple", smaller, minByTuple.reduce(smaller, bigger));
+			Assert.assertSame("SelectByMin must return smaller tuple", smaller, minByTuple.reduce(bigger, smaller));
+		} catch (Exception e) {
+			Assert.fail("No exception should be thrown while comapring both tuples");
+		}
+	}
+	
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d60a3169/flink-java/src/test/java/org/apache/flink/api/java/operator/MaxByOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/MaxByOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/MaxByOperatorTest.java
new file mode 100644
index 0000000..85bc2e5
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/MaxByOperatorTest.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.operator;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.UnsortedGrouping;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.BasicTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.junit.Test;
+
+public class MaxByOperatorTest {
+
+	// TUPLE DATA
+	private final List<Tuple5<Integer, Long, String, Long, Integer>> emptyTupleData = new ArrayList<Tuple5<Integer, Long, String, Long, Integer>>();
+
+	private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>>(
+			BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO,
+			BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO,
+			BasicTypeInfo.INT_TYPE_INFO);
+
+	/**
+	 * This test validates that no exceptions is thrown when an empty dataset 
+	 * calls maxBy().
+	 */
+	@Test
+	public void testMaxByKeyFieldsDataset() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment
+				.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env
+				.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// should work
+		try {
+			tupleDs.maxBy(4, 0, 1, 2, 3);
+		} catch (Exception e) {
+			Assert.fail();
+		}
+	}
+
+	private final List<CustomType> customTypeData = new ArrayList<CustomType>();
+	
+	/**
+	 * This test validates that an InvalidProgrammException is thrown when maxBy 
+	 * is used on a custom data type.
+	 */
+	@Test(expected = InvalidProgramException.class)
+	public void testCustomKeyFieldsDataset() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		this.customTypeData.add(new CustomType());
+		
+		DataSet<CustomType> customDs = env.fromCollection(customTypeData);
+		// should not work: groups on custom type
+		customDs.maxBy(0);
+	}
+	
+	/**
+	 * This test validates that an index which is out of bounds throws an
+	 * IndexOutOfBOundsExcpetion.
+	 */
+	@Test(expected = IndexOutOfBoundsException.class)
+	public void testOutOfTupleBoundsDataset1() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// should not work, key out of tuple bounds
+		tupleDs.maxBy(5);
+	}
+	
+	/**
+	 * This test validates that an index which is out of bounds throws an
+	 * IndexOutOfBOundsExcpetion.
+	 */
+	@Test(expected = IndexOutOfBoundsException.class)
+	public void testOutOfTupleBoundsDataset2() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// should not work, key out of tuple bounds
+		tupleDs.maxBy(-1);
+	}
+	
+	/**
+	 * This test validates that an index which is out of bounds throws an
+	 * IndexOutOfBOundsExcpetion.
+	 */
+	@Test(expected = IndexOutOfBoundsException.class)
+	public void testOutOfTupleBoundsDataset3() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// should not work, key out of tuple bounds
+		tupleDs.maxBy(1,2,3,4,-1);
+	}
+	
+	//---------------------------- GROUPING TESTS BELOW --------------------------------------
+	
+	/**
+	 * This test validates that no exceptions is thrown when an empty grouping 
+	 * calls maxBy().
+	 */
+	@Test
+	public void testMaxByKeyFieldsGrouping() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		UnsortedGrouping<Tuple5<Integer, Long, String, Long, Integer>> groupDs = env.fromCollection(emptyTupleData, tupleTypeInfo).groupBy(0);
+
+		// should work
+		try {
+			groupDs.maxBy(4, 0, 1, 2, 3);
+		} catch (Exception e) {
+			Assert.fail();
+		}
+	}
+
+	/**
+	 * This test validates that an InvalidProgrammException is thrown when maxBy 
+	 * is used on a custom data type.
+	 */
+	@Test(expected = InvalidProgramException.class)
+	public void testCustomKeyFieldsGrouping() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		this.customTypeData.add(new CustomType());
+		
+		UnsortedGrouping<CustomType> groupDs = env.fromCollection(customTypeData).groupBy(0);
+		// should not work: groups on custom type
+		groupDs.maxBy(0);
+	}
+	
+	/**
+	 * This test validates that an index which is out of bounds throws an
+	 * IndexOutOfBOundsExcpetion.
+	 */
+	@Test(expected = IndexOutOfBoundsException.class)
+	public void testOutOfTupleBoundsGrouping1() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		UnsortedGrouping<Tuple5<Integer, Long, String, Long, Integer>> groupDs = env.fromCollection(emptyTupleData, tupleTypeInfo).groupBy(0);
+
+		// should not work, key out of tuple bounds
+		groupDs.maxBy(5);
+	}
+	
+	/**
+	 * This test validates that an index which is out of bounds throws an
+	 * IndexOutOfBOundsExcpetion.
+	 */
+	@Test(expected = IndexOutOfBoundsException.class)
+	public void testOutOfTupleBoundsGrouping2() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		UnsortedGrouping<Tuple5<Integer, Long, String, Long, Integer>> groupDs = env.fromCollection(emptyTupleData, tupleTypeInfo).groupBy(0);
+
+		// should not work, key out of tuple bounds
+		groupDs.maxBy(-1);
+	}
+	
+	/**
+	 * This test validates that an index which is out of bounds throws an
+	 * IndexOutOfBOundsExcpetion.
+	 */
+	@Test(expected = IndexOutOfBoundsException.class)
+	public void testOutOfTupleBoundsGrouping3() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		UnsortedGrouping<Tuple5<Integer, Long, String, Long, Integer>> groupDs = env.fromCollection(emptyTupleData, tupleTypeInfo).groupBy(0);
+
+		// should not work, key out of tuple bounds
+		groupDs.maxBy(1,2,3,4,-1);
+	}
+
+	/**
+	 * Custom data type, for testing purposes.
+	 */
+	public static class CustomType implements Serializable {
+
+		private static final long serialVersionUID = 1L;
+
+		public int myInt;
+		public long myLong;
+		public String myString;
+
+		public CustomType() {
+		};
+
+		public CustomType(int i, long l, String s) {
+			myInt = i;
+			myLong = l;
+			myString = s;
+		}
+
+		@Override
+		public String toString() {
+			return myInt + "," + myLong + "," + myString;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d60a3169/flink-java/src/test/java/org/apache/flink/api/java/operator/MinByOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/MinByOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/MinByOperatorTest.java
new file mode 100644
index 0000000..ff6a851
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/MinByOperatorTest.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.operator;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.UnsortedGrouping;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.BasicTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.junit.Test;
+
+public class MinByOperatorTest {
+
+	// TUPLE DATA
+	private final List<Tuple5<Integer, Long, String, Long, Integer>> emptyTupleData = new ArrayList<Tuple5<Integer, Long, String, Long, Integer>>();
+
+	private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>>(
+			BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO,
+			BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO,
+			BasicTypeInfo.INT_TYPE_INFO);
+
+	/**
+	 * This test validates that no exceptions is thrown when an empty dataset 
+	 * calls minBy().
+	 */
+	@Test
+	public void testMinByKeyFieldsDataset() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment
+				.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env
+				.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// should work
+		try {
+			tupleDs.minBy(4, 0, 1, 2, 3);
+		} catch (Exception e) {
+			Assert.fail();
+		}
+	}
+
+	private final List<CustomType> customTypeData = new ArrayList<CustomType>();
+	
+	/**
+	 * This test validates that an InvalidProgrammException is thrown when minBy 
+	 * is used on a custom data type.
+	 */
+	@Test(expected = InvalidProgramException.class)
+	public void testCustomKeyFieldsDataset() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		this.customTypeData.add(new CustomType());
+		
+		DataSet<CustomType> customDs = env.fromCollection(customTypeData);
+		// should not work: groups on custom type
+		customDs.minBy(0);
+	}
+	
+	/**
+	 * This test validates that an index which is out of bounds throws an
+	 * IndexOutOfBOundsExcpetion.
+	 */
+	@Test(expected = IndexOutOfBoundsException.class)
+	public void testOutOfTupleBoundsDataset1() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// should not work, key out of tuple bounds
+		tupleDs.minBy(5);
+	}
+	
+	/**
+	 * This test validates that an index which is out of bounds throws an
+	 * IndexOutOfBOundsExcpetion.
+	 */
+	@Test(expected = IndexOutOfBoundsException.class)
+	public void testOutOfTupleBoundsDataset2() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// should not work, key out of tuple bounds
+		tupleDs.minBy(-1);
+	}
+	
+	/**
+	 * This test validates that an index which is out of bounds throws an
+	 * IndexOutOfBOundsExcpetion.
+	 */
+	@Test(expected = IndexOutOfBoundsException.class)
+	public void testOutOfTupleBoundsDataset3() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// should not work, key out of tuple bounds
+		tupleDs.minBy(1,2,3,4,-1);
+	}
+	
+	//---------------------------- GROUPING TESTS BELOW --------------------------------------
+	
+	/**
+	 * This test validates that no exceptions is thrown when an empty grouping 
+	 * calls minBy().
+	 */
+	@Test
+	public void testMinByKeyFieldsGrouping() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		UnsortedGrouping<Tuple5<Integer, Long, String, Long, Integer>> groupDs = env.fromCollection(emptyTupleData, tupleTypeInfo).groupBy(0);
+
+		// should work
+		try {
+			groupDs.minBy(4, 0, 1, 2, 3);
+		} catch (Exception e) {
+			Assert.fail();
+		}
+	}
+
+	/**
+	 * This test validates that an InvalidProgrammException is thrown when minBy 
+	 * is used on a custom data type.
+	 */
+	@Test(expected = InvalidProgramException.class)
+	public void testCustomKeyFieldsGrouping() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		this.customTypeData.add(new CustomType());
+		
+		UnsortedGrouping<CustomType> groupDs = env.fromCollection(customTypeData).groupBy(0);
+		// should not work: groups on custom type
+		groupDs.minBy(0);
+	}
+	
+	/**
+	 * This test validates that an index which is out of bounds throws an
+	 * IndexOutOfBOundsExcpetion.
+	 */
+	@Test(expected = IndexOutOfBoundsException.class)
+	public void testOutOfTupleBoundsGrouping1() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		UnsortedGrouping<Tuple5<Integer, Long, String, Long, Integer>> groupDs = env.fromCollection(emptyTupleData, tupleTypeInfo).groupBy(0);
+
+		// should not work, key out of tuple bounds
+		groupDs.minBy(5);
+	}
+	
+	/**
+	 * This test validates that an index which is out of bounds throws an
+	 * IndexOutOfBOundsExcpetion.
+	 */
+	@Test(expected = IndexOutOfBoundsException.class)
+	public void testOutOfTupleBoundsGrouping2() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		UnsortedGrouping<Tuple5<Integer, Long, String, Long, Integer>> groupDs = env.fromCollection(emptyTupleData, tupleTypeInfo).groupBy(0);
+
+		// should not work, key out of tuple bounds
+		groupDs.minBy(-1);
+	}
+	
+	/**
+	 * This test validates that an index which is out of bounds throws an
+	 * IndexOutOfBOundsExcpetion.
+	 */
+	@Test(expected = IndexOutOfBoundsException.class)
+	public void testOutOfTupleBoundsGrouping3() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		UnsortedGrouping<Tuple5<Integer, Long, String, Long, Integer>> groupDs = env.fromCollection(emptyTupleData, tupleTypeInfo).groupBy(0);
+
+		// should not work, key out of tuple bounds
+		groupDs.minBy(1,2,3,4,-1);
+	}
+
+	/**
+	 * Custom data type, for testing purposes.
+	 */
+	public static class CustomType implements Serializable {
+
+		private static final long serialVersionUID = 1L;
+
+		public int myInt;
+		public long myLong;
+		public String myString;
+
+		public CustomType() {
+		};
+
+		public CustomType(int i, long l, String s) {
+			myInt = i;
+			myLong = l;
+			myString = s;
+		}
+
+		@Override
+		public String toString() {
+			return myInt + "," + myLong + "," + myString;
+		}
+	}
+
+}