You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2016/08/24 13:55:44 UTC

[1/2] flink git commit: [FLINK-4452] [metrics] TaskManager network buffer gauges

Repository: flink
Updated Branches:
  refs/heads/master 58165d69f -> ad8e665f0


[FLINK-4452] [metrics] TaskManager network buffer gauges

Adds gauges for the number of total and available TaskManager network
memory segments.

This closes #2408


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/28743cfb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/28743cfb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/28743cfb

Branch: refs/heads/master
Commit: 28743cfb86545cf9eaf4ec2cf37ec460a13f3537
Parents: 58165d6
Author: Greg Hogan <co...@greghogan.com>
Authored: Tue Aug 23 10:46:48 2016 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Wed Aug 24 09:02:15 2016 -0400

----------------------------------------------------------------------
 docs/monitoring/metrics.md                      |  9 +++++++
 .../flink/runtime/taskmanager/TaskManager.scala | 25 +++++++++++++++++---
 2 files changed, 31 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/28743cfb/docs/monitoring/metrics.md
----------------------------------------------------------------------
diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index 023bef9..3a148e1 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -335,6 +335,15 @@ Flink exposes the following system metrics:
       <td></td>
     </tr>
     <tr>
+      <th rowspan="2"><strong>TaskManager.Status</strong></th>
+      <td>Network.AvailableMemorySegments</td>
+      <td>The number of unused memory segments.</td>
+    </tr>
+    <tr>
+      <td>Network.TotalMemorySegments</td>
+      <td>The number of allocated memory segments.</td>
+    </tr>
+    <tr>
       <th rowspan="19"><strong>TaskManager.Status.JVM</strong></th>
       <td>ClassLoader.ClassesLoaded</td>
       <td>The total number of classes loaded since the start of the JVM.</td>

http://git-wip-us.apache.org/repos/asf/flink/blob/28743cfb/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 5a95143..72ec2ac 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -965,7 +965,7 @@ class TaskManager(
     taskManagerMetricGroup = 
       new TaskManagerMetricGroup(metricsRegistry, this.runtimeInfo.getHostname, id.toString)
     
-    TaskManager.instantiateStatusMetrics(taskManagerMetricGroup)
+    TaskManager.instantiateStatusMetrics(taskManagerMetricGroup, network)
     
     // watch job manager to detect when it dies
     context.watch(jobManager)
@@ -2357,9 +2357,16 @@ object TaskManager {
     metricRegistry
   }
 
-  private def instantiateStatusMetrics(taskManagerMetricGroup: MetricGroup) : Unit = {
-    val jvm = taskManagerMetricGroup
+  private def instantiateStatusMetrics(
+      taskManagerMetricGroup: MetricGroup,
+      network: NetworkEnvironment)
+    : Unit = {
+    val status = taskManagerMetricGroup
       .addGroup("Status")
+
+    instantiateNetworkMetrics(status.addGroup("Network"), network)
+
+    val jvm = status
       .addGroup("JVM")
 
     instantiateClassLoaderMetrics(jvm.addGroup("ClassLoader"))
@@ -2369,6 +2376,18 @@ object TaskManager {
     instantiateCPUMetrics(jvm.addGroup("CPU"))
   }
 
+  private def instantiateNetworkMetrics(
+        metrics: MetricGroup,
+        network: NetworkEnvironment)
+    : Unit = {
+    metrics.gauge[Long, FlinkGauge[Long]]("TotalMemorySegments", new FlinkGauge[Long] {
+      override def getValue: Long = network.getNetworkBufferPool.getTotalNumberOfMemorySegments
+    })
+    metrics.gauge[Long, FlinkGauge[Long]]("AvailableMemorySegments", new FlinkGauge[Long] {
+      override def getValue: Long = network.getNetworkBufferPool.getNumberOfAvailableMemorySegments
+    })
+  }
+
   private def instantiateClassLoaderMetrics(metrics: MetricGroup) {
     val mxBean = ManagementFactory.getClassLoadingMXBean
 


[2/2] flink git commit: [FLINK-4231] [java] Switch DistinctOperator from GroupReduce to Reduce

Posted by gr...@apache.org.
[FLINK-4231] [java] Switch DistinctOperator from GroupReduce to Reduce

Rewrite the DistinctOperator using Reduce to support both the sort-based
combine and the recently added hash-based combine. This is configured by
the new method DistinctOperator.setCombineHint(CombineHint).

The tests for combineability are removed as Reduce is inherently
combineable.

This closes #2272


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ad8e665f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ad8e665f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ad8e665f

Branch: refs/heads/master
Commit: ad8e665f0607414b0ed50eab01e14c1446e86569
Parents: 28743cf
Author: Greg Hogan <co...@greghogan.com>
Authored: Mon Jul 18 11:50:44 2016 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Wed Aug 24 09:02:22 2016 -0400

----------------------------------------------------------------------
 .../operators/base/ReduceOperatorBase.java      |  7 +-
 .../api/java/operators/DistinctOperator.java    | 82 ++++++++++++--------
 .../api/java/operators/ReduceOperator.java      |  2 +-
 .../translation/DistinctTranslationTest.java    | 52 ++++---------
 .../optimizer/DistinctCompilationTest.java      | 66 ++++++++++++++--
 .../translation/DistinctTranslationTest.scala   | 52 -------------
 6 files changed, 128 insertions(+), 133 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ad8e665f/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
index 88a6fac..7828748 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
@@ -31,6 +31,7 @@ import org.apache.flink.api.common.operators.util.TypeComparable;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.common.operators.util.UserCodeWrapper;
+import org.apache.flink.api.common.typeinfo.AtomicType;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.common.typeutils.TypeComparator;
@@ -191,7 +192,7 @@ public class ReduceOperatorBase<T, FT extends ReduceFunction<T>> extends SingleI
 
 		int[] inputColumns = getKeyColumns(0);
 
-		if (!(inputType instanceof CompositeType) && inputColumns.length > 0) {
+		if (!(inputType instanceof CompositeType) && inputColumns.length > 1) {
 			throw new InvalidProgramException("Grouping is only possible on composite types.");
 		}
 
@@ -202,7 +203,9 @@ public class ReduceOperatorBase<T, FT extends ReduceFunction<T>> extends SingleI
 
 		if (inputColumns.length > 0) {
 			boolean[] inputOrderings = new boolean[inputColumns.length];
-			TypeComparator<T> inputComparator = ((CompositeType<T>) inputType).createComparator(inputColumns, inputOrderings, 0, executionConfig);
+			TypeComparator<T> inputComparator = inputType instanceof AtomicType
+					? ((AtomicType<T>) inputType).createComparator(false, executionConfig)
+					: ((CompositeType<T>) inputType).createComparator(inputColumns, inputOrderings, 0, executionConfig);
 
 			Map<TypeComparable<T>, T> aggregateMap = new HashMap<TypeComparable<T>, T>(inputData.size() / 10);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ad8e665f/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
index ee1669d..267513d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
@@ -20,19 +20,19 @@ package org.apache.flink.api.java.operators;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
-import org.apache.flink.api.common.functions.GroupCombineFunction;
-import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.operators.Keys;
+import org.apache.flink.api.common.operators.Keys.SelectorFunctionKeys;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
 import org.apache.flink.api.common.operators.UnaryOperatorInformation;
-import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
+import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
+import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.operators.Keys.SelectorFunctionKeys;
-import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.Collector;
 import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceOperator;
+import org.apache.flink.api.java.tuple.Tuple2;
 
 /**
  * This operator represents the application of a "distinct" function on a data set, and the
@@ -47,6 +47,8 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera
 
 	private final String distinctLocationName;
 
+	private CombineHint hint = CombineHint.OPTIMIZER_CHOOSES;
+
 	public DistinctOperator(DataSet<T> input, Keys<T> keys, String distinctLocationName) {
 		super(input, input.getType());
 
@@ -61,9 +63,9 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera
 	}
 
 	@Override
-	protected org.apache.flink.api.common.operators.base.GroupReduceOperatorBase<?, T, ?> translateToDataFlow(Operator<T> input) {
+	protected org.apache.flink.api.common.operators.SingleInputOperator<?, T, ?> translateToDataFlow(Operator<T> input) {
 
-		final GroupReduceFunction<T, T> function = new DistinctFunction<>();
+		final ReduceFunction<T> function = new DistinctFunction<>();
 
 		String name = getName() != null ? getName() : "Distinct at " + distinctLocationName;
 
@@ -71,10 +73,10 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera
 
 			int[] logicalKeyPositions = keys.computeLogicalKeyPositions();
 			UnaryOperatorInformation<T, T> operatorInfo = new UnaryOperatorInformation<>(getInputType(), getResultType());
-			GroupReduceOperatorBase<T, T, GroupReduceFunction<T, T>> po =
-					new GroupReduceOperatorBase<>(function, operatorInfo, logicalKeyPositions, name);
+			ReduceOperatorBase<T, ReduceFunction<T>> po =
+					new ReduceOperatorBase<>(function, operatorInfo, logicalKeyPositions, name);
 
-			po.setCombinable(true);
+			po.setCombineHint(hint);
 			po.setInput(input);
 			po.setParallelism(getParallelism());
 
@@ -96,10 +98,8 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera
 			@SuppressWarnings("unchecked")
 			SelectorFunctionKeys<T, ?> selectorKeys = (SelectorFunctionKeys<T, ?>) keys;
 
-			PlanUnwrappingReduceGroupOperator<T, T, ?> po = translateSelectorFunctionDistinct(
-							selectorKeys, function, getResultType(), name, input);
-
-			po.setParallelism(this.getParallelism());
+			org.apache.flink.api.common.operators.SingleInputOperator<?, T, ?> po =
+				translateSelectorFunctionDistinct(selectorKeys, function, getResultType(), name, input, parallelism, hint);
 
 			return po;
 		}
@@ -108,41 +108,55 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera
 		}
 	}
 
+	/**
+	 * Sets the strategy to use for the combine phase of the reduce.
+	 *
+	 * If this method is not called, then the default hint will be used.
+	 * ({@link org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint#OPTIMIZER_CHOOSES})
+	 *
+	 * @param strategy The hint to use.
+	 * @return The DistinctOperator object, for function call chaining.
+	 */
+	@PublicEvolving
+	public DistinctOperator<T> setCombineHint(CombineHint strategy) {
+		this.hint = strategy;
+		return this;
+	}
+
 	// --------------------------------------------------------------------------------------------
 
-	private static <IN, OUT, K> PlanUnwrappingReduceGroupOperator<IN, OUT, K> translateSelectorFunctionDistinct(
+	private static <IN, K> org.apache.flink.api.common.operators.SingleInputOperator<?, IN, ?> translateSelectorFunctionDistinct(
 			SelectorFunctionKeys<IN, ?> rawKeys,
-			GroupReduceFunction<IN, OUT> function,
-			TypeInformation<OUT> outputType,
+			ReduceFunction<IN> function,
+			TypeInformation<IN> outputType,
 			String name,
-			Operator<IN> input)
+			Operator<IN> input,
+			int parallelism,
+			CombineHint hint)
 	{
 		@SuppressWarnings("unchecked")
 		final SelectorFunctionKeys<IN, K> keys = (SelectorFunctionKeys<IN, K>) rawKeys;
-		
+
 		TypeInformation<Tuple2<K, IN>> typeInfoWithKey = KeyFunctions.createTypeWithKey(keys);
 		Operator<Tuple2<K, IN>> keyedInput = KeyFunctions.appendKeyExtractor(input, keys);
-		
-		PlanUnwrappingReduceGroupOperator<IN, OUT, K> reducer =
-				new PlanUnwrappingReduceGroupOperator<>(function, keys, name, outputType, typeInfoWithKey, true);
+
+		PlanUnwrappingReduceOperator<IN, K> reducer =
+				new PlanUnwrappingReduceOperator<>(function, keys, name, outputType, typeInfoWithKey);
 		reducer.setInput(keyedInput);
+		reducer.setCombineHint(hint);
+		reducer.setParallelism(parallelism);
 
-		return reducer;
+		return KeyFunctions.appendKeyRemover(reducer, keys);
 	}
-	
+
 	@Internal
-	public static final class DistinctFunction<T> implements GroupReduceFunction<T, T>, GroupCombineFunction<T, T> {
+	public static final class DistinctFunction<T> implements ReduceFunction<T> {
 
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void reduce(Iterable<T> values, Collector<T> out) {
-			out.collect(values.iterator().next());
-		}
-
-		@Override
-		public void combine(Iterable<T> values, Collector<T> out) {
-			out.collect(values.iterator().next());
+		public T reduce(T value1, T value2) throws Exception {
+			return value1;
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ad8e665f/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
index e02b64f..42dcf05 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
@@ -165,7 +165,7 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOpe
 	 * Sets the strategy to use for the combine phase of the reduce.
 	 *
 	 * If this method is not called, then the default hint will be used.
-	 * ({@link org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint.OPTIMIZER_CHOOSES})
+	 * ({@link org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint#OPTIMIZER_CHOOSES})
 	 *
 	 * @param strategy The hint to use.
 	 * @return The ReduceOperator object, for function call chaining.

http://git-wip-us.apache.org/repos/asf/flink/blob/ad8e665f/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java
index 9824ee1..cbdac4a 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java
@@ -21,14 +21,13 @@ package org.apache.flink.api.java.operators.translation;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.operators.GenericDataSinkBase;
 import org.apache.flink.api.common.operators.GenericDataSourceBase;
-import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
 import org.apache.flink.api.common.operators.base.MapOperatorBase;
+import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.api.java.operators.DistinctOperator;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
@@ -50,31 +49,6 @@ import static org.junit.Assert.fail;
 public class DistinctTranslationTest {
 
 	@Test
-	public void testCombinable() {
-		try {
-			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-			
-			DataSet<String> input = env.fromElements("1", "2", "1", "3");
-			
-			
-			DistinctOperator<String> op = input.distinct(new KeySelector<String, String>() {
-				public String getKey(String value) { return value; }
-			});
-			
-			op.output(new DiscardingOutputFormat<String>());
-			
-			Plan p = env.createProgramPlan();
-			
-			GroupReduceOperatorBase<?, ?, ?> reduceOp = (GroupReduceOperatorBase<?, ?, ?>) p.getDataSinks().iterator().next().getInput();
-			assertTrue(reduceOp.isCombinable());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
 	public void translateDistinctPlain() {
 		try {
 			final int parallelism = 8;
@@ -88,8 +62,8 @@ public class DistinctTranslationTest {
 
 			GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
 
-			// currently distinct is translated to a GroupReduce
-			GroupReduceOperatorBase<?, ?, ?> reducer = (GroupReduceOperatorBase<?, ?, ?>) sink.getInput();
+			// currently distinct is translated to a Reduce
+			ReduceOperatorBase<?, ?> reducer = (ReduceOperatorBase<?, ?>) sink.getInput();
 
 			// check types
 			assertEquals(initialData.getType(), reducer.getOperatorInfo().getInputType());
@@ -124,8 +98,8 @@ public class DistinctTranslationTest {
 
 			GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
 
-			// currently distinct is translated to a GroupReduce
-			GroupReduceOperatorBase<?, ?, ?> reducer = (GroupReduceOperatorBase<?, ?, ?>) sink.getInput();
+			// currently distinct is translated to a Reduce
+			ReduceOperatorBase<?, ?> reducer = (ReduceOperatorBase<?, ?>) sink.getInput();
 
 			// check types
 			assertEquals(initialData.getType(), reducer.getOperatorInfo().getInputType());
@@ -160,8 +134,8 @@ public class DistinctTranslationTest {
 
 			GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
 
-			// currently distinct is translated to a GroupReduce
-			GroupReduceOperatorBase<?, ?, ?> reducer = (GroupReduceOperatorBase<?, ?, ?>) sink.getInput();
+			// currently distinct is translated to a Reduce
+			ReduceOperatorBase<?, ?> reducer = (ReduceOperatorBase<?, ?>) sink.getInput();
 
 			// check types
 			assertEquals(initialData.getType(), reducer.getOperatorInfo().getInputType());
@@ -200,7 +174,8 @@ public class DistinctTranslationTest {
 
 			GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
 
-			PlanUnwrappingReduceGroupOperator<?, ?, ?> reducer = (PlanUnwrappingReduceGroupOperator<?, ?, ?>) sink.getInput();
+			MapOperatorBase<?, ?, ?> keyRemover = (MapOperatorBase<?, ?, ?>) sink.getInput();
+			PlanUnwrappingReduceOperator<?, ?> reducer = (PlanUnwrappingReduceOperator<?, ?>) keyRemover.getInput();
 			MapOperatorBase<?, ?, ?> keyExtractor = (MapOperatorBase<?, ?, ?>) reducer.getInput();
 
 			// check the parallelisms
@@ -216,7 +191,10 @@ public class DistinctTranslationTest {
 			assertEquals(keyValueInfo, keyExtractor.getOperatorInfo().getOutputType());
 
 			assertEquals(keyValueInfo, reducer.getOperatorInfo().getInputType());
-			assertEquals(initialData.getType(), reducer.getOperatorInfo().getOutputType());
+			assertEquals(keyValueInfo, reducer.getOperatorInfo().getOutputType());
+
+			assertEquals(keyValueInfo, keyRemover.getOperatorInfo().getInputType());
+			assertEquals(initialData.getType(), keyRemover.getOperatorInfo().getOutputType());
 
 			// check keys
 			assertEquals(KeyExtractingMapper.class, keyExtractor.getUserCodeWrapper().getUserCodeClass());
@@ -244,8 +222,8 @@ public class DistinctTranslationTest {
 
 			GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
 
-			// currently distinct is translated to a GroupReduce
-			GroupReduceOperatorBase<?, ?, ?> reducer = (GroupReduceOperatorBase<?, ?, ?>) sink.getInput();
+			// currently distinct is translated to a Reduce
+			ReduceOperatorBase<?, ?> reducer = (ReduceOperatorBase<?, ?>) sink.getInput();
 
 			// check types
 			assertEquals(initialData.getType(), reducer.getOperatorInfo().getInputType());

http://git-wip-us.apache.org/repos/asf/flink/blob/ad8e665f/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
index 20a4ef6..89e0f21 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
@@ -18,6 +18,7 @@
 package org.apache.flink.optimizer;
 
 import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint;
 import org.apache.flink.api.common.operators.util.FieldList;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -29,8 +30,8 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.optimizer.plan.SourcePlanNode;
-import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.optimizer.util.CompilerTestBase;
+import org.apache.flink.runtime.operators.DriverStrategy;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -71,8 +72,8 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io
 			assertEquals(reduceNode, sinkNode.getInput().getSource());
 
 			// check that both reduce and combiner have the same strategy
-			assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy());
-			assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combineNode.getDriverStrategy());
+			assertEquals(DriverStrategy.SORTED_REDUCE, reduceNode.getDriverStrategy());
+			assertEquals(DriverStrategy.SORTED_PARTIAL_REDUCE, combineNode.getDriverStrategy());
 
 			// check the keys
 			assertEquals(new FieldList(0, 1), reduceNode.getKeys(0));
@@ -93,6 +94,57 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io
 	}
 
 	@Test
+	public void testDistinctWithCombineHint() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setParallelism(8);
+
+			DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
+					.name("source").setParallelism(6);
+
+			data
+					.distinct().setCombineHint(CombineHint.HASH).name("reducer")
+					.output(new DiscardingOutputFormat<Tuple2<String, Double>>()).name("sink");
+
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+
+			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
+
+			// get the original nodes
+			SourcePlanNode sourceNode = resolver.getNode("source");
+			SingleInputPlanNode reduceNode = resolver.getNode("reducer");
+			SinkPlanNode sinkNode = resolver.getNode("sink");
+
+			// get the combiner
+			SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource();
+
+			// check wiring
+			assertEquals(sourceNode, combineNode.getInput().getSource());
+			assertEquals(reduceNode, sinkNode.getInput().getSource());
+
+			// check that both reduce and combiner have the same strategy
+			assertEquals(DriverStrategy.SORTED_REDUCE, reduceNode.getDriverStrategy());
+			assertEquals(DriverStrategy.HASHED_PARTIAL_REDUCE, combineNode.getDriverStrategy());
+
+			// check the keys
+			assertEquals(new FieldList(0, 1), reduceNode.getKeys(0));
+			assertEquals(new FieldList(0, 1), combineNode.getKeys(0));
+			assertEquals(new FieldList(0, 1), reduceNode.getInput().getLocalStrategyKeys());
+
+			// check parallelism
+			assertEquals(6, sourceNode.getParallelism());
+			assertEquals(6, combineNode.getParallelism());
+			assertEquals(8, reduceNode.getParallelism());
+			assertEquals(8, sinkNode.getParallelism());
+		} catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
+		}
+	}
+
+	@Test
 	public void testDistinctWithSelectorFunctionKey() {
 		try {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -129,8 +181,8 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io
 			assertEquals(keyProjector, sinkNode.getInput().getSource());
 
 			// check that both reduce and combiner have the same strategy
-			assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy());
-			assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combineNode.getDriverStrategy());
+			assertEquals(DriverStrategy.SORTED_REDUCE, reduceNode.getDriverStrategy());
+			assertEquals(DriverStrategy.SORTED_PARTIAL_REDUCE, combineNode.getDriverStrategy());
 
 			// check the keys
 			assertEquals(new FieldList(0), reduceNode.getKeys(0));
@@ -185,8 +237,8 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io
 			assertEquals(reduceNode, sinkNode.getInput().getSource());
 
 			// check that both reduce and combiner have the same strategy
-			assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy());
-			assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combineNode.getDriverStrategy());
+			assertEquals(DriverStrategy.SORTED_REDUCE, reduceNode.getDriverStrategy());
+			assertEquals(DriverStrategy.SORTED_PARTIAL_REDUCE, combineNode.getDriverStrategy());
 
 			// check the keys
 			assertEquals(new FieldList(1), reduceNode.getKeys(0));

http://git-wip-us.apache.org/repos/asf/flink/blob/ad8e665f/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DistinctTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DistinctTranslationTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DistinctTranslationTest.scala
deleted file mode 100644
index c540f61..0000000
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DistinctTranslationTest.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.operators.translation
-
-import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase
-import org.apache.flink.api.java.io.DiscardingOutputFormat
-import org.junit.Assert
-import org.junit.Test
-
-import org.apache.flink.api.scala._
-
-class DistinctTranslationTest {
-  @Test
-  def testCombinable(): Unit = {
-    try {
-      val env = ExecutionEnvironment.getExecutionEnvironment
-      val input = env.fromElements("1", "2", "1", "3")
-
-      val op = input.distinct { x => x}
-      op.output(new DiscardingOutputFormat[String])
-
-      val p = env.createProgramPlan()
-
-      val reduceOp =
-        p.getDataSinks.iterator.next.getInput.asInstanceOf[GroupReduceOperatorBase[_, _, _]]
-
-      Assert.assertTrue(reduceOp.isCombinable)
-    }
-    catch {
-      case e: Exception => {
-        e.printStackTrace()
-        Assert.fail(e.getMessage)
-      }
-    }
-  }
-}
-