You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/03/07 21:24:11 UTC

[7/7] incubator-systemml git commit: [SYSTEMML-552] Performance parallel reader sparse binary block matrices

[SYSTEMML-552] Performance parallel reader sparse binary block matrices

Reading a sparse binary block matrix into CP is realized via append to
avoid repeated reshifting for maintaining the sorted order. In case of
matrices with multiple column blocks, a parallel read requires locking
during append and final sorting of sparse rows. This patch improves
performance by (1) sorting sparse rows in parallel, and (2) fine-grained
locking (synchronization) per row block instead locking the entire
matrix. On a scenario with a 100k x 100k, sp=0.01 matrix (with k=16
vcores) this led to an improvement from 13.1s to 8.9s (w/ parallel
sorting) and to 4.1s (w/ both features). 

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/41ca1d16
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/41ca1d16
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/41ca1d16

Branch: refs/heads/master
Commit: 41ca1d163c335156afaf491df21435953d04c1d4
Parents: 0089c3d
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Mon Mar 7 12:22:23 2016 -0800
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Mon Mar 7 12:23:29 2016 -0800

----------------------------------------------------------------------
 .../apache/sysml/runtime/io/MatrixReader.java   | 15 ++++-
 .../sysml/runtime/io/ReaderBinaryBlock.java     |  2 +-
 .../runtime/io/ReaderBinaryBlockParallel.java   | 64 ++++++++++++++++----
 .../sysml/runtime/io/ReaderBinaryCell.java      |  2 +-
 .../apache/sysml/runtime/io/ReaderTextCSV.java  |  2 +-
 .../sysml/runtime/io/ReaderTextCSVParallel.java |  2 +-
 .../apache/sysml/runtime/io/ReaderTextCell.java |  4 +-
 .../runtime/io/ReaderTextCellParallel.java      |  2 +-
 8 files changed, 71 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/41ca1d16/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java b/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java
index ccd0aee..d8a5c2d 100644
--- a/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java
+++ b/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java
@@ -26,10 +26,11 @@ import java.util.LinkedList;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-
 import org.apache.sysml.hops.OptimizerUtils;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.data.SparseBlock;
+import org.apache.sysml.runtime.matrix.data.SparseBlockMCSR;
 import org.apache.sysml.runtime.util.MapReduceTool;
 
 /**
@@ -98,7 +99,7 @@ public abstract class MatrixReader
 	 * @throws DMLRuntimeException 
 	 * @throws IOException 
 	 */
-	protected static MatrixBlock createOutputMatrixBlock( long rlen, long clen, long estnnz, boolean mallocDense, boolean mallocSparse ) 
+	protected static MatrixBlock createOutputMatrixBlock( long rlen, long clen, int bclen, int brlen, long estnnz, boolean mallocDense, boolean mallocSparse ) 
 		throws IOException, DMLRuntimeException
 	{
 		//check input dimension
@@ -112,8 +113,16 @@ public abstract class MatrixReader
 		MatrixBlock ret = new MatrixBlock((int)rlen, (int)clen, sparse, estnnz);
 		if( !sparse && mallocDense )
 			ret.allocateDenseBlockUnsafe((int)rlen, (int)clen);
-		else if( sparse && mallocSparse  )
+		else if( sparse && mallocSparse  ) {
 			ret.allocateSparseRowsBlock();
+			SparseBlock sblock = ret.getSparseBlock();
+			//create synchronization points for MCSR (start row per block row)
+			if( sblock instanceof SparseBlockMCSR && clen > bclen      //multiple col blocks 
+				&& clen > 0 && bclen > 0 && rlen > 0 && brlen > 0 ) {  //all dims known
+				for( int i=0; i<rlen; i+=brlen )
+					ret.getSparseBlock().allocate(i, Math.min((int)(estnnz/rlen),1), (int)clen);
+			}
+		}
 		
 		return ret;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/41ca1d16/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java
index 6493abb..03e42f4 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java
@@ -53,7 +53,7 @@ public class ReaderBinaryBlock extends MatrixReader
 		throws IOException, DMLRuntimeException 
 	{
 		//allocate output matrix block
-		MatrixBlock ret = createOutputMatrixBlock(rlen, clen, estnnz, false, false);
+		MatrixBlock ret = createOutputMatrixBlock(rlen, clen, brlen, bclen, estnnz, false, false);
 		
 		//prepare file access
 		JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());	

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/41ca1d16/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java
index 03f17ac..cdd7b33 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java
@@ -31,12 +31,13 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.mapred.JobConf;
-
 import org.apache.sysml.conf.ConfigurationManager;
 import org.apache.sysml.hops.OptimizerUtils;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
+import org.apache.sysml.runtime.matrix.data.SparseBlock;
+import org.apache.sysml.runtime.matrix.data.SparseBlockMCSR;
 import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
 
 
@@ -55,7 +56,7 @@ public class ReaderBinaryBlockParallel extends ReaderBinaryBlock
 		throws IOException, DMLRuntimeException 
 	{	
 		//allocate output matrix block (incl block allocation for parallel)
-		MatrixBlock ret = createOutputMatrixBlock(rlen, clen, estnnz, true, true);
+		MatrixBlock ret = createOutputMatrixBlock(rlen, clen, brlen, bclen, estnnz, true, true);
 		
 		//prepare file access
 		JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());	
@@ -112,7 +113,6 @@ public class ReaderBinaryBlockParallel extends ReaderBinaryBlock
 
 			//wait until all tasks have been executed
 			List<Future<Object>> rt = pool.invokeAll(tasks);	
-			pool.shutdown();
 			
 			//check for exceptions and aggregate nnz
 			long lnnz = 0;
@@ -121,10 +121,16 @@ public class ReaderBinaryBlockParallel extends ReaderBinaryBlock
 			
 			//post-processing
 			dest.setNonZeros( lnnz );
-			if( dest.isInSparseFormat() && clen>bclen ){
-				//no need to sort if 1 column block since always sorted
-				dest.sortSparseRows();
-			}			
+			if( dest.isInSparseFormat() && clen>bclen ) {
+				//need to sort if multiple column block; otherwise always sorted
+				ArrayList<SortRowsTask> tasks2 = new ArrayList<SortRowsTask>();
+				int blklen = (int)(Math.ceil((double)rlen/_numThreads));
+				for( int i=0; i<_numThreads & i*blklen<rlen; i++ )
+					tasks2.add(new SortRowsTask(dest, i*blklen, Math.min((i+1)*blklen, (int)rlen)));
+				pool.invokeAll(tasks2);
+			}
+			
+			pool.shutdown();
 		} 
 		catch (Exception e) {
 			throw new IOException("Failed parallel read of binary block input.", e);
@@ -196,18 +202,29 @@ public class ReaderBinaryBlockParallel extends ReaderBinaryBlock
 					{
 						//note: append requires final sort
 						if (cols < _clen ) {
-							synchronized( _dest ){ //sparse requires lock, when matrix is wider than one block
-								_dest.appendToSparse(value, row_offset, col_offset);
+							//sparse requires lock, when matrix is wider than one block
+							//(fine-grained locking of block rows instead of the entire matrix)
+							//NOTE: fine-grained locking depends on MCSR SparseRow objects 
+							SparseBlock sblock = _dest.getSparseBlock();
+							if( sblock instanceof SparseBlockMCSR && sblock.get(row_offset) != null ) {
+								synchronized( sblock.get(row_offset) ){ 
+									_dest.appendToSparse(value, row_offset, col_offset);
+								}
+							}
+							else {
+								synchronized( _dest ){ 
+									_dest.appendToSparse(value, row_offset, col_offset);
+								}
 							}
 						}
-						else
+						else { //quickpath (no synchronization)
 							_dest.appendToSparse(value, row_offset, col_offset);
+						}
 					} 
 					else
 					{
 						_dest.copy( row_offset, row_offset+rows-1, 
-								   col_offset, col_offset+cols-1,
-								   value, false );
+								   col_offset, col_offset+cols-1, value, false );
 					}
 					
 					//aggregate nnz
@@ -222,4 +239,27 @@ public class ReaderBinaryBlockParallel extends ReaderBinaryBlock
 			return lnnz;
 		}
 	}
+	
+	/**
+	 * 
+	 */
+	private static class SortRowsTask implements Callable<Object> 
+	{
+		private MatrixBlock _dest = null;
+		private int _rl = -1;
+		private int _ru = -1;
+		
+		public SortRowsTask(MatrixBlock dest, int rl, int ru) {
+			_dest = dest;
+			_rl = rl;
+			_ru = ru;
+		}
+
+		@Override
+		public Object call() throws Exception {
+			for( int i=_rl; i<_ru; i++ )
+				_dest.getSparseBlock().sort(i);
+			return null;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/41ca1d16/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java
index 581b9ec..2ab4554 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java
@@ -44,7 +44,7 @@ public class ReaderBinaryCell extends MatrixReader
 		throws IOException, DMLRuntimeException 
 	{
 		//allocate output matrix block
-		MatrixBlock ret = createOutputMatrixBlock(rlen, clen, estnnz, true, false);
+		MatrixBlock ret = createOutputMatrixBlock(rlen, clen, (int)rlen, (int)clen, estnnz, true, false);
 		
 		//prepare file access
 		JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());	

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/41ca1d16/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java
index ebb24dd..3fefb56 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java
@@ -57,7 +57,7 @@ public class ReaderTextCSV extends MatrixReader
 		//allocate output matrix block
 		MatrixBlock ret = null;
 		if( rlen>0 && clen>0 ) //otherwise CSV reblock based on file size for matrix w/ unknown dimensions
-			ret = createOutputMatrixBlock(rlen, clen, estnnz, true, false);
+			ret = createOutputMatrixBlock(rlen, clen, (int)rlen, (int)clen, estnnz, true, false);
 		
 		//prepare file access
 		JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());	

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/41ca1d16/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java
index 24a5db1..64c055c 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java
@@ -248,7 +248,7 @@ public class ReaderTextCSVParallel extends MatrixReader
 
 		// allocate target matrix block based on given size; 
 		// need to allocate sparse as well since lock-free insert into target
-		return createOutputMatrixBlock(nrow, ncol, estnnz, true, true);
+		return createOutputMatrixBlock(nrow, ncol, nrow, ncol, estnnz, true, true);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/41ca1d16/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java
index 0dce052..15e7caf 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java
@@ -56,7 +56,7 @@ public class ReaderTextCell extends MatrixReader
 		throws IOException, DMLRuntimeException 
 	{
 		//allocate output matrix block
-		MatrixBlock ret = createOutputMatrixBlock(rlen, clen, estnnz, true, false);
+		MatrixBlock ret = createOutputMatrixBlock(rlen, clen, (int)rlen, (int)clen, estnnz, true, false);
 		
 		//prepare file access
 		JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());	
@@ -84,7 +84,7 @@ public class ReaderTextCell extends MatrixReader
 		throws IOException, DMLRuntimeException 
 	{
 		//allocate output matrix block
-		MatrixBlock ret = createOutputMatrixBlock(rlen, clen, estnnz, true, false);
+		MatrixBlock ret = createOutputMatrixBlock(rlen, clen, brlen, bclen, estnnz, true, false);
 	
 		//core read 
 		readRawTextCellMatrixFromInputStream(is, ret, rlen, clen, brlen, bclen, _isMMFile);

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/41ca1d16/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java
index 309f5b7..d35ff7c 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java
@@ -89,7 +89,7 @@ public class ReaderTextCellParallel extends MatrixReader
 		checkValidInputFile(fs, path);
 		
 		//allocate output matrix block
-		MatrixBlock ret = createOutputMatrixBlock(rlen, clen, estnnz, true, false);
+		MatrixBlock ret = createOutputMatrixBlock(rlen, clen, (int)rlen, (int)clen, estnnz, true, false);
 	
 		//core read 
 		readTextCellMatrixFromHDFS(path, job, ret, rlen, clen, brlen, bclen, _isMMFile);