You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2014/09/08 16:18:53 UTC

[2/2] git commit: [FLINK-925] Extended for distinct operator and added test cases

[FLINK-925] Extended for distinct operator and added test cases

This closes #59


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/122c9b02
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/122c9b02
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/122c9b02

Branch: refs/heads/master
Commit: 122c9b023cc5f9fcd5cb4914bd90afde0b3c6fc0
Parents: fb3bdea
Author: Fabian Hueske <fh...@apache.org>
Authored: Mon Sep 8 13:34:15 2014 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Mon Sep 8 16:17:47 2014 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/api/java/DataSet.java |  2 +-
 .../apache/flink/api/java/operators/Keys.java   |  5 ++
 .../flink/api/java/typeutils/TupleTypeInfo.java | 18 +++-
 .../flink/api/java/operator/GroupingTest.java   | 66 ++++++++++++--
 .../test/javaApiOperators/CoGroupITCase.java    | 92 +++++++++++++++++++-
 .../test/javaApiOperators/DistinctITCase.java   | 41 ++++++++-
 .../javaApiOperators/GroupReduceITCase.java     | 36 +++++++-
 .../flink/test/javaApiOperators/JoinITCase.java | 42 ++++++++-
 .../test/javaApiOperators/ReduceITCase.java     | 40 ++++++++-
 9 files changed, 324 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/122c9b02/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index 4688349..de86eee 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -399,7 +399,7 @@ public abstract class DataSet<T> {
 	 *                     distinction of the DataSet is decided.
 	 * @return A DistinctOperator that represents the distinct DataSet.
 	 */
-	public <K extends Comparable<K>> DistinctOperator<T> distinct(KeySelector<T, K> keyExtractor) {
+	public <K> DistinctOperator<T> distinct(KeySelector<T, K> keyExtractor) {
 		return new DistinctOperator<T>(this, new Keys.SelectorFunctionKeys<T, K>(keyExtractor, getType()));
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/122c9b02/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
index 2fdb520..8019cc8 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
@@ -134,6 +134,11 @@ public abstract class Keys<T> {
 			
 			this.keyExtractor = keyExtractor;
 			this.keyType = TypeExtractor.getKeySelectorTypes(keyExtractor, type);
+			
+			if (!this.keyType.isKeyType()) {
+				throw new IllegalArgumentException("Invalid type of KeySelector keys");
+			}
+			
 		}
 
 		public TypeInformation<K> getKeyType() {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/122c9b02/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
index 94d3252..b9dce11 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
@@ -95,7 +95,7 @@ public class TupleTypeInfo<T extends Tuple> extends TypeInformation<T> implement
 	
 	@Override
 	public boolean isKeyType() {
-		return false;
+		return this.isValidKeyType(this);
 	}
 	
 	@Override
@@ -228,6 +228,22 @@ public class TupleTypeInfo<T extends Tuple> extends TypeInformation<T> implement
 		return tupleInfo;
 	}
 	
+	private boolean isValidKeyType(TypeInformation<?> typeInfo) {
+		if(typeInfo instanceof TupleTypeInfo) {
+			TupleTypeInfo<?> tupleType = ((TupleTypeInfo<?>)typeInfo);
+			for(int i=0;i<tupleType.getArity();i++) {
+				if (!isValidKeyType(tupleType.getTypeAt(i))) {
+					return false;
+				}
+			}
+			return true;
+		} else if(typeInfo.isKeyType()) {
+				return true;
+		} else {
+			return false;
+		}
+	}
+	
 	// --------------------------------------------------------------------------------------------	
 	// The following lines are generated.
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/122c9b02/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
index f5cd7b9..34b2ac8 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/GroupingTest.java
@@ -22,20 +22,19 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.junit.Assert;
-
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.typeutils.BasicTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
 
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-
 public class GroupingTest {
 
 	// TUPLE DATA
@@ -189,7 +188,64 @@ public class GroupingTest {
 		} catch(Exception e) {
 			Assert.fail();
 		}
+	}
+	
+	@Test
+	@SuppressWarnings("serial")
+	public void testGroupByKeySelector2() {
 		
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		this.customTypeData.add(new CustomType());
+		
+		try {
+			DataSet<CustomType> customDs = env.fromCollection(customTypeData);
+			// should work
+			customDs.groupBy(
+					new KeySelector<GroupingTest.CustomType, Tuple2<Integer, Long>>() {
+						@Override
+						public Tuple2<Integer,Long> getKey(CustomType value) {
+							return new Tuple2<Integer, Long>(value.myInt, value.myLong);
+					}
+			});
+		} catch(Exception e) {
+			Assert.fail();
+		}
+	}
+	
+	@Test(expected=IllegalArgumentException.class)
+	@SuppressWarnings("serial")
+	public void testGroupByKeySelector3() {
+		
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		this.customTypeData.add(new CustomType());
+		
+		DataSet<CustomType> customDs = env.fromCollection(customTypeData);
+		// should not work
+		customDs.groupBy(
+				new KeySelector<GroupingTest.CustomType, CustomType>() {
+					@Override
+					public CustomType getKey(CustomType value) {
+						return value;
+				}
+		});
+	}
+	
+	@Test(expected=IllegalArgumentException.class)
+	@SuppressWarnings("serial")
+	public void testGroupByKeySelector4() {
+		
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		this.customTypeData.add(new CustomType());
+		
+		DataSet<CustomType> customDs = env.fromCollection(customTypeData);
+		// should not work
+		customDs.groupBy(
+				new KeySelector<GroupingTest.CustomType, Tuple2<Integer, GroupingTest.CustomType>>() {
+					@Override
+					public Tuple2<Integer, CustomType> getKey(CustomType value) {
+						return new Tuple2<Integer, CustomType>(value.myInt, value);
+				}
+		});
 	}
 	
 	@Test

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/122c9b02/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
index d59d721..f0229cb 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CoGroupITCase.java
@@ -20,10 +20,14 @@ package org.apache.flink.test.javaApiOperators;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.LinkedList;
+import java.util.List;
 
 import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.functions.RichCoGroupFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -37,13 +41,11 @@ import org.apache.flink.util.Collector;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
 
 @RunWith(Parameterized.class)
 public class CoGroupITCase extends JavaProgramTestBase {
 	
-	private static int NUM_PROGRAMS = 7;
+	private static int NUM_PROGRAMS = 9;
 	
 	private int curProgId = config.getInteger("ProgramId", -1);
 	private String resultPath;
@@ -293,6 +295,67 @@ public class CoGroupITCase extends JavaProgramTestBase {
 						"14,5,test\n"; 
 				
 			}
+			case 8: {
+				/*
+				 * CoGroup with multiple key fields
+				 */
+				
+				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+				
+				DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = CollectionDataSets.get5TupleDataSet(env);
+				DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
+				
+				DataSet<Tuple3<Integer, Long, String>> coGrouped = ds1.coGroup(ds2).
+						where(0,4).equalTo(0,1).with(new Tuple5Tuple3CoGroup());
+				
+				coGrouped.writeAsCsv(resultPath);
+				env.execute();
+				
+				return "1,1,Hallo\n" +
+						"2,2,Hallo Welt\n" +
+						"3,2,Hallo Welt wie gehts?\n" +
+						"3,2,ABC\n" +
+						"5,3,HIJ\n" +
+						"5,3,IJK\n";
+			}
+			case 9: {
+				/*
+				 * CoGroup with multiple key fields
+				 */
+				
+				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+				
+				DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds1 = CollectionDataSets.get5TupleDataSet(env);
+				DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
+				
+				DataSet<Tuple3<Integer, Long, String>> coGrouped = ds1.coGroup(ds2).
+						where(new KeySelector<Tuple5<Integer,Long,Integer,String,Long>, Tuple2<Integer, Long>>() {
+							private static final long serialVersionUID = 1L;
+							
+							@Override
+							public Tuple2<Integer, Long> getKey(Tuple5<Integer,Long,Integer,String,Long> t) {
+								return new Tuple2<Integer, Long>(t.f0, t.f4);
+							}
+						}).
+						equalTo(new KeySelector<Tuple3<Integer,Long,String>, Tuple2<Integer, Long>>() {
+							private static final long serialVersionUID = 1L;
+							
+							@Override
+							public Tuple2<Integer, Long> getKey(Tuple3<Integer,Long,String> t) {
+								return new Tuple2<Integer, Long>(t.f0, t.f1);
+							}
+						}).with(new Tuple5Tuple3CoGroup());
+				
+				coGrouped.writeAsCsv(resultPath);
+				env.execute();
+				
+				return "1,1,Hallo\n" +
+						"2,2,Hallo Welt\n" +
+						"3,2,Hallo Welt wie gehts?\n" +
+						"3,2,ABC\n" +
+						"5,3,HIJ\n" +
+						"5,3,IJK\n";
+			}
 			default: 
 				throw new IllegalArgumentException("Invalid program id");
 			}
@@ -481,4 +544,27 @@ public class CoGroupITCase extends JavaProgramTestBase {
 			out.collect(new Tuple3<Integer, Integer, Integer>(id, sum, broadcast));
 		}
 	}
+	
+	public static class Tuple5Tuple3CoGroup implements CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
+		
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void coGroup(Iterable<Tuple5<Integer, Long, Integer, String, Long>> first,
+				Iterable<Tuple3<Integer, Long, String>> second,
+				Collector<Tuple3<Integer, Long, String>> out)
+		{
+			List<String> strs = new ArrayList<String>();
+			
+			for (Tuple5<Integer, Long, Integer, String, Long> t : first) {
+				strs.add(t.f3);
+			}
+			
+			for(Tuple3<Integer, Long, String> t : second) {
+				for(String s : strs) {
+					out.collect(new Tuple3<Integer, Long, String>(t.f0, t.f1, s));
+				}
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/122c9b02/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
index 0c6f3cc..6e5cd9b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
@@ -22,9 +22,12 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.LinkedList;
 
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.configuration.Configuration;
@@ -34,14 +37,12 @@ import org.apache.flink.test.util.JavaProgramTestBase;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
 
 @SuppressWarnings("serial")
 @RunWith(Parameterized.class)
 public class DistinctITCase extends JavaProgramTestBase {
 	
-	private static int NUM_PROGRAMS = 5;
+	private static int NUM_PROGRAMS = 6;
 	
 	private int curProgId = config.getInteger("ProgramId", -1);
 	private String resultPath;
@@ -202,6 +203,40 @@ public class DistinctITCase extends JavaProgramTestBase {
 						"2,2,Hello\n" +
 						"3,2,Hello world\n";
 			}
+			case 6: {
+				
+				/*
+				 * check correctness of distinct on custom type with tuple-returning type extractor
+				 */
+				
+				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+				
+				DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
+				DataSet<Tuple2<Integer, Long>> reduceDs = ds
+						.distinct(new KeySelector<Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, Long>>() {
+									private static final long serialVersionUID = 1L;
+									@Override
+									public Tuple2<Integer,Long> getKey(Tuple5<Integer, Long, Integer, String, Long> t) {
+										return new Tuple2<Integer, Long>(t.f0, t.f4);
+									}
+								})
+						.project(0,4).types(Integer.class, Long.class);
+				
+				reduceDs.writeAsCsv(resultPath);
+				env.execute();
+				
+				// return expected result
+				return "1,1\n" +
+						"2,1\n" +
+						"2,2\n" +
+						"3,2\n" +
+						"3,3\n" +
+						"4,1\n" +
+						"4,2\n" +
+						"5,1\n" +
+						"5,2\n" +
+						"5,3\n";
+			}
 			default: 
 				throw new IllegalArgumentException("Invalid program id");
 			}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/122c9b02/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
index bd10c5e..2e00d32 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/GroupReduceITCase.java
@@ -48,7 +48,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 @RunWith(Parameterized.class)
 public class GroupReduceITCase extends JavaProgramTestBase {
 	
-	private static int NUM_PROGRAMS = 13;
+	private static int NUM_PROGRAMS = 14;
 	
 	private int curProgId = config.getInteger("ProgramId", -1);
 	private String resultPath;
@@ -411,6 +411,40 @@ public class GroupReduceITCase extends JavaProgramTestBase {
 							"111,6,Comment#15-Comment#14-Comment#13-Comment#12-Comment#11-Comment#10\n";
 
 				}
+				case 14: {
+					/*
+					 * check correctness of groupReduce on tuples with tuple-returning key selector
+					 */
+
+						final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+						DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
+						DataSet<Tuple5<Integer, Long, Integer, String, Long>> reduceDs = ds.
+								groupBy(
+										new KeySelector<Tuple5<Integer,Long,Integer,String,Long>, Tuple2<Integer, Long>>() {
+											private static final long serialVersionUID = 1L;
+				
+											@Override
+											public Tuple2<Integer, Long> getKey(Tuple5<Integer,Long,Integer,String,Long> t) {
+												return new Tuple2<Integer, Long>(t.f0, t.f4);
+											}
+										}).reduceGroup(new Tuple5GroupReduce());
+
+						reduceDs.writeAsCsv(resultPath);
+						env.execute();
+
+						// return expected result
+						return "1,1,0,P-),1\n" +
+								"2,3,0,P-),1\n" +
+								"2,2,0,P-),2\n" +
+								"3,9,0,P-),2\n" +
+								"3,6,0,P-),3\n" +
+								"4,17,0,P-),1\n" +
+								"4,17,0,P-),2\n" +
+								"5,11,0,P-),1\n" +
+								"5,29,0,P-),2\n" +
+								"5,25,0,P-),3\n";
+					}
 				default: {
 					throw new IllegalArgumentException("Invalid program id");
 				}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/122c9b02/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
index a293cbf..45a5458 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
@@ -46,7 +46,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 @RunWith(Parameterized.class)
 public class JoinITCase extends JavaProgramTestBase {
 	
-	private static int NUM_PROGRAMS = 13;
+	private static int NUM_PROGRAMS = 14;
 	
 	private int curProgId = config.getInteger("ProgramId", -1);
 	private String resultPath;
@@ -453,6 +453,46 @@ public class JoinITCase extends JavaProgramTestBase {
 						"2,2,Hello world,2,2,Hello world\n";
 	
 			}
+			case 14: {
+				/*
+				 * UDF Join on tuples with tuple-returning key selectors
+				 */
+				
+				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+				
+				DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.get3TupleDataSet(env);
+				DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+				DataSet<Tuple2<String, String>> joinDs = 
+						ds1.join(ds2)
+						   .where(new KeySelector<Tuple3<Integer,Long,String>, Tuple2<Integer, Long>>() {
+								private static final long serialVersionUID = 1L;
+								
+								@Override
+								public Tuple2<Integer, Long> getKey(Tuple3<Integer,Long,String> t) {
+									return new Tuple2<Integer, Long>(t.f0, t.f1);
+								}
+							})
+						   .equalTo(new KeySelector<Tuple5<Integer,Long,Integer,String,Long>, Tuple2<Integer, Long>>() {
+								private static final long serialVersionUID = 1L;
+								
+								@Override
+								public Tuple2<Integer, Long> getKey(Tuple5<Integer,Long,Integer,String,Long> t) {
+									return new Tuple2<Integer, Long>(t.f0, t.f4);
+								}
+							})
+						   .with(new T3T5FlatJoin());
+				
+				joinDs.writeAsCsv(resultPath);
+				env.execute();
+				
+				// return expected result
+				return "Hi,Hallo\n" +
+						"Hello,Hallo Welt\n" +
+						"Hello world,Hallo Welt wie gehts?\n" +
+						"Hello world,ABC\n" +
+						"I am fine.,HIJ\n" +
+						"I am fine.,IJK\n";
+			}
 			default: 
 				throw new IllegalArgumentException("Invalid program id");
 			}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/122c9b02/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
index a296a09..fd7fc9f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ReduceITCase.java
@@ -24,8 +24,11 @@ import java.util.Collection;
 import java.util.LinkedList;
 
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.functions.RichReduceFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.configuration.Configuration;
@@ -35,13 +38,11 @@ import org.apache.flink.test.util.JavaProgramTestBase;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
 
 @RunWith(Parameterized.class)
 public class ReduceITCase extends JavaProgramTestBase {
 	
-	private static int NUM_PROGRAMS = 8;
+	private static int NUM_PROGRAMS = 9;
 	
 	private int curProgId = config.getInteger("ProgramId", -1);
 	private String resultPath;
@@ -271,6 +272,39 @@ public class ReduceITCase extends JavaProgramTestBase {
 						"65,5,Hi again!\n" +
 						"111,6,Hi again!\n";
 			}
+			case 9: {
+				/*
+				 * Reduce with a Tuple-returning KeySelector 
+				 */
+				
+				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+				
+				DataSet<Tuple5<Integer, Long,  Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
+				DataSet<Tuple5<Integer, Long,  Integer, String, Long>> reduceDs = ds .
+						groupBy(
+								new KeySelector<Tuple5<Integer,Long,Integer,String,Long>, Tuple2<Integer, Long>>() {
+									private static final long serialVersionUID = 1L;
+		
+									@Override
+									public Tuple2<Integer, Long> getKey(Tuple5<Integer,Long,Integer,String,Long> t) {
+										return new Tuple2<Integer, Long>(t.f0, t.f4);
+									}
+								}).reduce(new Tuple5Reduce());
+				
+				reduceDs.writeAsCsv(resultPath);
+				env.execute();
+				
+				return "1,1,0,Hallo,1\n" +
+						"2,3,2,Hallo Welt wie,1\n" +
+						"2,2,1,Hallo Welt,2\n" +
+						"3,9,0,P-),2\n" +
+						"3,6,5,BCD,3\n" +
+						"4,17,0,P-),1\n" +
+						"4,17,0,P-),2\n" +
+						"5,11,10,GHI,1\n" +
+						"5,29,0,P-),2\n" +
+						"5,25,0,P-),3\n";
+			}
 			default:
 				throw new IllegalArgumentException("Invalid program id");
 			}