You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2017/04/19 07:18:34 UTC

[1/2] incubator-systemml git commit: [SYSTEMML-1545] Fix memory estimates leftindexing w/ empty inputs

Repository: incubator-systemml
Updated Branches:
  refs/heads/master cd5499c54 -> 4c74a3434


[SYSTEMML-1545] Fix memory estimates leftindexing w/ empty inputs

For a detailed problem description see
https://issues.apache.org/jira/browse/SYSTEMML-1545

This patch overcomes the incorrect memory estimates by only considering
the input nnz as output sparsity for empty left-hand-side matrices if
the indexing identifiers are constant, e.g., during dynamic
recompilation (after literal replacement); otherwise the sparsity is
left as unknown. 

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/30d4b1ee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/30d4b1ee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/30d4b1ee

Branch: refs/heads/master
Commit: 30d4b1eef85509c0f0cf646b15e4946d0eed3d32
Parents: cd5499c
Author: Matthias Boehm <mb...@gmail.com>
Authored: Wed Apr 19 00:10:03 2017 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Wed Apr 19 00:10:03 2017 -0700

----------------------------------------------------------------------
 .../org/apache/sysml/hops/LeftIndexingOp.java   | 33 +++++++++++---------
 .../recompile/SparsityRecompileTest.java        |  8 ++---
 2 files changed, 23 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/30d4b1ee/src/main/java/org/apache/sysml/hops/LeftIndexingOp.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/LeftIndexingOp.java b/src/main/java/org/apache/sysml/hops/LeftIndexingOp.java
index 89c025b..f927af7 100644
--- a/src/main/java/org/apache/sysml/hops/LeftIndexingOp.java
+++ b/src/main/java/org/apache/sysml/hops/LeftIndexingOp.java
@@ -113,15 +113,7 @@ public class LeftIndexingOp  extends Hop
 				Lop bottom=getInput().get(3).constructLops();
 				Lop left=getInput().get(4).constructLops();
 				Lop right=getInput().get(5).constructLops();
-				/*
-				//need to creat new lops for converting the index ranges
-				//original range is (a, b) --> (c, d)
-				//newa=2-a, newb=2-b
-				Lops two=new Data(null,	Data.OperationTypes.READ, null, "2", Expression.DataType.SCALAR, Expression.ValueType.INT, false);
-				Lops newTop=new Binary(two, top, HopsOpOp2LopsB.get(Hops.OpOp2.MINUS), Expression.DataType.SCALAR, Expression.ValueType.INT, et);
-				Lops newLeft=new Binary(two, left, HopsOpOp2LopsB.get(Hops.OpOp2.MINUS), Expression.DataType.SCALAR, Expression.ValueType.INT, et);
-				//newc=leftmatrix.row-a+1, newd=leftmatrix.row
-				*/
+				
 				//right hand matrix
 				Lop nrow=new UnaryCP(getInput().get(0).constructLops(), 
 								OperationTypes.NROW, DataType.SCALAR, ValueType.INT);
@@ -298,7 +290,9 @@ public class LeftIndexingOp  extends Hop
 			//(this is important for indexing sparse matrices into empty matrices).
 			MatrixCharacteristics mcM1 = memo.getAllInputStats(getInput().get(0));
 			MatrixCharacteristics mcM2 = memo.getAllInputStats(getInput().get(1));
-			if( mcM1.getNonZeros()>=0 && mcM2.getNonZeros()>=0  ) {
+			if( mcM1.getNonZeros()>=0 && mcM2.getNonZeros()>=0
+				&& hasConstantIndexingRange() ) 
+			{
 				long lnnz = mcM1.getNonZeros() + mcM2.getNonZeros();
 				_outputMemEstimate = computeOutputMemEstimate( _dim1, _dim2, lnnz );
 				_memEstimate = getInputSize(0) //original matrix (left)
@@ -316,7 +310,7 @@ public class LeftIndexingOp  extends Hop
 		{
 			Hop input1 = getInput().get(0);
 			Hop input2 = getInput().get(1);
-			if( input1.dimsKnown() ) {
+			if( input1.dimsKnown() && hasConstantIndexingRange() ) {
 				sparsity = OptimizerUtils.getLeftIndexingSparsity(
 						input1.getDim1(), input1.getDim2(), input1.getNnz(), 
 						input2.getDim1(), input2.getDim2(), input2.getNnz());
@@ -351,8 +345,8 @@ public class LeftIndexingOp  extends Hop
 			double sparsity = OptimizerUtils.getLeftIndexingSparsity(
 					mc1.getRows(), mc1.getCols(), mc1.getNonZeros(), 
 					mc2.getRows(), mc2.getCols(), mc2.getNonZeros());
-			long lnnz = (long)(sparsity * mc1.getRows() * mc1.getCols());
-			        
+			long lnnz = !hasConstantIndexingRange() ? -1 :
+					(long)(sparsity * mc1.getRows() * mc1.getCols());
 			ret = new long[]{mc1.getRows(), mc1.getCols(), lnnz};
 		}
 		
@@ -442,7 +436,11 @@ public class LeftIndexingOp  extends Hop
 		setDim2( input1.getDim2() );
 		
 		//refresh output nnz if exactly known; otherwise later inference
-		if( input1.getNnz() == 0 )  {
+		//note: leveraging the nnz for estimating the output sparsity is
+		//only valid for constant index identifiers (e.g., after literal 
+		//replacement during dynamic recompilation), otherwise this could
+		//lead to underestimation and hence OOMs in loops
+		if( input1.getNnz() == 0 && hasConstantIndexingRange() )  {
 			if( input2.getDataType()==DataType.SCALAR )
 				setNnz(1);
 			else 
@@ -451,6 +449,13 @@ public class LeftIndexingOp  extends Hop
 		else
 			setNnz(-1);
 	}
+	
+	private boolean hasConstantIndexingRange() {
+		return (getInput().get(2) instanceof LiteralOp
+			&& getInput().get(3) instanceof LiteralOp
+			&& getInput().get(4) instanceof LiteralOp
+			&& getInput().get(5) instanceof LiteralOp);
+	}
 
 	private void checkAndModifyRecompilationStatus()
 	{

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/30d4b1ee/src/test/java/org/apache/sysml/test/integration/functions/recompile/SparsityRecompileTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/recompile/SparsityRecompileTest.java b/src/test/java/org/apache/sysml/test/integration/functions/recompile/SparsityRecompileTest.java
index 29ba76d..923dc47 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/recompile/SparsityRecompileTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/recompile/SparsityRecompileTest.java
@@ -138,15 +138,15 @@ public class SparsityRecompileTest extends AutomatedTestBase
 			boolean exceptionExpected = false;
 			runTest(true, exceptionExpected, null, -1); 
 			
-			//CHECK compiled MR jobs (changed 07/2015 due to better sparsity inference, 08/2015 due to better worst-case inference)
+			//CHECK compiled MR jobs
 			int expectNumCompiled =   (testname.equals(TEST_NAME2)?3:4) //reblock,GMR,GMR,GMR (one GMR less for if) 
-	                                + ((testname.equals(TEST_NAME4) && !recompile)?2:0);//(+2 resultmerge)
+	                                + (testname.equals(TEST_NAME4)?2:0);//(+2 resultmerge)
 			Assert.assertEquals("Unexpected number of compiled MR jobs.", 
 					            expectNumCompiled, Statistics.getNoOfCompiledMRJobs());
 		
-			//CHECK executed MR jobs (changed 07/2015 due to better sparsity inference, 08/2015 due to better worst-case inference)
+			//CHECK executed MR jobs
 			int expectNumExecuted = -1;
-			if( recompile ) expectNumExecuted = 0; //+ ((testname.equals(TEST_NAME4))?2:0); //(+2 resultmerge) 
+			if( recompile ) expectNumExecuted = 0 + ((testname.equals(TEST_NAME4))?2:0); //(+2 resultmerge) 
 			else            expectNumExecuted =  (testname.equals(TEST_NAME2)?3:4) //reblock,GMR,GMR,GMR (one GMR less for if)
 					                              + ((testname.equals(TEST_NAME4))?2:0); //(+2 resultmerge) 
 			Assert.assertEquals("Unexpected number of executed MR jobs.", 


[2/2] incubator-systemml git commit: [SYSTEMML-1546] Fix parfor optimizer (result/task partitioning on spark)

Posted by mb...@apache.org.
[SYSTEMML-1546] Fix parfor optimizer (result/task partitioning on spark)

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/4c74a343
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/4c74a343
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/4c74a343

Branch: refs/heads/master
Commit: 4c74a34349bd4eeb0f4e102db7bca1f09b2ced97
Parents: 30d4b1e
Author: Matthias Boehm <mb...@gmail.com>
Authored: Wed Apr 19 00:12:03 2017 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Wed Apr 19 00:12:03 2017 -0700

----------------------------------------------------------------------
 .../parfor/opt/OptimizerConstrained.java        |  2 +-
 .../parfor/opt/OptimizerRuleBased.java          | 44 +++++++++++---------
 2 files changed, 25 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/4c74a343/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
index 235b927..fb83fd6 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerConstrained.java
@@ -131,7 +131,7 @@ public class OptimizerConstrained extends OptimizerRuleBased
 		boolean flagRecompMR = rewriteSetExecutionStategy( pn, M0a, M1, M2, M3, flagLIX );
 
 		//exec-type-specific rewrites
-		if( pn.getExecType() == ExecType.MR || pn.getExecType() == ExecType.SPARK )
+		if( pn.getExecType() == getRemoteExecType() )
 		{
 			if( M1 > _rm && M3 <= _rm  ) {
 				// rewrite 1: data partitioning (apply conditional partitioning)

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/4c74a343/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
index f0eb0e7..0ff7a31 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/opt/OptimizerRuleBased.java
@@ -262,7 +262,7 @@ public class OptimizerRuleBased extends Optimizer
 		boolean flagRecompMR = rewriteSetExecutionStategy( pn, M0a, M1, M2, M3, flagLIX );
 		
 		//exec-type-specific rewrites
-		if( pn.getExecType() == ExecType.MR || pn.getExecType()==ExecType.SPARK )
+		if( pn.getExecType() == getRemoteExecType() )
 		{
 			if( M1 > _rm && M3 <= _rm  ) {
 				// rewrite 1: data partitioning (apply conditional partitioning)
@@ -400,6 +400,10 @@ public class OptimizerRuleBased extends Optimizer
 		_rkmax2  = (int) Math.ceil( PAR_K_FACTOR * _rk2 ); 
 	}
 	
+	protected ExecType getRemoteExecType() {
+		return OptimizerUtils.isSparkExecutionMode() ? ExecType.SPARK : ExecType.MR;
+	}
+	
 	///////
 	//REWRITE set data partitioner
 	///
@@ -483,7 +487,7 @@ public class OptimizerRuleBased extends Optimizer
 				//NOTE: for the moment, we do not partition according to the remote mem, because we can execute 
 				//it even without partitioning in CP. However, advanced optimizers should reason about this 					   
 				//double mold = h.getMemEstimate();
-				if(	   n.getExecType() == ExecType.MR ||  n.getExecType()==ExecType.SPARK  //Opt Condition: MR/Spark
+				if(	   n.getExecType() == getRemoteExecType()  //Opt Condition: MR/Spark
 					|| h.getMemEstimate() > thetaM ) //Opt Condition: mem estimate > constraint to force partitioning	
 				{
 					//NOTE: subsequent rewrites will still use the MR mem estimate
@@ -608,23 +612,22 @@ public class OptimizerRuleBased extends Optimizer
 		ParForProgramBlock pfpb = (ParForProgramBlock) o[1];
 		
 		//search for candidates
-		Collection<OptNode> cand = n.getNodeList(ExecType.MR);
+		Collection<OptNode> cand = n.getNodeList(getRemoteExecType());
 		
 		//determine if applicable
-		boolean apply =    M < _rm         //ops fit in remote memory budget
-			            && !cand.isEmpty() //at least one MR
-		                && isResultPartitionableAll(cand,pfpb.getResultVariables(),vars, pfpb.getIterablePredicateVars()[0]); // check candidates
+		boolean apply = M < _rm   //ops fit in remote memory budget
+			&& !cand.isEmpty()    //at least one MR
+		    && isResultPartitionableAll(cand,pfpb.getResultVariables(), 
+		    		vars, pfpb.getIterablePredicateVars()[0]); // check candidates
 			
 		//recompile LIX
 		if( apply )
 		{
-			try
-			{
+			try {
 				for(OptNode lix : cand)
 					recompileLIX( lix, vars );
 			}
-			catch(Exception ex)
-			{
+			catch(Exception ex) {
 				throw new DMLRuntimeException("Unable to recompile LIX.", ex);
 			}
 		}
@@ -827,8 +830,9 @@ public class OptimizerRuleBased extends Optimizer
 		boolean isCPOnlyPossible = isCPOnly || isCPOnlyPossible(n, _rm);
 
 		String datapartitioner = n.getParam(ParamType.DATA_PARTITIONER);
-		ExecType REMOTE = OptimizerUtils.isSparkExecutionMode() ? ExecType.SPARK : ExecType.MR;
-		PDataPartitioner REMOTE_DP = OptimizerUtils.isSparkExecutionMode() ? PDataPartitioner.REMOTE_SPARK : PDataPartitioner.REMOTE_MR;
+		ExecType REMOTE = getRemoteExecType();
+		PDataPartitioner REMOTE_DP = OptimizerUtils.isSparkExecutionMode() ? 
+			PDataPartitioner.REMOTE_SPARK : PDataPartitioner.REMOTE_MR;
 
 		//deciding on the execution strategy
 		if( ConfigurationManager.isParallelParFor()  //allowed remote parfor execution
@@ -906,7 +910,7 @@ public class OptimizerRuleBased extends Optimizer
 		ExecType et = n.getExecType();
 		boolean ret = ( et == ExecType.CP);		
 		
-		if( n.isLeaf() && (et == ExecType.MR || et == ExecType.SPARK) )
+		if( n.isLeaf() && et == getRemoteExecType() )
 		{
 			Hop h = OptTreeConverter.getAbstractPlanMapping().getMappedHop( n.getID() );
 			if(    h.getForcedExecType()!=LopProperties.ExecType.MR  //e.g., -exec=hadoop
@@ -1156,7 +1160,7 @@ public class OptimizerRuleBased extends Optimizer
         							.getAbstractPlanMapping().getMappedProg(n.getID())[1];
 		
 		//decide on the replication factor 
-		if( n.getExecType()==ExecType.MR || n.getExecType()==ExecType.SPARK )		
+		if( n.getExecType()==getRemoteExecType() )		
 		{
 			apply = true;
 			
@@ -1417,7 +1421,8 @@ public class OptimizerRuleBased extends Optimizer
 		{
 			setTaskPartitioner( pn, PTaskPartitioner.FACTORING_CMAX );
 		}
-		else if( pn.getExecType()==ExecType.MR && !jvmreuse && pn.hasOnlySimpleChilds() )
+		else if( ((pn.getExecType()==ExecType.MR && !jvmreuse) 
+			|| pn.getExecType()==ExecType.SPARK) && pn.hasOnlySimpleChilds() )
 		{
 			//for simple body programs without loops, branches, or function calls, we don't
 			//expect much load imbalance and hence use static partitioning in order to
@@ -2931,7 +2936,7 @@ public class OptimizerRuleBased extends Optimizer
 		PResultMerge ret = null;
 		
 		//investigate details of current parfor node
-		boolean flagRemoteParFOR = (n.getExecType() == ExecType.MR || n.getExecType() == ExecType.SPARK);
+		boolean flagRemoteParFOR = (n.getExecType() == getRemoteExecType());
 		boolean flagLargeResult = hasLargeTotalResults( n, pfpb.getResultVariables(), vars, true );
 		boolean flagRemoteLeftIndexing = hasResultMRLeftIndexing( n, pfpb.getResultVariables(), vars, true );
 		boolean flagCellFormatWoCompare = determineFlagCellFormatWoCompare(pfpb.getResultVariables(), vars); 
@@ -3015,8 +3020,8 @@ public class OptimizerRuleBased extends Optimizer
 		{
 			String opName = n.getParam(ParamType.OPSTRING);
 			//check opstring and exec type
-			if( opName !=null && opName.equals(LeftIndexingOp.OPSTRING) && 
-				(n.getExecType() == ExecType.MR || n.getExecType() == ExecType.SPARK) )
+			if( opName != null && opName.equals(LeftIndexingOp.OPSTRING) 
+				&& n.getExecType() == getRemoteExecType() )
 			{
 				LeftIndexingOp hop = (LeftIndexingOp) OptTreeConverter.getAbstractPlanMapping().getMappedHop(n.getID());
 				//check agains set of varname
@@ -3151,7 +3156,7 @@ public class OptimizerRuleBased extends Optimizer
 			if( n.getNodeType() == NodeType.PARFOR )
 			{
 				rewriteSetResultMerge(n, vars, inLocal);
-				if( n.getExecType()==ExecType.MR || n.getExecType()==ExecType.SPARK )
+				if( n.getExecType()==getRemoteExecType() )
 					inLocal = false;
 			}
 			else if( n.getChilds()!=null )  
@@ -3493,7 +3498,6 @@ public class OptimizerRuleBased extends Optimizer
 		return count;
 	}
 	
-	
 	////////////////////////
 	//   Helper methods   //
 	////////////////////////