You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/12/01 10:13:23 UTC

incubator-flink git commit: [FLINK-984] Compiler tests for distinct(). Adopted the old code from @markus-h

Repository: incubator-flink
Updated Branches:
  refs/heads/master e46d14b4c -> e3eaac279


[FLINK-984] Compiler tests for distinct(). Adopted the old code from @markus-h

The code originates from https://github.com/apache/incubator-flink/pull/61.
The pull request also contained code to add POJO support to the distinct() operator. This has already been implemented in earlier work


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/e3eaac27
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/e3eaac27
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/e3eaac27

Branch: refs/heads/master
Commit: e3eaac2797e12d1d52395d51c55d14d5f57a9158
Parents: e46d14b
Author: Robert Metzger <rm...@apache.org>
Authored: Tue Nov 25 16:26:12 2014 +0100
Committer: Robert Metzger <rm...@apache.org>
Committed: Mon Dec 1 10:12:45 2014 +0100

----------------------------------------------------------------------
 .../apache/flink/compiler/DOPChangeTest.java    |   2 -
 .../flink/compiler/DistinctCompilationTest.java | 206 ++++++++++++++++
 .../api/java/operators/DistinctOperator.java    |  20 +-
 .../api/java/operator/DistinctOperatorTest.java |   6 +-
 .../translation/DistrinctTranslationTest.java   | 244 ++++++++++++++++++-
 .../test/javaApiOperators/DistinctITCase.java   |  24 +-
 6 files changed, 480 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e3eaac27/flink-compiler/src/test/java/org/apache/flink/compiler/DOPChangeTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/DOPChangeTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/DOPChangeTest.java
index 6eb4c49..8b8828e 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/DOPChangeTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/DOPChangeTest.java
@@ -15,8 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-
 package org.apache.flink.compiler;
 
 import org.junit.Assert;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e3eaac27/flink-compiler/src/test/java/org/apache/flink/compiler/DistinctCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/DistinctCompilationTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/DistinctCompilationTest.java
new file mode 100644
index 0000000..bde22bf
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/DistinctCompilationTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.compiler;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.DistinctOperator;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.compiler.plan.OptimizedPlan;
+import org.apache.flink.compiler.plan.SingleInputPlanNode;
+import org.apache.flink.compiler.plan.SinkPlanNode;
+import org.apache.flink.compiler.plan.SourcePlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+
+@SuppressWarnings("serial")
+public class DistinctCompilationTest extends CompilerTestBase implements java.io.Serializable {
+
+	@Test
+	public void testDistinctPlain() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(8);
+
+			DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
+					.name("source").setParallelism(6);
+
+			data
+					.distinct().name("reducer")
+					.print().name("sink");
+
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+
+			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
+
+			// get the original nodes
+			SourcePlanNode sourceNode = resolver.getNode("source");
+			SingleInputPlanNode reduceNode = resolver.getNode("reducer");
+			SinkPlanNode sinkNode = resolver.getNode("sink");
+
+			// get the combiner
+			SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource();
+
+			// check wiring
+			assertEquals(sourceNode, combineNode.getInput().getSource());
+			assertEquals(reduceNode, sinkNode.getInput().getSource());
+
+			// check that both reduce and combiner have the same strategy
+			assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy());
+			assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combineNode.getDriverStrategy());
+
+			// check the keys
+			assertEquals(new FieldList(0, 1), reduceNode.getKeys(0));
+			assertEquals(new FieldList(0, 1), combineNode.getKeys(0));
+			assertEquals(new FieldList(0, 1), reduceNode.getInput().getLocalStrategyKeys());
+
+			// check DOP
+			assertEquals(6, sourceNode.getDegreeOfParallelism());
+			assertEquals(6, combineNode.getDegreeOfParallelism());
+			assertEquals(8, reduceNode.getDegreeOfParallelism());
+			assertEquals(8, sinkNode.getDegreeOfParallelism());
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
+		}
+	}
+
+	@Test
+	public void testDistinctWithSelectorFunctionKey() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(8);
+
+			DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
+					.name("source").setParallelism(6);
+
+			data
+					.distinct(new KeySelector<Tuple2<String,Double>, String>() {
+						public String getKey(Tuple2<String, Double> value) { return value.f0; }
+					}).name("reducer")
+					.print().name("sink");
+
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+
+			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
+
+			// get the original nodes
+			SourcePlanNode sourceNode = resolver.getNode("source");
+			SingleInputPlanNode reduceNode = resolver.getNode("reducer");
+			SinkPlanNode sinkNode = resolver.getNode("sink");
+
+			// get the combiner
+			SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource();
+
+			// get the key extractors and projectors
+			SingleInputPlanNode keyExtractor = (SingleInputPlanNode) combineNode.getInput().getSource();
+			SingleInputPlanNode keyProjector = (SingleInputPlanNode) sinkNode.getInput().getSource();
+
+			// check wiring
+			assertEquals(sourceNode, keyExtractor.getInput().getSource());
+			assertEquals(keyProjector, sinkNode.getInput().getSource());
+
+			// check that both reduce and combiner have the same strategy
+			assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy());
+			assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combineNode.getDriverStrategy());
+
+			// check the keys
+			assertEquals(new FieldList(0), reduceNode.getKeys(0));
+			assertEquals(new FieldList(0), combineNode.getKeys(0));
+			assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys());
+
+			// check DOP
+			assertEquals(6, sourceNode.getDegreeOfParallelism());
+			assertEquals(6, keyExtractor.getDegreeOfParallelism());
+			assertEquals(6, combineNode.getDegreeOfParallelism());
+
+			assertEquals(8, reduceNode.getDegreeOfParallelism());
+			assertEquals(8, keyProjector.getDegreeOfParallelism());
+			assertEquals(8, sinkNode.getDegreeOfParallelism());
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
+		}
+	}
+
+	@Test
+	public void testDistinctWithFieldPositionKeyCombinable() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setDegreeOfParallelism(8);
+
+			DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
+					.name("source").setParallelism(6);
+
+			DistinctOperator<Tuple2<String, Double>> reduced = data
+					.distinct(1).name("reducer");
+
+			reduced.print().name("sink");
+
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+
+			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
+
+			// get the original nodes
+			SourcePlanNode sourceNode = resolver.getNode("source");
+			SingleInputPlanNode reduceNode = resolver.getNode("reducer");
+			SinkPlanNode sinkNode = resolver.getNode("sink");
+
+			// get the combiner
+			SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource();
+
+			// check wiring
+			assertEquals(sourceNode, combineNode.getInput().getSource());
+			assertEquals(reduceNode, sinkNode.getInput().getSource());
+
+			// check that both reduce and combiner have the same strategy
+			assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy());
+			assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combineNode.getDriverStrategy());
+
+			// check the keys
+			assertEquals(new FieldList(1), reduceNode.getKeys(0));
+			assertEquals(new FieldList(1), combineNode.getKeys(0));
+			assertEquals(new FieldList(1), reduceNode.getInput().getLocalStrategyKeys());
+
+			// check DOP
+			assertEquals(6, sourceNode.getDegreeOfParallelism());
+			assertEquals(6, combineNode.getDegreeOfParallelism());
+			assertEquals(8, reduceNode.getDegreeOfParallelism());
+			assertEquals(8, sinkNode.getDegreeOfParallelism());
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e3eaac27/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
index e60c7de..fa2d4d6 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
@@ -30,12 +30,10 @@ import org.apache.flink.api.common.operators.util.FieldSet;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
-import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
 import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
 import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
 import org.apache.flink.util.Collector;
 import org.apache.flink.api.java.DataSet;
 
@@ -53,22 +51,18 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera
 	
 	public DistinctOperator(DataSet<T> input, Keys<T> keys, String distinctLocationName) {
 		super(input, input.getType());
-		
+
 		this.distinctLocationName = distinctLocationName;
 		
 		// if keys is null distinction is done on all tuple fields
 		if (keys == null) {
-			if (input.getType().isTupleType()) {
+			if (input.getType() instanceof CompositeType) {
 
-				TupleTypeInfoBase<?> tupleType = (TupleTypeInfoBase<?>) input.getType();
-				int[] allFields = new int[tupleType.getArity()];
-				for(int i = 0; i < tupleType.getArity(); i++) {
-					allFields[i] = i;
-				}
-				keys = new Keys.ExpressionKeys<T>(allFields, input.getType(), true);
+				CompositeType<?> cType = (CompositeType<?>) input.getType();
+				keys = new Keys.ExpressionKeys<T>(new String[] {Keys.ExpressionKeys.SELECT_ALL_CHAR }, input.getType());
 			}
 			else {
-				throw new InvalidProgramException("Distinction on all fields is only possible on tuple data types.");
+				throw new InvalidProgramException("Distinction on all fields is only possible on composite (pojo / tuple) data types.");
 			}
 		}
 		
@@ -86,7 +80,7 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera
 		
 		final RichGroupReduceFunction<T, T> function = new DistinctFunction<T>();
 
-		String name = "Distinct at " + distinctLocationName;
+		String name = getName() != null ? getName() : "Distinct at " + distinctLocationName;
 		
 		if (keys instanceof Keys.ExpressionKeys) {
 
@@ -159,7 +153,7 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera
 		return reducer;
 	}
 	
-	@Combinable
+	@RichGroupReduceFunction.Combinable
 	public static final class DistinctFunction<T> extends RichGroupReduceFunction<T, T> {
 
 		private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e3eaac27/flink-java/src/test/java/org/apache/flink/api/java/operator/DistinctOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/DistinctOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/DistinctOperatorTest.java
index f0fb12c..f4c87c8 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/DistinctOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/DistinctOperatorTest.java
@@ -98,8 +98,8 @@ public class DistinctOperatorTest {
 		// should work
 		tupleDs.distinct();
 	}
-	
-	@Test(expected = InvalidProgramException.class)
+
+	@Test
 	public void testDistinctByKeyFields5() {
 		
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -108,7 +108,7 @@ public class DistinctOperatorTest {
 		
 		DataSet<CustomType> customDs = env.fromCollection(customTypeData);
 
-		// should not work, distinct without selector on custom types
+		// should work
 		customDs.distinct();
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e3eaac27/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistrinctTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistrinctTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistrinctTranslationTest.java
index 2bf8353..70e2947 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistrinctTranslationTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistrinctTranslationTest.java
@@ -19,14 +19,33 @@
 package org.apache.flink.api.java.operators.translation;
 
 import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.operators.GenericDataSinkBase;
+import org.apache.flink.api.common.operators.GenericDataSourceBase;
 import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
+import org.apache.flink.api.common.operators.base.MapOperatorBase;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.operators.DistinctOperator;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.StringValue;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 @SuppressWarnings("serial")
 public class DistrinctTranslationTest {
 
@@ -47,11 +66,232 @@ public class DistrinctTranslationTest {
 			Plan p = env.createProgramPlan();
 			
 			GroupReduceOperatorBase<?, ?, ?> reduceOp = (GroupReduceOperatorBase<?, ?, ?>) p.getDataSinks().iterator().next().getInput();
-			Assert.assertTrue(reduceOp.isCombinable());
+			assertTrue(reduceOp.isCombinable());
 		}
 		catch (Exception e) {
 			e.printStackTrace();
-			Assert.fail(e.getMessage());
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void translateDistinctPlain() {
+		try {
+			final int DOP = 8;
+			ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(DOP);
+
+			DataSet<Tuple3<Double, StringValue, LongValue>> initialData = getSourceDataSet(env);
+
+			initialData.distinct().print();
+
+			Plan p = env.createProgramPlan();
+
+			GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
+
+			// currently distinct is translated to a GroupReduce
+			GroupReduceOperatorBase<?, ?, ?> reducer = (GroupReduceOperatorBase<?, ?, ?>) sink.getInput();
+
+			// check types
+			assertEquals(initialData.getType(), reducer.getOperatorInfo().getInputType());
+			assertEquals(initialData.getType(), reducer.getOperatorInfo().getOutputType());
+
+			// check keys
+			assertArrayEquals(new int[] {0, 1, 2}, reducer.getKeyColumns(0));
+
+			// DOP was not configured on the operator
+			assertTrue(reducer.getDegreeOfParallelism() == 1 || reducer.getDegreeOfParallelism() == -1);
+
+			assertTrue(reducer.getInput() instanceof GenericDataSourceBase<?, ?>);
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail("Test caused an error: " + e.getMessage());
+		}
+	}
+
+	@Test
+	public void translateDistinctPlain2() {
+		try {
+			final int DOP = 8;
+			ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(DOP);
+
+			DataSet<CustomType> initialData = getSourcePojoDataSet(env);
+
+			initialData.distinct().print();
+
+			Plan p = env.createProgramPlan();
+
+			GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
+
+			// currently distinct is translated to a GroupReduce
+			GroupReduceOperatorBase<?, ?, ?> reducer = (GroupReduceOperatorBase<?, ?, ?>) sink.getInput();
+
+			// check types
+			assertEquals(initialData.getType(), reducer.getOperatorInfo().getInputType());
+			assertEquals(initialData.getType(), reducer.getOperatorInfo().getOutputType());
+
+			// check keys
+			assertArrayEquals(new int[] {0}, reducer.getKeyColumns(0));
+
+			// DOP was not configured on the operator
+			assertTrue(reducer.getDegreeOfParallelism() == 1 || reducer.getDegreeOfParallelism() == -1);
+
+			assertTrue(reducer.getInput() instanceof GenericDataSourceBase<?, ?>);
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail("Test caused an error: " + e.getMessage());
+		}
+	}
+
+	@Test
+	public void translateDistinctPosition() {
+		try {
+			final int DOP = 8;
+			ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(DOP);
+
+			DataSet<Tuple3<Double, StringValue, LongValue>> initialData = getSourceDataSet(env);
+
+			initialData.distinct(1, 2).print();
+
+			Plan p = env.createProgramPlan();
+
+			GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
+
+			// currently distinct is translated to a GroupReduce
+			GroupReduceOperatorBase<?, ?, ?> reducer = (GroupReduceOperatorBase<?, ?, ?>) sink.getInput();
+
+			// check types
+			assertEquals(initialData.getType(), reducer.getOperatorInfo().getInputType());
+			assertEquals(initialData.getType(), reducer.getOperatorInfo().getOutputType());
+
+			// check keys
+			assertArrayEquals(new int[] {1, 2}, reducer.getKeyColumns(0));
+
+			// DOP was not configured on the operator
+			assertTrue(reducer.getDegreeOfParallelism() == 1 || reducer.getDegreeOfParallelism() == -1);
+
+			assertTrue(reducer.getInput() instanceof GenericDataSourceBase<?, ?>);
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail("Test caused an error: " + e.getMessage());
+		}
+	}
+
+	@Test
+	public void translateDistinctKeySelector() {
+		try {
+			final int DOP = 8;
+			ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(DOP);
+
+			DataSet<Tuple3<Double, StringValue, LongValue>> initialData = getSourceDataSet(env);
+
+			initialData.distinct(new KeySelector<Tuple3<Double,StringValue,LongValue>, StringValue>() {
+				public StringValue getKey(Tuple3<Double, StringValue, LongValue> value) {
+					return value.f1;
+				}
+			}).setParallelism(4).print();
+
+			Plan p = env.createProgramPlan();
+
+			GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
+
+			PlanUnwrappingReduceGroupOperator<?, ?, ?> reducer = (PlanUnwrappingReduceGroupOperator<?, ?, ?>) sink.getInput();
+			MapOperatorBase<?, ?, ?> keyExtractor = (MapOperatorBase<?, ?, ?>) reducer.getInput();
+
+			// check the DOPs
+			assertEquals(1, keyExtractor.getDegreeOfParallelism());
+			assertEquals(4, reducer.getDegreeOfParallelism());
+
+			// check types
+			TypeInformation<?> keyValueInfo = new TupleTypeInfo<Tuple2<StringValue, Tuple3<Double,StringValue,LongValue>>>(
+					new ValueTypeInfo<StringValue>(StringValue.class),
+					initialData.getType());
+
+			assertEquals(initialData.getType(), keyExtractor.getOperatorInfo().getInputType());
+			assertEquals(keyValueInfo, keyExtractor.getOperatorInfo().getOutputType());
+
+			assertEquals(keyValueInfo, reducer.getOperatorInfo().getInputType());
+			assertEquals(initialData.getType(), reducer.getOperatorInfo().getOutputType());
+
+			// check keys
+			assertEquals(KeyExtractingMapper.class, keyExtractor.getUserCodeWrapper().getUserCodeClass());
+
+			assertTrue(keyExtractor.getInput() instanceof GenericDataSourceBase<?, ?>);
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail("Test caused an error: " + e.getMessage());
+		}
+	}
+
+	@Test
+	public void translateDistinctExpressionKey() {
+		try {
+			final int DOP = 8;
+			ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(DOP);
+
+			DataSet<CustomType> initialData = getSourcePojoDataSet(env);
+
+			initialData.distinct("myInt").print();
+
+			Plan p = env.createProgramPlan();
+
+			GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
+
+			// currently distinct is translated to a GroupReduce
+			GroupReduceOperatorBase<?, ?, ?> reducer = (GroupReduceOperatorBase<?, ?, ?>) sink.getInput();
+
+			// check types
+			assertEquals(initialData.getType(), reducer.getOperatorInfo().getInputType());
+			assertEquals(initialData.getType(), reducer.getOperatorInfo().getOutputType());
+
+			// check keys
+			assertArrayEquals(new int[] {0}, reducer.getKeyColumns(0));
+
+			// DOP was not configured on the operator
+			assertTrue(reducer.getDegreeOfParallelism() == 1 || reducer.getDegreeOfParallelism() == -1);
+
+			assertTrue(reducer.getInput() instanceof GenericDataSourceBase<?, ?>);
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail("Test caused an error: " + e.getMessage());
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	private static final DataSet<Tuple3<Double, StringValue, LongValue>> getSourceDataSet(ExecutionEnvironment env) {
+		return env.fromElements(new Tuple3<Double, StringValue, LongValue>(3.141592, new StringValue("foobar"), new LongValue(77)))
+				.setParallelism(1);
+	}
+
+	private static DataSet<CustomType> getSourcePojoDataSet(ExecutionEnvironment env) {
+		List<CustomType> data = new ArrayList<CustomType>();
+		data.add(new CustomType(1));
+		return env.fromCollection(data);
+	}
+
+	public static class CustomType implements Serializable {
+
+		private static final long serialVersionUID = 1L;
+		public int myInt;
+		@SuppressWarnings("unused")
+		public CustomType() {};
+
+		public CustomType(int i) {
+			myInt = i;
+		}
+
+		@Override
+		public String toString() {
+			return ""+myInt;
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/e3eaac27/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
index 54f1b58..b183a57 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
@@ -127,7 +127,7 @@ public class DistinctITCase extends JavaProgramTestBase {
 			case 3: {
 				
 				/*
-				 * check correctness of distinct on tuples with key extractor
+				 * check correctness of distinct on tuples with key extractor function
 				 */
 				
 				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -280,8 +280,28 @@ public class DistinctITCase extends JavaProgramTestBase {
 				
 				// return expected result
 				return "10000\n20000\n30000\n";
-								
 			}
+			case 9: {
+
+					/*
+					 * distinct on full Pojo
+					 */
+					final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+					DataSet<POJO> ds = CollectionDataSets.getDuplicatePojoDataSet(env);
+					DataSet<Integer> reduceDs = ds.distinct().map(new MapFunction<CollectionDataSets.POJO, Integer>() {
+						@Override
+						public Integer map(POJO value) throws Exception {
+							return (int) value.nestedPojo.longNumber;
+						}
+					});
+
+					reduceDs.writeAsText(resultPath);
+					env.execute();
+
+					// return expected result
+					return "10000\n20000\n30000\n";
+				}
 			default: 
 				throw new IllegalArgumentException("Invalid program id");
 			}