You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2014/09/08 16:18:53 UTC
[2/2] git commit: [FLINK-925] Extended for distinct operator and
added test cases
[FLINK-925] Extended for distinct operator and added test cases
This closes #59
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/122c9b02
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/122c9b02
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/122c9b02
Branch: refs/heads/master
Commit: 122c9b023cc5f9fcd5cb4914bd90afde0b3c6fc0
Parents: fb3bdea
Author: Fabian Hueske <fh...@apache.org>
Authored: Mon Sep 8 13:34:15 2014 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Mon Sep 8 16:17:47 2014 +0200
----------------------------------------------------------------------
.../java/org/apache/flink/api/java/DataSet.java | 2 +-
.../apache/flink/api/java/operators/Keys.java | 5 ++
.../flink/api/java/typeutils/TupleTypeInfo.java | 18 +++-
.../flink/api/java/operator/GroupingTest.java | 66 ++++++++++++--
.../test/javaApiOperators/CoGroupITCase.java | 92 +++++++++++++++++++-
.../test/javaApiOperators/DistinctITCase.java | 41 ++++++++-
.../javaApiOperators/GroupReduceITCase.java | 36 +++++++-
.../flink/test/javaApiOperators/JoinITCase.java | 42 ++++++++-
.../test/javaApiOperators/ReduceITCase.java | 40 ++++++++-
9 files changed, 324 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/122c9b02/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index 4688349..de86eee 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -399,7 +399,7 @@ public abstract class DataSet<T> {
* distinction of the DataSet is decided.
* @return A DistinctOperator that represents the distinct DataSet.
*/
- public <K extends Comparable<K>> DistinctOperator<T> distinct(KeySelector<T, K> keyExtractor) {
+ public <K> DistinctOperator<T> distinct(KeySelector<T, K> keyExtractor) {
return new DistinctOperator<T>(this, new Keys.SelectorFunctionKeys<T, K>(keyExtractor, getType()));
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/122c9b02/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
index 2fdb520..8019cc8 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
@@ -134,6 +134,11 @@ public abstract class Keys<T> {
this.keyExtractor = keyExtractor;
this.keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, type);
+
+ if (!this.keyType.isKeyType()) {
+ throw new IllegalArgumentException("Invalid type of KeySelector keys");
+ }
+
}
public TypeInformation<K> getKeyType() {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/122c9b02/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
index 94d3252..b9dce11 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
@@ -95,7 +95,7 @@ public class TupleTypeInfo<T extends Tuple> extends TypeInformation<T> implement
@Override
public boolean isKeyType() {
- return false;
+ return this.isValidKeyType(this);
}
@Override
@@ -228,6 +228,22 @@ public class TupleTypeInfo<T extends Tuple> extends TypeInformation<T> implement
return tupleInfo;
}
+ private boolean isValidKeyType(TypeInformation<?> typeInfo) {
+ if(typeInfo instanceof TupleTypeInfo) {
+ TupleTypeInfo<?> tupleType = ((TupleTypeInfo<?>)typeInfo);
+ for(int i=0;i<tupleType.getArity();i++) {
+ if (!isValidKeyType(tupleType.getTypeAt(i))) {
+ return false;
+ }
+ }
+ return true;
+ } else if(typeInfo.isKeyType()) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
// --------------------------------------------------------------------------------------------
// The following lines are generated.
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/122c9b02/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
index f5cd7b9..34b2ac8 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
@@ -22,20 +22,19 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
-import org.junit.Assert;
-
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.typeutils.BasicTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-
public class GroupingTest {
// TUPLE DATA
@@ -189,7 +188,64 @@ public class GroupingTest {
} catch(Exception e) {
Assert.fail();
}
+ }
+
+ @Test
+ @SuppressWarnings("serial")
+ public void testGroupByKeySelector2() {
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ this.customTypeData.add(new CustomType());
+
+ try {
+ DataSet<CustomType> customDs = env.fromCollection(customTypeData);
+ // should work
+ customDs.groupBy(
+ new KeySelector<GroupingTest.CustomType, Tuple2<Integer, Long>>() {
+ @Override
+ public Tuple2<Integer,Long> getKey(CustomType value) {
+ return new Tuple2<Integer, Long>(value.myInt, value.myLong);
+ }
+ });
+ } catch(Exception e) {
+ Assert.fail();
+ }
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ @SuppressWarnings("serial")
+ public void testGroupByKeySelector3() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ this.customTypeData.add(new CustomType());
+
+ DataSet<CustomType> customDs = env.fromCollection(customTypeData);
+ // should not work
+ customDs.groupBy(
+ new KeySelector<GroupingTest.CustomType, CustomType>() {
+ @Override
+ public CustomType getKey(CustomType value) {
+ return value;
+ }
+ });
+ }
+
+ @Test(expected=IllegalArgumentException.class)
+ @SuppressWarnings("serial")
+ public void testGroupByKeySelector4() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ this.customTypeData.add(new CustomType());
+
+ DataSet<CustomType> customDs = env.fromCollection(customTypeData);
+ // should not work
+ customDs.groupBy(
+ new KeySelector<GroupingTest.CustomType, Tuple2<Integer, GroupingTest.CustomType>>() {
+ @Override
+ public Tuple2<Integer, CustomType> getKey(CustomType value) {
+ return new Tuple2<Integer, CustomType>(value.myInt, value);
+ }
+ });
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/122c9b02/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
index d59d721..f0229cb 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
@@ -20,10 +20,14 @@ package org.apache.flink.test.javaApiOperators;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
+import java.util.List;
import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.functions.RichCoGroupFunction;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -37,13 +41,11 @@ import org.apache.flink.util.Collector;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
@RunWith(Parameterized.class)
public class CoGroupITCase extends JavaProgramTestBase {
- private static int NUM_PROGRAMS = 7;
+ private static int NUM_PROGRAMS = 9;
private int curProgId = config.getInteger("ProgramId", -1);
private String resultPath;
@@ -293,6 +295,67 @@ public class CoGroupITCase extends JavaProgramTestBase {
"14,5,test\n";
}
+ case 8: {
+ /*
+ * CoGroup with multiple key fields
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = CollectionDataSets.get5TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
+
+ DataSet<Tuple3<Integer, Long, String>> coGrouped = ds1.coGroup(ds2).
+ where(0,4).equalTo(0,1).with(new Tuple5Tuple3CoGroup());
+
+ coGrouped.writeAsCsv(resultPath);
+ env.execute();
+
+ return "1,1,Hallo\n" +
+ "2,2,Hallo Welt\n" +
+ "3,2,Hallo Welt wie gehts?\n" +
+ "3,2,ABC\n" +
+ "5,3,HIJ\n" +
+ "5,3,IJK\n";
+ }
+ case 9: {
+ /*
+ * CoGroup with multiple key fields
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = CollectionDataSets.get5TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
+
+ DataSet<Tuple3<Integer, Long, String>> coGrouped = ds1.coGroup(ds2).
+ where(new KeySelector<Tuple5<Integer,Long,Integer,String,Long>, Tuple2<Integer, Long>>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<Integer, Long> getKey(Tuple5<Integer,Long,Integer,String,Long> t) {
+ return new Tuple2<Integer, Long>(t.f0, t.f4);
+ }
+ }).
+ equalTo(new KeySelector<Tuple3<Integer,Long,String>, Tuple2<Integer, Long>>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<Integer, Long> getKey(Tuple3<Integer,Long,String> t) {
+ return new Tuple2<Integer, Long>(t.f0, t.f1);
+ }
+ }).with(new Tuple5Tuple3CoGroup());
+
+ coGrouped.writeAsCsv(resultPath);
+ env.execute();
+
+ return "1,1,Hallo\n" +
+ "2,2,Hallo Welt\n" +
+ "3,2,Hallo Welt wie gehts?\n" +
+ "3,2,ABC\n" +
+ "5,3,HIJ\n" +
+ "5,3,IJK\n";
+ }
default:
throw new IllegalArgumentException("Invalid program id");
}
@@ -481,4 +544,27 @@ public class CoGroupITCase extends JavaProgramTestBase {
out.collect(new Tuple3<Integer, Integer, Integer>(id, sum, broadcast));
}
}
+
+ public static class Tuple5Tuple3CoGroup implements CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void coGroup(Iterable<Tuple5<Integer, Long, Integer, String, Long>> first,
+ Iterable<Tuple3<Integer, Long, String>> second,
+ Collector<Tuple3<Integer, Long, String>> out)
+ {
+ List<String> strs = new ArrayList<String>();
+
+ for (Tuple5<Integer, Long, Integer, String, Long> t : first) {
+ strs.add(t.f3);
+ }
+
+ for(Tuple3<Integer, Long, String> t : second) {
+ for(String s : strs) {
+ out.collect(new Tuple3<Integer, Long, String>(t.f0, t.f1, s));
+ }
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/122c9b02/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
index 0c6f3cc..6e5cd9b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
@@ -22,9 +22,12 @@ import java.io.IOException;
import java.util.Collection;
import java.util.LinkedList;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.configuration.Configuration;
@@ -34,14 +37,12 @@ import org.apache.flink.test.util.JavaProgramTestBase;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
@SuppressWarnings("serial")
@RunWith(Parameterized.class)
public class DistinctITCase extends JavaProgramTestBase {
- private static int NUM_PROGRAMS = 5;
+ private static int NUM_PROGRAMS = 6;
private int curProgId = config.getInteger("ProgramId", -1);
private String resultPath;
@@ -202,6 +203,40 @@ public class DistinctITCase extends JavaProgramTestBase {
"2,2,Hello\n" +
"3,2,Hello world\n";
}
+ case 6: {
+
+ /*
+ * check correctness of distinct on custom type with tuple-returning type extractor
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
+ DataSet<Tuple2<Integer, Long>> reduceDs = ds
+ .distinct(new KeySelector<Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, Long>>() {
+ private static final long serialVersionUID = 1L;
+ @Override
+ public Tuple2<Integer,Long> getKey(Tuple5<Integer, Long, Integer, String, Long> t) {
+ return new Tuple2<Integer, Long>(t.f0, t.f4);
+ }
+ })
+ .project(0,4).types(Integer.class, Long.class);
+
+ reduceDs.writeAsCsv(resultPath);
+ env.execute();
+
+ // return expected result
+ return "1,1\n" +
+ "2,1\n" +
+ "2,2\n" +
+ "3,2\n" +
+ "3,3\n" +
+ "4,1\n" +
+ "4,2\n" +
+ "5,1\n" +
+ "5,2\n" +
+ "5,3\n";
+ }
default:
throw new IllegalArgumentException("Invalid program id");
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/122c9b02/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
index bd10c5e..2e00d32 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
@@ -48,7 +48,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
@RunWith(Parameterized.class)
public class GroupReduceITCase extends JavaProgramTestBase {
- private static int NUM_PROGRAMS = 13;
+ private static int NUM_PROGRAMS = 14;
private int curProgId = config.getInteger("ProgramId", -1);
private String resultPath;
@@ -411,6 +411,40 @@ public class GroupReduceITCase extends JavaProgramTestBase {
"111,6,Comment#15-Comment#14-Comment#13-Comment#12-Comment#11-Comment#10\n";
}
+ case 14: {
+ /*
+ * check correctness of groupReduce on tuples with tuple-returning key selector
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds.
+ groupBy(
+ new KeySelector<Tuple5<Integer,Long,Integer,String,Long>, Tuple2<Integer, Long>>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<Integer, Long> getKey(Tuple5<Integer,Long,Integer,String,Long> t) {
+ return new Tuple2<Integer, Long>(t.f0, t.f4);
+ }
+ }).reduceGroup(new Tuple5GroupReduce());
+
+ reduceDs.writeAsCsv(resultPath);
+ env.execute();
+
+ // return expected result
+ return "1,1,0,P-),1\n" +
+ "2,3,0,P-),1\n" +
+ "2,2,0,P-),2\n" +
+ "3,9,0,P-),2\n" +
+ "3,6,0,P-),3\n" +
+ "4,17,0,P-),1\n" +
+ "4,17,0,P-),2\n" +
+ "5,11,0,P-),1\n" +
+ "5,29,0,P-),2\n" +
+ "5,25,0,P-),3\n";
+ }
default: {
throw new IllegalArgumentException("Invalid program id");
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/122c9b02/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
index a293cbf..45a5458 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
@@ -46,7 +46,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
@RunWith(Parameterized.class)
public class JoinITCase extends JavaProgramTestBase {
- private static int NUM_PROGRAMS = 13;
+ private static int NUM_PROGRAMS = 14;
private int curProgId = config.getInteger("ProgramId", -1);
private String resultPath;
@@ -453,6 +453,46 @@ public class JoinITCase extends JavaProgramTestBase {
"2,2,Hello world,2,2,Hello world\n";
}
+ case 14: {
+ /*
+ * UDF Join on tuples with tuple-returning key selectors
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.get3TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+ DataSet<Tuple2<String, String>> joinDs =
+ ds1.join(ds2)
+ .where(new KeySelector<Tuple3<Integer,Long,String>, Tuple2<Integer, Long>>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<Integer, Long> getKey(Tuple3<Integer,Long,String> t) {
+ return new Tuple2<Integer, Long>(t.f0, t.f1);
+ }
+ })
+ .equalTo(new KeySelector<Tuple5<Integer,Long,Integer,String,Long>, Tuple2<Integer, Long>>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<Integer, Long> getKey(Tuple5<Integer,Long,Integer,String,Long> t) {
+ return new Tuple2<Integer, Long>(t.f0, t.f4);
+ }
+ })
+ .with(new T3T5FlatJoin());
+
+ joinDs.writeAsCsv(resultPath);
+ env.execute();
+
+ // return expected result
+ return "Hi,Hallo\n" +
+ "Hello,Hallo Welt\n" +
+ "Hello world,Hallo Welt wie gehts?\n" +
+ "Hello world,ABC\n" +
+ "I am fine.,HIJ\n" +
+ "I am fine.,IJK\n";
+ }
default:
throw new IllegalArgumentException("Invalid program id");
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/122c9b02/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
index a296a09..fd7fc9f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
@@ -24,8 +24,11 @@ import java.util.Collection;
import java.util.LinkedList;
import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.functions.RichReduceFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.configuration.Configuration;
@@ -35,13 +38,11 @@ import org.apache.flink.test.util.JavaProgramTestBase;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
@RunWith(Parameterized.class)
public class ReduceITCase extends JavaProgramTestBase {
- private static int NUM_PROGRAMS = 8;
+ private static int NUM_PROGRAMS = 9;
private int curProgId = config.getInteger("ProgramId", -1);
private String resultPath;
@@ -271,6 +272,39 @@ public class ReduceITCase extends JavaProgramTestBase {
"65,5,Hi again!\n" +
"111,6,Hi again!\n";
}
+ case 9: {
+ /*
+ * Reduce with a Tuple-returning KeySelector
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds .
+ groupBy(
+ new KeySelector<Tuple5<Integer,Long,Integer,String,Long>, Tuple2<Integer, Long>>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<Integer, Long> getKey(Tuple5<Integer,Long,Integer,String,Long> t) {
+ return new Tuple2<Integer, Long>(t.f0, t.f4);
+ }
+ }).reduce(new Tuple5Reduce());
+
+ reduceDs.writeAsCsv(resultPath);
+ env.execute();
+
+ return "1,1,0,Hallo,1\n" +
+ "2,3,2,Hallo Welt wie,1\n" +
+ "2,2,1,Hallo Welt,2\n" +
+ "3,9,0,P-),2\n" +
+ "3,6,5,BCD,3\n" +
+ "4,17,0,P-),1\n" +
+ "4,17,0,P-),2\n" +
+ "5,11,10,GHI,1\n" +
+ "5,29,0,P-),2\n" +
+ "5,25,0,P-),3\n";
+ }
default:
throw new IllegalArgumentException("Invalid program id");
}