You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/09/02 15:00:31 UTC
git commit: [FLINK-1020] Introduce minBy and maxBy
Repository: incubator-flink
Updated Branches:
refs/heads/master 4d4151d53 -> d60a3169f
[FLINK-1020] Introduce minBy and maxBy
This closes #101
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/d60a3169
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/d60a3169
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/d60a3169
Branch: refs/heads/master
Commit: d60a3169fec671f9176f29c5a9a5c66142d1425f
Parents: 4d4151d
Author: TobiasWiens <to...@gmail.com>
Authored: Fri Aug 22 12:10:37 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Sep 2 12:59:45 2014 +0200
----------------------------------------------------------------------
.../java/org/apache/flink/api/java/DataSet.java | 46 ++++
.../api/java/functions/SelectByMaxFunction.java | 91 ++++++++
.../api/java/functions/SelectByMinFunction.java | 91 ++++++++
.../api/java/operators/UnsortedGrouping.java | 47 +++-
.../java/functions/SelectByFunctionsTest.java | 184 +++++++++++++++
.../api/java/operator/MaxByOperatorTest.java | 229 +++++++++++++++++++
.../api/java/operator/MinByOperatorTest.java | 229 +++++++++++++++++++
7 files changed, 915 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d60a3169/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index d25e64b..ca2a5e9 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -19,6 +19,7 @@
package org.apache.flink.api.java;
import org.apache.commons.lang3.Validate;
+import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
@@ -31,6 +32,8 @@ import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.functions.FormattingMapper;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.functions.SelectByMaxFunction;
+import org.apache.flink.api.java.functions.SelectByMinFunction;
import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
import org.apache.flink.api.java.io.CsvOutputFormat;
import org.apache.flink.api.java.io.PrintingOutputFormat;
@@ -59,6 +62,7 @@ import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.types.TypeInformation;
@@ -339,6 +343,48 @@ public abstract class DataSet<T> {
return new GroupReduceOperator<T, R>(this, reducer);
}
+/**
+ * Applies a special case of a reduce transformation (minBy) on a non-grouped {@link DataSet}.<br/>
+ * The transformation consecutively calls a {@link ReduceFunction}
+ * until only a single element remains which is the result of the transformation.
+ * A ReduceFunction combines two elements into one new element of the same type.
+ *
+ * @param fields Keys taken into account for finding the minimum.
+ * @return A {@link ReduceOperator} representing the minimum.
+ */
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public ReduceOperator<T> minBy(int... fields) {
+
+ // Check for using a tuple
+ if(!this.type.isTupleType()) {
+ throw new InvalidProgramException("Method minBy(int) only works on tuples.");
+ }
+
+ return new ReduceOperator<T>(this, new SelectByMinFunction(
+ (TupleTypeInfo) this.type, fields));
+ }
+
+ /**
+ * Applies a special case of a reduce transformation (maxBy) on a non-grouped {@link DataSet}.<br/>
+ * The transformation consecutively calls a {@link ReduceFunction}
+ * until only a single element remains which is the result of the transformation.
+ * A ReduceFunction combines two elements into one new element of the same type.
+ *
+ * @param fields Keys taken into account for finding the minimum.
+ * @return A {@link ReduceOperator} representing the minimum.
+ */
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public ReduceOperator<T> maxBy(int... fields) {
+
+ // Check for using a tuple
+ if(!this.type.isTupleType()) {
+ throw new InvalidProgramException("Method maxBy(int) only works on tuples.");
+ }
+
+ return new ReduceOperator<T>(this, new SelectByMaxFunction(
+ (TupleTypeInfo) this.type, fields));
+ }
+
// --------------------------------------------------------------------------------------------
// distinct
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d60a3169/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMaxFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMaxFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMaxFunction.java
new file mode 100644
index 0000000..614676e
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMaxFunction.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.functions;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+
+
+public class SelectByMaxFunction<T extends Tuple> implements ReduceFunction<T> {
+ private static final long serialVersionUID = 1L;
+
+ // Fields which are used as KEYS
+ private int[] fields;
+
+ /**
+ * Constructor which is overwriting the default constructor.
+ * @param type Types of tuple whether to check if given fields are key types.
+ * @param fields Array of integers which are used as key for comparison. The order of indexes
+ * is regarded in the reduce function. First index has highest priority and last index has
+ * least priority.
+ */
+ public SelectByMaxFunction(TupleTypeInfo<T> type, int... fields) {
+ this.fields = fields;
+
+ // Check correctness of each position
+ for (int field : fields) {
+ // Is field inside array
+ if (field < 0 || field >= type.getArity()) {
+ throw new IndexOutOfBoundsException(
+ "MinReduceFunction field position " + field + " is out of range.");
+ }
+
+ // Check whether type is comparable
+ if (!type.getTypeAt(field).isKeyType()) {
+ throw new java.lang.IllegalArgumentException(
+ "MinReduceFunction supports only key(Comparable) types.");
+ }
+
+ }
+ }
+
+ /**
+ * Reduce implementation, returns bigger tuple or value1 if both tuples are
+ * equal. Comparison highly depends on the order and amount of fields chosen
+ * as indices. All given fields (at construction time) are checked in the same
+ * order as defined (at construction time). If both tuples are equal in one
+ * index, the next index is compared. Or if no next index is available value1
+ * is returned.
+ * The tuple which has a bigger value at one index will be returned.
+ */
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ @Override
+ public T reduce(T value1, T value2) throws Exception {
+
+ for (int index = 0; index < fields.length; index++) {
+ // Save position of compared key
+ int position = this.fields[index];
+
+ // Get both values - both implement comparable
+ Comparable comparable1 = value1.getField(position);
+ Comparable comparable2 = value2.getField(position);
+
+ // Compare values
+ int comp = comparable1.compareTo(comparable2);
+ // If comp is bigger than 0 comparable 1 is bigger.
+ // Return the smaller value.
+ if (comp > 0) {
+ return value1;
+ } else if (comp < 0) {
+ return value2;
+ }
+ }
+ return value1;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d60a3169/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMinFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMinFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMinFunction.java
new file mode 100644
index 0000000..4b0a7bf
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMinFunction.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.functions;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+
+
+public class SelectByMinFunction<T extends Tuple> implements ReduceFunction<T> {
+ private static final long serialVersionUID = 1L;
+
+ // Fields which are used as KEYS
+ private int[] fields;
+
+ /**
+ * Constructor which is overwriting the default constructor.
+ * @param type Types of tuple whether to check if given fields are key types.
+ * @param fields Array of integers which are used as key for comparison. The order of indexes
+ * is regarded in the reduce function. First index has highest priority and last index has
+ * least priority.
+ */
+ public SelectByMinFunction(TupleTypeInfo<T> type, int... fields) {
+ this.fields = fields;
+
+ // Check correctness of each position
+ for (int field : fields) {
+ // Is field inside array
+ if (field < 0 || field >= type.getArity()) {
+ throw new java.lang.IndexOutOfBoundsException(
+ "MinReduceFunction field position " + field + " is out of range.");
+ }
+
+ // Check whether type is comparable
+ if (!type.getTypeAt(field).isKeyType()) {
+ throw new java.lang.IllegalArgumentException(
+ "MinReduceFunction supports only key(Comparable) types.");
+ }
+
+ }
+ }
+
+ /**
+ * Reduce implementation, returns smaller tuple or value1 if both tuples are
+ * equal. Comparison highly depends on the order and amount of fields chosen
+ * as indices. All given fields (at construction time) are checked in the same
+ * order as defined (at construction time). If both tuples are equal in one
+ * index, the next index is compared. Or if no next index is available value1
+ * is returned.
+ * The tuple which has a smaller value at one index will be returned.
+ */
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ @Override
+ public T reduce(T value1, T value2) throws Exception {
+
+ for (int index = 0; index < fields.length; index++) {
+ // Save position of compared key
+ int position = this.fields[index];
+
+ // Get both values - both implement comparable
+ Comparable comparable1 = value1.getField(position);
+ Comparable comparable2 = value2.getField(position);
+
+ // Compare values
+ int comp = comparable1.compareTo(comparable2);
+ // If comp is smaller than 0 comparable 1 is smaller.
+ // Return the smaller value.
+ if (comp < 0) {
+ return value1;
+ } else if (comp > 0) {
+ return value2;
+ }
+ }
+ return value1;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d60a3169/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
index 87b1454..702f149 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
@@ -18,14 +18,17 @@
package org.apache.flink.api.java.operators;
+import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.aggregation.Aggregations;
-
import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.SelectByMaxFunction;
+import org.apache.flink.api.java.functions.SelectByMinFunction;
import org.apache.flink.api.java.functions.UnsupportedLambdaExpressionException;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
public class UnsortedGrouping<T> extends Grouping<T> {
@@ -133,7 +136,47 @@ public class UnsortedGrouping<T> extends Grouping<T> {
return new GroupReduceOperator<T, R>(this, reducer);
}
-
+ /**
+ * Applies a special case of a reduce transformation (minBy) on a grouped {@link DataSet}.<br/>
+ * The transformation consecutively calls a {@link ReduceFunction}
+ * until only a single element remains which is the result of the transformation.
+ * A ReduceFunction combines two elements into one new element of the same type.
+ *
+ * @param fields Keys taken into account for finding the minimum.
+ * @return A {@link ReduceOperator} representing the minimum.
+ */
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public ReduceOperator<T> minBy(int... fields) {
+
+ // Check for using a tuple
+ if(!this.dataSet.getType().isTupleType()) {
+ throw new InvalidProgramException("Method minBy(int) only works on tuples.");
+ }
+
+ return new ReduceOperator<T>(this, new SelectByMinFunction(
+ (TupleTypeInfo) this.dataSet.getType(), fields));
+ }
+
+ /**
+ * Applies a special case of a reduce transformation (maxBy) on a grouped {@link DataSet}.<br/>
+ * The transformation consecutively calls a {@link ReduceFunction}
+ * until only a single element remains which is the result of the transformation.
+ * A ReduceFunction combines two elements into one new element of the same type.
+ *
+ * @param fields Keys taken into account for finding the minimum.
+ * @return A {@link ReduceOperator} representing the minimum.
+ */
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public ReduceOperator<T> maxBy(int... fields) {
+
+ // Check for using a tuple
+ if(!this.dataSet.getType().isTupleType()) {
+ throw new InvalidProgramException("Method maxBy(int) only works on tuples.");
+ }
+
+ return new ReduceOperator<T>(this, new SelectByMaxFunction(
+ (TupleTypeInfo) this.dataSet.getType(), fields));
+ }
// --------------------------------------------------------------------------------------------
// Group Operations
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d60a3169/flink-java/src/test/java/org/apache/flink/api/java/functions/SelectByFunctionsTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/functions/SelectByFunctionsTest.java b/flink-java/src/test/java/org/apache/flink/api/java/functions/SelectByFunctionsTest.java
new file mode 100644
index 0000000..8bd9b04
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/functions/SelectByFunctionsTest.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.functions;
+
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.BasicTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SelectByFunctionsTest {
+
+ private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>>(
+ BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO);
+
+ private final Tuple5<Integer, Long, String, Long, Integer> bigger = new Tuple5<Integer, Long, String, Long, Integer>(10, 100L, "HelloWorld", 200L, 20);
+ private final Tuple5<Integer, Long, String, Long, Integer> smaller = new Tuple5<Integer, Long, String, Long, Integer>(5, 50L, "Hello", 50L, 15);
+
+ //Special case where only the last value determines if bigger or smaller
+ private final Tuple5<Integer, Long, String, Long, Integer> specialCaseBigger = new Tuple5<Integer, Long, String, Long, Integer>(10, 100L, "HelloWorld", 200L, 17);
+ private final Tuple5<Integer, Long, String, Long, Integer> specialCaseSmaller = new Tuple5<Integer, Long, String, Long, Integer>(5, 50L, "Hello", 50L, 17);
+
+
+ /**
+ * This test validates whether the order of tuples has any impact on the outcome and if the bigger tuple is returned.
+ */
+ @Test
+ public void testMaxByComparison() {
+ SelectByMaxFunction<Tuple5<Integer, Long, String, Long, Integer>> maxByTuple = new SelectByMaxFunction<Tuple5<Integer, Long, String, Long, Integer>>(tupleTypeInfo, new int[] {0});
+
+ try {
+ Assert.assertSame("SelectByMax must return bigger tuple", bigger, maxByTuple.reduce(smaller, bigger));
+ Assert.assertSame("SelectByMax must return bigger tuple", bigger, maxByTuple.reduce(bigger, smaller));
+ } catch (Exception e) {
+ Assert.fail("No exception should be thrown while comapring both tuples");
+ }
+ }
+
+ // ----------------------- MAXIMUM FUNCTION TEST BELOW --------------------------
+
+ /**
+ * This test cases checks when two tuples only differ in one value, but this value is not
+ * in the fields list. In that case it should be seen as equal and then the first given tuple (value1) should be returned by reduce().
+ */
+ @Test
+ public void testMaxByComparisonSpecialCase1() {
+ SelectByMaxFunction<Tuple5<Integer, Long, String, Long, Integer>> maxByTuple = new SelectByMaxFunction<Tuple5<Integer, Long, String, Long, Integer>>(tupleTypeInfo, new int[] {0,3});
+
+ try {
+ Assert.assertSame("SelectByMax must return the first given tuple", specialCaseBigger, maxByTuple.reduce(specialCaseBigger, bigger));
+ Assert.assertSame("SelectByMax must return the first given tuple", bigger, maxByTuple.reduce(bigger, specialCaseBigger));
+ } catch (Exception e) {
+ Assert.fail("No exception should be thrown while comapring both tuples");
+ }
+ }
+
+ /**
+ * This test cases checks when two tuples only differ in one value.
+ */
+ @Test
+ public void testMaxByComparisonSpecialCase2() {
+ SelectByMaxFunction<Tuple5<Integer, Long, String, Long, Integer>> maxByTuple = new SelectByMaxFunction<Tuple5<Integer, Long, String, Long, Integer>>(tupleTypeInfo, new int[] {0,2,1,4,3});
+
+ try {
+ Assert.assertSame("SelectByMax must return bigger tuple", bigger, maxByTuple.reduce(specialCaseBigger, bigger));
+ Assert.assertSame("SelectByMax must return bigger tuple", bigger, maxByTuple.reduce(bigger, specialCaseBigger));
+ } catch (Exception e) {
+ Assert.fail("No exception should be thrown while comapring both tuples");
+ }
+ }
+
+ /**
+ * This test validates that equality is independent of the amount of used indices.
+ */
+ @Test
+ public void testMaxByComparisonMultiple() {
+ SelectByMaxFunction<Tuple5<Integer, Long, String, Long, Integer>> maxByTuple = new SelectByMaxFunction<Tuple5<Integer, Long, String, Long, Integer>>(tupleTypeInfo, new int[] {0,1,2,3,4});
+
+ try {
+ Assert.assertSame("SelectByMax must return bigger tuple", bigger, maxByTuple.reduce(smaller, bigger));
+ Assert.assertSame("SelectByMax must return bigger tuple", bigger, maxByTuple.reduce(bigger, smaller));
+ } catch (Exception e) {
+ Assert.fail("No exception should be thrown while comapring both tuples");
+ }
+ }
+
+ /**
+ * Checks whether reduce does behave as expected if both values are the same object.
+ */
+ @Test
+ public void testMaxByComparisonMustReturnATuple() {
+ SelectByMaxFunction<Tuple5<Integer, Long, String, Long, Integer>> maxByTuple = new SelectByMaxFunction<Tuple5<Integer, Long, String, Long, Integer>>(tupleTypeInfo, new int[] {0});
+
+ try {
+ Assert.assertSame("SelectByMax must return bigger tuple", bigger, maxByTuple.reduce(bigger, bigger));
+ Assert.assertSame("SelectByMax must return smaller tuple", smaller, maxByTuple.reduce(smaller, smaller));
+ } catch (Exception e) {
+ Assert.fail("No exception should be thrown while comapring both tuples");
+ }
+ }
+
+ // ----------------------- MINIMUM FUNCTION TEST BELOW --------------------------
+
+ /**
+ * This test validates whether the order of tuples has any impact on the outcome and if the smaller tuple is returned.
+ */
+ @Test
+ public void testMinByComparison() {
+ SelectByMinFunction<Tuple5<Integer, Long, String, Long, Integer>> minByTuple = new SelectByMinFunction<Tuple5<Integer, Long, String, Long, Integer>>(tupleTypeInfo, new int[] {0});
+
+ try {
+ Assert.assertSame("SelectByMin must return smaller tuple", smaller, minByTuple.reduce(smaller, bigger));
+ Assert.assertSame("SelectByMin must return smaller tuple", smaller, minByTuple.reduce(bigger, smaller));
+ } catch (Exception e) {
+ Assert.fail("No exception should be thrown while comapring both tuples");
+ }
+ }
+
+ /**
+ * This test cases checks when two tuples only differ in one value, but this value is not
+ * in the fields list. In that case it should be seen as equal and then the first given tuple (value1) should be returned by reduce().
+ */
+ @Test
+ public void testMinByComparisonSpecialCase1() {
+ SelectByMinFunction<Tuple5<Integer, Long, String, Long, Integer>> minByTuple = new SelectByMinFunction<Tuple5<Integer, Long, String, Long, Integer>>(tupleTypeInfo, new int[] {0,3});
+
+ try {
+ Assert.assertSame("SelectByMin must return the first given tuple", specialCaseBigger, minByTuple.reduce(specialCaseBigger, bigger));
+ Assert.assertSame("SelectByMin must return the first given tuple", bigger, minByTuple.reduce(bigger, specialCaseBigger));
+ } catch (Exception e) {
+ Assert.fail("No exception should be thrown while comapring both tuples");
+ }
+ }
+
+ /**
+ * This test validates that when two tuples only differ in one value and that value's index is given
+ * at construction time. The smaller tuple must be returned then.
+ */
+ @Test
+ public void testMinByComparisonSpecialCase2() {
+ SelectByMinFunction<Tuple5<Integer, Long, String, Long, Integer>> minByTuple = new SelectByMinFunction<Tuple5<Integer, Long, String, Long, Integer>>(tupleTypeInfo, new int[] {0,2,1,4,3});
+
+ try {
+ Assert.assertSame("SelectByMin must return smaller tuple", smaller, minByTuple.reduce(specialCaseSmaller, smaller));
+ Assert.assertSame("SelectByMin must return smaller tuple", smaller, minByTuple.reduce(smaller, specialCaseSmaller));
+ } catch (Exception e) {
+ Assert.fail("No exception should be thrown while comapring both tuples");
+ }
+ }
+
+ /**
+ * Checks whether reduce does behave as expected if both values are the same object.
+ */
+ @Test
+ public void testMinByComparisonMultiple() {
+ SelectByMinFunction<Tuple5<Integer, Long, String, Long, Integer>> minByTuple = new SelectByMinFunction<Tuple5<Integer, Long, String, Long, Integer>>(tupleTypeInfo, new int[] {0,1,2,3,4});
+
+ try {
+ Assert.assertSame("SelectByMin must return smaller tuple", smaller, minByTuple.reduce(smaller, bigger));
+ Assert.assertSame("SelectByMin must return smaller tuple", smaller, minByTuple.reduce(bigger, smaller));
+ } catch (Exception e) {
+ Assert.fail("No exception should be thrown while comapring both tuples");
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d60a3169/flink-java/src/test/java/org/apache/flink/api/java/operator/MaxByOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/MaxByOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/MaxByOperatorTest.java
new file mode 100644
index 0000000..85bc2e5
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/MaxByOperatorTest.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.operator;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.UnsortedGrouping;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.BasicTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.junit.Test;
+
+public class MaxByOperatorTest {
+
+ // TUPLE DATA
+ private final List<Tuple5<Integer, Long, String, Long, Integer>> emptyTupleData = new ArrayList<Tuple5<Integer, Long, String, Long, Integer>>();
+
+ private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>>(
+ BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO);
+
+ /**
+ * This test validates that no exceptions is thrown when an empty dataset
+ * calls maxBy().
+ */
+ @Test
+ public void testMaxByKeyFieldsDataset() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment
+ .getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env
+ .fromCollection(emptyTupleData, tupleTypeInfo);
+
+ // should work
+ try {
+ tupleDs.maxBy(4, 0, 1, 2, 3);
+ } catch (Exception e) {
+ Assert.fail();
+ }
+ }
+
+ private final List<CustomType> customTypeData = new ArrayList<CustomType>();
+
+ /**
+ * This test validates that an InvalidProgrammException is thrown when maxBy
+ * is used on a custom data type.
+ */
+ @Test(expected = InvalidProgramException.class)
+ public void testCustomKeyFieldsDataset() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ this.customTypeData.add(new CustomType());
+
+ DataSet<CustomType> customDs = env.fromCollection(customTypeData);
+ // should not work: groups on custom type
+ customDs.maxBy(0);
+ }
+
+ /**
+ * This test validates that an index which is out of bounds throws an
+ * IndexOutOfBOundsExcpetion.
+ */
+ @Test(expected = IndexOutOfBoundsException.class)
+ public void testOutOfTupleBoundsDataset1() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ // should not work, key out of tuple bounds
+ tupleDs.maxBy(5);
+ }
+
+ /**
+ * This test validates that an index which is out of bounds throws an
+ * IndexOutOfBOundsExcpetion.
+ */
+ @Test(expected = IndexOutOfBoundsException.class)
+ public void testOutOfTupleBoundsDataset2() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ // should not work, key out of tuple bounds
+ tupleDs.maxBy(-1);
+ }
+
+ /**
+ * This test validates that an index which is out of bounds throws an
+ * IndexOutOfBOundsExcpetion.
+ */
+ @Test(expected = IndexOutOfBoundsException.class)
+ public void testOutOfTupleBoundsDataset3() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ // should not work, key out of tuple bounds
+ tupleDs.maxBy(1,2,3,4,-1);
+ }
+
+ //---------------------------- GROUPING TESTS BELOW --------------------------------------
+
+ /**
+ * This test validates that no exceptions is thrown when an empty grouping
+ * calls maxBy().
+ */
+ @Test
+ public void testMaxByKeyFieldsGrouping() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ UnsortedGrouping<Tuple5<Integer, Long, String, Long, Integer>> groupDs = env.fromCollection(emptyTupleData, tupleTypeInfo).groupBy(0);
+
+ // should work
+ try {
+ groupDs.maxBy(4, 0, 1, 2, 3);
+ } catch (Exception e) {
+ Assert.fail();
+ }
+ }
+
+ /**
+ * This test validates that an InvalidProgrammException is thrown when maxBy
+ * is used on a custom data type.
+ */
+ @Test(expected = InvalidProgramException.class)
+ public void testCustomKeyFieldsGrouping() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ this.customTypeData.add(new CustomType());
+
+ UnsortedGrouping<CustomType> groupDs = env.fromCollection(customTypeData).groupBy(0);
+ // should not work: groups on custom type
+ groupDs.maxBy(0);
+ }
+
+ /**
+ * This test validates that an index which is out of bounds throws an
+ * IndexOutOfBOundsExcpetion.
+ */
+ @Test(expected = IndexOutOfBoundsException.class)
+ public void testOutOfTupleBoundsGrouping1() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ UnsortedGrouping<Tuple5<Integer, Long, String, Long, Integer>> groupDs = env.fromCollection(emptyTupleData, tupleTypeInfo).groupBy(0);
+
+ // should not work, key out of tuple bounds
+ groupDs.maxBy(5);
+ }
+
+ /**
+ * This test validates that an index which is out of bounds throws an
+ * IndexOutOfBOundsExcpetion.
+ */
+ @Test(expected = IndexOutOfBoundsException.class)
+ public void testOutOfTupleBoundsGrouping2() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ UnsortedGrouping<Tuple5<Integer, Long, String, Long, Integer>> groupDs = env.fromCollection(emptyTupleData, tupleTypeInfo).groupBy(0);
+
+ // should not work, key out of tuple bounds
+ groupDs.maxBy(-1);
+ }
+
+ /**
+ * This test validates that an index which is out of bounds throws an
+ * IndexOutOfBOundsExcpetion.
+ */
+ @Test(expected = IndexOutOfBoundsException.class)
+ public void testOutOfTupleBoundsGrouping3() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ UnsortedGrouping<Tuple5<Integer, Long, String, Long, Integer>> groupDs = env.fromCollection(emptyTupleData, tupleTypeInfo).groupBy(0);
+
+ // should not work, key out of tuple bounds
+ groupDs.maxBy(1,2,3,4,-1);
+ }
+
+ /**
+ * Custom data type, for testing purposes.
+ */
+ public static class CustomType implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ public int myInt;
+ public long myLong;
+ public String myString;
+
+ public CustomType() {
+ };
+
+ public CustomType(int i, long l, String s) {
+ myInt = i;
+ myLong = l;
+ myString = s;
+ }
+
+ @Override
+ public String toString() {
+ return myInt + "," + myLong + "," + myString;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d60a3169/flink-java/src/test/java/org/apache/flink/api/java/operator/MinByOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/MinByOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/MinByOperatorTest.java
new file mode 100644
index 0000000..ff6a851
--- /dev/null
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/MinByOperatorTest.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.java.operator;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.UnsortedGrouping;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.BasicTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.junit.Test;
+
+public class MinByOperatorTest {
+
+ // TUPLE DATA
+ private final List<Tuple5<Integer, Long, String, Long, Integer>> emptyTupleData = new ArrayList<Tuple5<Integer, Long, String, Long, Integer>>();
+
+ private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>>(
+ BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO);
+
+ /**
+ * This test validates that no exceptions is thrown when an empty dataset
+ * calls minBy().
+ */
+ @Test
+ public void testMinByKeyFieldsDataset() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment
+ .getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env
+ .fromCollection(emptyTupleData, tupleTypeInfo);
+
+ // should work
+ try {
+ tupleDs.minBy(4, 0, 1, 2, 3);
+ } catch (Exception e) {
+ Assert.fail();
+ }
+ }
+
+ private final List<CustomType> customTypeData = new ArrayList<CustomType>();
+
+ /**
+ * This test validates that an InvalidProgrammException is thrown when minBy
+ * is used on a custom data type.
+ */
+ @Test(expected = InvalidProgramException.class)
+ public void testCustomKeyFieldsDataset() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ this.customTypeData.add(new CustomType());
+
+ DataSet<CustomType> customDs = env.fromCollection(customTypeData);
+ // should not work: groups on custom type
+ customDs.minBy(0);
+ }
+
+ /**
+ * This test validates that an index which is out of bounds throws an
+ * IndexOutOfBOundsExcpetion.
+ */
+ @Test(expected = IndexOutOfBoundsException.class)
+ public void testOutOfTupleBoundsDataset1() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ // should not work, key out of tuple bounds
+ tupleDs.minBy(5);
+ }
+
+ /**
+ * This test validates that an index which is out of bounds throws an
+ * IndexOutOfBOundsExcpetion.
+ */
+ @Test(expected = IndexOutOfBoundsException.class)
+ public void testOutOfTupleBoundsDataset2() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ // should not work, key out of tuple bounds
+ tupleDs.minBy(-1);
+ }
+
+ /**
+ * This test validates that an index which is out of bounds throws an
+ * IndexOutOfBOundsExcpetion.
+ */
+ @Test(expected = IndexOutOfBoundsException.class)
+ public void testOutOfTupleBoundsDataset3() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ // should not work, key out of tuple bounds
+ tupleDs.minBy(1,2,3,4,-1);
+ }
+
+ //---------------------------- GROUPING TESTS BELOW --------------------------------------
+
+ /**
+ * This test validates that no exceptions is thrown when an empty grouping
+ * calls minBy().
+ */
+ @Test
+ public void testMinByKeyFieldsGrouping() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ UnsortedGrouping<Tuple5<Integer, Long, String, Long, Integer>> groupDs = env.fromCollection(emptyTupleData, tupleTypeInfo).groupBy(0);
+
+ // should work
+ try {
+ groupDs.minBy(4, 0, 1, 2, 3);
+ } catch (Exception e) {
+ Assert.fail();
+ }
+ }
+
+ /**
+ * This test validates that an InvalidProgrammException is thrown when minBy
+ * is used on a custom data type.
+ */
+ @Test(expected = InvalidProgramException.class)
+ public void testCustomKeyFieldsGrouping() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ this.customTypeData.add(new CustomType());
+
+ UnsortedGrouping<CustomType> groupDs = env.fromCollection(customTypeData).groupBy(0);
+ // should not work: groups on custom type
+ groupDs.minBy(0);
+ }
+
+ /**
+ * This test validates that an index which is out of bounds throws an
+ * IndexOutOfBOundsExcpetion.
+ */
+ @Test(expected = IndexOutOfBoundsException.class)
+ public void testOutOfTupleBoundsGrouping1() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ UnsortedGrouping<Tuple5<Integer, Long, String, Long, Integer>> groupDs = env.fromCollection(emptyTupleData, tupleTypeInfo).groupBy(0);
+
+ // should not work, key out of tuple bounds
+ groupDs.minBy(5);
+ }
+
+ /**
+ * This test validates that an index which is out of bounds throws an
+ * IndexOutOfBOundsExcpetion.
+ */
+ @Test(expected = IndexOutOfBoundsException.class)
+ public void testOutOfTupleBoundsGrouping2() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ UnsortedGrouping<Tuple5<Integer, Long, String, Long, Integer>> groupDs = env.fromCollection(emptyTupleData, tupleTypeInfo).groupBy(0);
+
+ // should not work, key out of tuple bounds
+ groupDs.minBy(-1);
+ }
+
+ /**
+ * This test validates that an index which is out of bounds throws an
+ * IndexOutOfBOundsExcpetion.
+ */
+ @Test(expected = IndexOutOfBoundsException.class)
+ public void testOutOfTupleBoundsGrouping3() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ UnsortedGrouping<Tuple5<Integer, Long, String, Long, Integer>> groupDs = env.fromCollection(emptyTupleData, tupleTypeInfo).groupBy(0);
+
+ // should not work, key out of tuple bounds
+ groupDs.minBy(1,2,3,4,-1);
+ }
+
+ /**
+ * Custom data type, for testing purposes.
+ */
+ public static class CustomType implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ public int myInt;
+ public long myLong;
+ public String myString;
+
+ public CustomType() {
+ };
+
+ public CustomType(int i, long l, String s) {
+ myInt = i;
+ myLong = l;
+ myString = s;
+ }
+
+ @Override
+ public String toString() {
+ return myInt + "," + myLong + "," + myString;
+ }
+ }
+
+}