You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/07/09 12:21:39 UTC

[12/12] git commit: Fix optimizer error in channel instantiation (wrong parameterization)

Fix optimizer error in channel instantiation (wrong parameterization)


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/26bc321c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/26bc321c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/26bc321c

Branch: refs/heads/master
Commit: 26bc321c31c6625f782a1adfbc97a91e1cc0d288
Parents: 49ca3ed
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Jul 9 11:23:18 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jul 9 11:23:18 2014 +0200

----------------------------------------------------------------------
 .../stratosphere/compiler/costs/DefaultCostEstimator.java |  2 +-
 .../java/eu/stratosphere/compiler/dag/TwoInputNode.java   |  8 ++++++--
 .../compiler/examples/RelationalQueryCompilerTest.java    |  1 +
 .../test/compiler/iterations/ConnectedComponentsTest.java | 10 ++++++----
 4 files changed, 14 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/26bc321c/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/DefaultCostEstimator.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/DefaultCostEstimator.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/DefaultCostEstimator.java
index 93eb66a..ef3e51c 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/DefaultCostEstimator.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/DefaultCostEstimator.java
@@ -42,7 +42,7 @@ public class DefaultCostEstimator extends CostEstimator {
 	
 	private static final float HASHING_CPU_FACTOR = 4;
 	
-	private static final float SORTING_CPU_FACTOR = 7;
+	private static final float SORTING_CPU_FACTOR = 9;
 	
 	
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/26bc321c/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java
index 97a92d0..dd4e241 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java
@@ -474,12 +474,16 @@ public abstract class TwoInputNode extends OptimizerNode {
 							if (dps.areCoFulfilled(lpp.getProperties1(), lpp.getProperties2(), 
 								in1.getLocalProperties(), in2.getLocalProperties()))
 							{
+								// copy, because setting required properties and instantiation may
+								// change the channels and should not affect prior candidates
 								Channel in1Copy = in1.clone();
 								in1Copy.setRequiredLocalProps(lpp.getProperties1());
-								in2.setRequiredLocalProps(lpp.getProperties2());
+								
+								Channel in2Copy = in2.clone();
+								in2Copy.setRequiredLocalProps(lpp.getProperties2());
 								
 								// all right, co compatible
-								instantiate(dps, in1Copy, in2, broadcastPlanChannels, target, estimator, rgps1, rgps2, ilp1, ilp2);
+								instantiate(dps, in1Copy, in2Copy, broadcastPlanChannels, target, estimator, rgps1, rgps2, ilp1, ilp2);
 								break;
 							} else {
 								// meet, but not co-compatible

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/26bc321c/stratosphere-tests/src/test/java/eu/stratosphere/test/compiler/examples/RelationalQueryCompilerTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/compiler/examples/RelationalQueryCompilerTest.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/compiler/examples/RelationalQueryCompilerTest.java
index 461a85f..6bac03c 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/compiler/examples/RelationalQueryCompilerTest.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/compiler/examples/RelationalQueryCompilerTest.java
@@ -60,6 +60,7 @@ public class RelationalQueryCompilerTest extends CompilerTestBase {
 			
 			// compile
 			final OptimizedPlan plan = compileNoStats(p);
+			
 			final OptimizerPlanNodeResolver or = getOptimizerPlanNodeResolver(plan);
 			
 			// get the nodes from the final plan

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/26bc321c/stratosphere-tests/src/test/java/eu/stratosphere/test/compiler/iterations/ConnectedComponentsTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/compiler/iterations/ConnectedComponentsTest.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/compiler/iterations/ConnectedComponentsTest.java
index 9c8c0c8..e9074b4 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/compiler/iterations/ConnectedComponentsTest.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/compiler/iterations/ConnectedComponentsTest.java
@@ -102,7 +102,9 @@ public class ConnectedComponentsTest extends CompilerTestBase {
 		Assert.assertEquals(DriverStrategy.NONE, vertexSource.getDriverStrategy());
 		Assert.assertEquals(DriverStrategy.NONE, edgesSource.getDriverStrategy());
 		
-		Assert.assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND, neighborsJoin.getDriverStrategy());
+		Assert.assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED, neighborsJoin.getDriverStrategy());
+		Assert.assertTrue(!neighborsJoin.getInput1().getTempMode().isCached());
+		Assert.assertTrue(!neighborsJoin.getInput2().getTempMode().isCached());
 		Assert.assertEquals(set0, neighborsJoin.getKeysForInput1());
 		Assert.assertEquals(set0, neighborsJoin.getKeysForInput2());
 		
@@ -120,7 +122,6 @@ public class ConnectedComponentsTest extends CompilerTestBase {
 		Assert.assertEquals(ShipStrategyType.FORWARD, neighborsJoin.getInput1().getShipStrategy()); // workset
 		Assert.assertEquals(ShipStrategyType.PARTITION_HASH, neighborsJoin.getInput2().getShipStrategy()); // edges
 		Assert.assertEquals(set0, neighborsJoin.getInput2().getShipStrategyKeys());
-		Assert.assertTrue(neighborsJoin.getInput2().getTempMode().isCached());
 		
 		Assert.assertEquals(ShipStrategyType.PARTITION_HASH, minIdReducer.getInput().getShipStrategy());
 		Assert.assertEquals(set0, minIdReducer.getInput().getShipStrategyKeys());
@@ -182,7 +183,9 @@ public class ConnectedComponentsTest extends CompilerTestBase {
 		Assert.assertEquals(DriverStrategy.NONE, vertexSource.getDriverStrategy());
 		Assert.assertEquals(DriverStrategy.NONE, edgesSource.getDriverStrategy());
 		
-		Assert.assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND, neighborsJoin.getDriverStrategy());
+		Assert.assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED, neighborsJoin.getDriverStrategy());
+		Assert.assertTrue(!neighborsJoin.getInput1().getTempMode().isCached());
+		Assert.assertTrue(!neighborsJoin.getInput2().getTempMode().isCached());
 		Assert.assertEquals(set0, neighborsJoin.getKeysForInput1());
 		Assert.assertEquals(set0, neighborsJoin.getKeysForInput2());
 		
@@ -200,7 +203,6 @@ public class ConnectedComponentsTest extends CompilerTestBase {
 		Assert.assertEquals(ShipStrategyType.FORWARD, neighborsJoin.getInput1().getShipStrategy()); // workset
 		Assert.assertEquals(ShipStrategyType.PARTITION_HASH, neighborsJoin.getInput2().getShipStrategy()); // edges
 		Assert.assertEquals(set0, neighborsJoin.getInput2().getShipStrategyKeys());
-		Assert.assertTrue(neighborsJoin.getInput2().getTempMode().isCached());
 		
 		Assert.assertEquals(ShipStrategyType.PARTITION_HASH, minIdReducer.getInput().getShipStrategy());
 		Assert.assertEquals(set0, minIdReducer.getInput().getShipStrategyKeys());