You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/03/07 21:24:11 UTC
[7/7] incubator-systemml git commit: [SYSTEMML-552] Performance
parallel reader sparse binary block matrices
[SYSTEMML-552] Performance parallel reader sparse binary block matrices
Reading a sparse binary block matrix into CP is realized via append to
avoid repeated reshifting for maintaining the sorted order. In case of
matrices with multiple column blocks, a parallel read requires locking
during append and final sorting of sparse rows. This patch improves
performance by (1) sorting sparse rows in parallel, and (2) fine-grained
locking (synchronization) per row block instead locking the entire
matrix. On a scenario with a 100k x 100k, sp=0.01 matrix (with k=16
vcores) this led to an improvement from 13.1s to 8.9s (w/ parallel
sorting) and to 4.1s (w/ both features).
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/41ca1d16
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/41ca1d16
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/41ca1d16
Branch: refs/heads/master
Commit: 41ca1d163c335156afaf491df21435953d04c1d4
Parents: 0089c3d
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Mon Mar 7 12:22:23 2016 -0800
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Mon Mar 7 12:23:29 2016 -0800
----------------------------------------------------------------------
.../apache/sysml/runtime/io/MatrixReader.java | 15 ++++-
.../sysml/runtime/io/ReaderBinaryBlock.java | 2 +-
.../runtime/io/ReaderBinaryBlockParallel.java | 64 ++++++++++++++++----
.../sysml/runtime/io/ReaderBinaryCell.java | 2 +-
.../apache/sysml/runtime/io/ReaderTextCSV.java | 2 +-
.../sysml/runtime/io/ReaderTextCSVParallel.java | 2 +-
.../apache/sysml/runtime/io/ReaderTextCell.java | 4 +-
.../runtime/io/ReaderTextCellParallel.java | 2 +-
8 files changed, 71 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/41ca1d16/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java b/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java
index ccd0aee..d8a5c2d 100644
--- a/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java
+++ b/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java
@@ -26,10 +26,11 @@ import java.util.LinkedList;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-
import org.apache.sysml.hops.OptimizerUtils;
import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.data.SparseBlock;
+import org.apache.sysml.runtime.matrix.data.SparseBlockMCSR;
import org.apache.sysml.runtime.util.MapReduceTool;
/**
@@ -98,7 +99,7 @@ public abstract class MatrixReader
* @throws DMLRuntimeException
* @throws IOException
*/
- protected static MatrixBlock createOutputMatrixBlock( long rlen, long clen, long estnnz, boolean mallocDense, boolean mallocSparse )
+ protected static MatrixBlock createOutputMatrixBlock( long rlen, long clen, int bclen, int brlen, long estnnz, boolean mallocDense, boolean mallocSparse )
throws IOException, DMLRuntimeException
{
//check input dimension
@@ -112,8 +113,16 @@ public abstract class MatrixReader
MatrixBlock ret = new MatrixBlock((int)rlen, (int)clen, sparse, estnnz);
if( !sparse && mallocDense )
ret.allocateDenseBlockUnsafe((int)rlen, (int)clen);
- else if( sparse && mallocSparse )
+ else if( sparse && mallocSparse ) {
ret.allocateSparseRowsBlock();
+ SparseBlock sblock = ret.getSparseBlock();
+ //create synchronization points for MCSR (start row per block row)
+ if( sblock instanceof SparseBlockMCSR && clen > bclen //multiple col blocks
+ && clen > 0 && bclen > 0 && rlen > 0 && brlen > 0 ) { //all dims known
+ for( int i=0; i<rlen; i+=brlen )
+ ret.getSparseBlock().allocate(i, Math.min((int)(estnnz/rlen),1), (int)clen);
+ }
+ }
return ret;
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/41ca1d16/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java
index 6493abb..03e42f4 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java
@@ -53,7 +53,7 @@ public class ReaderBinaryBlock extends MatrixReader
throws IOException, DMLRuntimeException
{
//allocate output matrix block
- MatrixBlock ret = createOutputMatrixBlock(rlen, clen, estnnz, false, false);
+ MatrixBlock ret = createOutputMatrixBlock(rlen, clen, brlen, bclen, estnnz, false, false);
//prepare file access
JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/41ca1d16/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java
index 03f17ac..cdd7b33 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java
@@ -31,12 +31,13 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.mapred.JobConf;
-
import org.apache.sysml.conf.ConfigurationManager;
import org.apache.sysml.hops.OptimizerUtils;
import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.matrix.data.MatrixBlock;
import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
+import org.apache.sysml.runtime.matrix.data.SparseBlock;
+import org.apache.sysml.runtime.matrix.data.SparseBlockMCSR;
import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
@@ -55,7 +56,7 @@ public class ReaderBinaryBlockParallel extends ReaderBinaryBlock
throws IOException, DMLRuntimeException
{
//allocate output matrix block (incl block allocation for parallel)
- MatrixBlock ret = createOutputMatrixBlock(rlen, clen, estnnz, true, true);
+ MatrixBlock ret = createOutputMatrixBlock(rlen, clen, brlen, bclen, estnnz, true, true);
//prepare file access
JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
@@ -112,7 +113,6 @@ public class ReaderBinaryBlockParallel extends ReaderBinaryBlock
//wait until all tasks have been executed
List<Future<Object>> rt = pool.invokeAll(tasks);
- pool.shutdown();
//check for exceptions and aggregate nnz
long lnnz = 0;
@@ -121,10 +121,16 @@ public class ReaderBinaryBlockParallel extends ReaderBinaryBlock
//post-processing
dest.setNonZeros( lnnz );
- if( dest.isInSparseFormat() && clen>bclen ){
- //no need to sort if 1 column block since always sorted
- dest.sortSparseRows();
- }
+ if( dest.isInSparseFormat() && clen>bclen ) {
+ //need to sort if multiple column block; otherwise always sorted
+ ArrayList<SortRowsTask> tasks2 = new ArrayList<SortRowsTask>();
+ int blklen = (int)(Math.ceil((double)rlen/_numThreads));
+ for( int i=0; i<_numThreads & i*blklen<rlen; i++ )
+ tasks2.add(new SortRowsTask(dest, i*blklen, Math.min((i+1)*blklen, (int)rlen)));
+ pool.invokeAll(tasks2);
+ }
+
+ pool.shutdown();
}
catch (Exception e) {
throw new IOException("Failed parallel read of binary block input.", e);
@@ -196,18 +202,29 @@ public class ReaderBinaryBlockParallel extends ReaderBinaryBlock
{
//note: append requires final sort
if (cols < _clen ) {
- synchronized( _dest ){ //sparse requires lock, when matrix is wider than one block
- _dest.appendToSparse(value, row_offset, col_offset);
+ //sparse requires lock, when matrix is wider than one block
+ //(fine-grained locking of block rows instead of the entire matrix)
+ //NOTE: fine-grained locking depends on MCSR SparseRow objects
+ SparseBlock sblock = _dest.getSparseBlock();
+ if( sblock instanceof SparseBlockMCSR && sblock.get(row_offset) != null ) {
+ synchronized( sblock.get(row_offset) ){
+ _dest.appendToSparse(value, row_offset, col_offset);
+ }
+ }
+ else {
+ synchronized( _dest ){
+ _dest.appendToSparse(value, row_offset, col_offset);
+ }
}
}
- else
+ else { //quickpath (no synchronization)
_dest.appendToSparse(value, row_offset, col_offset);
+ }
}
else
{
_dest.copy( row_offset, row_offset+rows-1,
- col_offset, col_offset+cols-1,
- value, false );
+ col_offset, col_offset+cols-1, value, false );
}
//aggregate nnz
@@ -222,4 +239,27 @@ public class ReaderBinaryBlockParallel extends ReaderBinaryBlock
return lnnz;
}
}
+
+ /**
+ *
+ */
+ private static class SortRowsTask implements Callable<Object>
+ {
+ private MatrixBlock _dest = null;
+ private int _rl = -1;
+ private int _ru = -1;
+
+ public SortRowsTask(MatrixBlock dest, int rl, int ru) {
+ _dest = dest;
+ _rl = rl;
+ _ru = ru;
+ }
+
+ @Override
+ public Object call() throws Exception {
+ for( int i=_rl; i<_ru; i++ )
+ _dest.getSparseBlock().sort(i);
+ return null;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/41ca1d16/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java
index 581b9ec..2ab4554 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryCell.java
@@ -44,7 +44,7 @@ public class ReaderBinaryCell extends MatrixReader
throws IOException, DMLRuntimeException
{
//allocate output matrix block
- MatrixBlock ret = createOutputMatrixBlock(rlen, clen, estnnz, true, false);
+ MatrixBlock ret = createOutputMatrixBlock(rlen, clen, (int)rlen, (int)clen, estnnz, true, false);
//prepare file access
JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/41ca1d16/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java
index ebb24dd..3fefb56 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSV.java
@@ -57,7 +57,7 @@ public class ReaderTextCSV extends MatrixReader
//allocate output matrix block
MatrixBlock ret = null;
if( rlen>0 && clen>0 ) //otherwise CSV reblock based on file size for matrix w/ unknown dimensions
- ret = createOutputMatrixBlock(rlen, clen, estnnz, true, false);
+ ret = createOutputMatrixBlock(rlen, clen, (int)rlen, (int)clen, estnnz, true, false);
//prepare file access
JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/41ca1d16/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java
index 24a5db1..64c055c 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCSVParallel.java
@@ -248,7 +248,7 @@ public class ReaderTextCSVParallel extends MatrixReader
// allocate target matrix block based on given size;
// need to allocate sparse as well since lock-free insert into target
- return createOutputMatrixBlock(nrow, ncol, estnnz, true, true);
+ return createOutputMatrixBlock(nrow, ncol, nrow, ncol, estnnz, true, true);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/41ca1d16/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java
index 0dce052..15e7caf 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCell.java
@@ -56,7 +56,7 @@ public class ReaderTextCell extends MatrixReader
throws IOException, DMLRuntimeException
{
//allocate output matrix block
- MatrixBlock ret = createOutputMatrixBlock(rlen, clen, estnnz, true, false);
+ MatrixBlock ret = createOutputMatrixBlock(rlen, clen, (int)rlen, (int)clen, estnnz, true, false);
//prepare file access
JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
@@ -84,7 +84,7 @@ public class ReaderTextCell extends MatrixReader
throws IOException, DMLRuntimeException
{
//allocate output matrix block
- MatrixBlock ret = createOutputMatrixBlock(rlen, clen, estnnz, true, false);
+ MatrixBlock ret = createOutputMatrixBlock(rlen, clen, brlen, bclen, estnnz, true, false);
//core read
readRawTextCellMatrixFromInputStream(is, ret, rlen, clen, brlen, bclen, _isMMFile);
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/41ca1d16/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java
index 309f5b7..d35ff7c 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderTextCellParallel.java
@@ -89,7 +89,7 @@ public class ReaderTextCellParallel extends MatrixReader
checkValidInputFile(fs, path);
//allocate output matrix block
- MatrixBlock ret = createOutputMatrixBlock(rlen, clen, estnnz, true, false);
+ MatrixBlock ret = createOutputMatrixBlock(rlen, clen, (int)rlen, (int)clen, estnnz, true, false);
//core read
readTextCellMatrixFromHDFS(path, job, ret, rlen, clen, brlen, bclen, _isMMFile);