You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2017/07/09 05:33:19 UTC

[3/4] systemml git commit: [SYSTEMML-1753] Fix memory efficiency parfor in-memory result merge

[SYSTEMML-1753] Fix memory efficiency parfor in-memory result merge

This patch fixes out-of-memory issues with parfor in-memory result merge
for moderately large outputs. In case of result merge without compare
(i.e., initially empty output matrix), we currently allocate the output
in sparse, collect all outputs into this sparse representation and
finally convert it into a dense output. This is unnecessarily
inefficient in terms of memory requirements (dense matrix in sparse
format and dense output) and performance because for result merge
without compare, the output number of non-zeros is exactly know by the
sum of the number of non-zeros of all inputs. We now use a much better
estimate of the number of non-zeros, which improves the memory
requirements by more than 2x in above scenario. Furthermore, we now also
remove any remaining intermediates (other than result variables) from
the workers before result merge to reduce memory pressure in scenarios
without buffer pool eviction (e.g., training through JMLC). 


-> remove non-result variables before result merge.
fix2

Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/73f7c6a7
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/73f7c6a7
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/73f7c6a7

Branch: refs/heads/master
Commit: 73f7c6a7776a46435042fba3784af3d2368f7703
Parents: 8b83ab5
Author: Matthias Boehm <mb...@gmail.com>
Authored: Sat Jul 8 18:31:28 2017 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Sat Jul 8 22:32:04 2017 -0700

----------------------------------------------------------------------
 .../controlprogram/LocalVariableMap.java        | 14 +++---
 .../controlprogram/ParForProgramBlock.java      | 12 ++---
 .../parfor/ResultMergeLocalMemory.java          | 50 ++++++++++++--------
 .../sysml/runtime/matrix/data/MatrixBlock.java  |  4 +-
 4 files changed, 47 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/73f7c6a7/src/main/java/org/apache/sysml/runtime/controlprogram/LocalVariableMap.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/LocalVariableMap.java b/src/main/java/org/apache/sysml/runtime/controlprogram/LocalVariableMap.java
index 63757ac..d60ef24 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/LocalVariableMap.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/LocalVariableMap.java
@@ -78,20 +78,22 @@ public class LocalVariableMap implements Cloneable
 	 * @param name the variable name for the data value
 	 * @param val the data value object (such as envelope)
 	 */
-	public void put(String name, Data val)
-	{
+	public void put(String name, Data val) {
 		localMap.put( name, val );
 	}
 
-	public Data remove( String name )
-	{
+	public Data remove( String name ) {
 		return localMap.remove( name );
 	}
 
-	public void removeAll()
-	{
+	public void removeAll() {
 		localMap.clear();
 	}
+	
+	public void removeAllNotIn(Set<String> blacklist) {
+		localMap.entrySet().removeIf(
+			e -> !blacklist.contains(e.getKey()));
+	}
 
 	public boolean hasReferences( Data d )
 	{

http://git-wip-us.apache.org/repos/asf/systemml/blob/73f7c6a7/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
index 32f105b..a2d361c 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
@@ -802,17 +802,17 @@ public class ParForProgramBlock extends ForProgramBlock
 				
 				
 			// Step 4) collecting results from each parallel worker
-			//obtain results
+			//obtain results and cleanup other intermediates before result merge
 			LocalVariableMap [] localVariables = new LocalVariableMap [_numThreads]; 
-			for( int i=0; i<_numThreads; i++ )
-			{
+			for( int i=0; i<_numThreads; i++ ) {
 				localVariables[i] = workers[i].getVariables();
+				localVariables[i].removeAllNotIn(new HashSet<String>(_resultVars));
 				numExecutedTasks += workers[i].getExecutedTasks();
 				numExecutedIterations += workers[i].getExecutedIterations();			
 			}
 			//consolidate results into global symbol table
-			consolidateAndCheckResults( ec, numIterations, numCreatedTasks, numExecutedIterations, numExecutedTasks, 
-					                    localVariables );
+			consolidateAndCheckResults( ec, numIterations, numCreatedTasks,
+				numExecutedIterations, numExecutedTasks, localVariables );
 			
 			// Step 5) cleanup local parworkers (e.g., remove created functions)
 			for( int i=0; i<_numThreads; i++ )
@@ -1734,7 +1734,7 @@ public class ParForProgramBlock extends ForProgramBlock
 		{
 			//execute result merge sequentially for all result vars
 			for( String var : _resultVars ) //foreach non-local write
-			{			
+			{
 				Data dat = ec.getVariable(var);
 				if( dat instanceof MatrixObject ) //robustness scalars
 				{

http://git-wip-us.apache.org/repos/asf/systemml/blob/73f7c6a7/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java
index a1962e4..fdd3d97 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java
@@ -61,14 +61,16 @@ public class ResultMergeLocalMemory extends ResultMerge
 				
 		try
 		{
-			//get matrix blocks through caching 
+			//get old output matrix from cache for compare
 			MatrixBlock outMB = _output.acquireRead();
 			
-			//get old output matrix from cache for compare
-			int estnnz = outMB.getNumRows()*outMB.getNumColumns();
-			MatrixBlock outMBNew = new MatrixBlock(outMB.getNumRows(), outMB.getNumColumns(), 
-					                               outMB.isInSparseFormat(), estnnz);
+			//create output matrices in correct format according to 
+			//the estimated number of non-zeros
+			long estnnz = getOutputNnzEstimate();
+			MatrixBlock outMBNew = new MatrixBlock(
+				outMB.getNumRows(), outMB.getNumColumns(), estnnz);
 			boolean appendOnly = outMBNew.isInSparseFormat();
+			outMBNew.allocateDenseOrSparseBlock();
 			
 			//create compare matrix if required (existing data in result)
 			_compare = createCompareMatrix(outMB);
@@ -80,7 +82,7 @@ public class ResultMergeLocalMemory extends ResultMerge
 			for( MatrixObject in : _inputs )
 			{
 				//check for empty inputs (no iterations executed)
-				if( in !=null && in != _output ) 
+				if( in != null && in != _output ) 
 				{
 					LOG.trace("ResultMerge (local, in-memory): Merge input "+in.getVarName()+" (fname="+in.getFileName()+")");
 					
@@ -97,7 +99,7 @@ public class ResultMergeLocalMemory extends ResultMerge
 					
 					//determine need for sparse2dense change during merge
 					boolean sparseToDense = appendOnly && !MatrixBlock.evalSparseFormatInMemory(
-							                                 outMBNew.getNumRows(), outMBNew.getNumColumns(), outMBNew.getNonZeros()); 
+						outMBNew.getNumRows(), outMBNew.getNumColumns(), outMBNew.getNonZeros()); 
 					if( sparseToDense ) {
 						outMBNew.sortSparseRows(); //sort sparse due to append-only
 						outMBNew.examSparsity(); //sparse-dense representation change
@@ -129,8 +131,7 @@ public class ResultMergeLocalMemory extends ResultMerge
 			//release old output, and all inputs
 			_output.release();
 		}
-		catch(Exception ex)
-		{
+		catch(Exception ex) {
 			throw new DMLRuntimeException(ex);
 		}
 
@@ -144,13 +145,9 @@ public class ResultMergeLocalMemory extends ResultMerge
 		throws DMLRuntimeException
 	{		
 		MatrixObject moNew = null; //always create new matrix object (required for nested parallelism)
-	
-		//Timing time = null;
+		
 		LOG.trace("ResultMerge (local, in-memory): Execute parallel (par="+par+") merge for output "+_output.getVarName()+" (fname="+_output.getFileName()+")");
-		//	time = new Timing();
-		//	time.start();
 		
-
 		try
 		{
 			//get matrix blocks through caching 
@@ -211,10 +208,8 @@ public class ResultMergeLocalMemory extends ResultMerge
 			
 			//release old output, and all inputs
 			_output.release();			
-			//_output.clearData(); //save, since it respects pin/unpin  
 		}
-		catch(Exception ex)
-		{
+		catch(Exception ex) {
 			throw new DMLRuntimeException(ex);
 		}
 		
@@ -228,8 +223,7 @@ public class ResultMergeLocalMemory extends ResultMerge
 		double[][] ret = null;
 		
 		//create compare matrix only if required
-		if( output.getNonZeros() > 0 )
-		{
+		if( output.getNonZeros() > 0 ) {
 			ret = DataConverter.convertToDoubleMatrix( output );
 		}
 		
@@ -289,6 +283,24 @@ public class ResultMergeLocalMemory extends ResultMerge
 			mergeWithComp(out, in, _compare);
 	}
 	
+	/**
+	 * Estimates the number of non-zeros in the final merged output.
+	 * For scenarios without compare matrix, this is the exact number 
+	 * of non-zeros due to guaranteed disjoint results per worker.
+	 * 
+	 * @return estimated number of non-zeros.
+	 */
+	private long getOutputNnzEstimate() {
+		long nnzInputs = 0;
+		for( MatrixObject input : _inputs )
+			if( input != null )
+				nnzInputs += Math.max(input.getNnz(),1);
+		long rlen = _output.getNumRows();
+		long clen = _output.getNumColumns();
+		return Math.min(rlen * clen,
+			Math.max(nnzInputs, _output.getNnz()));
+	}
+	
 	
 	/**
 	 * NOTE: only used if matrix in dense

http://git-wip-us.apache.org/repos/asf/systemml/blob/73f7c6a7/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
index 43adfc6..16051dc 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
@@ -1033,7 +1033,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
 	 * @return true if matrix block shold be in sparse format in memory
 	 */
 	public static boolean evalSparseFormatInMemory( final long nrows, final long ncols, final long nnz )
-	{				
+	{
 		//evaluate sparsity threshold
 		double lsparsity = (double)nnz/nrows/ncols;
 		boolean lsparse = (lsparsity < SPARSITY_TURN_POINT);
@@ -1686,7 +1686,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
 			throw new DMLRuntimeException("Number of non-zeros mismatch on merge disjoint (target="+rlen+"x"+clen+", nnz target="+nonZeros+", nnz source="+that.nonZeros+")");
 		
 		//check for empty target (copy in full)
-		if( isEmptyBlock(false) ) {
+		if( isEmptyBlock(false) && !(!sparse && isAllocated()) ) {
 			copy(that);
 			return;
 		}