You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2017/02/14 19:19:23 UTC

incubator-systemml git commit: [SYSTEMML-1261] Improved transitive Spark exec type selection ba/ua

Repository: incubator-systemml
Updated Branches:
  refs/heads/master af5be9b2e -> b2be71738


[SYSTEMML-1261] Improved transitive Spark exec type selection ba/ua

This patch improves the transitive execution type selection of aggregate
binary and aggregate unary operations (i.e., pulling operations that
would fit in CP but whose inputs are executed in SPARK, transitively
into SPARK as well). The major benefits are reduced memory pressure on
the driver and reduced data transfer between executors and driver.

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/b2be7173
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/b2be7173
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/b2be7173

Branch: refs/heads/master
Commit: b2be717382ee90de2d8c0a6152faa8bb0651bd41
Parents: af5be9b
Author: Matthias Boehm <mb...@gmail.com>
Authored: Tue Feb 14 01:07:19 2017 -0800
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Tue Feb 14 11:17:30 2017 -0800

----------------------------------------------------------------------
 .../java/org/apache/sysml/hops/AggBinaryOp.java | 25 +++++++++++++-------
 .../java/org/apache/sysml/hops/AggUnaryOp.java  |  3 ++-
 2 files changed, 18 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b2be7173/src/main/java/org/apache/sysml/hops/AggBinaryOp.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/AggBinaryOp.java b/src/main/java/org/apache/sysml/hops/AggBinaryOp.java
index 67ea1ad..fe8a30d 100644
--- a/src/main/java/org/apache/sysml/hops/AggBinaryOp.java
+++ b/src/main/java/org/apache/sysml/hops/AggBinaryOp.java
@@ -437,17 +437,12 @@ public class AggBinaryOp extends Hop implements MultiThreadedHop
 			checkAndSetInvalidCPDimsAndSize();
 		}
 		
-		//spark-specific decision refinement (execute binary aggregate w/ left spark input and 
+		//spark-specific decision refinement (execute binary aggregate w/ left or right spark input and 
 		//single parent also in spark because it's likely cheap and reduces data transfer)
-		if( _etype == ExecType.CP && _etypeForced != ExecType.CP	
-			&& !(getInput().get(0) instanceof ReorgOp && ((ReorgOp)getInput().get(0)).getOp()==ReOrgOp.TRANSPOSE)
-			&& !(getInput().get(0) instanceof DataOp)  //input is not checkpoint
-			&& getInput().get(0).getParent().size()==1 //bagg is only parent	
-			&& !getInput().get(0).areDimsBelowThreshold() 
-			&& getInput().get(0).optFindExecType() == ExecType.SPARK
-			&& getInput().get(0).getOutputMemEstimate()>getOutputMemEstimate() )    
+		if( _etype == ExecType.CP && _etypeForced != ExecType.CP &&
+			(isApplicableForTransitiveSparkExecType(true) || isApplicableForTransitiveSparkExecType(false)) )    
 		{
-			//pull unary aggregate into spark 
+			//pull binary aggregate into spark 
 			_etype = ExecType.SPARK;
 		}
 		
@@ -459,6 +454,18 @@ public class AggBinaryOp extends Hop implements MultiThreadedHop
 		return _etype;
 	}
 	
+	private boolean isApplicableForTransitiveSparkExecType(boolean left) 
+		throws HopsException 
+	{
+		int index = left ? 0 : 1;
+		return !(getInput().get(index) instanceof DataOp && ((DataOp)getInput().get(index)).requiresCheckpoint())  
+			&& !(getInput().get(index) instanceof ReorgOp && ((ReorgOp)getInput().get(index)).getOp()==ReOrgOp.TRANSPOSE)
+			&& getInput().get(index).getParent().size()==1 //bagg is only parent	
+			&& !getInput().get(index).areDimsBelowThreshold() 
+			&& getInput().get(index).optFindExecType() == ExecType.SPARK
+			&& getInput().get(index).getOutputMemEstimate()>getOutputMemEstimate();
+	}
+	
 	/**
 	 * TSMM: Determine if XtX pattern applies for this aggbinary and if yes
 	 * which type. 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b2be7173/src/main/java/org/apache/sysml/hops/AggUnaryOp.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/AggUnaryOp.java b/src/main/java/org/apache/sysml/hops/AggUnaryOp.java
index 23e1da4..9964981 100644
--- a/src/main/java/org/apache/sysml/hops/AggUnaryOp.java
+++ b/src/main/java/org/apache/sysml/hops/AggUnaryOp.java
@@ -448,7 +448,8 @@ public class AggUnaryOp extends Hop implements MultiThreadedHop
 		//single parent also in spark because it's likely cheap and reduces data transfer)
 		if( _etype == ExecType.CP && _etypeForced != ExecType.CP
 			&& !(getInput().get(0) instanceof DataOp)  //input is not checkpoint
-			&& getInput().get(0).getParent().size()==1 //uagg is only parent
+			&& (getInput().get(0).getParent().size()==1 //uagg is only parent, or 
+			   || !requiresAggregation(getInput().get(0), _direction)) //w/o agg
 			&& getInput().get(0).optFindExecType() == ExecType.SPARK )					
 		{
 			//pull unary aggregate into spark