You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/06/26 11:47:10 UTC
[45/53] [abbrv] git commit: [FLINK-760] Add distinct operator
[FLINK-760] Add distinct operator
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/e2aabd90
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/e2aabd90
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/e2aabd90
Branch: refs/heads/travis_test
Commit: e2aabd90604d5880ef8e7b82ce58de218ddd48a7
Parents: 6c827fb
Author: Markus Holzemer <ma...@gmx.de>
Authored: Tue May 27 11:24:49 2014 +0200
Committer: uce <u....@fu-berlin.de>
Committed: Wed Jun 25 15:45:29 2014 +0200
----------------------------------------------------------------------
.../java/eu/stratosphere/api/java/DataSet.java | 45 +++-
.../api/java/operators/DistinctOperator.java | 109 +++++++++-
.../api/java/operator/DistinctOperatorTest.java | 170 +++++++++++++++
.../test/javaApiOperators/DistinctITCase.java | 208 +++++++++++++++++++
4 files changed, 522 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e2aabd90/stratosphere-java/src/main/java/eu/stratosphere/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/DataSet.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/DataSet.java
index 0770f24..a077e9a 100644
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/DataSet.java
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/DataSet.java
@@ -36,6 +36,7 @@ import eu.stratosphere.api.java.operators.CrossOperator;
import eu.stratosphere.api.java.operators.CrossOperator.DefaultCross;
import eu.stratosphere.api.java.operators.CustomUnaryOperation;
import eu.stratosphere.api.java.operators.DataSink;
+import eu.stratosphere.api.java.operators.DistinctOperator;
import eu.stratosphere.api.java.operators.FilterOperator;
import eu.stratosphere.api.java.operators.FlatMapOperator;
import eu.stratosphere.api.java.operators.Grouping;
@@ -301,13 +302,45 @@ public abstract class DataSet<T> {
// distinct
// --------------------------------------------------------------------------------------------
-// public <K extends Comparable<K>> DistinctOperator<T> distinct(KeySelector<T, K> keyExtractor) {
-// return new DistinctOperator<T>(this, new Keys.SelectorFunctionKeys<T, K>(keyExtractor, getType()));
-// }
+ /**
+ * Returns a distinct set of a {@link DataSet} using a {@link KeySelector} function.
+ * <p/>
+ * The KeySelector function is called for each element of the DataSet and extracts a single key value on which the
+ * decision is made if two items are distinct or not.
+ *
+ * @param keyExtractor The KeySelector function which extracts the key values from the DataSet on which the
+ * distinction of the DataSet is decided.
+ * @return A DistinctOperator that represents the distinct DataSet.
+ */
+ public <K extends Comparable<K>> DistinctOperator<T> distinct(KeySelector<T, K> keyExtractor) {
+ return new DistinctOperator<T>(this, new Keys.SelectorFunctionKeys<T, K>(keyExtractor, getType()));
+ }
-// public DistinctOperator<T> distinct(int... fields) {
-// return new DistinctOperator<T>(this, new Keys.FieldPositionKeys<T>(fields, getType(), true));
-// }
+ /**
+ * Returns a distinct set of a {@link Tuple} {@link DataSet} using field position keys.
+ * <p/>
+ * The field position keys specify the fields of Tuples on which the decision is made if two Tuples are distinct or
+ * not.
+ * <p/>
+ * Note: Field position keys can only be specified for Tuple DataSets.
+ *
+ * @param fields One or more field positions on which the distinction of the DataSet is decided.
+ * @return A DistinctOperator that represents the distinct DataSet.
+ */
+ public DistinctOperator<T> distinct(int... fields) {
+ return new DistinctOperator<T>(this, new Keys.FieldPositionKeys<T>(fields, getType(), true));
+ }
+
+ /**
+ * Returns a distinct set of a {@link Tuple} {@link DataSet} using all fields of the tuple.
+ * <p/>
+ * Note: This operator can only be applied to Tuple DataSets.
+ *
+ * @return A DistinctOperator that represents the distinct DataSet.
+ */
+ public DistinctOperator<T> distinct() {
+ return new DistinctOperator<T>(this, null);
+ }
// --------------------------------------------------------------------------------------------
// Grouping
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e2aabd90/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/DistinctOperator.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/DistinctOperator.java b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/DistinctOperator.java
index 87c99de..ed1d1a5 100644
--- a/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/DistinctOperator.java
+++ b/stratosphere-java/src/main/java/eu/stratosphere/api/java/operators/DistinctOperator.java
@@ -14,8 +14,23 @@
**********************************************************************************************************************/
package eu.stratosphere.api.java.operators;
+import java.util.Iterator;
+
+import eu.stratosphere.api.common.InvalidProgramException;
+import eu.stratosphere.api.common.functions.GenericGroupReduce;
+import eu.stratosphere.api.common.functions.GenericMap;
import eu.stratosphere.api.common.operators.Operator;
+import eu.stratosphere.api.common.operators.UnaryOperatorInformation;
+import eu.stratosphere.api.common.operators.base.GroupReduceOperatorBase;
+import eu.stratosphere.api.common.operators.base.MapOperatorBase;
import eu.stratosphere.api.java.DataSet;
+import eu.stratosphere.api.java.functions.GroupReduceFunction;
+import eu.stratosphere.api.java.operators.translation.KeyExtractingMapper;
+import eu.stratosphere.api.java.operators.translation.PlanUnwrappingReduceGroupOperator;
+import eu.stratosphere.api.java.tuple.Tuple2;
+import eu.stratosphere.api.java.typeutils.TupleTypeInfo;
+import eu.stratosphere.types.TypeInformation;
+import eu.stratosphere.util.Collector;
/**
* This operator represents the application of a "distinct" function on a data set, and the
@@ -25,21 +40,107 @@ import eu.stratosphere.api.java.DataSet;
*/
public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOperator<T>> {
- @SuppressWarnings("unused")
private final Keys<T> keys;
public DistinctOperator(DataSet<T> input, Keys<T> keys) {
super(input, input.getType());
+ // if keys is null distinction is done on all tuple fields
if (keys == null) {
- throw new NullPointerException();
+ if (input.getType().isTupleType()) {
+
+ TupleTypeInfo<?> tupleType = (TupleTypeInfo<?>) input.getType();
+ int[] allFields = new int[tupleType.getArity()];
+ for(int i = 0; i < tupleType.getArity(); i++) {
+ allFields[i] = i;
+ }
+ keys = new Keys.FieldPositionKeys<T>(allFields, input.getType(), true);
+ }
+ else {
+ throw new InvalidProgramException("Distinction on all fields is only possible on tuple data types.");
+ }
+ }
+
+
+ // FieldPositionKeys can only be applied on Tuples
+ if (keys instanceof Keys.FieldPositionKeys && !input.getType().isTupleType()) {
+ throw new InvalidProgramException("Distinction on field positions is only possible on tuple data types.");
}
this.keys = keys;
}
@Override
- protected eu.stratosphere.api.common.operators.SingleInputOperator<T, T, ?> translateToDataFlow(Operator<T> input) {
- throw new UnsupportedOperationException("NOT IMPLEMENTED");
+ protected eu.stratosphere.api.common.operators.base.GroupReduceOperatorBase<?, T, ?> translateToDataFlow(Operator<T> input) {
+
+ GroupReduceFunction<T, T> function = new DistinctFunction<T>();
+ String name = function.getClass().getName();
+
+ if (keys instanceof Keys.FieldPositionKeys) {
+
+ int[] logicalKeyPositions = keys.computeLogicalKeyPositions();
+ UnaryOperatorInformation<T, T> operatorInfo = new UnaryOperatorInformation<T, T>(getInputType(), getResultType());
+ GroupReduceOperatorBase<T, T, GenericGroupReduce<T, T>> po =
+ new GroupReduceOperatorBase<T, T, GenericGroupReduce<T, T>>(function, operatorInfo, logicalKeyPositions, name);
+
+ po.setCombinable(true);
+ po.setInput(input);
+ po.setDegreeOfParallelism(this.getParallelism());
+
+ return po;
+ }
+ else if (keys instanceof Keys.SelectorFunctionKeys) {
+
+ @SuppressWarnings("unchecked")
+ Keys.SelectorFunctionKeys<T, ?> selectorKeys = (Keys.SelectorFunctionKeys<T, ?>) keys;
+
+ PlanUnwrappingReduceGroupOperator<T, T, ?> po = translateSelectorFunctionDistinct(
+ selectorKeys, function, getInputType(), getResultType(), name, input, true);
+
+ po.setDegreeOfParallelism(this.getParallelism());
+
+ return po;
+ }
+ else {
+ throw new UnsupportedOperationException("Unrecognized key type.");
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ private static <IN, OUT, K> PlanUnwrappingReduceGroupOperator<IN, OUT, K> translateSelectorFunctionDistinct(
+ Keys.SelectorFunctionKeys<IN, ?> rawKeys, GroupReduceFunction<IN, OUT> function,
+ TypeInformation<IN> inputType, TypeInformation<OUT> outputType, String name, Operator<IN> input,
+ boolean combinable)
+ {
+ @SuppressWarnings("unchecked")
+ final Keys.SelectorFunctionKeys<IN, K> keys = (Keys.SelectorFunctionKeys<IN, K>) rawKeys;
+
+ TypeInformation<Tuple2<K, IN>> typeInfoWithKey = new TupleTypeInfo<Tuple2<K, IN>>(keys.getKeyType(), inputType);
+
+ KeyExtractingMapper<IN, K> extractor = new KeyExtractingMapper<IN, K>(keys.getKeyExtractor());
+
+ PlanUnwrappingReduceGroupOperator<IN, OUT, K> reducer = new PlanUnwrappingReduceGroupOperator<IN, OUT, K>(function, keys, name, outputType, typeInfoWithKey, combinable);
+
+ MapOperatorBase<IN, Tuple2<K, IN>, GenericMap<IN, Tuple2<K, IN>>> mapper = new MapOperatorBase<IN, Tuple2<K, IN>, GenericMap<IN, Tuple2<K, IN>>>(extractor, new UnaryOperatorInformation<IN, Tuple2<K, IN>>(inputType, typeInfoWithKey), "Key Extractor");
+
+ reducer.setInput(mapper);
+ mapper.setInput(input);
+
+ // set the mapper's parallelism to the input parallelism to make sure it is chained
+ mapper.setDegreeOfParallelism(input.getDegreeOfParallelism());
+
+ return reducer;
+ }
+
+ public static final class DistinctFunction<T> extends GroupReduceFunction<T, T> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void reduce(Iterator<T> values, Collector<T> out)
+ throws Exception {
+ out.collect(values.next());
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e2aabd90/stratosphere-java/src/test/java/eu/stratosphere/api/java/operator/DistinctOperatorTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-java/src/test/java/eu/stratosphere/api/java/operator/DistinctOperatorTest.java b/stratosphere-java/src/test/java/eu/stratosphere/api/java/operator/DistinctOperatorTest.java
new file mode 100644
index 0000000..a520812
--- /dev/null
+++ b/stratosphere-java/src/test/java/eu/stratosphere/api/java/operator/DistinctOperatorTest.java
@@ -0,0 +1,170 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+package eu.stratosphere.api.java.operator;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.junit.Test;
+
+import eu.stratosphere.api.common.InvalidProgramException;
+import eu.stratosphere.api.java.DataSet;
+import eu.stratosphere.api.java.ExecutionEnvironment;
+import eu.stratosphere.api.java.functions.KeySelector;
+import eu.stratosphere.api.java.tuple.Tuple5;
+import eu.stratosphere.api.java.typeutils.BasicTypeInfo;
+import eu.stratosphere.api.java.typeutils.TupleTypeInfo;
+
+public class DistinctOperatorTest {
+
+ // TUPLE DATA
+ private final List<Tuple5<Integer, Long, String, Long, Integer>> emptyTupleData =
+ new ArrayList<Tuple5<Integer, Long, String, Long, Integer>>();
+
+ private final TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>> tupleTypeInfo = new
+ TupleTypeInfo<Tuple5<Integer, Long, String, Long, Integer>>(
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.LONG_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.LONG_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO
+ );
+
+ // LONG DATA
+ private final List<Long> emptyLongData = new ArrayList<Long>();
+
+ private final List<CustomType> customTypeData = new ArrayList<CustomType>();
+
+ @Test
+ public void testDistinctByKeyFields1() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ // should work
+ try {
+ tupleDs.distinct(0);
+ } catch(Exception e) {
+ Assert.fail();
+ }
+ }
+
+ @Test(expected = InvalidProgramException.class)
+ public void testDistinctByKeyFields2() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Long> longDs = env.fromCollection(emptyLongData, BasicTypeInfo.LONG_TYPE_INFO);
+ // should not work: distinct on basic type
+ longDs.distinct(0);
+ }
+
+ @Test(expected = InvalidProgramException.class)
+ public void testDistinctByKeyFields3() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ this.customTypeData.add(new CustomType());
+
+ DataSet<CustomType> customDs = env.fromCollection(customTypeData);
+ // should not work: distinct on custom type
+ customDs.distinct(0);
+
+ }
+
+ @Test
+ public void testDistinctByKeyFields4() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ // should work
+ tupleDs.distinct();
+ }
+
+ @Test(expected = InvalidProgramException.class)
+ public void testDistinctByKeyFields5() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ this.customTypeData.add(new CustomType());
+
+ DataSet<CustomType> customDs = env.fromCollection(customTypeData);
+
+ // should not work, distinct without selector on custom types
+ customDs.distinct();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testDistinctByKeyFields6() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
+
+ // should not work, negative field position
+ tupleDs.distinct(-1);
+ }
+
+ @Test
+ @SuppressWarnings("serial")
+ public void testDistinctByKeySelector1() {
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ this.customTypeData.add(new CustomType());
+
+ try {
+ DataSet<CustomType> customDs = env.fromCollection(customTypeData);
+ // should work
+ customDs.distinct(
+ new KeySelector<DistinctOperatorTest.CustomType, Long>() {
+
+ @Override
+ public Long getKey(CustomType value) {
+ return value.myLong;
+ }
+ });
+ } catch(Exception e) {
+ Assert.fail();
+ }
+
+ }
+
+
+ public static class CustomType implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ public int myInt;
+ public long myLong;
+ public String myString;
+
+ public CustomType() {};
+
+ public CustomType(int i, long l, String s) {
+ myInt = i;
+ myLong = l;
+ myString = s;
+ }
+
+ @Override
+ public String toString() {
+ return myInt+","+myLong+","+myString;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e2aabd90/stratosphere-tests/src/test/java/eu/stratosphere/test/javaApiOperators/DistinctITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/javaApiOperators/DistinctITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/javaApiOperators/DistinctITCase.java
new file mode 100644
index 0000000..11fcf97
--- /dev/null
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/javaApiOperators/DistinctITCase.java
@@ -0,0 +1,208 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+package eu.stratosphere.test.javaApiOperators;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.LinkedList;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import eu.stratosphere.api.java.DataSet;
+import eu.stratosphere.api.java.ExecutionEnvironment;
+import eu.stratosphere.api.java.functions.KeySelector;
+import eu.stratosphere.api.java.functions.MapFunction;
+import eu.stratosphere.api.java.tuple.Tuple1;
+import eu.stratosphere.api.java.tuple.Tuple3;
+import eu.stratosphere.api.java.tuple.Tuple5;
+import eu.stratosphere.configuration.Configuration;
+import eu.stratosphere.test.javaApiOperators.util.CollectionDataSets;
+import eu.stratosphere.test.javaApiOperators.util.CollectionDataSets.CustomType;
+import eu.stratosphere.test.util.JavaProgramTestBase;
+
+@SuppressWarnings("serial")
+@RunWith(Parameterized.class)
+public class DistinctITCase extends JavaProgramTestBase {
+
+ private static int NUM_PROGRAMS = 5;
+
+ private int curProgId = config.getInteger("ProgramId", -1);
+ private String resultPath;
+ private String expectedResult;
+
+ public DistinctITCase(Configuration config) {
+ super(config);
+ }
+
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = getTempDirPath("result");
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+ expectedResult = DistinctProgs.runProgram(curProgId, resultPath);
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(expectedResult, resultPath);
+ }
+
+ @Parameters
+ public static Collection<Object[]> getConfigurations() throws IOException {
+ LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+
+ for(int i=1; i <= NUM_PROGRAMS; i++) {
+ Configuration config = new Configuration();
+ config.setInteger("ProgramId", i);
+ tConfigs.add(config);
+ }
+
+ return toParameterList(tConfigs);
+ }
+
+ private static class DistinctProgs {
+
+ public static String runProgram(int progId, String resultPath) throws Exception {
+
+ switch(progId) {
+ case 1: {
+
+ /*
+ * check correctness of distinct on tuples with key field selector
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> distinctDs = ds.union(ds).distinct(0, 1, 2);
+
+ distinctDs.writeAsCsv(resultPath);
+ env.execute();
+
+ // return expected result
+ return "1,1,Hi\n" +
+ "2,2,Hello\n" +
+ "3,2,Hello world\n";
+ }
+ case 2: {
+
+ /*
+ * check correctness of distinct on tuples with key field selector with not all fields selected
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
+ DataSet<Tuple1<Integer>> distinctDs = ds.union(ds).distinct(0).project(0).types(Integer.class);
+
+ distinctDs.writeAsCsv(resultPath);
+ env.execute();
+
+ // return expected result
+ return "1\n" +
+ "2\n";
+ }
+ case 3: {
+
+ /*
+ * check correctness of distinct on tuples with key extractor
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
+ DataSet<Tuple1<Integer>> reduceDs = ds.union(ds)
+ .distinct(new KeySelector<Tuple5<Integer, Long, Integer, String, Long>, Integer>() {
+ private static final long serialVersionUID = 1L;
+ @Override
+ public Integer getKey(Tuple5<Integer, Long, Integer, String, Long> in) {
+ return in.f0;
+ }
+ }).project(0).types(Integer.class);
+
+ reduceDs.writeAsCsv(resultPath);
+ env.execute();
+
+ // return expected result
+ return "1\n" +
+ "2\n";
+
+ }
+ case 4: {
+
+ /*
+ * check correctness of distinct on custom type with type extractor
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
+ DataSet<Tuple1<Integer>> reduceDs = ds
+ .distinct(new KeySelector<CustomType, Integer>() {
+ private static final long serialVersionUID = 1L;
+ @Override
+ public Integer getKey(CustomType in) {
+ return in.myInt;
+ }
+ })
+ .map(new MapFunction<CollectionDataSets.CustomType, Tuple1<Integer>>() {
+ @Override
+ public Tuple1<Integer> map(CustomType value) throws Exception {
+ return new Tuple1<Integer>(value.myInt);
+ }
+ });
+
+ reduceDs.writeAsCsv(resultPath);
+ env.execute();
+
+ // return expected result
+ return "1\n" +
+ "2\n" +
+ "3\n" +
+ "4\n" +
+ "5\n" +
+ "6\n";
+
+ }
+ case 5: {
+
+ /*
+ * check correctness of distinct on tuples
+ */
+
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Tuple3<Integer, Long, String>> distinctDs = ds.union(ds).distinct();
+
+ distinctDs.writeAsCsv(resultPath);
+ env.execute();
+
+ // return expected result
+ return "1,1,Hi\n" +
+ "2,2,Hello\n" +
+ "3,2,Hello world\n";
+ }
+ default:
+ throw new IllegalArgumentException("Invalid program id");
+ }
+ }
+ }
+}