You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2018/02/17 01:17:40 UTC

systemml git commit: [SYSTEMML-2152] Fix spark mapmm selection (missing dims constraint)

Repository: systemml
Updated Branches:
  refs/heads/master 5837953e9 -> a640e57b0


[SYSTEMML-2152] Fix spark mapmm selection (missing dims constraint)

This patch fixes the operator selection logic for the broadcast-based
mapmm. Since the broadcast matrix is created in the driver, the matrix
needs to satisfy the CP dimensions constraint of int32, which can be
exceeded for ultra-sparse broadcast choices.

In the future, we will also support the creation of broadcast from
blocked RDDs and hdfs files, which will allow to remove the constraint.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/a640e57b
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/a640e57b
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/a640e57b

Branch: refs/heads/master
Commit: a640e57b01ab186f9204ee1a256747dbbfe6ccaa
Parents: 5837953
Author: Matthias Boehm <mb...@gmail.com>
Authored: Fri Feb 16 17:17:33 2018 -0800
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Fri Feb 16 17:17:33 2018 -0800

----------------------------------------------------------------------
 .../java/org/apache/sysml/hops/AggBinaryOp.java | 24 +++++++++++---------
 1 file changed, 13 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/a640e57b/src/main/java/org/apache/sysml/hops/AggBinaryOp.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/AggBinaryOp.java b/src/main/java/org/apache/sysml/hops/AggBinaryOp.java
index 59cbf95..241dd86 100644
--- a/src/main/java/org/apache/sysml/hops/AggBinaryOp.java
+++ b/src/main/java/org/apache/sysml/hops/AggBinaryOp.java
@@ -1650,13 +1650,13 @@ public class AggBinaryOp extends Hop implements MultiThreadedHop
 	}
 	
 	private MMultMethod optFindMMultMethodSpark( long m1_rows, long m1_cols, long m1_rpb, long m1_cpb, long m1_nnz, 
-            long m2_rows, long m2_cols, long m2_rpb, long m2_cpb, long m2_nnz,
-            MMTSJType mmtsj, ChainType chainType, boolean leftPMInput, boolean tmmRewrite ) 
-	{	
+		long m2_rows, long m2_cols, long m2_rpb, long m2_cpb, long m2_nnz,
+		MMTSJType mmtsj, ChainType chainType, boolean leftPMInput, boolean tmmRewrite ) 
+	{
 		//Notes: Any broadcast needs to fit twice in local memory because we partition the input in cp,
 		//and needs to fit once in executor broadcast memory. The 2GB broadcast constraint is no longer
 		//required because the max_int byte buffer constraint has been fixed in Spark 1.4 
-		double memBudgetExec = MAPMULT_MEM_MULTIPLIER * SparkExecutionContext.getBroadcastMemoryBudget();		
+		double memBudgetExec = MAPMULT_MEM_MULTIPLIER * SparkExecutionContext.getBroadcastMemoryBudget();
 		double memBudgetLocal = OptimizerUtils.getLocalMemBudget();
 
 		//reset spark broadcast memory information (for concurrent parfor jobs, awareness of additional 
@@ -1699,11 +1699,11 @@ public class AggBinaryOp extends Hop implements MultiThreadedHop
 					   + OptimizerUtils.estimateSize(m1_cols, m2_cols)) < memBudgetLocal )
 				{
 					_spBroadcastMemEstimate = 2*(OptimizerUtils.estimateSize(m1_rows, m2_cols) 
-							   				   + OptimizerUtils.estimateSize(m1_cols, m2_cols));
+						+ OptimizerUtils.estimateSize(m1_cols, m2_cols));
 					return MMultMethod.MAPMM_CHAIN;
 				}
 			}
-		}		
+		}
 		
 		// Step 3: check for PMM (permutation matrix needs to fit into mapper memory)
 		// (needs to be checked before mapmult for consistency with removeEmpty compilation 
@@ -1735,11 +1735,13 @@ public class AggBinaryOp extends Hop implements MultiThreadedHop
 		{
 			//apply map mult if one side fits in remote task memory 
 			//(if so pick smaller input for distributed cache)
-			if( m1SizeP < m2SizeP && m1_rows>=0 && m1_cols>=0) {
+			//TODO relax requirement of valid CP dimensions once we support broadcast creation from files/RDDs
+			if( m1SizeP < m2SizeP && m1_rows>=0 && m1_cols>=0
+				&& OptimizerUtils.isValidCPDimensions(m1_rows, m1_cols) ) {
 				_spBroadcastMemEstimate = m1Size+m1SizeP;
 				return MMultMethod.MAPMM_L;
 			}
-			else {
+			else if( OptimizerUtils.isValidCPDimensions(m2_rows, m2_cols) ) {
 				_spBroadcastMemEstimate = m2Size+m2SizeP;
 				return MMultMethod.MAPMM_R;
 			}
@@ -1771,17 +1773,17 @@ public class AggBinaryOp extends Hop implements MultiThreadedHop
 		// Step 7: check for ZIPMM
 		// If t(X)%*%y -> t(t(y)%*%X) rewrite and ncol(X)<blocksize
 		if( tmmRewrite && m1_rows >= 0 && m1_rows <= m1_rpb  //blocksize constraint left
-			&& m2_cols >= 0 && m2_cols <= m2_cpb )           //blocksize constraint right	
+			&& m2_cols >= 0 && m2_cols <= m2_cpb )           //blocksize constraint right
 		{
 			return MMultMethod.ZIPMM;
 		}
-			
+		
 		// Step 8: Decide CPMM vs RMM based on io costs
 		//estimate shuffle costs weighted by parallelism
 		//TODO currently we reuse the mr estimates, these need to be fine-tune for our spark operators
 		double rmm_costs = getRMMCostEstimate(m1_rows, m1_cols, m1_rpb, m1_cpb, m2_rows, m2_cols, m2_rpb, m2_cpb);
 		double cpmm_costs = getCPMMCostEstimate(m1_rows, m1_cols, m1_rpb, m1_cpb, m2_rows, m2_cols, m2_rpb, m2_cpb);
-				
+		
 		//final mmult method decision 
 		if ( cpmm_costs < rmm_costs ) 
 			return MMultMethod.CPMM;