You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2017/08/15 19:49:42 UTC

[1/3] systemml git commit: [SYSTEMML-1838] Performance sparse/ultra-sparse binary block read

Repository: systemml
Updated Branches:
  refs/heads/master 667aeb2b7 -> fcfbd3d24


[SYSTEMML-1838] Performance sparse/ultra-sparse binary block read

This patch makes a number of performance improvements for CP read
(single- and multi-threaded) of sparse and ultra-sparse matrices in
binary block format:

(1) Allocate reuse block in CSR format: This avoids unnecessary
reallocations in the presence of a mix of sparse and ultra-sparse blocks
because ultra-sparse blocks clear any reuse block that is not in CSR.

(2) Special CSR init for sparse blocks from input streams (in addition
to the existing init for ultra-sparse blocks)

(3) Exploit estimated number of non-zeros on sparse block append (also
used during read), which changes the allocation factors from 1.1x to 2x
until the estimated number of non-zeros per row is reached and hence,
avoids unnecessary reallocations.

(4) Improved load balance via more tasks on the final sort of sparse
rows.

Together these changes improved the CP read time of a 1M x 1M matrix
with sparsity 0.001 (with a mix of sparse and ultra-sparse blocks) of
roughly 12GB in size and 1M blocks from 41s to 32.4s.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/9a275acb
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/9a275acb
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/9a275acb

Branch: refs/heads/master
Commit: 9a275acb0dd3f991fad7950d1824a89e109a9b43
Parents: 667aeb2
Author: Matthias Boehm <mb...@gmail.com>
Authored: Mon Aug 14 19:11:52 2017 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Tue Aug 15 12:48:25 2017 -0700

----------------------------------------------------------------------
 .../controlprogram/caching/CacheDataInput.java  | 10 +++--
 .../apache/sysml/runtime/io/MatrixReader.java   |  7 ++--
 .../sysml/runtime/io/ReaderBinaryBlock.java     | 32 ++++++++++-----
 .../runtime/io/ReaderBinaryBlockParallel.java   | 15 +++----
 .../sysml/runtime/matrix/data/MatrixBlock.java  | 26 ++++++-------
 .../matrix/data/MatrixBlockDataInput.java       |  5 ++-
 .../runtime/matrix/data/SparseBlockCSR.java     | 41 ++++++++++++++++----
 .../runtime/matrix/data/SparseRowVector.java    |  7 ++--
 .../util/FastBufferedDataInputStream.java       | 29 ++++++++------
 9 files changed, 110 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/9a275acb/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheDataInput.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheDataInput.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheDataInput.java
index 24df6d4..8f16ae1 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheDataInput.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheDataInput.java
@@ -163,11 +163,11 @@ public class CacheDataInput implements DataInput, MatrixBlockDataInput
 	}
 
 	@Override
-	public long readSparseRows(int rlen, SparseBlock rows) 
+	public long readSparseRows(int rlen, long nnz, SparseBlock rows) 
 		throws IOException 
 	{
 		//counter for non-zero elements
-		long nnz = 0;
+		long gnnz = 0;
 		
 		//read all individual sparse rows from input
 		for( int i=0; i<rlen; i++ )
@@ -189,10 +189,14 @@ public class CacheDataInput implements DataInput, MatrixBlockDataInput
 					_count+=12;
 				}
 				
-				nnz += lnnz;	
+				gnnz += lnnz;	
 			}
 		}
 		
+		//sanity check valid number of read nnz
+		if( gnnz != nnz )
+			throw new IOException("Invalid number of read nnz: "+gnnz+" vs "+nnz);
+		
 		return nnz;
 	}
 

http://git-wip-us.apache.org/repos/asf/systemml/blob/9a275acb/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java b/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java
index befccfe..9c59d4e 100644
--- a/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java
+++ b/src/main/java/org/apache/sysml/runtime/io/MatrixReader.java
@@ -118,10 +118,11 @@ public abstract class MatrixReader
 	protected static void sortSparseRowsParallel(MatrixBlock dest, long rlen, int k, ExecutorService pool) 
 		throws InterruptedException, ExecutionException
 	{
-		//create sort tasks
+		//create sort tasks (increase number of tasks for better load balance)
 		ArrayList<SortRowsTask> tasks = new ArrayList<SortRowsTask>();
-		int blklen = (int)(Math.ceil((double)rlen/k));
-		for( int i=0; i<k & i*blklen<rlen; i++ )
+		int k2 = (int) Math.min(8*k, rlen); 
+		int blklen = (int)(Math.ceil((double)rlen/k2));
+		for( int i=0; i<k2 & i*blklen<rlen; i++ )
 			tasks.add(new SortRowsTask(dest, i*blklen, Math.min((i+1)*blklen, (int)rlen)));
 		
 		//execute parallel sort and check for errors

http://git-wip-us.apache.org/repos/asf/systemml/blob/9a275acb/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java
index 0bca17d..e0c217a 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlock.java
@@ -33,6 +33,7 @@ import org.apache.sysml.conf.ConfigurationManager;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
+import org.apache.sysml.runtime.matrix.data.SparseBlock;
 import org.apache.sysml.runtime.matrix.mapred.IndexedMatrixValue;
 import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
 
@@ -89,19 +90,30 @@ public class ReaderBinaryBlock extends MatrixReader
 		ArrayList<IndexedMatrixValue> ret = new ArrayList<IndexedMatrixValue>();
 		
 		//prepare file access
-		JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());	
-		Path path = new Path( (_localFS ? "file:///" : "") + fname); 
+		JobConf job = new JobConf(ConfigurationManager.getCachedJobConf());
+		Path path = new Path( (_localFS ? "file:///" : "") + fname);
 		FileSystem fs = IOUtilFunctions.getFileSystem(path, job);
 		
 		//check existence and non-empty file
-		checkValidInputFile(fs, path); 
+		checkValidInputFile(fs, path);
 	
 		//core read 
 		readBinaryBlockMatrixBlocksFromHDFS(path, job, fs, ret, rlen, clen, brlen, bclen);
 		
 		return ret;
 	}
-
+	
+	protected static MatrixBlock getReuseBlock(int brlen, int bclen, boolean sparse) {
+		//note: we allocate the reuse block in CSR because this avoids unnecessary
+		//reallocations in the presence of a mix of sparse and ultra-sparse blocks,
+		//where ultra-sparse deserialization only reuses CSR blocks
+		MatrixBlock value = new MatrixBlock(brlen, bclen, sparse);
+		if( sparse ) {
+			value.allocateAndResetSparseRowsBlock(true, SparseBlock.Type.CSR);
+			value.getSparseBlock().allocate(0, 1024);
+		}
+		return value;
+	}
 
 	
 	/**
@@ -125,13 +137,12 @@ public class ReaderBinaryBlock extends MatrixReader
 	 * @throws IOException if IOException occurs
 	 * @throws DMLRuntimeException if DMLRuntimeException occurs
 	 */
-	@SuppressWarnings("deprecation")
 	private static void readBinaryBlockMatrixFromHDFS( Path path, JobConf job, FileSystem fs, MatrixBlock dest, long rlen, long clen, int brlen, int bclen )
 		throws IOException, DMLRuntimeException
 	{
 		boolean sparse = dest.isInSparseFormat();
 		MatrixIndexes key = new MatrixIndexes(); 
-		MatrixBlock value = new MatrixBlock();
+		MatrixBlock value = getReuseBlock(brlen, bclen, sparse);
 		long lnnz = 0; //aggregate block nnz
 		
 		//set up preferred custom serialization framework for binary block format
@@ -141,7 +152,8 @@ public class ReaderBinaryBlock extends MatrixReader
 		for( Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, path) ) //1..N files 
 		{
 			//directly read from sequence files (individual partfiles)
-			SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,job);
+			SequenceFile.Reader reader = new SequenceFile
+				.Reader(job, SequenceFile.Reader.file(lpath));
 			
 			try
 			{
@@ -195,8 +207,7 @@ public class ReaderBinaryBlock extends MatrixReader
 			dest.sortSparseRows();
 		}
 	}
-
-	@SuppressWarnings("deprecation")
+	
 	private void readBinaryBlockMatrixBlocksFromHDFS( Path path, JobConf job, FileSystem fs, Collection<IndexedMatrixValue> dest, long rlen, long clen, int brlen, int bclen )
 		throws IOException
 	{
@@ -210,7 +221,8 @@ public class ReaderBinaryBlock extends MatrixReader
 		for( Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, path) ) //1..N files 
 		{
 			//directly read from sequence files (individual partfiles)
-			SequenceFile.Reader reader = new SequenceFile.Reader(fs,lpath,job);
+			SequenceFile.Reader reader = new SequenceFile
+				.Reader(job, SequenceFile.Reader.file(lpath));
 			
 			try
 			{

http://git-wip-us.apache.org/repos/asf/systemml/blob/9a275acb/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java
index e7114d8..16260a8 100644
--- a/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java
+++ b/src/main/java/org/apache/sysml/runtime/io/ReaderBinaryBlockParallel.java
@@ -90,7 +90,7 @@ public class ReaderBinaryBlockParallel extends ReaderBinaryBlock
 			ExecutorService pool = Executors.newFixedThreadPool(_numThreads);
 			ArrayList<ReadFileTask> tasks = new ArrayList<ReadFileTask>();
 			for( Path lpath : IOUtilFunctions.getSequenceFilePaths(fs, path) ){
-				ReadFileTask t = new ReadFileTask(lpath, job, fs, dest, rlen, clen, brlen, bclen);
+				ReadFileTask t = new ReadFileTask(lpath, job, dest, rlen, clen, brlen, bclen);
 				tasks.add(t);
 			}
 
@@ -118,17 +118,15 @@ public class ReaderBinaryBlockParallel extends ReaderBinaryBlock
 	{
 		private Path _path = null;
 		private JobConf _job = null;
-		private FileSystem _fs = null;
 		private MatrixBlock _dest = null;
 		private long _rlen = -1;
 		private long _clen = -1;
 		private int _brlen = -1;
 		private int _bclen = -1;
 		
-		public ReadFileTask(Path path, JobConf job, FileSystem fs, MatrixBlock dest, long rlen, long clen, int brlen, int bclen)
+		public ReadFileTask(Path path, JobConf job, MatrixBlock dest, long rlen, long clen, int brlen, int bclen)
 		{
 			_path = path;
-			_fs = fs;
 			_job = job;
 			_dest = dest;
 			_rlen = rlen;
@@ -138,16 +136,16 @@ public class ReaderBinaryBlockParallel extends ReaderBinaryBlock
 		}
 
 		@Override
-		@SuppressWarnings({ "deprecation" })
 		public Object call() throws Exception 
 		{
 			boolean sparse = _dest.isInSparseFormat();
 			MatrixIndexes key = new MatrixIndexes(); 
-			MatrixBlock value = new MatrixBlock();
+			MatrixBlock value = getReuseBlock(_brlen, _bclen, sparse);
 			long lnnz = 0; //aggregate block nnz
 			
 			//directly read from sequence files (individual partfiles)
-			SequenceFile.Reader reader = new SequenceFile.Reader(_fs,_path,_job);
+			SequenceFile.Reader reader = new SequenceFile
+				.Reader(_job, SequenceFile.Reader.file(_path));
 			
 			try
 			{
@@ -205,8 +203,7 @@ public class ReaderBinaryBlockParallel extends ReaderBinaryBlock
 					lnnz += value.getNonZeros();
 				}
 			}
-			finally
-			{
+			finally {
 				IOUtilFunctions.closeSilently(reader);
 			}
 			

http://git-wip-us.apache.org/repos/asf/systemml/blob/9a275acb/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
index e9039ba..145eb97 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
@@ -736,7 +736,10 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
 					int len = b.size(i);
 					int[] ix = b.indexes(i);
 					double[] val = b.values(i);
-					sparseBlock.allocate(aix, sparseBlock.size(aix)+len);
+					if( estimatedNNzsPerRow > 0 )
+						sparseBlock.allocate(aix, Math.max(estimatedNNzsPerRow, sparseBlock.size(aix)+len), clen);
+					else
+						sparseBlock.allocate(aix, sparseBlock.size(aix)+len);
 					for( int j=pos; j<pos+len; j++ )
 						sparseBlock.append(aix, coloffset+ix[j], val[j]);	
 				}
@@ -1894,39 +1897,36 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
 			}
 		}
 	}
-
+	
 	private void readSparseBlock(DataInput in) 
 		throws IOException 
 	{			
-		allocateSparseRowsBlock(false); 
-		resetSparse(); //reset all sparse rows
+		allocateSparseRowsBlock(false);
+		resetSparse();
 		
-		if( in instanceof MatrixBlockDataInput ) //fast deserialize
-		{
+		if( in instanceof MatrixBlockDataInput ) { //fast deserialize
 			MatrixBlockDataInput mbin = (MatrixBlockDataInput)in;
-			nonZeros = mbin.readSparseRows(rlen, sparseBlock);
+			nonZeros = mbin.readSparseRows(rlen, nonZeros, sparseBlock);
 		}
-		else if( in instanceof DataInputBuffer  && MRJobConfiguration.USE_BINARYBLOCK_SERIALIZATION ) 
-		{
+		else if( in instanceof DataInputBuffer && MRJobConfiguration.USE_BINARYBLOCK_SERIALIZATION ) {
 			//workaround because sequencefile.reader.next(key, value) does not yet support serialization framework
 			DataInputBuffer din = (DataInputBuffer)in;
 			FastBufferedDataInputStream mbin = null;
 			try {
 				mbin = new FastBufferedDataInputStream(din);
-				nonZeros = mbin.readSparseRows(rlen, sparseBlock);	
+				nonZeros = mbin.readSparseRows(rlen, nonZeros, sparseBlock);
 			}
 			finally {
 				IOUtilFunctions.closeSilently(mbin);
 			}
 		}
-		else //default deserialize
-		{
+		else { //default deserialize
 			for(int r=0; r<rlen; r++) {
 				int rnnz = in.readInt(); //row nnz
 				if( rnnz > 0 ) {
 					sparseBlock.reset(r, rnnz, clen);
 					for(int j=0; j<rnnz; j++) //col index/value pairs
-						sparseBlock.append(r, in.readInt(), in.readDouble());		
+						sparseBlock.append(r, in.readInt(), in.readDouble());
 				}
 			}
 		}

http://git-wip-us.apache.org/repos/asf/systemml/blob/9a275acb/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlockDataInput.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlockDataInput.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlockDataInput.java
index c673623..6fc36fb 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlockDataInput.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlockDataInput.java
@@ -50,11 +50,12 @@ public interface MatrixBlockDataInput
 	 * Reads the sparse rows array from the data input into a sparse block
 	 * and returns the number of non-zeros.
 	 * 
-	 * @param rlen ?
+	 * @param rlen number of rows
+	 * @param nnz number of non-zeros
 	 * @param rows sparse block
 	 * @return number of non-zeros
 	 * @throws IOException if IOExcepton occurs
 	 */
-	public long readSparseRows(int rlen, SparseBlock rows) 
+	public long readSparseRows(int rlen, long nnz, SparseBlock rows) 
 		throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/9a275acb/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCSR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCSR.java b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCSR.java
index a4e680d..bc817c7 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCSR.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCSR.java
@@ -176,13 +176,9 @@ public class SparseBlockCSR extends SparseBlock
 	public void initUltraSparse(int nnz, DataInput in) 
 		throws IOException 
 	{
-		//ensure empty block before init
-		if( _size > 0 )
-			reset();
-			
 		//allocate space if necessary
 		if( _values.length < nnz )
-			resize(nnz);
+			resize(newCapacity(nnz));
 		
 		//read ijv triples, append and update pointers
 		int rlast = 0;
@@ -201,6 +197,37 @@ public class SparseBlockCSR extends SparseBlock
 	}
 	
 	/**
+	 * Initializes the CSR sparse block from an ordered input
+	 * stream of sparse rows (rownnz, jv-pairs*). 
+	 * 
+	 * @param rlen number of rows
+	 * @param nnz number of non-zeros to read
+	 * @param in data input stream of sparse rows, ordered by i
+	 * @throws IOException if deserialization error occurs
+	 */
+	public void initSparse(int rlen, int nnz, DataInput in) 
+		throws IOException
+	{
+		//allocate space if necessary
+		if( _values.length < nnz )
+			resize(newCapacity(nnz));
+		
+		//read sparse rows, append and update pointers
+		_ptr[0] = 0;
+		for( int r=0, pos=0; r<rlen; r++ ) {
+			int lnnz = in.readInt();
+			for( int j=0; j<lnnz; j++, pos++ ) {
+				_indexes[pos] = in.readInt();
+				_values[pos] = in.readDouble();
+			}
+			_ptr[r+1] = pos;
+		}
+		
+		//update meta data
+		_size = nnz;
+	}
+	
+	/**
 	 * Get the estimated in-memory size of the sparse block in CSR 
 	 * with the given dimensions w/o accounting for overallocation. 
 	 * 
@@ -402,14 +429,14 @@ public class SparseBlockCSR extends SparseBlock
 			if( _size==_values.length )
 				resize();
 			insert(_size, c, v);		
-		}		
+		}
 		else {
 			//resize, shift and insert
 			if( _size==_values.length )
 				resizeAndInsert(pos+len, c, v);
 			else
 				shiftRightAndInsert(pos+len, c, v);
-		}			
+		}
 		incrPtr(r+1);
 	}
 

http://git-wip-us.apache.org/repos/asf/systemml/blob/9a275acb/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRowVector.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRowVector.java b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRowVector.java
index 1c160b2..4927906 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRowVector.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRowVector.java
@@ -147,10 +147,9 @@ public final class SparseRowVector extends SparseRow implements Serializable
 	 * @return new capacity for resizing
 	 */
 	private int newCapacity() {
-		if( values.length < estimatedNzs )
-			return Math.min(estimatedNzs, values.length*2);
-		else
-			return (int) Math.min(maxNzs, Math.ceil((double)(values.length)*1.1));
+		return (int) ((values.length < estimatedNzs) ?
+			Math.min(estimatedNzs, values.length*SparseBlock.RESIZE_FACTOR1) :
+			Math.min(maxNzs, Math.ceil(values.length*SparseBlock.RESIZE_FACTOR2)));
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/systemml/blob/9a275acb/src/main/java/org/apache/sysml/runtime/util/FastBufferedDataInputStream.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/util/FastBufferedDataInputStream.java b/src/main/java/org/apache/sysml/runtime/util/FastBufferedDataInputStream.java
index 932ceb1..197467d 100644
--- a/src/main/java/org/apache/sysml/runtime/util/FastBufferedDataInputStream.java
+++ b/src/main/java/org/apache/sysml/runtime/util/FastBufferedDataInputStream.java
@@ -27,6 +27,7 @@ import java.io.InputStream;
 
 import org.apache.sysml.runtime.matrix.data.MatrixBlockDataInput;
 import org.apache.sysml.runtime.matrix.data.SparseBlock;
+import org.apache.sysml.runtime.matrix.data.SparseBlockCSR;
 
 public class FastBufferedDataInputStream extends FilterInputStream implements DataInput, MatrixBlockDataInput
 {
@@ -34,23 +35,19 @@ public class FastBufferedDataInputStream extends FilterInputStream implements Da
 	protected byte[] _buff;
 	protected int _bufflen;
 	
-	public FastBufferedDataInputStream( InputStream in )
-	{
+	public FastBufferedDataInputStream( InputStream in ) {
 		this(in, 8192);
 	}
 	
-	public FastBufferedDataInputStream( InputStream in, int size )
-	{
+	public FastBufferedDataInputStream( InputStream in, int size ) {
 		super(in);
 		
 		if (size <= 0) 
 	    	throw new IllegalArgumentException("Buffer size <= 0");
-		
 		_buff = new byte[ size ];
 		_bufflen = size;
 	}
 
-
 	/////////////////////////////
 	// DataInput Implementation
 	/////////////////////////////
@@ -169,8 +166,8 @@ public class FastBufferedDataInputStream extends FilterInputStream implements Da
 	public long readDoubleArray(int len, double[] varr) 
 		throws IOException 
 	{
-		//if( len<=0 || len != varr.length )
-		//	throw new IndexOutOfBoundsException("len="+len+", varr.length="+varr.length);
+		if( len<=0 || len > varr.length )
+			throw new IndexOutOfBoundsException("len="+len+", varr.length="+varr.length);
 		
 		//counter for non-zero elements
 		long nnz = 0;
@@ -198,11 +195,17 @@ public class FastBufferedDataInputStream extends FilterInputStream implements Da
 	}
 
 	@Override
-	public long readSparseRows(int rlen, SparseBlock rows) 
+	public long readSparseRows(int rlen, long nnz, SparseBlock rows) 
 		throws IOException 
 	{
+		//check for CSR quick-path
+		if( rows instanceof SparseBlockCSR ) {
+			((SparseBlockCSR) rows).initSparse(rlen, (int)nnz, this);
+			return nnz;
+		}
+		
 		//counter for non-zero elements
-		long nnz = 0;
+		long gnnz = 0;
 		
 		//read all individual sparse rows from input
 		for( int i=0; i<rlen; i++ )
@@ -241,10 +244,14 @@ public class FastBufferedDataInputStream extends FilterInputStream implements Da
 					}
 				}
 				
-				nnz += lnnz;	
+				gnnz += lnnz;	
 			}
 		}
 		
+		//sanity check valid number of read nnz
+		if( gnnz != nnz )
+			throw new IOException("Invalid number of read nnz: "+gnnz+" vs "+nnz);
+		
 		return nnz;
 	}
 


[2/3] systemml git commit: [SYSTEMML-1839] Fix NPE on parfor initialization w/o log4j config

Posted by mb...@apache.org.
[SYSTEMML-1839] Fix NPE on parfor initialization w/o log4j config

This patch fixes a null pointer exception on parfor static
initialization which expected a set log level. We now properly probe the
log level and assign a default of INFO if nothing was setup.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/89632b5e
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/89632b5e
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/89632b5e

Branch: refs/heads/master
Commit: 89632b5ea8063d8959dcab3ec3a774f8883f2b62
Parents: 9a275ac
Author: Matthias Boehm <mb...@gmail.com>
Authored: Mon Aug 14 20:15:32 2017 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Tue Aug 15 12:48:27 2017 -0700

----------------------------------------------------------------------
 .../org/apache/sysml/hops/OptimizerUtils.java   |  7 +++++++
 .../sysml/parser/ParForStatementBlock.java      | 20 ++++++++++----------
 2 files changed, 17 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/89632b5e/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
index a0a36d5..3b98a34 100644
--- a/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
+++ b/src/main/java/org/apache/sysml/hops/OptimizerUtils.java
@@ -24,6 +24,8 @@ import java.util.HashMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
 import org.apache.sysml.api.DMLScript;
 import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
 import org.apache.sysml.conf.CompilerConfig;
@@ -933,6 +935,11 @@ public class OptimizerUtils
 		return ret;
 	}
 	
+	public static Level getDefaultLogLevel() {
+		Level log = Logger.getRootLogger().getLevel();
+		return (log != null) ? log : Level.INFO;
+	}
+	
 	////////////////////////
 	// Sparsity Estimates //
 	////////////////////////

http://git-wip-us.apache.org/repos/asf/systemml/blob/89632b5e/src/main/java/org/apache/sysml/parser/ParForStatementBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/parser/ParForStatementBlock.java b/src/main/java/org/apache/sysml/parser/ParForStatementBlock.java
index 4dc06bd..8efbc9f 100644
--- a/src/main/java/org/apache/sysml/parser/ParForStatementBlock.java
+++ b/src/main/java/org/apache/sysml/parser/ParForStatementBlock.java
@@ -128,18 +128,18 @@ public class ParForStatementBlock extends ForStatementBlock
 		_paramDefaults.put( EXEC_MODE,         String.valueOf(PExecMode.LOCAL) );
 		_paramDefaults.put( OPT_MODE,          String.valueOf(POptMode.RULEBASED) );
 		_paramDefaults.put( PROFILE,           "0" );
-		_paramDefaults.put( OPT_LOG,           Logger.getRootLogger().getLevel().toString() );
+		_paramDefaults.put( OPT_LOG,           OptimizerUtils.getDefaultLogLevel().toString() );
 		
 		_paramDefaults2 = new HashMap<String, String>(); //OPT_MODE always specified
-		_paramDefaults2.put( CHECK,             "1" );
-		_paramDefaults2.put( PAR,               "-1" );
-		_paramDefaults2.put( TASK_PARTITIONER,  String.valueOf(PTaskPartitioner.UNSPECIFIED) );
-		_paramDefaults2.put( TASK_SIZE,         "-1" );
-		_paramDefaults2.put( DATA_PARTITIONER,  String.valueOf(PDataPartitioner.UNSPECIFIED) );
-		_paramDefaults2.put( RESULT_MERGE,      String.valueOf(PResultMerge.UNSPECIFIED) );
-		_paramDefaults2.put( EXEC_MODE,         String.valueOf(PExecMode.UNSPECIFIED) );
-		_paramDefaults2.put( PROFILE,           "0" );
-		_paramDefaults2.put( OPT_LOG,           Logger.getRootLogger().getLevel().toString() );
+		_paramDefaults2.put( CHECK,            "1" );
+		_paramDefaults2.put( PAR,              "-1" );
+		_paramDefaults2.put( TASK_PARTITIONER, String.valueOf(PTaskPartitioner.UNSPECIFIED) );
+		_paramDefaults2.put( TASK_SIZE,        "-1" );
+		_paramDefaults2.put( DATA_PARTITIONER, String.valueOf(PDataPartitioner.UNSPECIFIED) );
+		_paramDefaults2.put( RESULT_MERGE,     String.valueOf(PResultMerge.UNSPECIFIED) );
+		_paramDefaults2.put( EXEC_MODE,        String.valueOf(PExecMode.UNSPECIFIED) );
+		_paramDefaults2.put( PROFILE,          "0" );
+		_paramDefaults2.put( OPT_LOG,          OptimizerUtils.getDefaultLogLevel().toString() );
 		
 		_idSeq = new IDSequence();
 		_idSeqfn = new IDSequence();


[3/3] systemml git commit: [SYSTEMML-1840] Fix missing validation of transform* specifications

Posted by mb...@apache.org.
[SYSTEMML-1840] Fix missing validation of transform* specifications

Currently, all transform builtin functions parse the provided transform
specification during runtime but not during initial compilation. This
patch fixes this missing validation to give users immediate feedback on
potential syntax errors, even before reading potentially large data.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/fcfbd3d2
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/fcfbd3d2
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/fcfbd3d2

Branch: refs/heads/master
Commit: fcfbd3d2473a555b139371ebe4d49714d5dabe48
Parents: 89632b5
Author: Matthias Boehm <mb...@gmail.com>
Authored: Mon Aug 14 23:10:38 2017 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Tue Aug 15 12:48:29 2017 -0700

----------------------------------------------------------------------
 .../ParameterizedBuiltinFunctionExpression.java | 26 +++++++++++++++-----
 1 file changed, 20 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/fcfbd3d2/src/main/java/org/apache/sysml/parser/ParameterizedBuiltinFunctionExpression.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/parser/ParameterizedBuiltinFunctionExpression.java b/src/main/java/org/apache/sysml/parser/ParameterizedBuiltinFunctionExpression.java
index f90b7ef..2768cc1 100644
--- a/src/main/java/org/apache/sysml/parser/ParameterizedBuiltinFunctionExpression.java
+++ b/src/main/java/org/apache/sysml/parser/ParameterizedBuiltinFunctionExpression.java
@@ -26,6 +26,7 @@ import java.util.HashSet;
 
 import org.apache.sysml.hops.Hop.ParamBuiltinOp;
 import org.apache.sysml.parser.LanguageException.LanguageErrorCodes;
+import org.apache.wink.json4j.JSONObject;
 
 
 public class ParameterizedBuiltinFunctionExpression extends DataIdentifier 
@@ -288,6 +289,7 @@ public class ParameterizedBuiltinFunctionExpression extends DataIdentifier
 		
 		//validate specification
 		checkDataValueType("transformapply", TF_FN_PARAM_SPEC, DataType.SCALAR, ValueType.STRING, conditional);
+		validateTransformSpec(TF_FN_PARAM_SPEC, conditional);
 		
 		//set output dimensions
 		output.setDataType(DataType.MATRIX);
@@ -304,6 +306,7 @@ public class ParameterizedBuiltinFunctionExpression extends DataIdentifier
 		
 		//validate specification
 		checkDataValueType("transformdecode", TF_FN_PARAM_SPEC, DataType.SCALAR, ValueType.STRING, conditional);
+		validateTransformSpec(TF_FN_PARAM_SPEC, conditional);
 		
 		//set output dimensions
 		output.setDataType(DataType.FRAME);
@@ -316,6 +319,7 @@ public class ParameterizedBuiltinFunctionExpression extends DataIdentifier
 	{
 		//validate specification
 		checkDataValueType("transformmeta", TF_FN_PARAM_SPEC, DataType.SCALAR, ValueType.STRING, conditional);
+		validateTransformSpec(TF_FN_PARAM_SPEC, conditional);
 		
 		//validate meta data path 
 		checkDataValueType("transformmeta", TF_FN_PARAM_MTD, DataType.SCALAR, ValueType.STRING, conditional);
@@ -334,6 +338,7 @@ public class ParameterizedBuiltinFunctionExpression extends DataIdentifier
 		
 		//validate specification
 		checkDataValueType("transformencode", TF_FN_PARAM_SPEC, DataType.SCALAR, ValueType.STRING, conditional);
+		validateTransformSpec(TF_FN_PARAM_SPEC, conditional);
 		
 		//set output dimensions 
 		output1.setDataType(DataType.MATRIX);
@@ -344,6 +349,20 @@ public class ParameterizedBuiltinFunctionExpression extends DataIdentifier
 		output2.setDimensions(-1, -1);
 	}
 	
+	private void validateTransformSpec(String pname, boolean conditional) throws LanguageException {
+		Expression data = getVarParam(pname);
+		if( data instanceof StringIdentifier ) {
+			try {
+				StringIdentifier spec = (StringIdentifier)data;
+				new JSONObject(spec.getValue());
+			}
+			catch(Exception ex) {
+				raiseValidateError("Transform specification parsing issue: ", 
+					conditional, ex.getMessage());
+			}
+		}
+	}
+	
 	private void validateReplace(DataIdentifier output, boolean conditional) throws LanguageException {
 		//check existence and correctness of arguments
 		Expression target = getVarParam("target");
@@ -721,11 +740,6 @@ public class ParameterizedBuiltinFunctionExpression extends DataIdentifier
 
 	@Override
 	public boolean multipleReturns() {
-		switch(_opcode) {
-			case TRANSFORMENCODE:
-				return true;
-			default:
-				return false;
-		}
+		return (_opcode == ParameterizedBuiltinFunctionOp.TRANSFORMENCODE);
 	}
 }