You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/07/09 12:21:39 UTC
[12/12] git commit: Fix optimizer error in channel instantiation
(wrong parameterization)
Fix optimizer error in channel instantiation (wrong parameterization)
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/26bc321c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/26bc321c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/26bc321c
Branch: refs/heads/master
Commit: 26bc321c31c6625f782a1adfbc97a91e1cc0d288
Parents: 49ca3ed
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Jul 9 11:23:18 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jul 9 11:23:18 2014 +0200
----------------------------------------------------------------------
.../stratosphere/compiler/costs/DefaultCostEstimator.java | 2 +-
.../java/eu/stratosphere/compiler/dag/TwoInputNode.java | 8 ++++++--
.../compiler/examples/RelationalQueryCompilerTest.java | 1 +
.../test/compiler/iterations/ConnectedComponentsTest.java | 10 ++++++----
4 files changed, 14 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/26bc321c/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/DefaultCostEstimator.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/DefaultCostEstimator.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/DefaultCostEstimator.java
index 93eb66a..ef3e51c 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/DefaultCostEstimator.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/DefaultCostEstimator.java
@@ -42,7 +42,7 @@ public class DefaultCostEstimator extends CostEstimator {
private static final float HASHING_CPU_FACTOR = 4;
- private static final float SORTING_CPU_FACTOR = 7;
+ private static final float SORTING_CPU_FACTOR = 9;
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/26bc321c/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java
index 97a92d0..dd4e241 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java
@@ -474,12 +474,16 @@ public abstract class TwoInputNode extends OptimizerNode {
if (dps.areCoFulfilled(lpp.getProperties1(), lpp.getProperties2(),
in1.getLocalProperties(), in2.getLocalProperties()))
{
+ // copy, because setting required properties and instantiation may
+ // change the channels and should not affect prior candidates
Channel in1Copy = in1.clone();
in1Copy.setRequiredLocalProps(lpp.getProperties1());
- in2.setRequiredLocalProps(lpp.getProperties2());
+
+ Channel in2Copy = in2.clone();
+ in2Copy.setRequiredLocalProps(lpp.getProperties2());
// all right, co compatible
- instantiate(dps, in1Copy, in2, broadcastPlanChannels, target, estimator, rgps1, rgps2, ilp1, ilp2);
+ instantiate(dps, in1Copy, in2Copy, broadcastPlanChannels, target, estimator, rgps1, rgps2, ilp1, ilp2);
break;
} else {
// meet, but not co-compatible
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/26bc321c/stratosphere-tests/src/test/java/eu/stratosphere/test/compiler/examples/RelationalQueryCompilerTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/compiler/examples/RelationalQueryCompilerTest.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/compiler/examples/RelationalQueryCompilerTest.java
index 461a85f..6bac03c 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/compiler/examples/RelationalQueryCompilerTest.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/compiler/examples/RelationalQueryCompilerTest.java
@@ -60,6 +60,7 @@ public class RelationalQueryCompilerTest extends CompilerTestBase {
// compile
final OptimizedPlan plan = compileNoStats(p);
+
final OptimizerPlanNodeResolver or = getOptimizerPlanNodeResolver(plan);
// get the nodes from the final plan
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/26bc321c/stratosphere-tests/src/test/java/eu/stratosphere/test/compiler/iterations/ConnectedComponentsTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/compiler/iterations/ConnectedComponentsTest.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/compiler/iterations/ConnectedComponentsTest.java
index 9c8c0c8..e9074b4 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/compiler/iterations/ConnectedComponentsTest.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/compiler/iterations/ConnectedComponentsTest.java
@@ -102,7 +102,9 @@ public class ConnectedComponentsTest extends CompilerTestBase {
Assert.assertEquals(DriverStrategy.NONE, vertexSource.getDriverStrategy());
Assert.assertEquals(DriverStrategy.NONE, edgesSource.getDriverStrategy());
- Assert.assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND, neighborsJoin.getDriverStrategy());
+ Assert.assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED, neighborsJoin.getDriverStrategy());
+ Assert.assertTrue(!neighborsJoin.getInput1().getTempMode().isCached());
+ Assert.assertTrue(!neighborsJoin.getInput2().getTempMode().isCached());
Assert.assertEquals(set0, neighborsJoin.getKeysForInput1());
Assert.assertEquals(set0, neighborsJoin.getKeysForInput2());
@@ -120,7 +122,6 @@ public class ConnectedComponentsTest extends CompilerTestBase {
Assert.assertEquals(ShipStrategyType.FORWARD, neighborsJoin.getInput1().getShipStrategy()); // workset
Assert.assertEquals(ShipStrategyType.PARTITION_HASH, neighborsJoin.getInput2().getShipStrategy()); // edges
Assert.assertEquals(set0, neighborsJoin.getInput2().getShipStrategyKeys());
- Assert.assertTrue(neighborsJoin.getInput2().getTempMode().isCached());
Assert.assertEquals(ShipStrategyType.PARTITION_HASH, minIdReducer.getInput().getShipStrategy());
Assert.assertEquals(set0, minIdReducer.getInput().getShipStrategyKeys());
@@ -182,7 +183,9 @@ public class ConnectedComponentsTest extends CompilerTestBase {
Assert.assertEquals(DriverStrategy.NONE, vertexSource.getDriverStrategy());
Assert.assertEquals(DriverStrategy.NONE, edgesSource.getDriverStrategy());
- Assert.assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND, neighborsJoin.getDriverStrategy());
+ Assert.assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED, neighborsJoin.getDriverStrategy());
+ Assert.assertTrue(!neighborsJoin.getInput1().getTempMode().isCached());
+ Assert.assertTrue(!neighborsJoin.getInput2().getTempMode().isCached());
Assert.assertEquals(set0, neighborsJoin.getKeysForInput1());
Assert.assertEquals(set0, neighborsJoin.getKeysForInput2());
@@ -200,7 +203,6 @@ public class ConnectedComponentsTest extends CompilerTestBase {
Assert.assertEquals(ShipStrategyType.FORWARD, neighborsJoin.getInput1().getShipStrategy()); // workset
Assert.assertEquals(ShipStrategyType.PARTITION_HASH, neighborsJoin.getInput2().getShipStrategy()); // edges
Assert.assertEquals(set0, neighborsJoin.getInput2().getShipStrategyKeys());
- Assert.assertTrue(neighborsJoin.getInput2().getTempMode().isCached());
Assert.assertEquals(ShipStrategyType.PARTITION_HASH, minIdReducer.getInput().getShipStrategy());
Assert.assertEquals(set0, minIdReducer.getInput().getShipStrategyKeys());