You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/11/06 12:58:28 UTC
[1/2] git commit: [FLINK-1214] Prevent partitionings on subsets of
fields from being pushed down
Repository: incubator-flink
Updated Branches:
refs/heads/master c3835cdf3 -> 6ecd0f826
[FLINK-1214] Prevent partitionings on subsets of fields from being pushed down
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/6ecd0f82
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/6ecd0f82
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/6ecd0f82
Branch: refs/heads/master
Commit: 6ecd0f8264ab5cfb6101046a415c996993b682e5
Parents: a586614
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Nov 5 15:23:20 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Nov 5 15:25:56 2014 +0100
----------------------------------------------------------------------
.../flink/compiler/dag/SingleInputNode.java | 13 +++
.../flink/compiler/PartitionPushdownTest.java | 104 +++++++++++++++++++
.../iterations/ConnectedComponentsTest.java | 4 +-
3 files changed, 119 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6ecd0f82/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SingleInputNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SingleInputNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SingleInputNode.java
index b3d639b..730c1bb 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SingleInputNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/SingleInputNode.java
@@ -220,8 +220,21 @@ public abstract class SingleInputNode extends OptimizerNode {
// add all properties relevant to this node
for (OperatorDescriptorSingle dps : getPossibleProperties()) {
for (RequestedGlobalProperties gp : dps.getPossibleGlobalProperties()) {
+
+ if (gp.getPartitioning().isPartitionedOnKey()) {
+ // make sure that among the same partitioning types, we do not push anything down that has fewer key fields
+
+ for (RequestedGlobalProperties contained : props.getGlobalProperties()) {
+ if (contained.getPartitioning() == gp.getPartitioning() && gp.getPartitionedFields().isValidSubset(contained.getPartitionedFields())) {
+ props.getGlobalProperties().remove(contained);
+ break;
+ }
+ }
+ }
+
props.addGlobalProperties(gp);
}
+
for (RequestedLocalProperties lp : dps.getPossibleLocalProperties()) {
props.addLocalProperties(lp);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6ecd0f82/flink-compiler/src/test/java/org/apache/flink/compiler/PartitionPushdownTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/PartitionPushdownTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/PartitionPushdownTest.java
new file mode 100644
index 0000000..7d45015
--- /dev/null
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/PartitionPushdownTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.compiler.plan.OptimizedPlan;
+import org.apache.flink.compiler.plan.SingleInputPlanNode;
+import org.apache.flink.compiler.plan.SinkPlanNode;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class PartitionPushdownTest extends CompilerTestBase {
+
+ @Test
+ public void testPartitioningNotPushedDown() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ @SuppressWarnings("unchecked")
+ DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L));
+
+ input
+ .groupBy(0, 1).sum(2)
+ .groupBy(0).sum(1)
+ .print();
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+ SinkPlanNode sink = op.getDataSinks().iterator().next();
+
+ SingleInputPlanNode agg2Reducer = (SingleInputPlanNode) sink.getInput().getSource();
+ SingleInputPlanNode agg2Combiner = (SingleInputPlanNode) agg2Reducer.getInput().getSource();
+ SingleInputPlanNode agg1Reducer = (SingleInputPlanNode) agg2Combiner.getInput().getSource();
+
+ assertEquals(ShipStrategyType.PARTITION_HASH, agg2Reducer.getInput().getShipStrategy());
+ assertEquals(new FieldList(0), agg2Reducer.getInput().getShipStrategyKeys());
+
+ assertEquals(ShipStrategyType.FORWARD, agg2Combiner.getInput().getShipStrategy());
+
+ assertEquals(ShipStrategyType.PARTITION_HASH, agg1Reducer.getInput().getShipStrategy());
+ assertEquals(new FieldList(0, 1), agg1Reducer.getInput().getShipStrategyKeys());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testPartitioningReused() {
+ try {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ @SuppressWarnings("unchecked")
+ DataSet<Tuple3<Long, Long, Long>> input = env.fromElements(new Tuple3<Long, Long, Long>(0L, 0L, 0L));
+
+ input
+ .groupBy(0).sum(1)
+ .groupBy(0, 1).sum(2)
+ .print();
+
+ Plan p = env.createProgramPlan();
+ OptimizedPlan op = compileNoStats(p);
+
+ SinkPlanNode sink = op.getDataSinks().iterator().next();
+
+ SingleInputPlanNode agg2Reducer = (SingleInputPlanNode) sink.getInput().getSource();
+ SingleInputPlanNode agg1Reducer = (SingleInputPlanNode) agg2Reducer.getInput().getSource();
+
+ assertEquals(ShipStrategyType.FORWARD, agg2Reducer.getInput().getShipStrategy());
+
+ assertEquals(ShipStrategyType.PARTITION_HASH, agg1Reducer.getInput().getShipStrategy());
+ assertEquals(new FieldList(0), agg1Reducer.getInput().getShipStrategyKeys());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6ecd0f82/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/ConnectedComponentsTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/ConnectedComponentsTest.java b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/ConnectedComponentsTest.java
index 2a43c64..eec02f8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/ConnectedComponentsTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/ConnectedComponentsTest.java
@@ -136,7 +136,7 @@ public class ConnectedComponentsTest extends CompilerTestBase {
// test all the local strategies
Assert.assertEquals(LocalStrategy.NONE, sink.getInput().getLocalStrategy());
Assert.assertEquals(LocalStrategy.NONE, iter.getInitialSolutionSetInput().getLocalStrategy());
-// Assert.assertEquals(LocalStrategy.NONE, iter.getInitialWorksetInput().getLocalStrategy());
+ Assert.assertEquals(LocalStrategy.NONE, iter.getInitialWorksetInput().getLocalStrategy());
Assert.assertEquals(LocalStrategy.NONE, neighborsJoin.getInput1().getLocalStrategy()); // workset
Assert.assertEquals(LocalStrategy.NONE, neighborsJoin.getInput2().getLocalStrategy()); // edges
@@ -217,7 +217,7 @@ public class ConnectedComponentsTest extends CompilerTestBase {
// test all the local strategies
Assert.assertEquals(LocalStrategy.NONE, sink.getInput().getLocalStrategy());
Assert.assertEquals(LocalStrategy.NONE, iter.getInitialSolutionSetInput().getLocalStrategy());
-// Assert.assertEquals(LocalStrategy.NONE, iter.getInitialWorksetInput().getLocalStrategy());
+ Assert.assertEquals(LocalStrategy.NONE, iter.getInitialWorksetInput().getLocalStrategy());
Assert.assertEquals(LocalStrategy.NONE, neighborsJoin.getInput1().getLocalStrategy()); // workset
Assert.assertEquals(LocalStrategy.NONE, neighborsJoin.getInput2().getLocalStrategy()); // edges
[2/2] git commit: [FLINK-1005] By default,
new objects are created for each element in group operations
Posted by se...@apache.org.
[FLINK-1005] By default, new objects are created for each element in group operations
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/a586614a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/a586614a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/a586614a
Branch: refs/heads/master
Commit: a586614aea9a639efcb096797dd7aa68306f8af4
Parents: c3835cd
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Nov 5 12:19:24 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Nov 5 15:25:56 2014 +0100
----------------------------------------------------------------------
.../java/org/apache/flink/runtime/operators/util/TaskConfig.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a586614a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
index eba5319..1b44a3b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
@@ -344,7 +344,7 @@ public class TaskConfig {
}
public boolean getMutableObjectMode() {
- return this.config.getBoolean(DRIVER_MUTABLE_OBJECT_MODE, true);
+ return this.config.getBoolean(DRIVER_MUTABLE_OBJECT_MODE, false);
}
public void setDriverComparator(TypeComparatorFactory<?> factory, int inputNum) {