You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2017/07/09 05:33:17 UTC

[1/4] systemml git commit: [SYSTEMML-1752] Cache-conscious mmchain block matrix multiplication

Repository: systemml
Updated Branches:
  refs/heads/master f485ab292 -> b84a4933c


[SYSTEMML-1752] Cache-conscious mmchain block matrix multiplication

The fused mmchain matrix multiply for patterns such as t(X) %% (w * (X
%% v)) uses row-wise dotProduct and vectMultAdd operations, which works
very well for the common case of tall&skinny matrices where individual
rows fit into L1 cache. However, for graph and text scenarios with wide
matrices this leads to cache thrashing on the input and output vectors.

This patch introduces cache-conscious dense mmchain operations by
working over fragments of input and output vectors. On a scenario with
dense matrices of 2M features, this improved the multi-threaded
performance (in a parfor context) by almost 2x due to less cache
thrashing. At the same time, this patch does not negatively affect the
performance of the common case with tall and skinny matrices.

Furthermore, for sparse, we no longer use row blocks but single rows
(which reduces cache memory requirements) because these sparse row
blocks can anyway not be used for vectMultAdd4 operations. However, a
full support for cache-conscious sparse mmchain operations is still
pending but initial experiments with sparsity-dependent block sizes were
non-conclusive from a performance perspective and showed issues of
numerical stability.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/8b83ab5f
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/8b83ab5f
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/8b83ab5f

Branch: refs/heads/master
Commit: 8b83ab5f4611c27b605af45ee40e6e0b5cd801f9
Parents: 4a6165b
Author: Matthias Boehm <mb...@gmail.com>
Authored: Sat Jul 8 02:23:03 2017 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Sat Jul 8 22:32:03 2017 -0700

----------------------------------------------------------------------
 .../runtime/matrix/data/LibMatrixMult.java      | 86 +++++++++-----------
 1 file changed, 39 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/8b83ab5f/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java
index b5429c3..5996a51 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixMult.java
@@ -1565,26 +1565,35 @@ public class LibMatrixMult
 		
 		//temporary array for cache blocking
 		//(blocksize chosen to fit b+v in L2 (256KB) for default 1k blocks)
-		final int blocksize = 24; // constraint: factor of 4
-		double[] tmp = new double[blocksize];
-			
+		final int blocksizeI = 24; // constraint: factor of 4
+		final int blocksizeJ = 1024;
+		double[] tmp = new double[blocksizeI];
+		
 		//blockwise mmchain computation
-		final int bn = ru - ru % blocksize; //rl blocksize aligned
-		for( int bi=rl; bi < bn; bi+=blocksize ) 
+		final int bn = ru - ru % blocksizeI; //rl blocksize aligned
+		for( int bi=rl; bi < bn; bi+=blocksizeI ) 
 		{
 			//compute 1st matrix-vector for row block
-			for( int j=0, aix=bi*cd; j < blocksize; j++, aix+=cd)
-				tmp[j] = dotProduct(a, b, aix, 0, cd);
+			Arrays.fill(tmp, 0);
+			for( int bj=0; bj<cd; bj+=blocksizeJ ) {
+				int bjmin = Math.min(cd-bj, blocksizeJ);
+				for( int i=0, aix=bi*cd+bj; i < blocksizeI; i++, aix+=cd)
+					tmp[i] += dotProduct(a, b, aix, bj, bjmin);
+			}
 			
 			//multiply/subtract weights (in-place), if required
 			if( weights ) 
-				vectMultiply(w, tmp, bi, 0, blocksize);	
+				vectMultiply(w, tmp, bi, 0, blocksizeI);	
 			else if( weights2 )
-				vectSubtract(w, tmp, bi, 0, blocksize);
+				vectSubtract(w, tmp, bi, 0, blocksizeI);
 				
 			//compute 2nd matrix vector for row block and aggregate
-			for (int j=0, aix=bi*cd; j < blocksize; j+=4, aix+=4*cd)
-				vectMultiplyAdd4(tmp[j], tmp[j+1], tmp[j+2], tmp[j+3], a, c, aix, aix+cd, aix+2*cd, aix+3*cd, 0, cd);
+			for( int bj = 0; bj<cd; bj+=blocksizeJ ) {
+				int bjmin = Math.min(cd-bj, blocksizeJ);
+				for (int i=0, aix=bi*cd+bj; i<blocksizeI; i+=4, aix+=4*cd)
+					vectMultiplyAdd4(tmp[i], tmp[i+1], tmp[i+2], tmp[i+3],
+						a, c, aix, aix+cd, aix+2*cd, aix+3*cd, bj, bjmin);
+			}
 		}
 		
 		//compute rest (not aligned to blocksize)
@@ -1592,7 +1601,7 @@ public class LibMatrixMult
 			double val = dotProduct(a, b, aix, 0, cd);
 			val *= (weights) ? w[i] : 1;
 			val -= (weights2) ? w[i] : 0;
-			vectMultiplyAdd(val, a, c, aix, 0, cd);				
+			vectMultiplyAdd(val, a, c, aix, 0, cd);
 		}
 	}
 
@@ -1605,42 +1614,25 @@ public class LibMatrixMult
 		boolean weights = (ct == ChainType.XtwXv);
 		boolean weights2 = (ct == ChainType.XtXvy);
 		
-		//temporary array for cache blocking
-		//(blocksize chosen to fit b+v in L2 (256KB) for default 1k blocks)
-		final int blocksize = 24;
-		double[] tmp = new double[blocksize];
-		
-		//blockwise mmchain computation
-		for( int bi=rl; bi < ru; bi+=blocksize ) 
-		{
-			//reset row block intermediate
-			int tmplen = Math.min(blocksize, ru-bi);
-
-			//compute 1st matrix-vector for row block
-			for( int j=0; j < tmplen; j++) {
-				if( !a.isEmpty(bi+j) ) {
-					int apos = a.pos(bi+j);
-					int alen = a.size(bi+j);				
-					tmp[j] = dotProduct(a.values(bi+j), b, a.indexes(bi+j), apos, 0, alen);							
-				}
-			}
+		//row-wise mmchain computation
+		for( int i=rl; i < ru; i++ ) {
+			if( a.isEmpty(i) || (weights && w[i]==0) )
+				continue;
+			int apos = a.pos(i);
+			int alen = a.size(i);
+			int[] aix = a.indexes(i);
+			double[] avals = a.values(i);
 			
-			//multiply weights (in-place), if required
-			if( weights ) 
-				vectMultiply(w, tmp, bi, 0, tmplen);	
-			else if( weights2 )
-				vectSubtract(w, tmp, bi, 0, tmplen);
-		
-			//compute 2nd matrix vector for row block and aggregate
-			for( int j=0; j < tmplen; j++) {
-				if( !a.isEmpty(bi+j) && tmp[j] != 0 ) {
-					int apos = a.pos(bi+j);
-					int alen = a.size(bi+j);
-					int[] aix = a.indexes(bi+j);
-					double[] avals = a.values(bi+j);		
-					vectMultiplyAdd(tmp[j], avals, c, aix, apos, 0, alen);							
-				}
-			}
+			//compute 1st matrix-vector dot product
+			double val = dotProduct(avals, b, aix, apos, 0, alen);
+			
+			//multiply/subtract weights, if required
+			val *= (weights) ? w[i] : 1;
+			val -= (weights2) ? w[i] : 0;
+			
+			//compute 2nd matrix vector and aggregate
+			if( val != 0 )
+				vectMultiplyAdd(val, avals, c, aix, apos, 0, alen);
 		}
 	}
 


[4/4] systemml git commit: [SYSTEMML-1754] Performance removeEmpty (shallow copy if unmodified)

Posted by mb...@apache.org.
[SYSTEMML-1754] Performance removeEmpty (shallow copy if unmodified)

This patch improves the performance and memory efficiency of removeEmpty
rows/columns for both dense and sparse inputs by using a shallow output
copy if the determined output dimensions match the input dimensions. 

Basic experiments (cumulative runtime for 100 iterations):
1) removeEmpty rows 10Kx10K, sp=1.0 (dense): 33.9 -> 0.027s
2) removeEmpty rows 100Kx1K, sp=1.0 (dense): 33.2 -> 0.137s
3) removeEmpty cols 10Kx10K, sp=1.0 (dense): 57.8 -> 14.8s
4) removeEmpty cols 1Kx100K, sp=1.0 (dense): 61.5 -> 14.3s
5) removeEmpty rows 10Kx10K, sp=0.1 (sparse): 0.055 -> 0.036s
6) removeEmpty rows 100Kx1K, sp=0.1 (sparse): 0.470 -> 0.127s
7) removeEmpty cols 10Kx10K, sp=0.1 (sparse): 55.3 -> 1.3s
8) removeEmpty cols 1Kx100K, sp=0.1 (sparse): 39.8 -> 1.1s

Note that this patch improves performance by orders of magnitude, except
for  sparse removeEmpty rows (which already used shallow row copy).


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/b84a4933
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/b84a4933
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/b84a4933

Branch: refs/heads/master
Commit: b84a4933c11a25d064216bc0606938bcf0d792e6
Parents: 73f7c6a
Author: Matthias Boehm <mb...@gmail.com>
Authored: Sat Jul 8 21:44:51 2017 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Sat Jul 8 22:32:05 2017 -0700

----------------------------------------------------------------------
 .../runtime/matrix/data/LibMatrixReorg.java     | 103 +++++++++++--------
 1 file changed, 61 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/b84a4933/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
index 73d50eb..205f787 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
@@ -1710,7 +1710,14 @@ public class LibMatrixReorg
 		if( in.isEmptyBlock(false) )
 			return ret;
 		
-		if( in.sparse ) //* <- SPARSE
+		if( SHALLOW_COPY_REORG && m == rlen2 ) {
+			ret.sparse = in.sparse;
+			if( ret.sparse )
+				ret.sparseBlock = in.sparseBlock;
+			else
+				ret.denseBlock = in.denseBlock;
+		}
+		else if( in.sparse ) //* <- SPARSE
 		{
 			//note: output dense or sparse
 			for( int i=0, cix=0; i<m; i++ )
@@ -1798,14 +1805,7 @@ public class LibMatrixReorg
 			clen2 += flags[j] ? 1 : 0;
 		}
 		
-		//Step 3: create mapping of flags to target indexes
-		int[] cix = new int[n];
-		for( int j=0, pos=0; j<n; j++ ) {
-			if( flags[j] )
-				cix[j] = pos++;	
-		}
-		
-		//Step 3: reset result and copy cols
+		//Step 3: reset result and copy columns
 		//dense stays dense if correct input representation (but robust for any input), 
 		// sparse might be dense/sparse
 		clen2 = Math.max(clen2, 1); //ensure valid output
@@ -1814,42 +1814,61 @@ public class LibMatrixReorg
 		if( in.isEmptyBlock(false) )
 			return ret;
 		
-		if( in.sparse ) //* <- SPARSE 
-		{
-			//note: output dense or sparse
-			SparseBlock a = in.sparseBlock;
-			
-			for( int i=0; i<m; i++ ) 
-				if ( !a.isEmpty(i) ) {
-					int apos = a.pos(i);
-					int alen = a.size(i);
-					int[] aix = a.indexes(i);
-					double[] avals = a.values(i);
-					for( int j=apos; j<apos+alen; j++ )
-						if( flags[aix[j]] )
-							ret.appendValue(i, cix[aix[j]], avals[j]);
-				}
-		}
-		else if( !in.sparse && !ret.sparse )  //DENSE <- DENSE
-		{
-			ret.allocateDenseBlock();
-			double[] a = in.denseBlock;
-			double[] c = ret.denseBlock;
-			
-			for(int i=0, aix=0, lcix=0; i<m; i++, lcix+=clen2)
-				for(int j=0; j<n; j++, aix++)
-					if( flags[j] )
-						 c[ lcix+cix[j] ] = a[aix];	
+		if( SHALLOW_COPY_REORG && n == clen2 ) {
+			//quick path: shallow copy if unmodified
+			ret.sparse = in.sparse;
+			if( ret.sparse )
+				ret.sparseBlock = in.sparseBlock;
+			else
+				ret.denseBlock = in.denseBlock;
 		}
-		else //SPARSE <- DENSE
+		else
 		{
-			ret.allocateSparseRowsBlock();
-			double[] a = in.denseBlock;
+			//create mapping of flags to target indexes
+			int[] cix = new int[n];
+			for( int j=0, pos=0; j<n; j++ ) {
+				if( flags[j] )
+					cix[j] = pos++;
+			}
 			
-			for(int i=0, aix=0; i<m; i++)
-				for(int j=0; j<n; j++, aix++)
-					if( flags[j] && a[aix]!=0 )
-						 ret.appendValue(i, cix[j], a[aix]);	
+			//deep copy of modified outputs
+			if( in.sparse ) //* <- SPARSE
+			{
+				//note: output dense or sparse
+				SparseBlock a = in.sparseBlock;
+				
+				for( int i=0; i<m; i++ )
+					if ( !a.isEmpty(i) ) {
+						int apos = a.pos(i);
+						int alen = a.size(i);
+						int[] aix = a.indexes(i);
+						double[] avals = a.values(i);
+						for( int j=apos; j<apos+alen; j++ )
+							if( flags[aix[j]] )
+								ret.appendValue(i, cix[aix[j]], avals[j]);
+					}
+			}
+			else if( !in.sparse && !ret.sparse )  //DENSE <- DENSE
+			{
+				ret.allocateDenseBlock();
+				double[] a = in.denseBlock;
+				double[] c = ret.denseBlock;
+				
+				for(int i=0, aix=0, lcix=0; i<m; i++, lcix+=clen2)
+					for(int j=0; j<n; j++, aix++)
+						if( flags[j] )
+							 c[ lcix+cix[j] ] = a[aix];	
+			}
+			else //SPARSE <- DENSE
+			{
+				ret.allocateSparseRowsBlock();
+				double[] a = in.denseBlock;
+				
+				for(int i=0, aix=0; i<m; i++)
+					for(int j=0; j<n; j++, aix++)
+						if( flags[j] && a[aix]!=0 )
+							 ret.appendValue(i, cix[j], a[aix]);
+			}
 		}
 		
 		//check sparsity


[3/4] systemml git commit: [SYSTEMML-1753] Fix memory efficiency parfor in-memory result merge

Posted by mb...@apache.org.
[SYSTEMML-1753] Fix memory efficiency parfor in-memory result merge

This patch fixes out-of-memory issues with parfor in-memory result merge
for moderately large outputs. In case of result merge without compare
(i.e., initially empty output matrix), we currently allocate the output
in sparse, collect all outputs into this sparse representation and
finally convert it into a dense output. This is unnecessarily
inefficient in terms of memory requirements (dense matrix in sparse
format and dense output) and performance because for result merge
without compare, the output number of non-zeros is exactly know by the
sum of the number of non-zeros of all inputs. We now use a much better
estimate of the number of non-zeros, which improves the memory
requirements by more than 2x in above scenario. Furthermore, we now also
remove any remaining intermediates (other than result variables) from
the workers before result merge to reduce memory pressure in scenarios
without buffer pool eviction (e.g., training through JMLC). 


-> remove non-result variables before result merge.
fix2

Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/73f7c6a7
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/73f7c6a7
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/73f7c6a7

Branch: refs/heads/master
Commit: 73f7c6a7776a46435042fba3784af3d2368f7703
Parents: 8b83ab5
Author: Matthias Boehm <mb...@gmail.com>
Authored: Sat Jul 8 18:31:28 2017 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Sat Jul 8 22:32:04 2017 -0700

----------------------------------------------------------------------
 .../controlprogram/LocalVariableMap.java        | 14 +++---
 .../controlprogram/ParForProgramBlock.java      | 12 ++---
 .../parfor/ResultMergeLocalMemory.java          | 50 ++++++++++++--------
 .../sysml/runtime/matrix/data/MatrixBlock.java  |  4 +-
 4 files changed, 47 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/73f7c6a7/src/main/java/org/apache/sysml/runtime/controlprogram/LocalVariableMap.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/LocalVariableMap.java b/src/main/java/org/apache/sysml/runtime/controlprogram/LocalVariableMap.java
index 63757ac..d60ef24 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/LocalVariableMap.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/LocalVariableMap.java
@@ -78,20 +78,22 @@ public class LocalVariableMap implements Cloneable
 	 * @param name the variable name for the data value
 	 * @param val the data value object (such as envelope)
 	 */
-	public void put(String name, Data val)
-	{
+	public void put(String name, Data val) {
 		localMap.put( name, val );
 	}
 
-	public Data remove( String name )
-	{
+	public Data remove( String name ) {
 		return localMap.remove( name );
 	}
 
-	public void removeAll()
-	{
+	public void removeAll() {
 		localMap.clear();
 	}
+	
+	public void removeAllNotIn(Set<String> blacklist) {
+		localMap.entrySet().removeIf(
+			e -> !blacklist.contains(e.getKey()));
+	}
 
 	public boolean hasReferences( Data d )
 	{

http://git-wip-us.apache.org/repos/asf/systemml/blob/73f7c6a7/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
index 32f105b..a2d361c 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/ParForProgramBlock.java
@@ -802,17 +802,17 @@ public class ParForProgramBlock extends ForProgramBlock
 				
 				
 			// Step 4) collecting results from each parallel worker
-			//obtain results
+			//obtain results and cleanup other intermediates before result merge
 			LocalVariableMap [] localVariables = new LocalVariableMap [_numThreads]; 
-			for( int i=0; i<_numThreads; i++ )
-			{
+			for( int i=0; i<_numThreads; i++ ) {
 				localVariables[i] = workers[i].getVariables();
+				localVariables[i].removeAllNotIn(new HashSet<String>(_resultVars));
 				numExecutedTasks += workers[i].getExecutedTasks();
 				numExecutedIterations += workers[i].getExecutedIterations();			
 			}
 			//consolidate results into global symbol table
-			consolidateAndCheckResults( ec, numIterations, numCreatedTasks, numExecutedIterations, numExecutedTasks, 
-					                    localVariables );
+			consolidateAndCheckResults( ec, numIterations, numCreatedTasks,
+				numExecutedIterations, numExecutedTasks, localVariables );
 			
 			// Step 5) cleanup local parworkers (e.g., remove created functions)
 			for( int i=0; i<_numThreads; i++ )
@@ -1734,7 +1734,7 @@ public class ParForProgramBlock extends ForProgramBlock
 		{
 			//execute result merge sequentially for all result vars
 			for( String var : _resultVars ) //foreach non-local write
-			{			
+			{
 				Data dat = ec.getVariable(var);
 				if( dat instanceof MatrixObject ) //robustness scalars
 				{

http://git-wip-us.apache.org/repos/asf/systemml/blob/73f7c6a7/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java
index a1962e4..fdd3d97 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/parfor/ResultMergeLocalMemory.java
@@ -61,14 +61,16 @@ public class ResultMergeLocalMemory extends ResultMerge
 				
 		try
 		{
-			//get matrix blocks through caching 
+			//get old output matrix from cache for compare
 			MatrixBlock outMB = _output.acquireRead();
 			
-			//get old output matrix from cache for compare
-			int estnnz = outMB.getNumRows()*outMB.getNumColumns();
-			MatrixBlock outMBNew = new MatrixBlock(outMB.getNumRows(), outMB.getNumColumns(), 
-					                               outMB.isInSparseFormat(), estnnz);
+			//create output matrices in correct format according to 
+			//the estimated number of non-zeros
+			long estnnz = getOutputNnzEstimate();
+			MatrixBlock outMBNew = new MatrixBlock(
+				outMB.getNumRows(), outMB.getNumColumns(), estnnz);
 			boolean appendOnly = outMBNew.isInSparseFormat();
+			outMBNew.allocateDenseOrSparseBlock();
 			
 			//create compare matrix if required (existing data in result)
 			_compare = createCompareMatrix(outMB);
@@ -80,7 +82,7 @@ public class ResultMergeLocalMemory extends ResultMerge
 			for( MatrixObject in : _inputs )
 			{
 				//check for empty inputs (no iterations executed)
-				if( in !=null && in != _output ) 
+				if( in != null && in != _output ) 
 				{
 					LOG.trace("ResultMerge (local, in-memory): Merge input "+in.getVarName()+" (fname="+in.getFileName()+")");
 					
@@ -97,7 +99,7 @@ public class ResultMergeLocalMemory extends ResultMerge
 					
 					//determine need for sparse2dense change during merge
 					boolean sparseToDense = appendOnly && !MatrixBlock.evalSparseFormatInMemory(
-							                                 outMBNew.getNumRows(), outMBNew.getNumColumns(), outMBNew.getNonZeros()); 
+						outMBNew.getNumRows(), outMBNew.getNumColumns(), outMBNew.getNonZeros()); 
 					if( sparseToDense ) {
 						outMBNew.sortSparseRows(); //sort sparse due to append-only
 						outMBNew.examSparsity(); //sparse-dense representation change
@@ -129,8 +131,7 @@ public class ResultMergeLocalMemory extends ResultMerge
 			//release old output, and all inputs
 			_output.release();
 		}
-		catch(Exception ex)
-		{
+		catch(Exception ex) {
 			throw new DMLRuntimeException(ex);
 		}
 
@@ -144,13 +145,9 @@ public class ResultMergeLocalMemory extends ResultMerge
 		throws DMLRuntimeException
 	{		
 		MatrixObject moNew = null; //always create new matrix object (required for nested parallelism)
-	
-		//Timing time = null;
+		
 		LOG.trace("ResultMerge (local, in-memory): Execute parallel (par="+par+") merge for output "+_output.getVarName()+" (fname="+_output.getFileName()+")");
-		//	time = new Timing();
-		//	time.start();
 		
-
 		try
 		{
 			//get matrix blocks through caching 
@@ -211,10 +208,8 @@ public class ResultMergeLocalMemory extends ResultMerge
 			
 			//release old output, and all inputs
 			_output.release();			
-			//_output.clearData(); //save, since it respects pin/unpin  
 		}
-		catch(Exception ex)
-		{
+		catch(Exception ex) {
 			throw new DMLRuntimeException(ex);
 		}
 		
@@ -228,8 +223,7 @@ public class ResultMergeLocalMemory extends ResultMerge
 		double[][] ret = null;
 		
 		//create compare matrix only if required
-		if( output.getNonZeros() > 0 )
-		{
+		if( output.getNonZeros() > 0 ) {
 			ret = DataConverter.convertToDoubleMatrix( output );
 		}
 		
@@ -289,6 +283,24 @@ public class ResultMergeLocalMemory extends ResultMerge
 			mergeWithComp(out, in, _compare);
 	}
 	
+	/**
+	 * Estimates the number of non-zeros in the final merged output.
+	 * For scenarios without compare matrix, this is the exact number 
+	 * of non-zeros due to guaranteed disjoint results per worker.
+	 * 
+	 * @return estimated number of non-zeros.
+	 */
+	private long getOutputNnzEstimate() {
+		long nnzInputs = 0;
+		for( MatrixObject input : _inputs )
+			if( input != null )
+				nnzInputs += Math.max(input.getNnz(),1);
+		long rlen = _output.getNumRows();
+		long clen = _output.getNumColumns();
+		return Math.min(rlen * clen,
+			Math.max(nnzInputs, _output.getNnz()));
+	}
+	
 	
 	/**
 	 * NOTE: only used if matrix in dense

http://git-wip-us.apache.org/repos/asf/systemml/blob/73f7c6a7/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
index 43adfc6..16051dc 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
@@ -1033,7 +1033,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
 	 * @return true if matrix block shold be in sparse format in memory
 	 */
 	public static boolean evalSparseFormatInMemory( final long nrows, final long ncols, final long nnz )
-	{				
+	{
 		//evaluate sparsity threshold
 		double lsparsity = (double)nnz/nrows/ncols;
 		boolean lsparse = (lsparsity < SPARSITY_TURN_POINT);
@@ -1686,7 +1686,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
 			throw new DMLRuntimeException("Number of non-zeros mismatch on merge disjoint (target="+rlen+"x"+clen+", nnz target="+nonZeros+", nnz source="+that.nonZeros+")");
 		
 		//check for empty target (copy in full)
-		if( isEmptyBlock(false) ) {
+		if( isEmptyBlock(false) && !(!sparse && isAllocated()) ) {
 			copy(that);
 			return;
 		}


[2/4] systemml git commit: [MINOR] Performance frame transformencode (selective row iterators)

Posted by mb...@apache.org.
[MINOR] Performance frame transformencode (selective row iterators)

This patch adds selective row iterators to frame blocks, which allows
the transform recode encoder to iterate over rows of selected columns
which avoids unnecessary string conversions for unused columns.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/4a6165b7
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/4a6165b7
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/4a6165b7

Branch: refs/heads/master
Commit: 4a6165b796590a6388a9c182612761219731d77f
Parents: f485ab2
Author: Matthias Boehm <mb...@gmail.com>
Authored: Fri Jul 7 17:14:39 2017 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Sat Jul 8 22:32:03 2017 -0700

----------------------------------------------------------------------
 .../sysml/runtime/matrix/data/FrameBlock.java   | 82 +++++++++++++++++---
 .../transform/decode/DecoderFactory.java        |  2 +-
 .../transform/encode/EncoderFactory.java        |  2 +-
 .../runtime/transform/encode/EncoderRecode.java |  4 +-
 .../sysml/runtime/util/UtilFunctions.java       | 46 +++++++----
 5 files changed, 107 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/4a6165b7/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
index 99a6f3f..512b85c 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
@@ -505,6 +505,17 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable
 	}
 	
 	/**
+	 * Get a row iterator over the frame where all selected fields are 
+	 * encoded as strings independent of their value types.  
+	 * 
+	 * @param cols column selection, 1-based
+	 * @return string array iterator
+	 */
+	public Iterator<String[]> getStringRowIterator(int[] cols) {
+		return new StringRowIterator(0, _numRows, cols);
+	}
+	
+	/**
 	 * Get a row iterator over the frame where all fields are encoded
 	 * as strings independent of their value types.  
 	 * 
@@ -517,6 +528,19 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable
 	}
 	
 	/**
+	 * Get a row iterator over the frame where all selected fields are 
+	 * encoded as strings independent of their value types.  
+	 * 
+	 * @param rl lower row index
+	 * @param ru upper row index
+	 * @param cols column selection, 1-based
+	 * @return string array iterator
+	 */
+	public Iterator<String[]> getStringRowIterator(int rl, int ru, int[] cols) {
+		return new StringRowIterator(rl, ru, cols);
+	}
+	
+	/**
 	 * Get a row iterator over the frame where all fields are encoded
 	 * as boxed objects according to their value types.  
 	 * 
@@ -527,6 +551,17 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable
 	}
 	
 	/**
+	 * Get a row iterator over the frame where all selected fields are 
+	 * encoded as boxed objects according to their value types.  
+	 * 
+	 * @param cols column selection, 1-based
+	 * @return object array iterator
+	 */
+	public Iterator<Object[]> getObjectRowIterator(int[] cols) {
+		return new ObjectRowIterator(0, _numRows, cols);
+	}
+	
+	/**
 	 * Get a row iterator over the frame where all fields are encoded
 	 * as boxed objects according to their value types.  
 	 * 
@@ -537,6 +572,19 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable
 	public Iterator<Object[]> getObjectRowIterator(int rl, int ru) {
 		return new ObjectRowIterator(rl, ru);
 	}
+	
+	/**
+	 * Get a row iterator over the frame where all selected fields are 
+	 * encoded as boxed objects according to their value types.  
+	 * 
+	 * @param rl lower row index
+	 * @param ru upper row index
+	 * @param cols column selection, 1-based
+	 * @return object array iterator
+	 */
+	public Iterator<Object[]> getObjectRowIterator(int rl, int ru, int[] cols) {
+		return new ObjectRowIterator(rl, ru, cols);
+	}
 
 	///////
 	// serialization / deserialization (implementation of writable and externalizable)
@@ -1111,14 +1159,20 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable
 	// row iterators (over strings and boxed objects)
 
 	private abstract class RowIterator<T> implements Iterator<T[]> {
-		protected T[] _curRow = null;
+		protected final int[] _cols;
+		protected final T[] _curRow;
+		protected final int _maxPos;
 		protected int _curPos = -1;
-		protected int _maxPos = -1;
 		
 		protected RowIterator(int rl, int ru) {
-			_curPos = rl;
+			this(rl, ru, UtilFunctions.getSeqArray(1, getNumColumns(), 1));
+		}
+		
+		protected RowIterator(int rl, int ru, int[] cols) {
+			_curRow = createRow(cols.length);
+			_cols = cols;
 			_maxPos = ru;
-			_curRow = createRow(getNumColumns());
+			_curPos = rl;
 		}
 		
 		@Override
@@ -1139,6 +1193,10 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable
 			super(rl, ru);
 		}
 		
+		public StringRowIterator(int rl, int ru, int[] cols) {
+			super(rl, ru, cols);
+		}
+		
 		@Override
 		protected String[] createRow(int size) {
 			return new String[size];
@@ -1146,11 +1204,11 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable
 		
 		@Override
 		public String[] next( ) {
-			for( int j=0; j<getNumColumns(); j++ ) {
-				Object tmp = get(_curPos, j);
+			for( int j=0; j<_cols.length; j++ ) {
+				Object tmp = get(_curPos, _cols[j]-1);
 				_curRow[j] = (tmp!=null) ? tmp.toString() : null;
 			}
-			_curPos++;			
+			_curPos++;
 			return _curRow;
 		}
 	}
@@ -1160,6 +1218,10 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable
 			super(rl, ru);
 		}
 		
+		public ObjectRowIterator(int rl, int ru, int[] cols) {
+			super(rl, ru, cols);
+		}
+		
 		@Override
 		protected Object[] createRow(int size) {
 			return new Object[size];
@@ -1167,9 +1229,9 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable
 		
 		@Override
 		public Object[] next( ) {
-			for( int j=0; j<getNumColumns(); j++ )
-				_curRow[j] = get(_curPos, j);
-			_curPos++;			
+			for( int j=0; j<_cols.length; j++ )
+				_curRow[j] = get(_curPos, _cols[j]-1);
+			_curPos++;
 			return _curRow;
 		}
 	}

http://git-wip-us.apache.org/repos/asf/systemml/blob/4a6165b7/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderFactory.java b/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderFactory.java
index 425466a..c02609a 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderFactory.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/decode/DecoderFactory.java
@@ -56,7 +56,7 @@ public class DecoderFactory
 					TfMetaUtils.parseJsonIDList(jSpec, colnames, TfUtils.TXMETHOD_DUMMYCODE))); 
 			rcIDs = new ArrayList<Integer>(CollectionUtils.union(rcIDs, dcIDs));
 			List<Integer> ptIDs = new ArrayList<Integer>(CollectionUtils
-					.subtract(UtilFunctions.getSequenceList(1, meta.getNumColumns(), 1), rcIDs)); 
+					.subtract(UtilFunctions.getSeqList(1, meta.getNumColumns(), 1), rcIDs)); 
 
 			//create default schema if unspecified (with double columns for pass-through)
 			if( schema == null ) {

http://git-wip-us.apache.org/repos/asf/systemml/blob/4a6165b7/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java
index 13b2810..5e0a178 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderFactory.java
@@ -65,7 +65,7 @@ public class EncoderFactory
 			rcIDs = new ArrayList<Integer>(CollectionUtils.union(rcIDs, dcIDs));
 			List<Integer> binIDs = TfMetaUtils.parseBinningColIDs(jSpec, colnames); 
 			List<Integer> ptIDs = new ArrayList<Integer>(CollectionUtils.subtract(
-					CollectionUtils.subtract(UtilFunctions.getSequenceList(1, clen, 1), rcIDs), binIDs)); 
+					CollectionUtils.subtract(UtilFunctions.getSeqList(1, clen, 1), rcIDs), binIDs)); 
 			List<Integer> oIDs = Arrays.asList(ArrayUtils.toObject(
 					TfMetaUtils.parseJsonIDList(jSpec, colnames, TfUtils.TXMETHOD_OMIT))); 
 			List<Integer> mvIDs = Arrays.asList(ArrayUtils.toObject(

http://git-wip-us.apache.org/repos/asf/systemml/blob/4a6165b7/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderRecode.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderRecode.java b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderRecode.java
index bb8592c..dc75a74 100644
--- a/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderRecode.java
+++ b/src/main/java/org/apache/sysml/runtime/transform/encode/EncoderRecode.java
@@ -112,7 +112,7 @@ public class EncoderRecode extends Encoder
 		if( !isApplicable() )
 			return;		
 
-		Iterator<String[]> iter = in.getStringRowIterator();
+		Iterator<String[]> iter = in.getStringRowIterator(_colList);
 		while( iter.hasNext() ) {
 			String[] row = iter.next(); 
 			for( int j=0; j<_colList.length; j++ ) {
@@ -122,7 +122,7 @@ public class EncoderRecode extends Encoder
 					_rcdMaps.put(colID, new HashMap<String,Long>());
 				//probe and build column map
 				HashMap<String,Long> map = _rcdMaps.get(colID);
-				String key = row[colID-1];
+				String key = row[j];
 				if( key!=null && !key.isEmpty() && !map.containsKey(key) )
 					map.put(key, Long.valueOf(map.size()+1));
 			}

http://git-wip-us.apache.org/repos/asf/systemml/blob/4a6165b7/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
index f76d37b..8c4cacd 100644
--- a/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
+++ b/src/main/java/org/apache/sysml/runtime/util/UtilFunctions.java
@@ -314,6 +314,37 @@ public class UtilFunctions
 		return 1L + (long) Math.floor(to/incr - from/incr);
 	}
 	
+	/**
+	 * Obtain sequence list
+	 * 
+	 * @param low   lower bound (inclusive)
+	 * @param up    upper bound (inclusive)
+	 * @param incr  increment 
+	 * @return list of integers
+	 */
+	public static List<Integer> getSeqList(int low, int up, int incr) {
+		ArrayList<Integer> ret = new ArrayList<Integer>();
+		for( int i=low; i<=up; i+=incr )
+			ret.add(i);
+		return ret;
+	}
+	
+	/**
+	 * Obtain sequence array
+	 * 
+	 * @param low   lower bound (inclusive)
+	 * @param up    upper bound (inclusive)
+	 * @param incr  increment 
+	 * @return array of integers
+	 */
+	public static int[] getSeqArray(int low, int up, int incr) {
+		int len = (int) getSeqLength(low, up, incr);
+		int[] ret = new int[len];
+		for( int i=0, val=low; i<len; i++, val+=incr )
+			ret[i] = val;
+		return ret;
+	}
+	
  	public static int roundToNext(int val, int factor) {
 		//round up to next non-zero multiple of factor
 		int pval = Math.max(val, factor);
@@ -506,21 +537,6 @@ public class UtilFunctions
 		else
 			return String.format("%d", arg);
 	}
-	
-	/**
-	 * Obtain sequence list
-	 * 
-	 * @param low   lower bound (inclusive)
-	 * @param up    upper bound (inclusive)
-	 * @param incr  increment 
-	 * @return list of integers
-	 */
-	public static List<Integer> getSequenceList(int low, int up, int incr) {
-		ArrayList<Integer> ret = new ArrayList<Integer>();
-		for( int i=low; i<=up; i+=incr )
-			ret.add(i);
-		return ret;
-	}
 
 	public static double getDouble(Object obj) {
 		return (obj instanceof Double) ? (Double)obj :