You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/12/01 10:13:23 UTC
incubator-flink git commit: [FLINK-984] Compiler tests for
distinct(). Adopted the old code from @markus-h
Repository: incubator-flink
Updated Branches:
refs/heads/master e46d14b4c -> e3eaac279
[FLINK-984] Compiler tests for distinct(). Adopted the old code from @markus-h
The code originates from https://github.com/apache/incubator-flink/pull/61.
The pull request also contained code to add POJO support to the distinct() operator. This has already been implemented in earlier work
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/e3eaac27
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/e3eaac27
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/e3eaac27
Branch: refs/heads/master
Commit: e3eaac2797e12d1d52395d51c55d14d5f57a9158
Parents: e46d14b
Author: Robert Metzger <rm...@apache.org>
Authored: Tue Nov 25 16:26:12 2014 +0100
Committer: Robert Metzger <rm...@apache.org>
Committed: Mon Dec 1 10:12:45 2014 +0100
----------------------------------------------------------------------
.../apache/flink/compiler/DOPChangeTest.java | 2 -
.../flink/compiler/DistinctCompilationTest.java | 206 ++++++++++++++++
.../api/java/operators/DistinctOperator.java | 20 +-
.../api/java/operator/DistinctOperatorTest.java | 6 +-
.../translation/DistrinctTranslationTest.java | 244 ++++++++++++++++++-
.../test/javaApiOperators/DistinctITCase.java | 24 +-
6 files changed, 480 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e3eaac27/flink-compiler/src/test/java/org/apache/flink/compiler/DOPChangeTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/DOPChangeTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/DOPChangeTest.java
index 6eb4c49..8b8828e 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/DOPChangeTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/DOPChangeTest.java
@@ -15,8 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-
package org.apache.flink.compiler;
import org.junit.Assert;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e3eaac27/flink-compiler/src/test/java/org/apache/flink/compiler/DistinctCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/DistinctCompilationTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/DistinctCompilationTest.java
new file mode 100644
index 0000000..bde22bf
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/DistinctCompilationTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.compiler;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.DistinctOperator;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.compiler.plan.OptimizedPlan;
+import org.apache.flink.compiler.plan.SingleInputPlanNode;
+import org.apache.flink.compiler.plan.SinkPlanNode;
+import org.apache.flink.compiler.plan.SourcePlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+
+@SuppressWarnings("serial")
+public class DistinctCompilationTest extends CompilerTestBase implements java.io.Serializable {
+
+ @Test
+ public void testDistinctPlain() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(8);
+
+ DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
+ .name("source").setParallelism(6);
+
+ data
+ .distinct().name("reducer")
+ .print().name("sink");
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+ OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
+
+ // get the original nodes
+ SourcePlanNode sourceNode = resolver.getNode("source");
+ SingleInputPlanNode reduceNode = resolver.getNode("reducer");
+ SinkPlanNode sinkNode = resolver.getNode("sink");
+
+ // get the combiner
+ SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource();
+
+ // check wiring
+ assertEquals(sourceNode, combineNode.getInput().getSource());
+ assertEquals(reduceNode, sinkNode.getInput().getSource());
+
+ // check that both reduce and combiner have the same strategy
+ assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy());
+ assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combineNode.getDriverStrategy());
+
+ // check the keys
+ assertEquals(new FieldList(0, 1), reduceNode.getKeys(0));
+ assertEquals(new FieldList(0, 1), combineNode.getKeys(0));
+ assertEquals(new FieldList(0, 1), reduceNode.getInput().getLocalStrategyKeys());
+
+ // check DOP
+ assertEquals(6, sourceNode.getDegreeOfParallelism());
+ assertEquals(6, combineNode.getDegreeOfParallelism());
+ assertEquals(8, reduceNode.getDegreeOfParallelism());
+ assertEquals(8, sinkNode.getDegreeOfParallelism());
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testDistinctWithSelectorFunctionKey() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(8);
+
+ DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
+ .name("source").setParallelism(6);
+
+ data
+ .distinct(new KeySelector<Tuple2<String,Double>, String>() {
+ public String getKey(Tuple2<String, Double> value) { return value.f0; }
+ }).name("reducer")
+ .print().name("sink");
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+ OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
+
+ // get the original nodes
+ SourcePlanNode sourceNode = resolver.getNode("source");
+ SingleInputPlanNode reduceNode = resolver.getNode("reducer");
+ SinkPlanNode sinkNode = resolver.getNode("sink");
+
+ // get the combiner
+ SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource();
+
+ // get the key extractors and projectors
+ SingleInputPlanNode keyExtractor = (SingleInputPlanNode) combineNode.getInput().getSource();
+ SingleInputPlanNode keyProjector = (SingleInputPlanNode) sinkNode.getInput().getSource();
+
+ // check wiring
+ assertEquals(sourceNode, keyExtractor.getInput().getSource());
+ assertEquals(keyProjector, sinkNode.getInput().getSource());
+
+ // check that both reduce and combiner have the same strategy
+ assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy());
+ assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combineNode.getDriverStrategy());
+
+ // check the keys
+ assertEquals(new FieldList(0), reduceNode.getKeys(0));
+ assertEquals(new FieldList(0), combineNode.getKeys(0));
+ assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys());
+
+ // check DOP
+ assertEquals(6, sourceNode.getDegreeOfParallelism());
+ assertEquals(6, keyExtractor.getDegreeOfParallelism());
+ assertEquals(6, combineNode.getDegreeOfParallelism());
+
+ assertEquals(8, reduceNode.getDegreeOfParallelism());
+ assertEquals(8, keyProjector.getDegreeOfParallelism());
+ assertEquals(8, sinkNode.getDegreeOfParallelism());
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testDistinctWithFieldPositionKeyCombinable() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setDegreeOfParallelism(8);
+
+ DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
+ .name("source").setParallelism(6);
+
+ DistinctOperator<Tuple2<String, Double>> reduced = data
+ .distinct(1).name("reducer");
+
+ reduced.print().name("sink");
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+ OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
+
+ // get the original nodes
+ SourcePlanNode sourceNode = resolver.getNode("source");
+ SingleInputPlanNode reduceNode = resolver.getNode("reducer");
+ SinkPlanNode sinkNode = resolver.getNode("sink");
+
+ // get the combiner
+ SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource();
+
+ // check wiring
+ assertEquals(sourceNode, combineNode.getInput().getSource());
+ assertEquals(reduceNode, sinkNode.getInput().getSource());
+
+ // check that both reduce and combiner have the same strategy
+ assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy());
+ assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combineNode.getDriverStrategy());
+
+ // check the keys
+ assertEquals(new FieldList(1), reduceNode.getKeys(0));
+ assertEquals(new FieldList(1), combineNode.getKeys(0));
+ assertEquals(new FieldList(1), reduceNode.getInput().getLocalStrategyKeys());
+
+ // check DOP
+ assertEquals(6, sourceNode.getDegreeOfParallelism());
+ assertEquals(6, combineNode.getDegreeOfParallelism());
+ assertEquals(8, reduceNode.getDegreeOfParallelism());
+ assertEquals(8, sinkNode.getDegreeOfParallelism());
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e3eaac27/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
index e60c7de..fa2d4d6 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
@@ -30,12 +30,10 @@ import org.apache.flink.api.common.operators.util.FieldSet;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
-import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
import org.apache.flink.util.Collector;
import org.apache.flink.api.java.DataSet;
@@ -53,22 +51,18 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera
public DistinctOperator(DataSet<T> input, Keys<T> keys, String distinctLocationName) {
super(input, input.getType());
-
+
this.distinctLocationName = distinctLocationName;
// if keys is null distinction is done on all tuple fields
if (keys == null) {
- if (input.getType().isTupleType()) {
+ if (input.getType() instanceof CompositeType) {
- TupleTypeInfoBase<?> tupleType = (TupleTypeInfoBase<?>) input.getType();
- int[] allFields = new int[tupleType.getArity()];
- for(int i = 0; i < tupleType.getArity(); i++) {
- allFields[i] = i;
- }
- keys = new Keys.ExpressionKeys<T>(allFields, input.getType(), true);
+ CompositeType<?> cType = (CompositeType<?>) input.getType();
+ keys = new Keys.ExpressionKeys<T>(new String[] {Keys.ExpressionKeys.SELECT_ALL_CHAR }, input.getType());
}
else {
- throw new InvalidProgramException("Distinction on all fields is only possible on tuple data types.");
+ throw new InvalidProgramException("Distinction on all fields is only possible on composite (pojo / tuple) data types.");
}
}
@@ -86,7 +80,7 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera
final RichGroupReduceFunction<T, T> function = new DistinctFunction<T>();
- String name = "Distinct at " + distinctLocationName;
+ String name = getName() != null ? getName() : "Distinct at " + distinctLocationName;
if (keys instanceof Keys.ExpressionKeys) {
@@ -159,7 +153,7 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera
return reducer;
}
- @Combinable
+ @RichGroupReduceFunction.Combinable
public static final class DistinctFunction<T> extends RichGroupReduceFunction<T, T> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e3eaac27/flink-java/src/test/java/org/apache/flink/api/java/operator/DistinctOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/DistinctOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/DistinctOperatorTest.java
index f0fb12c..f4c87c8 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/DistinctOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/DistinctOperatorTest.java
@@ -98,8 +98,8 @@ public class DistinctOperatorTest {
// should work
tupleDs.distinct();
}
-
- @Test(expected = InvalidProgramException.class)
+
+ @Test
public void testDistinctByKeyFields5() {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -108,7 +108,7 @@ public class DistinctOperatorTest {
DataSet<CustomType> customDs = env.fromCollection(customTypeData);
- // should not work, distinct without selector on custom types
+ // should work
customDs.distinct();
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e3eaac27/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistrinctTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistrinctTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistrinctTranslationTest.java
index 2bf8353..70e2947 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistrinctTranslationTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistrinctTranslationTest.java
@@ -19,14 +19,33 @@
package org.apache.flink.api.java.operators.translation;
import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.operators.GenericDataSinkBase;
+import org.apache.flink.api.common.operators.GenericDataSourceBase;
import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
+import org.apache.flink.api.common.operators.base.MapOperatorBase;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.operators.DistinctOperator;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.StringValue;
import org.junit.Assert;
import org.junit.Test;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
@SuppressWarnings("serial")
public class DistrinctTranslationTest {
@@ -47,11 +66,232 @@ public class DistrinctTranslationTest {
Plan p = env.createProgramPlan();
GroupReduceOperatorBase<?, ?, ?> reduceOp = (GroupReduceOperatorBase<?, ?, ?>) p.getDataSinks().iterator().next().getInput();
- Assert.assertTrue(reduceOp.isCombinable());
+ assertTrue(reduceOp.isCombinable());
}
catch (Exception e) {
e.printStackTrace();
- Assert.fail(e.getMessage());
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void translateDistinctPlain() {
+ try {
+ final int DOP = 8;
+ ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(DOP);
+
+ DataSet<Tuple3<Double, StringValue, LongValue>> initialData = getSourceDataSet(env);
+
+ initialData.distinct().print();
+
+ Plan p = env.createProgramPlan();
+
+ GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
+
+ // currently distinct is translated to a GroupReduce
+ GroupReduceOperatorBase<?, ?, ?> reducer = (GroupReduceOperatorBase<?, ?, ?>) sink.getInput();
+
+ // check types
+ assertEquals(initialData.getType(), reducer.getOperatorInfo().getInputType());
+ assertEquals(initialData.getType(), reducer.getOperatorInfo().getOutputType());
+
+ // check keys
+ assertArrayEquals(new int[] {0, 1, 2}, reducer.getKeyColumns(0));
+
+ // DOP was not configured on the operator
+ assertTrue(reducer.getDegreeOfParallelism() == 1 || reducer.getDegreeOfParallelism() == -1);
+
+ assertTrue(reducer.getInput() instanceof GenericDataSourceBase<?, ?>);
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail("Test caused an error: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void translateDistinctPlain2() {
+ try {
+ final int DOP = 8;
+ ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(DOP);
+
+ DataSet<CustomType> initialData = getSourcePojoDataSet(env);
+
+ initialData.distinct().print();
+
+ Plan p = env.createProgramPlan();
+
+ GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
+
+ // currently distinct is translated to a GroupReduce
+ GroupReduceOperatorBase<?, ?, ?> reducer = (GroupReduceOperatorBase<?, ?, ?>) sink.getInput();
+
+ // check types
+ assertEquals(initialData.getType(), reducer.getOperatorInfo().getInputType());
+ assertEquals(initialData.getType(), reducer.getOperatorInfo().getOutputType());
+
+ // check keys
+ assertArrayEquals(new int[] {0}, reducer.getKeyColumns(0));
+
+ // DOP was not configured on the operator
+ assertTrue(reducer.getDegreeOfParallelism() == 1 || reducer.getDegreeOfParallelism() == -1);
+
+ assertTrue(reducer.getInput() instanceof GenericDataSourceBase<?, ?>);
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail("Test caused an error: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void translateDistinctPosition() {
+ try {
+ final int DOP = 8;
+ ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(DOP);
+
+ DataSet<Tuple3<Double, StringValue, LongValue>> initialData = getSourceDataSet(env);
+
+ initialData.distinct(1, 2).print();
+
+ Plan p = env.createProgramPlan();
+
+ GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
+
+ // currently distinct is translated to a GroupReduce
+ GroupReduceOperatorBase<?, ?, ?> reducer = (GroupReduceOperatorBase<?, ?, ?>) sink.getInput();
+
+ // check types
+ assertEquals(initialData.getType(), reducer.getOperatorInfo().getInputType());
+ assertEquals(initialData.getType(), reducer.getOperatorInfo().getOutputType());
+
+ // check keys
+ assertArrayEquals(new int[] {1, 2}, reducer.getKeyColumns(0));
+
+ // DOP was not configured on the operator
+ assertTrue(reducer.getDegreeOfParallelism() == 1 || reducer.getDegreeOfParallelism() == -1);
+
+ assertTrue(reducer.getInput() instanceof GenericDataSourceBase<?, ?>);
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail("Test caused an error: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void translateDistinctKeySelector() {
+ try {
+ final int DOP = 8;
+ ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(DOP);
+
+ DataSet<Tuple3<Double, StringValue, LongValue>> initialData = getSourceDataSet(env);
+
+ initialData.distinct(new KeySelector<Tuple3<Double,StringValue,LongValue>, StringValue>() {
+ public StringValue getKey(Tuple3<Double, StringValue, LongValue> value) {
+ return value.f1;
+ }
+ }).setParallelism(4).print();
+
+ Plan p = env.createProgramPlan();
+
+ GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
+
+ PlanUnwrappingReduceGroupOperator<?, ?, ?> reducer = (PlanUnwrappingReduceGroupOperator<?, ?, ?>) sink.getInput();
+ MapOperatorBase<?, ?, ?> keyExtractor = (MapOperatorBase<?, ?, ?>) reducer.getInput();
+
+ // check the DOPs
+ assertEquals(1, keyExtractor.getDegreeOfParallelism());
+ assertEquals(4, reducer.getDegreeOfParallelism());
+
+ // check types
+ TypeInformation<?> keyValueInfo = new TupleTypeInfo<Tuple2<StringValue, Tuple3<Double,StringValue,LongValue>>>(
+ new ValueTypeInfo<StringValue>(StringValue.class),
+ initialData.getType());
+
+ assertEquals(initialData.getType(), keyExtractor.getOperatorInfo().getInputType());
+ assertEquals(keyValueInfo, keyExtractor.getOperatorInfo().getOutputType());
+
+ assertEquals(keyValueInfo, reducer.getOperatorInfo().getInputType());
+ assertEquals(initialData.getType(), reducer.getOperatorInfo().getOutputType());
+
+ // check keys
+ assertEquals(KeyExtractingMapper.class, keyExtractor.getUserCodeWrapper().getUserCodeClass());
+
+ assertTrue(keyExtractor.getInput() instanceof GenericDataSourceBase<?, ?>);
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail("Test caused an error: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void translateDistinctExpressionKey() {
+ try {
+ final int DOP = 8;
+ ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(DOP);
+
+ DataSet<CustomType> initialData = getSourcePojoDataSet(env);
+
+ initialData.distinct("myInt").print();
+
+ Plan p = env.createProgramPlan();
+
+ GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
+
+ // currently distinct is translated to a GroupReduce
+ GroupReduceOperatorBase<?, ?, ?> reducer = (GroupReduceOperatorBase<?, ?, ?>) sink.getInput();
+
+ // check types
+ assertEquals(initialData.getType(), reducer.getOperatorInfo().getInputType());
+ assertEquals(initialData.getType(), reducer.getOperatorInfo().getOutputType());
+
+ // check keys
+ assertArrayEquals(new int[] {0}, reducer.getKeyColumns(0));
+
+ // DOP was not configured on the operator
+ assertTrue(reducer.getDegreeOfParallelism() == 1 || reducer.getDegreeOfParallelism() == -1);
+
+ assertTrue(reducer.getInput() instanceof GenericDataSourceBase<?, ?>);
+ }
+ catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail("Test caused an error: " + e.getMessage());
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private static final DataSet<Tuple3<Double, StringValue, LongValue>> getSourceDataSet(ExecutionEnvironment env) {
+ return env.fromElements(new Tuple3<Double, StringValue, LongValue>(3.141592, new StringValue("foobar"), new LongValue(77)))
+ .setParallelism(1);
+ }
+
+ private static DataSet<CustomType> getSourcePojoDataSet(ExecutionEnvironment env) {
+ List<CustomType> data = new ArrayList<CustomType>();
+ data.add(new CustomType(1));
+ return env.fromCollection(data);
+ }
+
+ public static class CustomType implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+ public int myInt;
+ @SuppressWarnings("unused")
+ public CustomType() {};
+
+ public CustomType(int i) {
+ myInt = i;
+ }
+
+ @Override
+ public String toString() {
+ return ""+myInt;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e3eaac27/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
index 54f1b58..b183a57 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
@@ -127,7 +127,7 @@ public class DistinctITCase extends JavaProgramTestBase {
case 3: {
/*
- * check correctness of distinct on tuples with key extractor
+ * check correctness of distinct on tuples with key extractor function
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -280,8 +280,28 @@ public class DistinctITCase extends JavaProgramTestBase {
// return expected result
return "10000\n20000\n30000\n";
-
}
+ case 9: {
+
+ /*
+ * distinct on full Pojo
+ */
+ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<POJO> ds = CollectionDataSets.getDuplicatePojoDataSet(env);
+ DataSet<Integer> reduceDs = ds.distinct().map(new MapFunction<CollectionDataSets.POJO, Integer>() {
+ @Override
+ public Integer map(POJO value) throws Exception {
+ return (int) value.nestedPojo.longNumber;
+ }
+ });
+
+ reduceDs.writeAsText(resultPath);
+ env.execute();
+
+ // return expected result
+ return "10000\n20000\n30000\n";
+ }
default:
throw new IllegalArgumentException("Invalid program id");
}