You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/07/09 12:21:37 UTC

[10/12] git commit: No explicit caching when hash table is cached Add tests for explicit cache removal when hash table is cached

No explicit caching when hash table is cached
Add tests for explicit cache removal when hash table is cached


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/52f9cf63
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/52f9cf63
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/52f9cf63

Branch: refs/heads/master
Commit: 52f9cf63e66a53592ea952c154e9e28ab4bce3b7
Parents: a9b1daa
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Jul 9 03:10:45 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jul 9 03:19:24 2014 +0200

----------------------------------------------------------------------
 .../eu/stratosphere/compiler/dag/TempMode.java  | 11 ++++++++
 .../operators/HashJoinBuildFirstProperties.java |  6 +++++
 .../HashJoinBuildSecondProperties.java          |  9 ++++++-
 .../CachedMatchStrategyCompilerTest.java        | 28 +++++++++++++-------
 4 files changed, 44 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/52f9cf63/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TempMode.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TempMode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TempMode.java
index 7907af7..a698422 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TempMode.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TempMode.java
@@ -63,4 +63,15 @@ public enum TempMode {
 			return this;
 		}
 	}
+	
+	
+	public TempMode makeNonCached() {
+		if (this == CACHED) {
+			return NONE;
+		} else if (this == CACHING_PIPELINE_BREAKER) {
+			return PIPELINE_BREAKER;
+		} else {
+			return this;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/52f9cf63/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/HashJoinBuildFirstProperties.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/HashJoinBuildFirstProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/HashJoinBuildFirstProperties.java
index 4f694c5..f8a7c38 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/HashJoinBuildFirstProperties.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/HashJoinBuildFirstProperties.java
@@ -17,6 +17,7 @@ import java.util.Collections;
 import java.util.List;
 
 import eu.stratosphere.api.common.operators.util.FieldList;
+import eu.stratosphere.compiler.CompilerException;
 import eu.stratosphere.compiler.dag.TwoInputNode;
 import eu.stratosphere.compiler.dataproperties.LocalProperties;
 import eu.stratosphere.compiler.dataproperties.RequestedLocalProperties;
@@ -57,6 +58,11 @@ public class HashJoinBuildFirstProperties extends AbstractJoinDescriptor {
 		DriverStrategy strategy;
 		
 		if(!in1.isOnDynamicPath() && in2.isOnDynamicPath()) {
+			// sanity check that the first input is cached and remove that cache
+			if (!in1.getTempMode().isCached()) {
+				throw new CompilerException("No cache at point where static and dynamic parts meet.");
+			}
+			in1.setTempMode(in1.getTempMode().makeNonCached());
 			strategy = DriverStrategy.HYBRIDHASH_BUILD_FIRST_CACHED;
 		}
 		else {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/52f9cf63/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/HashJoinBuildSecondProperties.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/HashJoinBuildSecondProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/HashJoinBuildSecondProperties.java
index 6bea65a..6982ce6 100644
--- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/HashJoinBuildSecondProperties.java
+++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/HashJoinBuildSecondProperties.java
@@ -17,6 +17,7 @@ import java.util.Collections;
 import java.util.List;
 
 import eu.stratosphere.api.common.operators.util.FieldList;
+import eu.stratosphere.compiler.CompilerException;
 import eu.stratosphere.compiler.dag.TwoInputNode;
 import eu.stratosphere.compiler.dataproperties.LocalProperties;
 import eu.stratosphere.compiler.dataproperties.RequestedLocalProperties;
@@ -53,7 +54,13 @@ public final class HashJoinBuildSecondProperties extends AbstractJoinDescriptor
 	public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) {
 		DriverStrategy strategy;
 		
-		if(!in2.isOnDynamicPath() && in1.isOnDynamicPath()) {
+		if (!in2.isOnDynamicPath() && in1.isOnDynamicPath()) {
+			// sanity check that the first input is cached and remove that cache
+			if (!in2.getTempMode().isCached()) {
+				throw new CompilerException("No cache at point where static and dynamic parts meet.");
+			}
+			
+			in2.setTempMode(in2.getTempMode().makeNonCached());
 			strategy = DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED;
 		}
 		else {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/52f9cf63/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CachedMatchStrategyCompilerTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CachedMatchStrategyCompilerTest.java b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CachedMatchStrategyCompilerTest.java
index b82c0f9..33fc6e4 100644
--- a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CachedMatchStrategyCompilerTest.java
+++ b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CachedMatchStrategyCompilerTest.java
@@ -28,6 +28,7 @@ import eu.stratosphere.api.java.IterativeDataSet;
 import eu.stratosphere.api.java.functions.JoinFunction;
 import eu.stratosphere.api.java.tuple.Tuple3;
 import eu.stratosphere.compiler.PactCompiler;
+import eu.stratosphere.compiler.dag.TempMode;
 import eu.stratosphere.compiler.plan.DualInputPlanNode;
 import eu.stratosphere.compiler.plan.OptimizedPlan;
 import eu.stratosphere.compiler.plantranslate.NepheleJobGraphGenerator;
@@ -57,6 +58,8 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase {
 			
 			// verify correct join strategy
 			assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED, innerJoin.getDriverStrategy()); 
+			assertEquals(TempMode.NONE, innerJoin.getInput1().getTempMode());
+			assertEquals(TempMode.NONE, innerJoin.getInput2().getTempMode());
 		
 			new NepheleJobGraphGenerator().compileJobGraph(oPlan);
 		}
@@ -83,6 +86,8 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase {
 			
 			// verify correct join strategy
 			assertEquals(DriverStrategy.HYBRIDHASH_BUILD_FIRST, innerJoin.getDriverStrategy()); 
+			assertEquals(TempMode.NONE, innerJoin.getInput1().getTempMode());
+			assertEquals(TempMode.CACHED, innerJoin.getInput2().getTempMode());
 		
 			new NepheleJobGraphGenerator().compileJobGraph(oPlan);
 		}
@@ -109,7 +114,9 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase {
 			DualInputPlanNode innerJoin = resolver.getNode("DummyJoiner");
 			
 			// verify correct join strategy
-			assertEquals(DriverStrategy.HYBRIDHASH_BUILD_FIRST_CACHED, innerJoin.getDriverStrategy()); 
+			assertEquals(DriverStrategy.HYBRIDHASH_BUILD_FIRST_CACHED, innerJoin.getDriverStrategy());
+			assertEquals(TempMode.NONE, innerJoin.getInput1().getTempMode());
+			assertEquals(TempMode.NONE, innerJoin.getInput2().getTempMode());
 		
 			new NepheleJobGraphGenerator().compileJobGraph(oPlan);
 		}
@@ -135,7 +142,9 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase {
 			DualInputPlanNode innerJoin = resolver.getNode("DummyJoiner");
 			
 			// verify correct join strategy
-			assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND, innerJoin.getDriverStrategy()); 
+			assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND, innerJoin.getDriverStrategy());
+			assertEquals(TempMode.CACHED, innerJoin.getInput1().getTempMode());
+			assertEquals(TempMode.NONE, innerJoin.getInput2().getTempMode());
 		
 			new NepheleJobGraphGenerator().compileJobGraph(oPlan);
 		}
@@ -169,13 +178,16 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase {
 				}
 			}
 			
+			
 			OptimizedPlan oPlan = compileNoStats(plan);
 	
 			OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(oPlan);
 			DualInputPlanNode innerJoin = resolver.getNode("DummyJoiner");
 			
 			// verify correct join strategy
-			assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED, innerJoin.getDriverStrategy()); 
+			assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED, innerJoin.getDriverStrategy());
+			assertEquals(TempMode.NONE, innerJoin.getInput1().getTempMode());
+			assertEquals(TempMode.NONE, innerJoin.getInput2().getTempMode());
 		
 			new NepheleJobGraphGenerator().compileJobGraph(oPlan);
 		}
@@ -197,16 +209,14 @@ public class CachedMatchStrategyCompilerTest extends CompilerTestBase {
 		
 		IterativeDataSet<Tuple3<Long, Long, Long>> iteration = bigInput.iterate(10);
 		
-		DataSet<Tuple3<Long, Long, Long>> inner;
+		Configuration joinStrategy = new Configuration();
+		joinStrategy.setString(PactCompiler.HINT_SHIP_STRATEGY, PactCompiler.HINT_SHIP_STRATEGY_REPARTITION_HASH);
 		
 		if(strategy != "") {
-			Configuration joinStrategy = new Configuration();
 			joinStrategy.setString(PactCompiler.HINT_LOCAL_STRATEGY, strategy);
-			inner = iteration.join(smallInput).where(0).equalTo(0).with(new DummyJoiner()).name("DummyJoiner").withParameters(joinStrategy);
-		}
-		else {
-			inner = iteration.join(smallInput).where(0).equalTo(0).with(new DummyJoiner()).name("DummyJoiner");
 		}
+		
+		DataSet<Tuple3<Long, Long, Long>> inner = iteration.join(smallInput).where(0).equalTo(0).with(new DummyJoiner()).name("DummyJoiner").withParameters(joinStrategy);
 
 		DataSet<Tuple3<Long, Long, Long>> output = iteration.closeWith(inner);