You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2014/06/25 15:47:41 UTC

git commit: [FLINK-760] Add distinct operator

Repository: incubator-flink
Updated Branches:
  refs/heads/master 6c827fb93 -> e2aabd906


[FLINK-760] Add distinct operator


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/e2aabd90
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/e2aabd90
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/e2aabd90

Branch: refs/heads/master
Commit: e2aabd90604d5880ef8e7b82ce58de218ddd48a7
Parents: 6c827fb
Author: Markus Holzemer <ma...@gmx.de>
Authored: Tue May 27 11:24:49 2014 +0200
Committer: uce <u....@fu-berlin.de>
Committed: Wed Jun 25 15:45:29 2014 +0200

----------------------------------------------------------------------
 .../java/eu/stratosphere/api/java/DataSet.java  |  45 +++-
 .../api/java/operators/DistinctOperator.java    | 109 +++++++++-
 .../api/java/operator/DistinctOperatorTest.java | 170 +++++++++++++++
 .../test/javaApiOperators/DistinctITCase.java   | 208 +++++++++++++++++++
 4 files changed, 522 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e2aabd90/stratosphere-java/src/main/java/eu/stratosphere/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/DataSet.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/DataSet.java
index 0770f24..a077e9a 100644
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/DataSet.java
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/DataSet.java
@@ -36,6 +36,7 @@ import eu.stratosphere.api.java.operators.CrossOperator;
 import eu.stratosphere.api.java.operators.CrossOperator.DefaultCross;
 import eu.stratosphere.api.java.operators.CustomUnaryOperation;
 import eu.stratosphere.api.java.operators.DataSink;
+import eu.stratosphere.api.java.operators.DistinctOperator;
 import eu.stratosphere.api.java.operators.FilterOperator;
 import eu.stratosphere.api.java.operators.FlatMapOperator;
 import eu.stratosphere.api.java.operators.Grouping;
@@ -301,13 +302,45 @@ public abstract class DataSet<T> {
 	//  distinct
 	// --------------------------------------------------------------------------------------------
 	
-//	public <K extends Comparable<K>> DistinctOperator<T> distinct(KeySelector<T, K> keyExtractor) {
-//		return new DistinctOperator<T>(this, new Keys.SelectorFunctionKeys<T, K>(keyExtractor, getType()));
-//	}
+	/**
+	 * Returns a distinct set of a {@link DataSet} using a {@link KeySelector} function.
+	 * <p/>
+	 * The KeySelector function is called for each element of the DataSet and extracts a single key value on which the
+	 * decision is made if two items are distinct or not.
+	 *  
+	 * @param keyExtractor The KeySelector function which extracts the key values from the DataSet on which the
+	 *                     distinction of the DataSet is decided.
+	 * @return A DistinctOperator that represents the distinct DataSet.
+	 */
+	public <K extends Comparable<K>> DistinctOperator<T> distinct(KeySelector<T, K> keyExtractor) {
+		return new DistinctOperator<T>(this, new Keys.SelectorFunctionKeys<T, K>(keyExtractor, getType()));
+	}
 	
-//	public DistinctOperator<T> distinct(int... fields) {
-//		return new DistinctOperator<T>(this, new Keys.FieldPositionKeys<T>(fields, getType(), true));
-//	}
+	/**
+	 * Returns a distinct set of a {@link Tuple} {@link DataSet} using field position keys.
+	 * <p/>
+	 * The field position keys specify the fields of Tuples on which the decision is made if two Tuples are distinct or
+	 * not.
+	 * <p/>
+	 * Note: Field position keys can only be specified for Tuple DataSets.
+	 *
+	 * @param fields One or more field positions on which the distinction of the DataSet is decided. 
+	 * @return A DistinctOperator that represents the distinct DataSet.
+	 */
+	public DistinctOperator<T> distinct(int... fields) {
+		return new DistinctOperator<T>(this, new Keys.FieldPositionKeys<T>(fields, getType(), true));
+	}
+	
+	/**
+	 * Returns a distinct set of a {@link Tuple} {@link DataSet} using all fields of the tuple.
+	 * <p/>
+	 * Note: This operator can only be applied to Tuple DataSets.
+	 * 
+	 * @return A DistinctOperator that represents the distinct DataSet.
+	 */
+	public DistinctOperator<T> distinct() {
+		return new DistinctOperator<T>(this, null);
+	}
 	
 	// --------------------------------------------------------------------------------------------
 	//  Grouping

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e2aabd90/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/DistinctOperator.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/DistinctOperator.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/DistinctOperator.java
index 87c99de..ed1d1a5 100644
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/DistinctOperator.java
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/DistinctOperator.java
@@ -14,8 +14,23 @@
  **********************************************************************************************************************/
 package eu.stratosphere.api.java.operators;
 
+import java.util.Iterator;
+
+import eu.stratosphere.api.common.InvalidProgramException;
+import eu.stratosphere.api.common.functions.GenericGroupReduce;
+import eu.stratosphere.api.common.functions.GenericMap;
 import eu.stratosphere.api.common.operators.Operator;
+import eu.stratosphere.api.common.operators.UnaryOperatorInformation;
+import eu.stratosphere.api.common.operators.base.GroupReduceOperatorBase;
+import eu.stratosphere.api.common.operators.base.MapOperatorBase;
 import eu.stratosphere.api.java.DataSet;
+import eu.stratosphere.api.java.functions.GroupReduceFunction;
+import eu.stratosphere.api.java.operators.translation.KeyExtractingMapper;
+import eu.stratosphere.api.java.operators.translation.PlanUnwrappingReduceGroupOperator;
+import eu.stratosphere.api.java.tuple.Tuple2;
+import eu.stratosphere.api.java.typeutils.TupleTypeInfo;
+import eu.stratosphere.types.TypeInformation;
+import eu.stratosphere.util.Collector;
 
 /**
  * This operator represents the application of a "distinct" function on a data set, and the
@@ -25,21 +40,107 @@ import eu.stratosphere.api.java.DataSet;
  */
 public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOperator<T>> {
 	
-	@SuppressWarnings("unused")
 	private final Keys<T> keys;
 	
 	public DistinctOperator(DataSet<T> input, Keys<T> keys) {
 		super(input, input.getType());
 		
+		// if keys is null distinction is done on all tuple fields
 		if (keys == null) {
-			throw new NullPointerException();
+			if (input.getType().isTupleType()) {
+				
+				TupleTypeInfo<?> tupleType = (TupleTypeInfo<?>) input.getType();
+				int[] allFields = new int[tupleType.getArity()];
+				for(int i = 0; i < tupleType.getArity(); i++) {
+					allFields[i] = i;
+				}
+				keys = new Keys.FieldPositionKeys<T>(allFields, input.getType(), true);
+			}
+			else {
+				throw new InvalidProgramException("Distinction on all fields is only possible on tuple data types.");
+			}
+		}
+		
+		
+		// FieldPositionKeys can only be applied on Tuples
+		if (keys instanceof Keys.FieldPositionKeys && !input.getType().isTupleType()) {
+			throw new InvalidProgramException("Distinction on field positions is only possible on tuple data types.");
 		}
 		
 		this.keys = keys;
 	}
 
 	@Override
-	protected eu.stratosphere.api.common.operators.SingleInputOperator<T, T, ?> translateToDataFlow(Operator<T> input) {
-		throw new UnsupportedOperationException("NOT IMPLEMENTED");
+	protected eu.stratosphere.api.common.operators.base.GroupReduceOperatorBase<?, T, ?> translateToDataFlow(Operator<T> input) {
+		
+		GroupReduceFunction<T, T> function = new DistinctFunction<T>();
+		String name = function.getClass().getName();
+		
+		if (keys instanceof Keys.FieldPositionKeys) {
+
+			int[] logicalKeyPositions = keys.computeLogicalKeyPositions();
+			UnaryOperatorInformation<T, T> operatorInfo = new UnaryOperatorInformation<T, T>(getInputType(), getResultType());
+			GroupReduceOperatorBase<T, T, GenericGroupReduce<T, T>> po =
+					new GroupReduceOperatorBase<T, T, GenericGroupReduce<T, T>>(function, operatorInfo, logicalKeyPositions, name);
+
+			po.setCombinable(true);
+			po.setInput(input);
+			po.setDegreeOfParallelism(this.getParallelism());
+			
+			return po;
+		}
+		else if (keys instanceof Keys.SelectorFunctionKeys) {
+		
+			@SuppressWarnings("unchecked")
+			Keys.SelectorFunctionKeys<T, ?> selectorKeys = (Keys.SelectorFunctionKeys<T, ?>) keys;
+			
+			PlanUnwrappingReduceGroupOperator<T, T, ?> po = translateSelectorFunctionDistinct(
+							selectorKeys, function, getInputType(), getResultType(), name, input, true);
+			
+			po.setDegreeOfParallelism(this.getParallelism());
+			
+			return po;
+		}
+		else {
+			throw new UnsupportedOperationException("Unrecognized key type.");
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	private static <IN, OUT, K> PlanUnwrappingReduceGroupOperator<IN, OUT, K> translateSelectorFunctionDistinct(
+			Keys.SelectorFunctionKeys<IN, ?> rawKeys, GroupReduceFunction<IN, OUT> function,
+			TypeInformation<IN> inputType, TypeInformation<OUT> outputType, String name, Operator<IN> input,
+			boolean combinable)
+	{
+		@SuppressWarnings("unchecked")
+		final Keys.SelectorFunctionKeys<IN, K> keys = (Keys.SelectorFunctionKeys<IN, K>) rawKeys;
+		
+		TypeInformation<Tuple2<K, IN>> typeInfoWithKey = new TupleTypeInfo<Tuple2<K, IN>>(keys.getKeyType(), inputType);
+		
+		KeyExtractingMapper<IN, K> extractor = new KeyExtractingMapper<IN, K>(keys.getKeyExtractor());
+		
+		PlanUnwrappingReduceGroupOperator<IN, OUT, K> reducer = new PlanUnwrappingReduceGroupOperator<IN, OUT, K>(function, keys, name, outputType, typeInfoWithKey, combinable);
+		
+		MapOperatorBase<IN, Tuple2<K, IN>, GenericMap<IN, Tuple2<K, IN>>> mapper = new MapOperatorBase<IN, Tuple2<K, IN>, GenericMap<IN, Tuple2<K, IN>>>(extractor, new UnaryOperatorInformation<IN, Tuple2<K, IN>>(inputType, typeInfoWithKey), "Key Extractor");
+
+		reducer.setInput(mapper);
+		mapper.setInput(input);
+		
+		// set the mapper's parallelism to the input parallelism to make sure it is chained
+		mapper.setDegreeOfParallelism(input.getDegreeOfParallelism());
+		
+		return reducer;
+	}
+	
+	public static final class DistinctFunction<T> extends GroupReduceFunction<T, T> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void reduce(Iterator<T> values, Collector<T> out)
+				throws Exception {
+			out.collect(values.next());
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e2aabd90/stratosphere-java/src/test/java/eu/stratosphere/api/java/operator/DistinctOperatorTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/test/java/eu/stratosphere/api/java/operator/DistinctOperatorTest.java b/stratosphere-java/src/test/java/eu/stratosphere/api/java/operator/DistinctOperatorTest.java
new file mode 100644
index 0000000..a520812
--- /dev/null
+++ b/stratosphere-java/src/test/java/eu/stratosphere/api/java/operator/DistinctOperatorTest.java
@@ -0,0 +1,170 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+package eu.stratosphere.api.java.operator;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.junit.Test;
+
+import eu.stratosphere.api.common.InvalidProgramException;
+import eu.stratosphere.api.java.DataSet;
+import eu.stratosphere.api.java.ExecutionEnvironment;
+import eu.stratosphere.api.java.functions.KeySelector;
+import eu.stratosphere.api.java.tuple.Tuple5;
+import eu.stratosphere.api.java.typeutils.BasicTypeInfo;
+import eu.stratosphere.api.java.typeutils.TupleTypeInfo;
+
+public class DistinctOperatorTest {
+
+	// TUPLE DATA
+	private final List<Tuple5<Integer, Long, String, Long, Integer>> emptyTupleData = 
+			new ArrayList<Tuple5<Integer, Long, String, Long, Integer>>();
+	
+	private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new 
+			TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>>(
+					BasicTypeInfo.INT_TYPE_INFO,
+					BasicTypeInfo.LONG_TYPE_INFO,
+					BasicTypeInfo.STRING_TYPE_INFO,
+					BasicTypeInfo.LONG_TYPE_INFO,
+					BasicTypeInfo.INT_TYPE_INFO
+			);
+	
+	// LONG DATA
+	private final List<Long> emptyLongData = new ArrayList<Long>();
+	
+	private final List<CustomType> customTypeData = new ArrayList<CustomType>();
+	
+	@Test  
+	public void testDistinctByKeyFields1() {
+		
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// should work
+		try {
+			tupleDs.distinct(0);
+		} catch(Exception e) {
+			Assert.fail();
+		}
+	}
+	
+	@Test(expected = InvalidProgramException.class)  
+	public void testDistinctByKeyFields2() {
+		
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Long> longDs = env.fromCollection(emptyLongData, BasicTypeInfo.LONG_TYPE_INFO);
+		// should not work: distinct on basic type
+		longDs.distinct(0);
+	}
+	
+	@Test(expected = InvalidProgramException.class)  
+	public void testDistinctByKeyFields3() {
+		
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		this.customTypeData.add(new CustomType());
+		
+		DataSet<CustomType> customDs = env.fromCollection(customTypeData);
+		// should not work: distinct on custom type
+		customDs.distinct(0);
+		
+	}
+	
+	@Test
+	public void testDistinctByKeyFields4() {
+		
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// should work
+		tupleDs.distinct();
+	}
+	
+	@Test(expected = InvalidProgramException.class)
+	public void testDistinctByKeyFields5() {
+		
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		this.customTypeData.add(new CustomType());
+		
+		DataSet<CustomType> customDs = env.fromCollection(customTypeData);
+
+		// should not work, distinct without selector on custom types
+		customDs.distinct();
+	}
+	
+	@Test(expected = IllegalArgumentException.class)
+	public void testDistinctByKeyFields6() {
+		
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+		// should not work, negative field position
+		tupleDs.distinct(-1);
+	}
+	
+	@Test
+	@SuppressWarnings("serial")
+	public void testDistinctByKeySelector1() {
+		
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		this.customTypeData.add(new CustomType());
+		
+		try {
+			DataSet<CustomType> customDs = env.fromCollection(customTypeData);
+			// should work
+			customDs.distinct(
+					new KeySelector<DistinctOperatorTest.CustomType, Long>() {
+	
+						@Override
+						public Long getKey(CustomType value) {
+							return value.myLong;
+					}
+			});
+		} catch(Exception e) {
+			Assert.fail();
+		}
+		
+	}
+	
+
+	public static class CustomType implements Serializable {
+		
+		private static final long serialVersionUID = 1L;
+		
+		public int myInt;
+		public long myLong;
+		public String myString;
+		
+		public CustomType() {};
+		
+		public CustomType(int i, long l, String s) {
+			myInt = i;
+			myLong = l;
+			myString = s;
+		}
+		
+		@Override
+		public String toString() {
+			return myInt+","+myLong+","+myString;
+		}
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e2aabd90/stratosphere-tests/src/test/java/eu/stratosphere/test/javaApiOperators/DistinctITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/javaApiOperators/DistinctITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/javaApiOperators/DistinctITCase.java
new file mode 100644
index 0000000..11fcf97
--- /dev/null
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/javaApiOperators/DistinctITCase.java
@@ -0,0 +1,208 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+package eu.stratosphere.test.javaApiOperators;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.LinkedList;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import eu.stratosphere.api.java.DataSet;
+import eu.stratosphere.api.java.ExecutionEnvironment;
+import eu.stratosphere.api.java.functions.KeySelector;
+import eu.stratosphere.api.java.functions.MapFunction;
+import eu.stratosphere.api.java.tuple.Tuple1;
+import eu.stratosphere.api.java.tuple.Tuple3;
+import eu.stratosphere.api.java.tuple.Tuple5;
+import eu.stratosphere.configuration.Configuration;
+import eu.stratosphere.test.javaApiOperators.util.CollectionDataSets;
+import eu.stratosphere.test.javaApiOperators.util.CollectionDataSets.CustomType;
+import eu.stratosphere.test.util.JavaProgramTestBase;
+
+@SuppressWarnings("serial")
+@RunWith(Parameterized.class)
+public class DistinctITCase extends JavaProgramTestBase {
+	
+	private static int NUM_PROGRAMS = 5;
+	
+	private int curProgId = config.getInteger("ProgramId", -1);
+	private String resultPath;
+	private String expectedResult;
+	
+	public DistinctITCase(Configuration config) {
+		super(config);
+	}
+	
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempDirPath("result");
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		expectedResult = DistinctProgs.runProgram(curProgId, resultPath);
+	}
+	
+	@Override
+	protected void postSubmit() throws Exception {
+		compareResultsByLinesInMemory(expectedResult, resultPath);
+	}
+	
+	@Parameters
+	public static Collection<Object[]> getConfigurations() throws IOException {
+		LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+
+		for(int i=1; i <= NUM_PROGRAMS; i++) {
+			Configuration config = new Configuration();
+			config.setInteger("ProgramId", i);
+			tConfigs.add(config);
+		}
+		
+		return toParameterList(tConfigs);
+	}
+	
+	private static class DistinctProgs {
+		
+		public static String runProgram(int progId, String resultPath) throws Exception {
+			
+			switch(progId) {
+			case 1: {
+				
+				/*
+				 * check correctness of distinct on tuples with key field selector
+				 */
+				
+				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+				
+				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
+				DataSet<Tuple3<Integer, Long, String>> distinctDs = ds.union(ds).distinct(0, 1, 2);
+				
+				distinctDs.writeAsCsv(resultPath);
+				env.execute();
+				
+				// return expected result
+				return "1,1,Hi\n" +
+						"2,2,Hello\n" +
+						"3,2,Hello world\n";
+			}
+			case 2: {
+				
+				/*
+				 * check correctness of distinct on tuples with key field selector with not all fields selected
+				 */
+				
+				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+				
+				DataSet<Tuple5<Integer, Long,  Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
+				DataSet<Tuple1<Integer>> distinctDs = ds.union(ds).distinct(0).project(0).types(Integer.class);
+				
+				distinctDs.writeAsCsv(resultPath);
+				env.execute();
+				
+				// return expected result
+				return "1\n" +
+						"2\n";
+			}
+			case 3: {
+				
+				/*
+				 * check correctness of distinct on tuples with key extractor
+				 */
+				
+				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+				
+				DataSet<Tuple5<Integer, Long,  Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
+				DataSet<Tuple1<Integer>> reduceDs = ds.union(ds)
+						.distinct(new KeySelector<Tuple5<Integer, Long,  Integer, String, Long>, Integer>() {
+									private static final long serialVersionUID = 1L;
+									@Override
+									public Integer getKey(Tuple5<Integer, Long,  Integer, String, Long> in) {
+										return in.f0;
+									}
+								}).project(0).types(Integer.class);
+				
+				reduceDs.writeAsCsv(resultPath);
+				env.execute();
+				
+				// return expected result
+				return "1\n" +
+						"2\n";
+								
+			}
+			case 4: {
+				
+				/*
+				 * check correctness of distinct on custom type with type extractor
+				 */
+				
+				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+				
+				DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
+				DataSet<Tuple1<Integer>> reduceDs = ds
+						.distinct(new KeySelector<CustomType, Integer>() {
+									private static final long serialVersionUID = 1L;
+									@Override
+									public Integer getKey(CustomType in) {
+										return in.myInt;
+									}
+								})
+						.map(new MapFunction<CollectionDataSets.CustomType, Tuple1<Integer>>() {
+							@Override
+							public Tuple1<Integer> map(CustomType value) throws Exception {
+								return new Tuple1<Integer>(value.myInt);
+							}
+						});
+				
+				reduceDs.writeAsCsv(resultPath);
+				env.execute();
+				
+				// return expected result
+				return "1\n" +
+						"2\n" +
+						"3\n" +
+						"4\n" +
+						"5\n" +
+						"6\n";
+				
+			}
+			case 5: {
+				
+				/*
+				 * check correctness of distinct on tuples
+				 */
+				
+				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+				
+				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
+				DataSet<Tuple3<Integer, Long, String>> distinctDs = ds.union(ds).distinct();
+				
+				distinctDs.writeAsCsv(resultPath);
+				env.execute();
+				
+				// return expected result
+				return "1,1,Hi\n" +
+						"2,2,Hello\n" +
+						"3,2,Hello world\n";
+			}
+			default: 
+				throw new IllegalArgumentException("Invalid program id");
+			}
+		}
+	}
+}