You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2017/02/24 20:27:38 UTC

[6/6] incubator-systemml git commit: [SYSTEMML-1336] Improved parfor optimizer (conditional partitioning)

[SYSTEMML-1336] Improved parfor optimizer (conditional partitioning)

This patch improves the parfor optimizer to consider what-if scenarios
with conditional partitioning to avoid falling back to local parfor
plans with small degree of parallelism (if the data barely fits in the
driver) although we could have applied a fused partition-execute parfor
job. 

For example, on perftest 8GB univariate-stats, it improved the
end-to-end runtime (incl spark context creation and I/O) from 781s to
110s.

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/2f7fa8d7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/2f7fa8d7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/2f7fa8d7

Branch: refs/heads/master
Commit: 2f7fa8d73fa9680df283444627209a31c5ef4acd
Parents: 35da413
Author: Matthias Boehm <mb...@gmail.com>
Authored: Thu Feb 23 22:19:24 2017 -0800
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Fri Feb 24 12:27:29 2017 -0800

----------------------------------------------------------------------
 .../parfor/opt/CostEstimator.java               | 26 +++++--
 .../controlprogram/parfor/opt/OptNode.java      |  2 +
 .../parfor/opt/OptimizerConstrained.java        | 37 ++++++----
 .../parfor/opt/OptimizerRuleBased.java          | 75 +++++++++++---------
 4 files changed, 90 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2f7fa8d7/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/CostEstimator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/CostEstimator.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/CostEstimator.java
index bb3ca88..3fdf8bd 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/CostEstimator.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/CostEstimator.java
@@ -55,6 +55,8 @@ public abstract class CostEstimator
 		SPARSE
 	}
 	
+	protected boolean _inclCondPart = false;
+	
 	/**
 	 * Main leaf node estimation method - to be overwritten by specific cost estimators
 	 * 
@@ -88,6 +90,7 @@ public abstract class CostEstimator
 	 * 
 	 * @param measure ?
 	 * @param node internal representation of a plan alternative for program blocks and instructions
+	 * @param inclCondPart including conditional partitioning
 	 * @return estimate?
 	 * @throws DMLRuntimeException if DMLRuntimeException occurs
 	 */
@@ -97,13 +100,26 @@ public abstract class CostEstimator
 		return getEstimate(measure, node, null);
 	}
 	
+	public double getEstimate( TestMeasure measure, OptNode node, boolean inclCondPart ) 
+		throws DMLRuntimeException
+	{
+		//temporarily change local flag and get estimate
+		boolean oldInclCondPart = _inclCondPart;
+		_inclCondPart = inclCondPart; 
+		double val = getEstimate(measure, node, null);
+		
+		//reset local flag and return
+		_inclCondPart = oldInclCondPart;
+		return val;
+	}
+	
 	/**
 	 * Main estimation method.
 	 * 
-	 * @param measure ?
-	 * @param node internal representation of a plan alternative for program blocks and instructions
+	 * @param measure estimate type (time or memory)
+	 * @param node plan opt tree node
 	 * @param et execution type
-	 * @return estimate?
+	 * @return estimate
 	 * @throws DMLRuntimeException if DMLRuntimeException occurs
 	 */
 	public double getEstimate( TestMeasure measure, OptNode node, ExecType et ) 
@@ -113,7 +129,9 @@ public abstract class CostEstimator
 		
 		if( node.isLeaf() )
 		{
-			if( et != null )
+			if( _inclCondPart && node.getParam(ParamType.DATA_PARTITION_COND_MEM) != null )
+				val = Double.parseDouble(node.getParam(ParamType.DATA_PARTITION_COND_MEM));
+			else if( et != null )
 				val = getLeafNodeEstimate(measure, node, et); //forced type
 			else 
 				val = getLeafNodeEstimate(measure, node); //default	

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2f7fa8d7/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptNode.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptNode.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptNode.java
index 7968c6a..26c30d4 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptNode.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptNode.java
@@ -71,6 +71,8 @@ public class OptNode
 		TASK_SIZE,
 		DATA_PARTITIONER,
 		DATA_PARTITION_FORMAT,
+		DATA_PARTITION_COND,
+		DATA_PARTITION_COND_MEM,
 		RESULT_MERGE,
 		NUM_ITERATIONS,
 		RECURSIVE_CALL

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2f7fa8d7/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
index 39e742f..6edcec3 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
@@ -80,7 +80,6 @@ public class OptimizerConstrained extends OptimizerRuleBased
 		LOG.debug("--- "+getOptMode()+" OPTIMIZER -------");
 
 		OptNode pn = plan.getRoot();
-		double M0 = -1, M1 = -1, M2 = -1; //memory consumption
 
 		//early abort for empty parfor body 
 		if( pn.isLeaf() )
@@ -100,35 +99,45 @@ public class OptimizerConstrained extends OptimizerRuleBased
 		ExecType oldET = pn.getExecType();
 		int oldK = pn.getK();
 		pn.setSerialParFor(); //for basic mem consumption 
-		M0 = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn);
+		double M0a = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn);
 		pn.setExecType(oldET);
 		pn.setK(oldK);
-		LOG.debug(getOptMode()+" OPT: estimated mem (serial exec) M="+toMB(M0) );
+		LOG.debug(getOptMode()+" OPT: estimated mem (serial exec) M="+toMB(M0a) );
 
 		//OPTIMIZE PARFOR PLAN
 
 		// rewrite 1: data partitioning (incl. log. recompile RIX)
 		HashMap<String, PDataPartitionFormat> partitionedMatrices = new HashMap<String,PDataPartitionFormat>();
-		rewriteSetDataPartitioner( pn, ec.getVariables(), partitionedMatrices );
-		M1 = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn); //reestimate
+		rewriteSetDataPartitioner( pn, ec.getVariables(), partitionedMatrices, OptimizerUtils.getLocalMemBudget() );
+		double M0b = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn); //reestimate
 
 		// rewrite 2: remove unnecessary compare matrix
 		rewriteRemoveUnnecessaryCompareMatrix(pn, ec);
 
 		// rewrite 3: rewrite result partitioning (incl. log/phy recompile LIX) 
-		boolean flagLIX = super.rewriteSetResultPartitioning( pn, M1, ec.getVariables() );
-		M1 = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn); //reestimate 
-		M2 = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn, LopProperties.ExecType.CP);
+		boolean flagLIX = super.rewriteSetResultPartitioning( pn, M0b, ec.getVariables() );
+		double M1 = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn); //reestimate 
 		LOG.debug(getOptMode()+" OPT: estimated new mem (serial exec) M="+toMB(M1) );
+		
+		//determine memory consumption for what-if: all-cp or partitioned
+		double M2 = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn, LopProperties.ExecType.CP);
 		LOG.debug(getOptMode()+" OPT: estimated new mem (serial exec, all CP) M="+toMB(M2) );
-
+		double M3 = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn, true);
+		LOG.debug(getOptMode()+" OPT: estimated new mem (cond partitioning) M="+toMB(M3) );
+		
 		// rewrite 4: execution strategy
 		PExecMode tmpmode = getPExecMode(pn); //keep old
-		boolean flagRecompMR = rewriteSetExecutionStategy( pn, M0, M1, M2, flagLIX );
+		boolean flagRecompMR = rewriteSetExecutionStategy( pn, M0a, M1, M2, M3, flagLIX );
 
 		//exec-type-specific rewrites
 		if( pn.getExecType() == ExecType.MR || pn.getExecType() == ExecType.SPARK )
 		{
+			if( M1 > _rm && M3 <= _rm  ) {
+				// rewrite 1: data partitioning (apply conditional partitioning)
+				rewriteSetDataPartitioner( pn, ec.getVariables(), partitionedMatrices, M3 );
+				M1 = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn); //reestimate 		
+			}
+			
 			if( flagRecompMR ){
 				//rewrite 5: set operations exec type
 				rewriteSetOperationsExecType( pn, flagRecompMR );
@@ -221,7 +230,7 @@ public class OptimizerConstrained extends OptimizerRuleBased
 	///
 
 	@Override
-	protected boolean rewriteSetDataPartitioner(OptNode n, LocalVariableMap vars, HashMap<String,PDataPartitionFormat> partitionedMatrices)
+	protected boolean rewriteSetDataPartitioner(OptNode n, LocalVariableMap vars, HashMap<String,PDataPartitionFormat> partitionedMatrices, double thetaM)
 		throws DMLRuntimeException
 	{
 		boolean blockwise = false;
@@ -235,7 +244,7 @@ public class OptimizerConstrained extends OptimizerRuleBased
 			LOG.debug(getOptMode()+" OPT: forced 'set data partitioner' - result="+n.getParam(ParamType.DATA_PARTITIONER) );
 		}
 		else
-			super.rewriteSetDataPartitioner(n, vars, partitionedMatrices);
+			super.rewriteSetDataPartitioner(n, vars, partitionedMatrices, thetaM);
 
 		return blockwise;
 	}
@@ -246,7 +255,7 @@ public class OptimizerConstrained extends OptimizerRuleBased
 	///
 
 	@Override
-	protected boolean rewriteSetExecutionStategy(OptNode n, double M0, double M, double M2, boolean flagLIX)
+	protected boolean rewriteSetExecutionStategy(OptNode n, double M0, double M, double M2, double M3, boolean flagLIX)
 		throws DMLRuntimeException
 	{
 		boolean ret = false;
@@ -270,7 +279,7 @@ public class OptimizerConstrained extends OptimizerRuleBased
 			LOG.debug(getOptMode()+" OPT: forced 'set execution strategy' - result="+mode );
 		}
 		else
-			ret = super.rewriteSetExecutionStategy(n, M0, M, M2, flagLIX);
+			ret = super.rewriteSetExecutionStategy(n, M0, M, M2, M3, flagLIX);
 
 		return ret;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/2f7fa8d7/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
index bbe5bf7..87cabaa 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
@@ -216,7 +216,6 @@ public class OptimizerRuleBased extends Optimizer
 		LOG.debug("--- "+getOptMode()+" OPTIMIZER -------");
 
 		OptNode pn = plan.getRoot();
-		double M0 = -1, M1 = -1, M2 = -1; //memory consumption
 		
 		//early abort for empty parfor body 
 		if( pn.isLeaf() )
@@ -234,32 +233,42 @@ public class OptimizerRuleBased extends Optimizer
 		
 		//ESTIMATE memory consumption 
 		pn.setSerialParFor(); //for basic mem consumption 
-		M0 = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn);
-		LOG.debug(getOptMode()+" OPT: estimated mem (serial exec) M="+toMB(M0) );
+		double M0a = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn);
+		LOG.debug(getOptMode()+" OPT: estimated mem (serial exec) M="+toMB(M0a) );
 		
 		//OPTIMIZE PARFOR PLAN
 		
-		// rewrite 1: data partitioning (incl. log. recompile RIX)
+		// rewrite 1: data partitioning (incl. log. recompile RIX and flag opt nodes)
 		HashMap<String, PDataPartitionFormat> partitionedMatrices = new HashMap<String,PDataPartitionFormat>();
-		rewriteSetDataPartitioner( pn, ec.getVariables(), partitionedMatrices );
-		M1 = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn); //reestimate
+		rewriteSetDataPartitioner( pn, ec.getVariables(), partitionedMatrices, OptimizerUtils.getLocalMemBudget() );
+		double M0b = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn); //reestimate
 		
 		// rewrite 2: remove unnecessary compare matrix (before result partitioning)
 		rewriteRemoveUnnecessaryCompareMatrix(pn, ec);
 		
 		// rewrite 3: rewrite result partitioning (incl. log/phy recompile LIX) 
-		boolean flagLIX = rewriteSetResultPartitioning( pn, M1, ec.getVariables() );
-		M1 = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn); //reestimate 
-		M2 = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn, LopProperties.ExecType.CP);
+		boolean flagLIX = rewriteSetResultPartitioning( pn, M0b, ec.getVariables() );
+		double M1 = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn); //reestimate 
 		LOG.debug(getOptMode()+" OPT: estimated new mem (serial exec) M="+toMB(M1) );
+		
+		//determine memory consumption for what-if: all-cp or partitioned 
+		double M2 = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn, LopProperties.ExecType.CP);
 		LOG.debug(getOptMode()+" OPT: estimated new mem (serial exec, all CP) M="+toMB(M2) );
+		double M3 = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn, true);
+		LOG.debug(getOptMode()+" OPT: estimated new mem (cond partitioning) M="+toMB(M3) );
 		
 		// rewrite 4: execution strategy
-		boolean flagRecompMR = rewriteSetExecutionStategy( pn, M0, M1, M2, flagLIX );
+		boolean flagRecompMR = rewriteSetExecutionStategy( pn, M0a, M1, M2, M3, flagLIX );
 		
 		//exec-type-specific rewrites
 		if( pn.getExecType() == ExecType.MR || pn.getExecType()==ExecType.SPARK )
 		{
+			if( M1 > _rm && M3 <= _rm  ) {
+				// rewrite 1: data partitioning (apply conditional partitioning)
+				rewriteSetDataPartitioner( pn, ec.getVariables(), partitionedMatrices, M3 );
+				M1 = _cost.getEstimate(TestMeasure.MEMORY_USAGE, pn); //reestimate 		
+			}
+			
 			if( flagRecompMR ){
 				//rewrite 5: set operations exec type
 				rewriteSetOperationsExecType( pn, flagRecompMR );
@@ -390,7 +399,7 @@ public class OptimizerRuleBased extends Optimizer
 	//REWRITE set data partitioner
 	///
 
-	protected boolean rewriteSetDataPartitioner(OptNode n, LocalVariableMap vars, HashMap<String, PDataPartitionFormat> partitionedMatrices ) 
+	protected boolean rewriteSetDataPartitioner(OptNode n, LocalVariableMap vars, HashMap<String, PDataPartitionFormat> partitionedMatrices, double thetaM ) 
 		throws DMLRuntimeException
 	{
 		if( n.getNodeType() != NodeType.PARFOR )
@@ -414,16 +423,15 @@ public class OptimizerRuleBased extends Optimizer
 			for( String c : cand )
 			{
 				PDataPartitionFormat dpf = pfsb.determineDataPartitionFormat( c );
-				//System.out.println("Partitioning Format: "+dpf);
+				
 				if( dpf != PDataPartitionFormat.NONE 
-					&& dpf != PDataPartitionFormat.BLOCK_WISE_M_N ) //FIXME
+					&& dpf != PDataPartitionFormat.BLOCK_WISE_M_N ) 
 				{
 					cand2.put( c, dpf );
-				}
-					
+				}	
 			}
 			
-			apply = rFindDataPartitioningCandidates(n, cand2, vars);
+			apply = rFindDataPartitioningCandidates(n, cand2, vars, thetaM);
 			if( apply )
 				partitionedMatrices.putAll(cand2);
 		}
@@ -447,7 +455,7 @@ public class OptimizerRuleBased extends Optimizer
 		return blockwise;
 	}
 
-	protected boolean rFindDataPartitioningCandidates( OptNode n, HashMap<String, PDataPartitionFormat> cand, LocalVariableMap vars ) 
+	protected boolean rFindDataPartitioningCandidates( OptNode n, HashMap<String, PDataPartitionFormat> cand, LocalVariableMap vars, double thetaM ) 
 		throws DMLRuntimeException
 	{
 		boolean ret = false;
@@ -456,7 +464,7 @@ public class OptimizerRuleBased extends Optimizer
 		{
 			for( OptNode cn : n.getChilds() )
 				if( cn.getNodeType() != NodeType.FUNCCALL ) //prevent conflicts with aliases
-					ret |= rFindDataPartitioningCandidates( cn, cand, vars );
+					ret |= rFindDataPartitioningCandidates( cn, cand, vars, thetaM );
 		}
 		else if( n.getNodeType()== NodeType.HOP
 			     && n.getParam(ParamType.OPSTRING).equals(IndexingOp.OPSTRING) )
@@ -470,20 +478,22 @@ public class OptimizerRuleBased extends Optimizer
 				//NOTE: for the moment, we do not partition according to the remote mem, because we can execute 
 				//it even without partitioning in CP. However, advanced optimizers should reason about this 					   
 				//double mold = h.getMemEstimate();
-				if(	   n.getExecType() == ExecType.MR ||  n.getExecType()==ExecType.SPARK ) //Opt Condition: MR/Spark
-				   // || (mold > _rm && mnew <= _rm)   ) //Opt Condition: non-MR special cases (for remote exec)
+				if(	   n.getExecType() == ExecType.MR ||  n.getExecType()==ExecType.SPARK  //Opt Condition: MR/Spark
+					|| h.getMemEstimate() > thetaM ) //Opt Condition: mem estimate > constraint to force partitioning	
 				{
 					//NOTE: subsequent rewrites will still use the MR mem estimate
 					//(guarded by subsequent operations that have at least the memory req of one partition)
-					//if( mnew < _lm ) //apply rewrite if partitions fit into memory
-					//	n.setExecType(ExecType.CP);
-					//else
-					//	n.setExecType(ExecType.CP); //CP_FILE, but hop still in MR 
-					n.setExecType(ExecType.CP);
+					n.setExecType(ExecType.CP); //partition ref only (see below)
 					n.addParam(ParamType.DATA_PARTITION_FORMAT, dpf.toString());
 					h.setMemEstimate( mnew ); //CP vs CP_FILE in ProgramRecompiler bases on mem_estimate
 					ret = true;
 				}
+				//keep track of nodes that allow conditional data partitioning and their mem
+				else
+				{
+					n.addParam(ParamType.DATA_PARTITION_COND, String.valueOf(true));
+					n.addParam(ParamType.DATA_PARTITION_COND_MEM, String.valueOf(mnew));
+				}
 			}
 		}
 		
@@ -803,7 +813,7 @@ public class OptimizerRuleBased extends Optimizer
 	//REWRITE set execution strategy
 	///
 
-	protected boolean rewriteSetExecutionStategy(OptNode n, double M0, double M, double M2, boolean flagLIX) 
+	protected boolean rewriteSetExecutionStategy(OptNode n, double M0, double M, double M2, double M3, boolean flagLIX) 
 		throws DMLRuntimeException
 	{
 		boolean isCPOnly = n.isCPOnly();
@@ -814,26 +824,27 @@ public class OptimizerRuleBased extends Optimizer
 		PDataPartitioner REMOTE_DP = OptimizerUtils.isSparkExecutionMode() ? PDataPartitioner.REMOTE_SPARK : PDataPartitioner.REMOTE_MR;
 
 		//deciding on the execution strategy
-		if( ConfigurationManager.isParallelParFor()            //allowed remote parfor execution
-			&& ( (isCPOnly && M <= _rm )    //Required: all instruction can be be executed in CP
-			   ||(isCPOnlyPossible && M2 <= _rm)) )  //Required: cp inst fit into remote JVM mem 
+		if( ConfigurationManager.isParallelParFor()  //allowed remote parfor execution
+			&& ( (isCPOnly && M <= _rm )             //Required: all inst already in cp and fit in remote mem
+			   ||(isCPOnly && M3 <= _rm ) 	         //Required: all inst already in cp and fit partitioned in remote mem
+			   ||(isCPOnlyPossible && M2 <= _rm)) )  //Required: all inst forced to cp fit in remote mem
 		{
 			//at this point all required conditions for REMOTE_MR given, now its an opt decision
 			int cpk = (int) Math.min( _lk, Math.floor( _lm / M ) ); //estimated local exploited par  
 			
 			//MR if local par cannot be exploited due to mem constraints (this implies that we work on large data)
 			//(the factor of 2 is to account for hyper-threading and in order prevent too eager remote parfor)
-			if( 2*cpk < _lk && 2*cpk < _N && 2*cpk < _rk )
+			if( 2*cpk < _lk && 2*cpk < _N && 2*cpk < _rk ) //incl conditional partitioning
 			{
 				n.setExecType( REMOTE ); //remote parfor
 			}
 			//MR if problem is large enough and remote parallelism is larger than local   
-			else if( _lk < _N && _lk < _rk && isLargeProblem(n, M0) )
+			else if( _lk < _N && _lk < _rk && M <= _rm && isLargeProblem(n, M0) )
 			{
 				n.setExecType( REMOTE ); //remote parfor
 			}
 			//MR if MR operations in local, but CP only in remote (less overall MR jobs)
-			else if( (!isCPOnly) && isCPOnlyPossible )
+			else if( !isCPOnly && isCPOnlyPossible )
 			{
 				n.setExecType( REMOTE ); //remote parfor
 			}