You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/07/19 19:01:42 UTC

flink git commit: [FLINK-7204] [core] CombineHint.NONE

Repository: flink
Updated Branches:
  refs/heads/master 9beccec45 -> ce10e57bc


[FLINK-7204] [core] CombineHint.NONE

Add a new option to CombineHint which excludes the creation of a
combiner for a reduce function.

Gelly now excludes the combiner when simplifying graphs as used in most
algorithm unit and integration tests.

This closes #4350


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ce10e57b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ce10e57b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ce10e57b

Branch: refs/heads/master
Commit: ce10e57bc163babd59005fa250c26e7604f23cf5
Parents: 9beccec
Author: Greg Hogan <co...@greghogan.com>
Authored: Sun Jul 16 07:24:59 2017 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Wed Jul 19 14:44:49 2017 -0400

----------------------------------------------------------------------
 .../operators/base/ReduceOperatorBase.java      |  8 ++-
 .../graph/asm/simple/directed/Simplify.java     |  2 +
 .../graph/asm/simple/undirected/Simplify.java   |  2 +
 .../apache/flink/optimizer/dag/ReduceNode.java  | 10 ++--
 .../GroupReduceWithCombineProperties.java       |  2 +-
 .../optimizer/operators/ReduceProperties.java   | 25 ++++----
 .../optimizer/java/ReduceCompilationTest.java   | 62 ++++++++++++++++++--
 7 files changed, 86 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ce10e57b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
index 7828748..f97e4d6 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java
@@ -76,7 +76,13 @@ public class ReduceOperatorBase<T, FT extends ReduceFunction<T>> extends SingleI
 		 * Use a hash-based strategy. This should be faster in most cases, especially if the number
 		 * of different keys is small compared to the number of input elements (eg. 1/10).
 		 */
-		HASH
+		HASH,
+
+		/**
+		 * Disable the use of a combiner. This can be faster in cases when the number of different keys
+		 * is very small compared to the number of input elements (eg. 1/100).
+		 */
+		NONE
 	}
 
 	private CombineHint hint;

http://git-wip-us.apache.org/repos/asf/flink/blob/ce10e57b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
index 1bab9c6..511840a 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
@@ -19,6 +19,7 @@
 package org.apache.flink.graph.asm.simple.directed;
 
 import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
@@ -44,6 +45,7 @@ extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> {
 				.setParallelism(parallelism)
 				.name("Remove self-loops")
 			.distinct(0, 1)
+				.setCombineHint(CombineHint.NONE)
 				.setParallelism(parallelism)
 				.name("Remove duplicate edges");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ce10e57b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
index 6f1e282..21db233 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
@@ -19,6 +19,7 @@
 package org.apache.flink.graph.asm.simple.undirected;
 
 import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
@@ -74,6 +75,7 @@ extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> {
 				.setParallelism(parallelism)
 				.name("Remove self-loops")
 			.distinct(0, 1)
+				.setCombineHint(CombineHint.NONE)
 				.setParallelism(parallelism)
 				.name("Remove duplicate edges");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ce10e57b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java
index e83352e..1a1f3eb 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java
@@ -16,12 +16,8 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.optimizer.dag;
 
-import java.util.Collections;
-import java.util.List;
-
 import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.operators.AllReduceProperties;
@@ -29,6 +25,9 @@ import org.apache.flink.optimizer.operators.OperatorDescriptorSingle;
 import org.apache.flink.optimizer.operators.ReduceProperties;
 import org.apache.flink.runtime.operators.DriverStrategy;
 
+import java.util.Collections;
+import java.util.List;
+
 /**
  * The Optimizer representation of a <i>Reduce</i> operator.
  */
@@ -63,6 +62,9 @@ public class ReduceNode extends SingleInputNode {
 				case HASH:
 					combinerStrategy = DriverStrategy.HASHED_PARTIAL_REDUCE;
 					break;
+				case NONE:
+					combinerStrategy = DriverStrategy.NONE;
+					break;
 				default:
 					throw new RuntimeException("Unknown CombineHint");
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/ce10e57b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
index 888b670..accd11b 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
@@ -105,7 +105,7 @@ public final class GroupReduceWithCombineProperties extends OperatorDescriptorSi
 				in.setLocalStrategy(LocalStrategy.COMBININGSORT, in.getLocalStrategyKeys(),
 									in.getLocalStrategySortOrder());
 			}
-			return new SingleInputPlanNode(node, "Reduce("+node.getOperator().getName()+")", in,
+			return new SingleInputPlanNode(node, "Reduce ("+node.getOperator().getName()+")", in,
 											DriverStrategy.SORTED_GROUP_REDUCE, this.keyList);
 		} else {
 			// non forward case. all local properties are killed anyways, so we can safely plug in a combiner

http://git-wip-us.apache.org/repos/asf/flink/blob/ce10e57b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java
index d8e5a6c..eab31d3 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java
@@ -65,20 +65,18 @@ public final class ReduceProperties extends OperatorDescriptorSingle {
 
 	@Override
 	public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
+		Channel toReducer = in;
+
 		if (in.getShipStrategy() == ShipStrategyType.FORWARD ||
-				(node.getBroadcastConnections() != null && !node.getBroadcastConnections().isEmpty()))
-		{
-			if(in.getSource().getOptimizerNode() instanceof PartitionNode) {
+				(node.getBroadcastConnections() != null && !node.getBroadcastConnections().isEmpty())) {
+			if (in.getSource().getOptimizerNode() instanceof PartitionNode) {
 				LOG.warn("Cannot automatically inject combiner for ReduceFunction. Please add an explicit combiner with combineGroup() in front of the partition operator.");
 			}
-			return new SingleInputPlanNode(node, "Reduce ("+node.getOperator().getName()+")", in,
-											DriverStrategy.SORTED_REDUCE, this.keyList);
-		}
-		else {
+		} else if (combinerStrategy != DriverStrategy.NONE) {
 			// non forward case. all local properties are killed anyways, so we can safely plug in a combiner
 			Channel toCombiner = new Channel(in.getSource());
 			toCombiner.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-			
+
 			// create an input node for combine with same parallelism as input node
 			ReduceNode combinerNode = ((ReduceNode) node).getCombinerUtilityNode();
 			combinerNode.setParallelism(in.getSource().getParallelism());
@@ -89,15 +87,16 @@ public final class ReduceProperties extends OperatorDescriptorSingle {
 
 			combiner.setCosts(new Costs(0, 0));
 			combiner.initProperties(toCombiner.getGlobalProperties(), toCombiner.getLocalProperties());
-			
-			Channel toReducer = new Channel(combiner);
+
+			toReducer = new Channel(combiner);
 			toReducer.setShipStrategy(in.getShipStrategy(), in.getShipStrategyKeys(),
 										in.getShipStrategySortOrder(), in.getDataExchangeMode());
 			toReducer.setLocalStrategy(LocalStrategy.SORT, in.getLocalStrategyKeys(), in.getLocalStrategySortOrder());
-
-			return new SingleInputPlanNode(node, "Reduce("+node.getOperator().getName()+")", toReducer,
-											DriverStrategy.SORTED_REDUCE, this.keyList);
 		}
+
+		return new SingleInputPlanNode(node, "Reduce (" + node.getOperator().getName() + ")", toReducer,
+			DriverStrategy.SORTED_REDUCE, this.keyList);
+
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/ce10e57b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
index f513155..d2c640f 100644
--- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
+++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java
@@ -19,23 +19,25 @@
 package org.apache.flink.optimizer.java;
 
 import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint;
 import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.optimizer.util.CompilerTestBase;
-import org.junit.Test;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.SingleInputPlanNode;
 import org.apache.flink.optimizer.plan.SinkPlanNode;
 import org.apache.flink.optimizer.plan.SourcePlanNode;
+import org.apache.flink.optimizer.util.CompilerTestBase;
 import org.apache.flink.runtime.operators.DriverStrategy;
 
-import static org.junit.Assert.*;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 @SuppressWarnings("serial")
 public class ReduceCompilationTest extends CompilerTestBase implements java.io.Serializable {
@@ -327,4 +329,52 @@ public class ReduceCompilationTest extends CompilerTestBase implements java.io.S
 			fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
 		}
 	}
+
+	/**
+	 * Test program compilation when the Reduce's combiner has been excluded
+	 * by setting {@code CombineHint.NONE}.
+	 */
+	@Test
+	public void testGroupedReduceWithoutCombiner() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(8);
+
+		DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
+			.name("source").setParallelism(6);
+
+		data
+			.groupBy(0)
+			.reduce(new RichReduceFunction<Tuple2<String, Double>>() {
+				@Override
+				public Tuple2<String, Double> reduce(Tuple2<String, Double> value1, Tuple2<String, Double> value2) {
+					return null;
+				}
+			}).setCombineHint(CombineHint.NONE).name("reducer")
+			.output(new DiscardingOutputFormat<Tuple2<String, Double>>()).name("sink");
+
+		Plan p = env.createProgramPlan();
+		OptimizedPlan op = compileNoStats(p);
+
+		OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
+
+		// get the original nodes
+		SourcePlanNode sourceNode = resolver.getNode("source");
+		SingleInputPlanNode reduceNode = resolver.getNode("reducer");
+		SinkPlanNode sinkNode = resolver.getNode("sink");
+
+		// check wiring
+		assertEquals(sourceNode, reduceNode.getInput().getSource());
+
+		// check the strategies
+		assertEquals(DriverStrategy.SORTED_REDUCE, reduceNode.getDriverStrategy());
+
+		// check the keys
+		assertEquals(new FieldList(0), reduceNode.getKeys(0));
+		assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys());
+
+		// check parallelism
+		assertEquals(6, sourceNode.getParallelism());
+		assertEquals(8, reduceNode.getParallelism());
+		assertEquals(8, sinkNode.getParallelism());
+	}
 }