You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/07/19 19:01:42 UTC
flink git commit: [FLINK-7204] [core] CombineHint.NONE
Repository: flink
Updated Branches:
refs/heads/master 9beccec45 -> ce10e57bc
[FLINK-7204] [core] CombineHint.NONE
Add a new option to CombineHint which excludes the creation of a
combiner for a reduce function.
Gelly now excludes the combiner when simplifying graphs as used in most
algorithm unit and integration tests.
This closes #4350
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ce10e57b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ce10e57b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ce10e57b
Branch: refs/heads/master
Commit: ce10e57bc163babd59005fa250c26e7604f23cf5
Parents: 9beccec
Author: Greg Hogan <co...@greghogan.com>
Authored: Sun Jul 16 07:24:59 2017 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Wed Jul 19 14:44:49 2017 -0400
----------------------------------------------------------------------
.../operators/base/ReduceOperatorBase.java | 8 ++-
.../graph/asm/simple/directed/Simplify.java | 2 +
.../graph/asm/simple/undirected/Simplify.java | 2 +
.../apache/flink/optimizer/dag/ReduceNode.java | 10 ++--
.../GroupReduceWithCombineProperties.java | 2 +-
.../optimizer/operators/ReduceProperties.java | 25 ++++----
.../optimizer/java/ReduceCompilationTest.java | 62 ++++++++++++++++++--
7 files changed, 86 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ce10e57b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
index 7828748..f97e4d6 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
@@ -76,7 +76,13 @@ public class ReduceOperatorBase<T, FT extends ReduceFunction<T>> extends SingleI
* Use a hash-based strategy. This should be faster in most cases, especially if the number
* of different keys is small compared to the number of input elements (eg. 1/10).
*/
- HASH
+ HASH,
+
+ /**
+ * Disable the use of a combiner. This can be faster in cases when the number of different keys
+ * is very small compared to the number of input elements (eg. 1/100).
+ */
+ NONE
}
private CombineHint hint;
http://git-wip-us.apache.org/repos/asf/flink/blob/ce10e57b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
index 1bab9c6..511840a 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
@@ -19,6 +19,7 @@
package org.apache.flink.graph.asm.simple.directed;
import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
@@ -44,6 +45,7 @@ extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> {
.setParallelism(parallelism)
.name("Remove self-loops")
.distinct(0, 1)
+ .setCombineHint(CombineHint.NONE)
.setParallelism(parallelism)
.name("Remove duplicate edges");
http://git-wip-us.apache.org/repos/asf/flink/blob/ce10e57b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
index 6f1e282..21db233 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
@@ -19,6 +19,7 @@
package org.apache.flink.graph.asm.simple.undirected;
import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
@@ -74,6 +75,7 @@ extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> {
.setParallelism(parallelism)
.name("Remove self-loops")
.distinct(0, 1)
+ .setCombineHint(CombineHint.NONE)
.setParallelism(parallelism)
.name("Remove duplicate edges");
http://git-wip-us.apache.org/repos/asf/flink/blob/ce10e57b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java
index e83352e..1a1f3eb 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java
@@ -16,12 +16,8 @@
* limitations under the License.
*/
-
package org.apache.flink.optimizer.dag;
-import java.util.Collections;
-import java.util.List;
-
import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.operators.AllReduceProperties;
@@ -29,6 +25,9 @@ import org.apache.flink.optimizer.operators.OperatorDescriptorSingle;
import org.apache.flink.optimizer.operators.ReduceProperties;
import org.apache.flink.runtime.operators.DriverStrategy;
+import java.util.Collections;
+import java.util.List;
+
/**
* The Optimizer representation of a <i>Reduce</i> operator.
*/
@@ -63,6 +62,9 @@ public class ReduceNode extends SingleInputNode {
case HASH:
combinerStrategy = DriverStrategy.HASHED_PARTIAL_REDUCE;
break;
+ case NONE:
+ combinerStrategy = DriverStrategy.NONE;
+ break;
default:
throw new RuntimeException("Unknown CombineHint");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ce10e57b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
index 888b670..accd11b 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
@@ -105,7 +105,7 @@ public final class GroupReduceWithCombineProperties extends OperatorDescriptorSi
in.setLocalStrategy(LocalStrategy.COMBININGSORT, in.getLocalStrategyKeys(),
in.getLocalStrategySortOrder());
}
- return new SingleInputPlanNode(node, "Reduce("+node.getOperator().getName()+")", in,
+ return new SingleInputPlanNode(node, "Reduce ("+node.getOperator().getName()+")", in,
DriverStrategy.SORTED_GROUP_REDUCE, this.keyList);
} else {
// non forward case. all local properties are killed anyways, so we can safely plug in a combiner
http://git-wip-us.apache.org/repos/asf/flink/blob/ce10e57b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java
index d8e5a6c..eab31d3 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java
@@ -65,20 +65,18 @@ public final class ReduceProperties extends OperatorDescriptorSingle {
@Override
public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
+ Channel toReducer = in;
+
if (in.getShipStrategy() == ShipStrategyType.FORWARD ||
- (node.getBroadcastConnections() != null && !node.getBroadcastConnections().isEmpty()))
- {
- if(in.getSource().getOptimizerNode() instanceof PartitionNode) {
+ (node.getBroadcastConnections() != null && !node.getBroadcastConnections().isEmpty())) {
+ if (in.getSource().getOptimizerNode() instanceof PartitionNode) {
LOG.warn("Cannot automatically inject combiner for ReduceFunction. Please add an explicit combiner with combineGroup() in front of the partition operator.");
}
- return new SingleInputPlanNode(node, "Reduce ("+node.getOperator().getName()+")", in,
- DriverStrategy.SORTED_REDUCE, this.keyList);
- }
- else {
+ } else if (combinerStrategy != DriverStrategy.NONE) {
// non forward case. all local properties are killed anyways, so we can safely plug in a combiner
Channel toCombiner = new Channel(in.getSource());
toCombiner.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-
+
// create an input node for combine with same parallelism as input node
ReduceNode combinerNode = ((ReduceNode) node).getCombinerUtilityNode();
combinerNode.setParallelism(in.getSource().getParallelism());
@@ -89,15 +87,16 @@ public final class ReduceProperties extends OperatorDescriptorSingle {
combiner.setCosts(new Costs(0, 0));
combiner.initProperties(toCombiner.getGlobalProperties(), toCombiner.getLocalProperties());
-
- Channel toReducer = new Channel(combiner);
+
+ toReducer = new Channel(combiner);
toReducer.setShipStrategy(in.getShipStrategy(), in.getShipStrategyKeys(),
in.getShipStrategySortOrder(), in.getDataExchangeMode());
toReducer.setLocalStrategy(LocalStrategy.SORT, in.getLocalStrategyKeys(), in.getLocalStrategySortOrder());
-
- return new SingleInputPlanNode(node, "Reduce("+node.getOperator().getName()+")", toReducer,
- DriverStrategy.SORTED_REDUCE, this.keyList);
}
+
+ return new SingleInputPlanNode(node, "Reduce (" + node.getOperator().getName() + ")", toReducer,
+ DriverStrategy.SORTED_REDUCE, this.keyList);
+
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/ce10e57b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
index f513155..d2c640f 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
@@ -19,23 +19,25 @@
package org.apache.flink.optimizer.java;
import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint;
import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.junit.Test;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
import org.apache.flink.optimizer.plan.SinkPlanNode;
import org.apache.flink.optimizer.plan.SourcePlanNode;
+import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.runtime.operators.DriverStrategy;
-import static org.junit.Assert.*;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
@SuppressWarnings("serial")
public class ReduceCompilationTest extends CompilerTestBase implements java.io.Serializable {
@@ -327,4 +329,52 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
}
}
+
+ /**
+ * Test program compilation when the Reduce's combiner has been excluded
+ * by setting {@code CombineHint.NONE}.
+ */
+ @Test
+ public void testGroupedReduceWithoutCombiner() {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(8);
+
+ DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
+ .name("source").setParallelism(6);
+
+ data
+ .groupBy(0)
+ .reduce(new RichReduceFunction<Tuple2<String, Double>>() {
+ @Override
+ public Tuple2<String, Double> reduce(Tuple2<String, Double> value1, Tuple2<String, Double> value2) {
+ return null;
+ }
+ }).setCombineHint(CombineHint.NONE).name("reducer")
+ .output(new DiscardingOutputFormat<Tuple2<String, Double>>()).name("sink");
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+ OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
+
+ // get the original nodes
+ SourcePlanNode sourceNode = resolver.getNode("source");
+ SingleInputPlanNode reduceNode = resolver.getNode("reducer");
+ SinkPlanNode sinkNode = resolver.getNode("sink");
+
+ // check wiring
+ assertEquals(sourceNode, reduceNode.getInput().getSource());
+
+ // check the strategies
+ assertEquals(DriverStrategy.SORTED_REDUCE, reduceNode.getDriverStrategy());
+
+ // check the keys
+ assertEquals(new FieldList(0), reduceNode.getKeys(0));
+ assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys());
+
+ // check parallelism
+ assertEquals(6, sourceNode.getParallelism());
+ assertEquals(8, reduceNode.getParallelism());
+ assertEquals(8, sinkNode.getParallelism());
+ }
}