You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/11/06 12:58:28 UTC

[1/2] git commit: [FLINK-1214] Prevent partitionings on subsets of fields from being pushed down

Repository: incubator-flink
Updated Branches:
  refs/heads/master c3835cdf3 -> 6ecd0f826


[FLINK-1214] Prevent partitionings on subsets of fields from being pushed down


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/6ecd0f82
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/6ecd0f82
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/6ecd0f82

Branch: refs/heads/master
Commit: 6ecd0f8264ab5cfb6101046a415c996993b682e5
Parents: a586614
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Nov 5 15:23:20 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Nov 5 15:25:56 2014 +0100

----------------------------------------------------------------------
 .../flink/compiler/dag/SingleInputNode.java     |  13 +++
 .../flink/compiler/PartitionPushdownTest.java   | 104 +++++++++++++++++++
 .../iterations/ConnectedComponentsTest.java     |   4 +-
 3 files changed, 119 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6ecd0f82/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SingleInputNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SingleInputNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SingleInputNode.java
index b3d639b..730c1bb 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SingleInputNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SingleInputNode.java
@@ -220,8 +220,21 @@ public abstract class SingleInputNode extends OptimizerNode {
 		// add all properties relevant to this node
 		for (OperatorDescriptorSingle dps : getPossibleProperties()) {
 			for (RequestedGlobalProperties gp : dps.getPossibleGlobalProperties()) {
+				
+				if (gp.getPartitioning().isPartitionedOnKey()) {
+					// make sure that among the same partitioning types, we do not push anything down that has fewer key fields
+					
+					for (RequestedGlobalProperties contained : props.getGlobalProperties()) {
+						if (contained.getPartitioning() == gp.getPartitioning() && gp.getPartitionedFields().isValidSubset(contained.getPartitionedFields())) {
+							props.getGlobalProperties().remove(contained);
+							break;
+						}
+					}
+				}
+				
 				props.addGlobalProperties(gp);
 			}
+			
 			for (RequestedLocalProperties lp : dps.getPossibleLocalProperties()) {
 				props.addLocalProperties(lp);
 			}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6ecd0f82/flink-compiler/src/test/java/org/apache/flink/compiler/PartitionPushdownTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/PartitionPushdownTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/PartitionPushdownTest.java
new file mode 100644
index 0000000..7d45015
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/PartitionPushdownTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.compiler.plan.OptimizedPlan;
+import org.apache.flink.compiler.plan.SingleInputPlanNode;
+import org.apache.flink.compiler.plan.SinkPlanNode;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class PartitionPushdownTest extends CompilerTestBase {
+
+	@Test
+	public void testPartitioningNotPushedDown() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			@SuppressWarnings("unchecked")
+			DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L));
+			
+			input
+				.groupBy(0, 1).sum(2)
+				.groupBy(0).sum(1)
+				.print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			
+			SingleInputPlanNode agg2Reducer = (SingleInputPlanNode) sink.getInput().getSource();
+			SingleInputPlanNode agg2Combiner = (SingleInputPlanNode) agg2Reducer.getInput().getSource();
+			SingleInputPlanNode agg1Reducer = (SingleInputPlanNode) agg2Combiner.getInput().getSource();
+			
+			assertEquals(ShipStrategyType.PARTITION_HASH, agg2Reducer.getInput().getShipStrategy());
+			assertEquals(new FieldList(0), agg2Reducer.getInput().getShipStrategyKeys());
+			
+			assertEquals(ShipStrategyType.FORWARD, agg2Combiner.getInput().getShipStrategy());
+			
+			assertEquals(ShipStrategyType.PARTITION_HASH, agg1Reducer.getInput().getShipStrategy());
+			assertEquals(new FieldList(0, 1), agg1Reducer.getInput().getShipStrategyKeys());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testPartitioningReused() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			@SuppressWarnings("unchecked")
+			DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L));
+			
+			input
+				.groupBy(0).sum(1)
+				.groupBy(0, 1).sum(2)
+				.print();
+			
+			Plan p = env.createProgramPlan();
+			OptimizedPlan op = compileNoStats(p);
+			
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			
+			SingleInputPlanNode agg2Reducer = (SingleInputPlanNode) sink.getInput().getSource();
+			SingleInputPlanNode agg1Reducer = (SingleInputPlanNode) agg2Reducer.getInput().getSource();
+			
+			assertEquals(ShipStrategyType.FORWARD, agg2Reducer.getInput().getShipStrategy());
+			
+			assertEquals(ShipStrategyType.PARTITION_HASH, agg1Reducer.getInput().getShipStrategy());
+			assertEquals(new FieldList(0), agg1Reducer.getInput().getShipStrategyKeys());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6ecd0f82/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/ConnectedComponentsTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/ConnectedComponentsTest.java b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/ConnectedComponentsTest.java
index 2a43c64..eec02f8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/ConnectedComponentsTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/ConnectedComponentsTest.java
@@ -136,7 +136,7 @@ public class ConnectedComponentsTest extends CompilerTestBase {
 		// test all the local strategies
 		Assert.assertEquals(LocalStrategy.NONE, sink.getInput().getLocalStrategy());
 		Assert.assertEquals(LocalStrategy.NONE, iter.getInitialSolutionSetInput().getLocalStrategy());
-//		Assert.assertEquals(LocalStrategy.NONE, iter.getInitialWorksetInput().getLocalStrategy());
+		Assert.assertEquals(LocalStrategy.NONE, iter.getInitialWorksetInput().getLocalStrategy());
 		
 		Assert.assertEquals(LocalStrategy.NONE, neighborsJoin.getInput1().getLocalStrategy()); // workset
 		Assert.assertEquals(LocalStrategy.NONE, neighborsJoin.getInput2().getLocalStrategy()); // edges
@@ -217,7 +217,7 @@ public class ConnectedComponentsTest extends CompilerTestBase {
 		// test all the local strategies
 		Assert.assertEquals(LocalStrategy.NONE, sink.getInput().getLocalStrategy());
 		Assert.assertEquals(LocalStrategy.NONE, iter.getInitialSolutionSetInput().getLocalStrategy());
-//		Assert.assertEquals(LocalStrategy.NONE, iter.getInitialWorksetInput().getLocalStrategy());
+		Assert.assertEquals(LocalStrategy.NONE, iter.getInitialWorksetInput().getLocalStrategy());
 		
 		Assert.assertEquals(LocalStrategy.NONE, neighborsJoin.getInput1().getLocalStrategy()); // workset
 		Assert.assertEquals(LocalStrategy.NONE, neighborsJoin.getInput2().getLocalStrategy()); // edges


[2/2] git commit: [FLINK-1005] By default, new objects are created for each element in group operations

Posted by se...@apache.org.
[FLINK-1005] By default, new objects are created for each element in group operations


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/a586614a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/a586614a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/a586614a

Branch: refs/heads/master
Commit: a586614aea9a639efcb096797dd7aa68306f8af4
Parents: c3835cd
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Nov 5 12:19:24 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Nov 5 15:25:56 2014 +0100

----------------------------------------------------------------------
 .../java/org/apache/flink/runtime/operators/util/TaskConfig.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a586614a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
index eba5319..1b44a3b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
@@ -344,7 +344,7 @@ public class TaskConfig {
 	}
 	
 	public boolean getMutableObjectMode() {
-		return this.config.getBoolean(DRIVER_MUTABLE_OBJECT_MODE, true);
+		return this.config.getBoolean(DRIVER_MUTABLE_OBJECT_MODE, false);
 	}
 	
 	public void setDriverComparator(TypeComparatorFactory<?> factory, int inputNum) {