You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2016/08/24 13:55:44 UTC
[1/2] flink git commit: [FLINK-4452] [metrics] TaskManager network
buffer gauges
Repository: flink
Updated Branches:
refs/heads/master 58165d69f -> ad8e665f0
[FLINK-4452] [metrics] TaskManager network buffer gauges
Adds gauges for the number of total and available TaskManager network
memory segments.
This closes #2408
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/28743cfb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/28743cfb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/28743cfb
Branch: refs/heads/master
Commit: 28743cfb86545cf9eaf4ec2cf37ec460a13f3537
Parents: 58165d6
Author: Greg Hogan <co...@greghogan.com>
Authored: Tue Aug 23 10:46:48 2016 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Wed Aug 24 09:02:15 2016 -0400
----------------------------------------------------------------------
docs/monitoring/metrics.md | 9 +++++++
.../flink/runtime/taskmanager/TaskManager.scala | 25 +++++++++++++++++---
2 files changed, 31 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/28743cfb/docs/monitoring/metrics.md
----------------------------------------------------------------------
diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index 023bef9..3a148e1 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -335,6 +335,15 @@ Flink exposes the following system metrics:
<td></td>
</tr>
<tr>
+ <th rowspan="2"><strong>TaskManager.Status</strong></th>
+ <td>Network.AvailableMemorySegments</td>
+ <td>The number of unused memory segments.</td>
+ </tr>
+ <tr>
+ <td>Network.TotalMemorySegments</td>
+ <td>The number of allocated memory segments.</td>
+ </tr>
+ <tr>
<th rowspan="19"><strong>TaskManager.Status.JVM</strong></th>
<td>ClassLoader.ClassesLoaded</td>
<td>The total number of classes loaded since the start of the JVM.</td>
http://git-wip-us.apache.org/repos/asf/flink/blob/28743cfb/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 5a95143..72ec2ac 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -965,7 +965,7 @@ class TaskManager(
taskManagerMetricGroup =
new TaskManagerMetricGroup(metricsRegistry, this.runtimeInfo.getHostname, id.toString)
- TaskManager.instantiateStatusMetrics(taskManagerMetricGroup)
+ TaskManager.instantiateStatusMetrics(taskManagerMetricGroup, network)
// watch job manager to detect when it dies
context.watch(jobManager)
@@ -2357,9 +2357,16 @@ object TaskManager {
metricRegistry
}
- private def instantiateStatusMetrics(taskManagerMetricGroup: MetricGroup) : Unit = {
- val jvm = taskManagerMetricGroup
+ private def instantiateStatusMetrics(
+ taskManagerMetricGroup: MetricGroup,
+ network: NetworkEnvironment)
+ : Unit = {
+ val status = taskManagerMetricGroup
.addGroup("Status")
+
+ instantiateNetworkMetrics(status.addGroup("Network"), network)
+
+ val jvm = status
.addGroup("JVM")
instantiateClassLoaderMetrics(jvm.addGroup("ClassLoader"))
@@ -2369,6 +2376,18 @@ object TaskManager {
instantiateCPUMetrics(jvm.addGroup("CPU"))
}
+ private def instantiateNetworkMetrics(
+ metrics: MetricGroup,
+ network: NetworkEnvironment)
+ : Unit = {
+ metrics.gauge[Long, FlinkGauge[Long]]("TotalMemorySegments", new FlinkGauge[Long] {
+ override def getValue: Long = network.getNetworkBufferPool.getTotalNumberOfMemorySegments
+ })
+ metrics.gauge[Long, FlinkGauge[Long]]("AvailableMemorySegments", new FlinkGauge[Long] {
+ override def getValue: Long = network.getNetworkBufferPool.getNumberOfAvailableMemorySegments
+ })
+ }
+
private def instantiateClassLoaderMetrics(metrics: MetricGroup) {
val mxBean = ManagementFactory.getClassLoadingMXBean
[2/2] flink git commit: [FLINK-4231] [java] Switch DistinctOperator
from GroupReduce to Reduce
Posted by gr...@apache.org.
[FLINK-4231] [java] Switch DistinctOperator from GroupReduce to Reduce
Rewrite the DistinctOperator using Reduce to support both the sort-based
combine and the recently added hash-based combine. This is configured by
the new method DistinctOperator.setCombineHint(CombineHint).
The tests for combineability are removed as Reduce is inherently
combineable.
This closes #2272
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ad8e665f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ad8e665f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ad8e665f
Branch: refs/heads/master
Commit: ad8e665f0607414b0ed50eab01e14c1446e86569
Parents: 28743cf
Author: Greg Hogan <co...@greghogan.com>
Authored: Mon Jul 18 11:50:44 2016 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Wed Aug 24 09:02:22 2016 -0400
----------------------------------------------------------------------
.../operators/base/ReduceOperatorBase.java | 7 +-
.../api/java/operators/DistinctOperator.java | 82 ++++++++++++--------
.../api/java/operators/ReduceOperator.java | 2 +-
.../translation/DistinctTranslationTest.java | 52 ++++---------
.../optimizer/DistinctCompilationTest.java | 66 ++++++++++++++--
.../translation/DistinctTranslationTest.scala | 52 -------------
6 files changed, 128 insertions(+), 133 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ad8e665f/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
index 88a6fac..7828748 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
@@ -31,6 +31,7 @@ import org.apache.flink.api.common.operators.util.TypeComparable;
import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
import org.apache.flink.api.common.operators.util.UserCodeWrapper;
+import org.apache.flink.api.common.typeinfo.AtomicType;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.common.typeutils.TypeComparator;
@@ -191,7 +192,7 @@ public class ReduceOperatorBase<T, FT extends ReduceFunction<T>> extends SingleI
int[] inputColumns = getKeyColumns(0);
- if (!(inputType instanceof CompositeType) && inputColumns.length > 0) {
+ if (!(inputType instanceof CompositeType) && inputColumns.length > 1) {
throw new InvalidProgramException("Grouping is only possible on composite types.");
}
@@ -202,7 +203,9 @@ public class ReduceOperatorBase<T, FT extends ReduceFunction<T>> extends SingleI
if (inputColumns.length > 0) {
boolean[] inputOrderings = new boolean[inputColumns.length];
- TypeComparator<T> inputComparator = ((CompositeType<T>) inputType).createComparator(inputColumns, inputOrderings, 0, executionConfig);
+ TypeComparator<T> inputComparator = inputType instanceof AtomicType
+ ? ((AtomicType<T>) inputType).createComparator(false, executionConfig)
+ : ((CompositeType<T>) inputType).createComparator(inputColumns, inputOrderings, 0, executionConfig);
Map<TypeComparable<T>, T> aggregateMap = new HashMap<TypeComparable<T>, T>(inputData.size() / 10);
http://git-wip-us.apache.org/repos/asf/flink/blob/ad8e665f/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
index ee1669d..267513d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DistinctOperator.java
@@ -20,19 +20,19 @@ package org.apache.flink.api.java.operators;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
-import org.apache.flink.api.common.functions.GroupCombineFunction;
-import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.operators.Keys;
+import org.apache.flink.api.common.operators.Keys.SelectorFunctionKeys;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
-import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
+import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
+import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.operators.Keys.SelectorFunctionKeys;
-import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.Collector;
import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceOperator;
+import org.apache.flink.api.java.tuple.Tuple2;
/**
* This operator represents the application of a "distinct" function on a data set, and the
@@ -47,6 +47,8 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera
private final String distinctLocationName;
+ private CombineHint hint = CombineHint.OPTIMIZER_CHOOSES;
+
public DistinctOperator(DataSet<T> input, Keys<T> keys, String distinctLocationName) {
super(input, input.getType());
@@ -61,9 +63,9 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera
}
@Override
- protected org.apache.flink.api.common.operators.base.GroupReduceOperatorBase<?, T, ?> translateToDataFlow(Operator<T> input) {
+ protected org.apache.flink.api.common.operators.SingleInputOperator<?, T, ?> translateToDataFlow(Operator<T> input) {
- final GroupReduceFunction<T, T> function = new DistinctFunction<>();
+ final ReduceFunction<T> function = new DistinctFunction<>();
String name = getName() != null ? getName() : "Distinct at " + distinctLocationName;
@@ -71,10 +73,10 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera
int[] logicalKeyPositions = keys.computeLogicalKeyPositions();
UnaryOperatorInformation<T, T> operatorInfo = new UnaryOperatorInformation<>(getInputType(), getResultType());
- GroupReduceOperatorBase<T, T, GroupReduceFunction<T, T>> po =
- new GroupReduceOperatorBase<>(function, operatorInfo, logicalKeyPositions, name);
+ ReduceOperatorBase<T, ReduceFunction<T>> po =
+ new ReduceOperatorBase<>(function, operatorInfo, logicalKeyPositions, name);
- po.setCombinable(true);
+ po.setCombineHint(hint);
po.setInput(input);
po.setParallelism(getParallelism());
@@ -96,10 +98,8 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera
@SuppressWarnings("unchecked")
SelectorFunctionKeys<T, ?> selectorKeys = (SelectorFunctionKeys<T, ?>) keys;
- PlanUnwrappingReduceGroupOperator<T, T, ?> po = translateSelectorFunctionDistinct(
- selectorKeys, function, getResultType(), name, input);
-
- po.setParallelism(this.getParallelism());
+ org.apache.flink.api.common.operators.SingleInputOperator<?, T, ?> po =
+ translateSelectorFunctionDistinct(selectorKeys, function, getResultType(), name, input, parallelism, hint);
return po;
}
@@ -108,41 +108,55 @@ public class DistinctOperator<T> extends SingleInputOperator<T, T, DistinctOpera
}
}
+ /**
+ * Sets the strategy to use for the combine phase of the reduce.
+ *
+ * If this method is not called, then the default hint will be used.
+ * ({@link org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint#OPTIMIZER_CHOOSES})
+ *
+ * @param strategy The hint to use.
+ * @return The DistinctOperator object, for function call chaining.
+ */
+ @PublicEvolving
+ public DistinctOperator<T> setCombineHint(CombineHint strategy) {
+ this.hint = strategy;
+ return this;
+ }
+
// --------------------------------------------------------------------------------------------
- private static <IN, OUT, K> PlanUnwrappingReduceGroupOperator<IN, OUT, K> translateSelectorFunctionDistinct(
+ private static <IN, K> org.apache.flink.api.common.operators.SingleInputOperator<?, IN, ?> translateSelectorFunctionDistinct(
SelectorFunctionKeys<IN, ?> rawKeys,
- GroupReduceFunction<IN, OUT> function,
- TypeInformation<OUT> outputType,
+ ReduceFunction<IN> function,
+ TypeInformation<IN> outputType,
String name,
- Operator<IN> input)
+ Operator<IN> input,
+ int parallelism,
+ CombineHint hint)
{
@SuppressWarnings("unchecked")
final SelectorFunctionKeys<IN, K> keys = (SelectorFunctionKeys<IN, K>) rawKeys;
-
+
TypeInformation<Tuple2<K, IN>> typeInfoWithKey = KeyFunctions.createTypeWithKey(keys);
Operator<Tuple2<K, IN>> keyedInput = KeyFunctions.appendKeyExtractor(input, keys);
-
- PlanUnwrappingReduceGroupOperator<IN, OUT, K> reducer =
- new PlanUnwrappingReduceGroupOperator<>(function, keys, name, outputType, typeInfoWithKey, true);
+
+ PlanUnwrappingReduceOperator<IN, K> reducer =
+ new PlanUnwrappingReduceOperator<>(function, keys, name, outputType, typeInfoWithKey);
reducer.setInput(keyedInput);
+ reducer.setCombineHint(hint);
+ reducer.setParallelism(parallelism);
- return reducer;
+ return KeyFunctions.appendKeyRemover(reducer, keys);
}
-
+
@Internal
- public static final class DistinctFunction<T> implements GroupReduceFunction<T, T>, GroupCombineFunction<T, T> {
+ public static final class DistinctFunction<T> implements ReduceFunction<T> {
private static final long serialVersionUID = 1L;
@Override
- public void reduce(Iterable<T> values, Collector<T> out) {
- out.collect(values.iterator().next());
- }
-
- @Override
- public void combine(Iterable<T> values, Collector<T> out) {
- out.collect(values.iterator().next());
+ public T reduce(T value1, T value2) throws Exception {
+ return value1;
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ad8e665f/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
index e02b64f..42dcf05 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/ReduceOperator.java
@@ -165,7 +165,7 @@ public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOpe
* Sets the strategy to use for the combine phase of the reduce.
*
* If this method is not called, then the default hint will be used.
- * ({@link org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint.OPTIMIZER_CHOOSES})
+ * ({@link org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint#OPTIMIZER_CHOOSES})
*
* @param strategy The hint to use.
* @return The ReduceOperator object, for function call chaining.
http://git-wip-us.apache.org/repos/asf/flink/blob/ad8e665f/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java
index 9824ee1..cbdac4a 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DistinctTranslationTest.java
@@ -21,14 +21,13 @@ package org.apache.flink.api.java.operators.translation;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.operators.GenericDataSinkBase;
import org.apache.flink.api.common.operators.GenericDataSourceBase;
-import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
import org.apache.flink.api.common.operators.base.MapOperatorBase;
+import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.api.java.operators.DistinctOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
@@ -50,31 +49,6 @@ import static org.junit.Assert.fail;
public class DistinctTranslationTest {
@Test
- public void testCombinable() {
- try {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<String> input = env.fromElements("1", "2", "1", "3");
-
-
- DistinctOperator<String> op = input.distinct(new KeySelector<String, String>() {
- public String getKey(String value) { return value; }
- });
-
- op.output(new DiscardingOutputFormat<String>());
-
- Plan p = env.createProgramPlan();
-
- GroupReduceOperatorBase<?, ?, ?> reduceOp = (GroupReduceOperatorBase<?, ?, ?>) p.getDataSinks().iterator().next().getInput();
- assertTrue(reduceOp.isCombinable());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
public void translateDistinctPlain() {
try {
final int parallelism = 8;
@@ -88,8 +62,8 @@ public class DistinctTranslationTest {
GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
- // currently distinct is translated to a GroupReduce
- GroupReduceOperatorBase<?, ?, ?> reducer = (GroupReduceOperatorBase<?, ?, ?>) sink.getInput();
+ // currently distinct is translated to a Reduce
+ ReduceOperatorBase<?, ?> reducer = (ReduceOperatorBase<?, ?>) sink.getInput();
// check types
assertEquals(initialData.getType(), reducer.getOperatorInfo().getInputType());
@@ -124,8 +98,8 @@ public class DistinctTranslationTest {
GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
- // currently distinct is translated to a GroupReduce
- GroupReduceOperatorBase<?, ?, ?> reducer = (GroupReduceOperatorBase<?, ?, ?>) sink.getInput();
+ // currently distinct is translated to a Reduce
+ ReduceOperatorBase<?, ?> reducer = (ReduceOperatorBase<?, ?>) sink.getInput();
// check types
assertEquals(initialData.getType(), reducer.getOperatorInfo().getInputType());
@@ -160,8 +134,8 @@ public class DistinctTranslationTest {
GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
- // currently distinct is translated to a GroupReduce
- GroupReduceOperatorBase<?, ?, ?> reducer = (GroupReduceOperatorBase<?, ?, ?>) sink.getInput();
+ // currently distinct is translated to a Reduce
+ ReduceOperatorBase<?, ?> reducer = (ReduceOperatorBase<?, ?>) sink.getInput();
// check types
assertEquals(initialData.getType(), reducer.getOperatorInfo().getInputType());
@@ -200,7 +174,8 @@ public class DistinctTranslationTest {
GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
- PlanUnwrappingReduceGroupOperator<?, ?, ?> reducer = (PlanUnwrappingReduceGroupOperator<?, ?, ?>) sink.getInput();
+ MapOperatorBase<?, ?, ?> keyRemover = (MapOperatorBase<?, ?, ?>) sink.getInput();
+ PlanUnwrappingReduceOperator<?, ?> reducer = (PlanUnwrappingReduceOperator<?, ?>) keyRemover.getInput();
MapOperatorBase<?, ?, ?> keyExtractor = (MapOperatorBase<?, ?, ?>) reducer.getInput();
// check the parallelisms
@@ -216,7 +191,10 @@ public class DistinctTranslationTest {
assertEquals(keyValueInfo, keyExtractor.getOperatorInfo().getOutputType());
assertEquals(keyValueInfo, reducer.getOperatorInfo().getInputType());
- assertEquals(initialData.getType(), reducer.getOperatorInfo().getOutputType());
+ assertEquals(keyValueInfo, reducer.getOperatorInfo().getOutputType());
+
+ assertEquals(keyValueInfo, keyRemover.getOperatorInfo().getInputType());
+ assertEquals(initialData.getType(), keyRemover.getOperatorInfo().getOutputType());
// check keys
assertEquals(KeyExtractingMapper.class, keyExtractor.getUserCodeWrapper().getUserCodeClass());
@@ -244,8 +222,8 @@ public class DistinctTranslationTest {
GenericDataSinkBase<?> sink = p.getDataSinks().iterator().next();
- // currently distinct is translated to a GroupReduce
- GroupReduceOperatorBase<?, ?, ?> reducer = (GroupReduceOperatorBase<?, ?, ?>) sink.getInput();
+ // currently distinct is translated to a Reduce
+ ReduceOperatorBase<?, ?> reducer = (ReduceOperatorBase<?, ?>) sink.getInput();
// check types
assertEquals(initialData.getType(), reducer.getOperatorInfo().getInputType());
http://git-wip-us.apache.org/repos/asf/flink/blob/ad8e665f/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
index 20a4ef6..89e0f21 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/DistinctCompilationTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.optimizer;
import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint;
import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
@@ -29,8 +30,8 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
import org.apache.flink.optimizer.plan.SinkPlanNode;
import org.apache.flink.optimizer.plan.SourcePlanNode;
-import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.optimizer.util.CompilerTestBase;
+import org.apache.flink.runtime.operators.DriverStrategy;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
@@ -71,8 +72,8 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io
assertEquals(reduceNode, sinkNode.getInput().getSource());
// check that both reduce and combiner have the same strategy
- assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy());
- assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combineNode.getDriverStrategy());
+ assertEquals(DriverStrategy.SORTED_REDUCE, reduceNode.getDriverStrategy());
+ assertEquals(DriverStrategy.SORTED_PARTIAL_REDUCE, combineNode.getDriverStrategy());
// check the keys
assertEquals(new FieldList(0, 1), reduceNode.getKeys(0));
@@ -93,6 +94,57 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io
}
@Test
+ public void testDistinctWithCombineHint() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(8);
+
+ DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
+ .name("source").setParallelism(6);
+
+ data
+ .distinct().setCombineHint(CombineHint.HASH).name("reducer")
+ .output(new DiscardingOutputFormat<Tuple2<String, Double>>()).name("sink");
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+ OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
+
+ // get the original nodes
+ SourcePlanNode sourceNode = resolver.getNode("source");
+ SingleInputPlanNode reduceNode = resolver.getNode("reducer");
+ SinkPlanNode sinkNode = resolver.getNode("sink");
+
+ // get the combiner
+ SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource();
+
+ // check wiring
+ assertEquals(sourceNode, combineNode.getInput().getSource());
+ assertEquals(reduceNode, sinkNode.getInput().getSource());
+
+ // check that both reduce and combiner have the same strategy
+ assertEquals(DriverStrategy.SORTED_REDUCE, reduceNode.getDriverStrategy());
+ assertEquals(DriverStrategy.HASHED_PARTIAL_REDUCE, combineNode.getDriverStrategy());
+
+ // check the keys
+ assertEquals(new FieldList(0, 1), reduceNode.getKeys(0));
+ assertEquals(new FieldList(0, 1), combineNode.getKeys(0));
+ assertEquals(new FieldList(0, 1), reduceNode.getInput().getLocalStrategyKeys());
+
+ // check parallelism
+ assertEquals(6, sourceNode.getParallelism());
+ assertEquals(6, combineNode.getParallelism());
+ assertEquals(8, reduceNode.getParallelism());
+ assertEquals(8, sinkNode.getParallelism());
+ } catch (Exception e) {
+ System.err.println(e.getMessage());
+ e.printStackTrace();
+ fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
+ }
+ }
+
+ @Test
public void testDistinctWithSelectorFunctionKey() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -129,8 +181,8 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io
assertEquals(keyProjector, sinkNode.getInput().getSource());
// check that both reduce and combiner have the same strategy
- assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy());
- assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combineNode.getDriverStrategy());
+ assertEquals(DriverStrategy.SORTED_REDUCE, reduceNode.getDriverStrategy());
+ assertEquals(DriverStrategy.SORTED_PARTIAL_REDUCE, combineNode.getDriverStrategy());
// check the keys
assertEquals(new FieldList(0), reduceNode.getKeys(0));
@@ -185,8 +237,8 @@ public class DistinctCompilationTest extends CompilerTestBase implements java.io
assertEquals(reduceNode, sinkNode.getInput().getSource());
// check that both reduce and combiner have the same strategy
- assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy());
- assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combineNode.getDriverStrategy());
+ assertEquals(DriverStrategy.SORTED_REDUCE, reduceNode.getDriverStrategy());
+ assertEquals(DriverStrategy.SORTED_PARTIAL_REDUCE, combineNode.getDriverStrategy());
// check the keys
assertEquals(new FieldList(1), reduceNode.getKeys(0));
http://git-wip-us.apache.org/repos/asf/flink/blob/ad8e665f/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DistinctTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DistinctTranslationTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DistinctTranslationTest.scala
deleted file mode 100644
index c540f61..0000000
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/DistinctTranslationTest.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.operators.translation
-
-import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase
-import org.apache.flink.api.java.io.DiscardingOutputFormat
-import org.junit.Assert
-import org.junit.Test
-
-import org.apache.flink.api.scala._
-
-class DistinctTranslationTest {
- @Test
- def testCombinable(): Unit = {
- try {
- val env = ExecutionEnvironment.getExecutionEnvironment
- val input = env.fromElements("1", "2", "1", "3")
-
- val op = input.distinct { x => x}
- op.output(new DiscardingOutputFormat[String])
-
- val p = env.createProgramPlan()
-
- val reduceOp =
- p.getDataSinks.iterator.next.getInput.asInstanceOf[GroupReduceOperatorBase[_, _, _]]
-
- Assert.assertTrue(reduceOp.isCombinable)
- }
- catch {
- case e: Exception => {
- e.printStackTrace()
- Assert.fail(e.getMessage)
- }
- }
- }
-}
-