You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2018/04/27 07:02:36 UTC

[2/6] systemml git commit: [SYSTEMML-2280] Performance ultra-sparse/empty block serialization

[SYSTEMML-2280] Performance ultra-sparse/empty block serialization

This patch improves the performance and memory-efficiency of ultra-spark
and empty block serialization, especially for distributed shuffle
operations. In detail, we now refrain from creating custom serialization
buffers (with 8KB mem buffers) for these tiny blocks, which helps reduce
GC overhead. Furthermore, we added a new serialization code path for
sparse blocks in COO format, which changes the serialization complexity
from O(rlen+nnz) to O(nnz).


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/0bd08f29
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/0bd08f29
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/0bd08f29

Branch: refs/heads/master
Commit: 0bd08f29da6973d48d9d31fa6cf9eb6309b8af73
Parents: 18cc576
Author: Matthias Boehm <mb...@gmail.com>
Authored: Thu Apr 26 20:45:28 2018 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Fri Apr 27 00:03:14 2018 -0700

----------------------------------------------------------------------
 .../spark/data/CorrMatrixBlock.java             |  8 ++--
 .../sysml/runtime/matrix/data/MatrixBlock.java  | 41 ++++++++++++++------
 2 files changed, 35 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/0bd08f29/src/main/java/org/apache/sysml/runtime/instructions/spark/data/CorrMatrixBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/CorrMatrixBlock.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/CorrMatrixBlock.java
index 60ad927..574839b 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/CorrMatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/CorrMatrixBlock.java
@@ -101,7 +101,7 @@ public class CorrMatrixBlock implements Externalizable
 	public void writeExternal(ObjectOutput os) 
 		throws IOException
 	{
-		if( os instanceof ObjectOutputStream ) {
+		if( os instanceof ObjectOutputStream && !_value.isEmptyBlock(false) ) {
 			//fast serialize of dense/sparse blocks
 			ObjectOutputStream oos = (ObjectOutputStream)os;
 			FastBufferedDataOutputStream fos = new FastBufferedDataOutputStream(oos);
@@ -117,9 +117,11 @@ public class CorrMatrixBlock implements Externalizable
 	private void writeHeaderAndPayload(DataOutput dos) 
 		throws IOException 
 	{
-		dos.writeByte((_corr!=null)?1:0);
+		boolean writeCorr = (_corr != null
+			&& !_corr.isEmptyBlock(false));
+		dos.writeByte(writeCorr ? 1 : 0);
 		_value.write(dos);
-		if( _corr!=null )
+		if( writeCorr )
 			_corr.write(dos);
 	}
 

http://git-wip-us.apache.org/repos/asf/systemml/blob/0bd08f29/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
index 9e032b6..51084ef 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
@@ -373,7 +373,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
 		return reset;
 	}
 	
-	public void allocateAndResetSparseRowsBlock(boolean clearNNZ, SparseBlock.Type stype)
+	public void allocateAndResetSparseBlock(boolean clearNNZ, SparseBlock.Type stype)
 	{
 		//allocate block if non-existing or too small (guaranteed to be 0-initialized)
 		if( sparseBlock == null || sparseBlock.numRows()<rlen
@@ -1874,7 +1874,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
 		//and to allow efficient reset without repeated sparse row allocation
 		
 		//adjust size and ensure reuse block is in CSR format
-		allocateAndResetSparseRowsBlock(false, SparseBlock.Type.CSR);
+		allocateAndResetSparseBlock(false, SparseBlock.Type.CSR);
 		
 		if( clen > 1 ) { //ULTRA-SPARSE BLOCK
 			//block: read ijv-triples (ordered by row and column) via custom 
@@ -2026,14 +2026,26 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
 		if( clen > 1 ) //ULTRA-SPARSE BLOCK
 		{
 			//block: write ijv-triples
-			for(int r=0;r<Math.min(rlen, sparseBlock.numRows()); r++)
-				if( !sparseBlock.isEmpty(r) )
-				{
+			if( sparseBlock instanceof SparseBlockCOO ) {
+				SparseBlockCOO sblock = (SparseBlockCOO)sparseBlock;
+				int[] rix = sblock.rowIndexes();
+				int[] cix = sblock.indexes();
+				double[] vals = sblock.values();
+				for(int i=0; i<sblock.size(); i++) {
+					//ultra-sparse block: write ijv-triples
+					out.writeInt(rix[i]);
+					out.writeInt(cix[i]);
+					out.writeDouble(vals[i]);
+					wnnz++;
+				}
+			}
+			else {
+				for(int r=0;r<Math.min(rlen, sparseBlock.numRows()); r++) {
+					if( sparseBlock.isEmpty(r) ) continue;
 					int apos = sparseBlock.pos(r);
 					int alen = sparseBlock.size(r);
 					int[] aix = sparseBlock.indexes(r);
 					double[] avals = sparseBlock.values(r);
-					
 					for(int j=apos; j<apos+alen; j++) {
 						//ultra-sparse block: write ijv-triples
 						out.writeInt(r);
@@ -2041,7 +2053,8 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
 						out.writeDouble(avals[j]);
 						wnnz++;
 					}
-				}	
+				}
+			}
 		}
 		else //ULTRA-SPARSE COL
 		{
@@ -2209,7 +2222,8 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
 	public void readExternal(ObjectInput is) 
 		throws IOException
 	{
-		if( is instanceof ObjectInputStream )
+		if( is instanceof ObjectInputStream
+			&& !(is instanceof MatrixBlockDataInput) )
 		{
 			//fast deserialize of dense/sparse blocks
 			ObjectInputStream ois = (ObjectInputStream)is;
@@ -2232,7 +2246,12 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
 	public void writeExternal(ObjectOutput os) 
 		throws IOException
 	{
-		if( os instanceof ObjectOutputStream ) {
+		//note: in case of a CorrMatrixBlock being wrapped around a matrix
+		//block, the object output is already a FastBufferedDataOutputStream;
+		//so in general we try to avoid unnecessary buffer allocations here.
+		
+		if( os instanceof ObjectOutputStream && !isEmptyBlock(false)
+			&& !(os instanceof MatrixBlockDataOutput) ) {
 			//fast serialize of dense/sparse blocks
 			ObjectOutputStream oos = (ObjectOutputStream)os;
 			FastBufferedDataOutputStream fos = new FastBufferedDataOutputStream(oos);
@@ -2828,7 +2847,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
 	}
 	
 	@Override
-	public void incrementalAggregate(AggregateOperator aggOp, MatrixValue correction, MatrixValue newWithCorrection) {
+	public void incrementalAggregate(AggregateOperator aggOp, MatrixValue correction, MatrixValue newWithCorrection, boolean deep) {
 		//assert(aggOp.correctionExists); 
 		MatrixBlock cor=checkType(correction);
 		MatrixBlock newWithCor=checkType(newWithCorrection);
@@ -2900,7 +2919,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
 		{
 			//e.g., ak+ kahan plus as used in sum, mapmult, mmcj and tsmm
 			if(aggOp.increOp.fn instanceof KahanPlus) {
-				LibMatrixAgg.aggregateBinaryMatrix(newWithCor, this, cor);
+				LibMatrixAgg.aggregateBinaryMatrix(newWithCor, this, cor, deep);
 			}
 			else
 			{