You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2017/04/19 07:18:35 UTC

[2/2] incubator-systemml git commit: [SYSTEMML-1546] Fix parfor optimizer (result/task partitioning on spark)

[SYSTEMML-1546] Fix parfor optimizer (result/task partitioning on spark)

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/4c74a343
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/4c74a343
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/4c74a343

Branch: refs/heads/master
Commit: 4c74a34349bd4eeb0f4e102db7bca1f09b2ced97
Parents: 30d4b1e
Author: Matthias Boehm <mb...@gmail.com>
Authored: Wed Apr 19 00:12:03 2017 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Wed Apr 19 00:12:03 2017 -0700

----------------------------------------------------------------------
 .../parfor/opt/OptimizerConstrained.java        |  2 +-
 .../parfor/opt/OptimizerRuleBased.java          | 44 +++++++++++---------
 2 files changed, 25 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/4c74a343/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
index 235b927..fb83fd6 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
@@ -131,7 +131,7 @@ public class OptimizerConstrained extends OptimizerRuleBased
 		boolean flagRecompMR = rewriteSetExecutionStategy( pn, M0a, M1, M2, M3, flagLIX );
 
 		//exec-type-specific rewrites
-		if( pn.getExecType() == ExecType.MR || pn.getExecType() == ExecType.SPARK )
+		if( pn.getExecType() == getRemoteExecType() )
 		{
 			if( M1 > _rm && M3 <= _rm  ) {
 				// rewrite 1: data partitioning (apply conditional partitioning)

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/4c74a343/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
index f0eb0e7..0ff7a31 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
@@ -262,7 +262,7 @@ public class OptimizerRuleBased extends Optimizer
 		boolean flagRecompMR = rewriteSetExecutionStategy( pn, M0a, M1, M2, M3, flagLIX );
 		
 		//exec-type-specific rewrites
-		if( pn.getExecType() == ExecType.MR || pn.getExecType()==ExecType.SPARK )
+		if( pn.getExecType() == getRemoteExecType() )
 		{
 			if( M1 > _rm && M3 <= _rm  ) {
 				// rewrite 1: data partitioning (apply conditional partitioning)
@@ -400,6 +400,10 @@ public class OptimizerRuleBased extends Optimizer
 		_rkmax2  = (int) Math.ceil( PAR_K_FACTOR * _rk2 ); 
 	}
 	
+	protected ExecType getRemoteExecType() {
+		return OptimizerUtils.isSparkExecutionMode() ? ExecType.SPARK : ExecType.MR;
+	}
+	
 	///////
 	//REWRITE set data partitioner
 	///
@@ -483,7 +487,7 @@ public class OptimizerRuleBased extends Optimizer
 				//NOTE: for the moment, we do not partition according to the remote mem, because we can execute 
 				//it even without partitioning in CP. However, advanced optimizers should reason about this 					   
 				//double mold = h.getMemEstimate();
-				if(	   n.getExecType() == ExecType.MR ||  n.getExecType()==ExecType.SPARK  //Opt Condition: MR/Spark
+				if(	   n.getExecType() == getRemoteExecType()  //Opt Condition: MR/Spark
 					|| h.getMemEstimate() > thetaM ) //Opt Condition: mem estimate > constraint to force partitioning	
 				{
 					//NOTE: subsequent rewrites will still use the MR mem estimate
@@ -608,23 +612,22 @@ public class OptimizerRuleBased extends Optimizer
 		ParForProgramBlock pfpb = (ParForProgramBlock) o[1];
 		
 		//search for candidates
-		Collection<OptNode> cand = n.getNodeList(ExecType.MR);
+		Collection<OptNode> cand = n.getNodeList(getRemoteExecType());
 		
 		//determine if applicable
-		boolean apply =    M < _rm         //ops fit in remote memory budget
-			            && !cand.isEmpty() //at least one MR
-		                && isResultPartitionableAll(cand,pfpb.getResultVariables(),vars, pfpb.getIterablePredicateVars()[0]); // check candidates
+		boolean apply = M < _rm   //ops fit in remote memory budget
+			&& !cand.isEmpty()    //at least one MR
+		    && isResultPartitionableAll(cand,pfpb.getResultVariables(), 
+		    		vars, pfpb.getIterablePredicateVars()[0]); // check candidates
 			
 		//recompile LIX
 		if( apply )
 		{
-			try
-			{
+			try {
 				for(OptNode lix : cand)
 					recompileLIX( lix, vars );
 			}
-			catch(Exception ex)
-			{
+			catch(Exception ex) {
 				throw new DMLRuntimeException("Unable to recompile LIX.", ex);
 			}
 		}
@@ -827,8 +830,9 @@ public class OptimizerRuleBased extends Optimizer
 		boolean isCPOnlyPossible = isCPOnly || isCPOnlyPossible(n, _rm);
 
 		String datapartitioner = n.getParam(ParamType.DATA_PARTITIONER);
-		ExecType REMOTE = OptimizerUtils.isSparkExecutionMode() ? ExecType.SPARK : ExecType.MR;
-		PDataPartitioner REMOTE_DP = OptimizerUtils.isSparkExecutionMode() ? PDataPartitioner.REMOTE_SPARK : PDataPartitioner.REMOTE_MR;
+		ExecType REMOTE = getRemoteExecType();
+		PDataPartitioner REMOTE_DP = OptimizerUtils.isSparkExecutionMode() ? 
+			PDataPartitioner.REMOTE_SPARK : PDataPartitioner.REMOTE_MR;
 
 		//deciding on the execution strategy
 		if( ConfigurationManager.isParallelParFor()  //allowed remote parfor execution
@@ -906,7 +910,7 @@ public class OptimizerRuleBased extends Optimizer
 		ExecType et = n.getExecType();
 		boolean ret = ( et == ExecType.CP);		
 		
-		if( n.isLeaf() && (et == ExecType.MR || et == ExecType.SPARK) )
+		if( n.isLeaf() && et == getRemoteExecType() )
 		{
 			Hop h = OptTreeConverter.getAbstractPlanMapping().getMappedHop( n.getID() );
 			if(    h.getForcedExecType()!=LopProperties.ExecType.MR  //e.g., -exec=hadoop
@@ -1156,7 +1160,7 @@ public class OptimizerRuleBased extends Optimizer
         							.getAbstractPlanMapping().getMappedProg(n.getID())[1];
 		
 		//decide on the replication factor 
-		if( n.getExecType()==ExecType.MR || n.getExecType()==ExecType.SPARK )		
+		if( n.getExecType()==getRemoteExecType() )		
 		{
 			apply = true;
 			
@@ -1417,7 +1421,8 @@ public class OptimizerRuleBased extends Optimizer
 		{
 			setTaskPartitioner( pn, PTaskPartitioner.FACTORING_CMAX );
 		}
-		else if( pn.getExecType()==ExecType.MR && !jvmreuse && pn.hasOnlySimpleChilds() )
+		else if( ((pn.getExecType()==ExecType.MR && !jvmreuse) 
+			|| pn.getExecType()==ExecType.SPARK) && pn.hasOnlySimpleChilds() )
 		{
 			//for simple body programs without loops, branches, or function calls, we don't
 			//expect much load imbalance and hence use static partitioning in order to
@@ -2931,7 +2936,7 @@ public class OptimizerRuleBased extends Optimizer
 		PResultMerge ret = null;
 		
 		//investigate details of current parfor node
-		boolean flagRemoteParFOR = (n.getExecType() == ExecType.MR || n.getExecType() == ExecType.SPARK);
+		boolean flagRemoteParFOR = (n.getExecType() == getRemoteExecType());
 		boolean flagLargeResult = hasLargeTotalResults( n, pfpb.getResultVariables(), vars, true );
 		boolean flagRemoteLeftIndexing = hasResultMRLeftIndexing( n, pfpb.getResultVariables(), vars, true );
 		boolean flagCellFormatWoCompare = determineFlagCellFormatWoCompare(pfpb.getResultVariables(), vars); 
@@ -3015,8 +3020,8 @@ public class OptimizerRuleBased extends Optimizer
 		{
 			String opName = n.getParam(ParamType.OPSTRING);
 			//check opstring and exec type
-			if( opName !=null && opName.equals(LeftIndexingOp.OPSTRING) && 
-				(n.getExecType() == ExecType.MR || n.getExecType() == ExecType.SPARK) )
+			if( opName != null && opName.equals(LeftIndexingOp.OPSTRING) 
+				&& n.getExecType() == getRemoteExecType() )
 			{
 				LeftIndexingOp hop = (LeftIndexingOp) OptTreeConverter.getAbstractPlanMapping().getMappedHop(n.getID());
 				//check agains set of varname
@@ -3151,7 +3156,7 @@ public class OptimizerRuleBased extends Optimizer
 			if( n.getNodeType() == NodeType.PARFOR )
 			{
 				rewriteSetResultMerge(n, vars, inLocal);
-				if( n.getExecType()==ExecType.MR || n.getExecType()==ExecType.SPARK )
+				if( n.getExecType()==getRemoteExecType() )
 					inLocal = false;
 			}
 			else if( n.getChilds()!=null )  
@@ -3493,7 +3498,6 @@ public class OptimizerRuleBased extends Optimizer
 		return count;
 	}
 	
-	
 	////////////////////////
 	//   Helper methods   //
 	////////////////////////