You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2017/11/30 03:11:20 UTC

[1/2] systemml git commit: [SYSTEMML-2030] Improved transitive exec type selection spark mm

Repository: systemml
Updated Branches:
  refs/heads/master 4416b5e51 -> aefab8f8c


[SYSTEMML-2030] Improved transitive exec type selection spark mm

This patch improves the transitive execution type selection of spark
matrix multiply for cases where the input is produced by a spark
transpose operation and the transpose-mm rewrite is not applicable due
to memory constraints. On the perftest L2SVM 800GB sparse icpt 1, this
patch improved performance from 2,420s to 223s.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/fbec4795
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/fbec4795
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/fbec4795

Branch: refs/heads/master
Commit: fbec47952122fb3ffd1193568035daa052618102
Parents: 4416b5e
Author: Matthias Boehm <mb...@gmail.com>
Authored: Wed Nov 29 16:24:46 2017 -0800
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Wed Nov 29 19:11:52 2017 -0800

----------------------------------------------------------------------
 .../java/org/apache/sysml/hops/AggBinaryOp.java | 54 ++++++++------------
 1 file changed, 21 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/fbec4795/src/main/java/org/apache/sysml/hops/AggBinaryOp.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/AggBinaryOp.java b/src/main/java/org/apache/sysml/hops/AggBinaryOp.java
index d733d6a..2ff432b 100644
--- a/src/main/java/org/apache/sysml/hops/AggBinaryOp.java
+++ b/src/main/java/org/apache/sysml/hops/AggBinaryOp.java
@@ -431,14 +431,14 @@ public class AggBinaryOp extends Hop implements MultiThreadedHop
 	}
 	
 	@Override
-	protected ExecType optFindExecType() 
+	protected ExecType optFindExecType()
 		throws HopsException 
 	{	
 		checkAndSetForcedPlatform();
 
 		ExecType REMOTE = OptimizerUtils.isSparkExecutionMode() ? ExecType.SPARK : ExecType.MR;
 		
-		if( _etypeForced != null ) 			
+		if( _etypeForced != null )
 		{
 			_etype = _etypeForced;
 		}
@@ -472,8 +472,9 @@ public class AggBinaryOp extends Hop implements MultiThreadedHop
 		
 		//spark-specific decision refinement (execute binary aggregate w/ left or right spark input and 
 		//single parent also in spark because it's likely cheap and reduces data transfer)
-		if( _etype == ExecType.CP && _etypeForced != ExecType.CP &&
-			(isApplicableForTransitiveSparkExecType(true) || isApplicableForTransitiveSparkExecType(false)) )    
+		if( _etype == ExecType.CP && _etypeForced != ExecType.CP 
+			&& (isApplicableForTransitiveSparkExecType(true) 
+			|| isApplicableForTransitiveSparkExecType(false)) )
 		{
 			//pull binary aggregate into spark 
 			_etype = ExecType.SPARK;
@@ -489,9 +490,10 @@ public class AggBinaryOp extends Hop implements MultiThreadedHop
 		throws HopsException 
 	{
 		int index = left ? 0 : 1;
-		return !(getInput().get(index) instanceof DataOp && ((DataOp)getInput().get(index)).requiresCheckpoint())  
-			&& !HopRewriteUtils.isTransposeOperation(getInput().get(index))
-			&& getInput().get(index).getParent().size()==1 //bagg is only parent	
+		return !(getInput().get(index) instanceof DataOp && ((DataOp)getInput().get(index)).requiresCheckpoint())
+			&& (!HopRewriteUtils.isTransposeOperation(getInput().get(index))
+				|| (left && !isLeftTransposeRewriteApplicable(true, false)))
+			&& getInput().get(index).getParent().size()==1 //bagg is only parent
 			&& !getInput().get(index).areDimsBelowThreshold() 
 			&& getInput().get(index).optFindExecType() == ExecType.SPARK
 			&& getInput().get(index).getOutputMemEstimate()>getOutputMemEstimate();
@@ -660,35 +662,21 @@ public class AggBinaryOp extends Hop implements MultiThreadedHop
 
 	private void constructCPLopsMM(ExecType et) 
 		throws HopsException, LopsException
-	{	
+	{
 		Lop matmultCP = null;
-
+		
 		if (et == ExecType.GPU) {
 			Hop h1 = getInput().get(0);
 			Hop h2 = getInput().get(1);
-			Lop left; Lop right;
-			boolean isLeftTransposed; boolean isRightTransposed;
-			if( HopRewriteUtils.isTransposeOperation(h1) ) {
-				isLeftTransposed = true;
-				left = h1.getInput().get(0).constructLops();
-			}
-			else {
-				isLeftTransposed = false;
-				left = h1.constructLops();
-			}
-			if( HopRewriteUtils.isTransposeOperation(h2) ) {
-				isRightTransposed = true;
-				right = h2.getInput().get(0).constructLops();
-			}
-			else {
-				isRightTransposed = false;
-				right = h2.constructLops();
-			}
-			
-			matmultCP = new Binary(left, right, 
-									 Binary.OperationTypes.MATMULT, getDataType(), getValueType(), et, isLeftTransposed, isRightTransposed);
+			boolean leftTrans = HopRewriteUtils.isTransposeOperation(h1);
+			boolean rightTrans = HopRewriteUtils.isTransposeOperation(h1);
+			Lop left = !leftTrans ? h1.constructLops() :
+				h1.getInput().get(0).constructLops();
+			Lop right = !rightTrans ? h2.constructLops() :
+				h2.getInput().get(0).constructLops();
+			matmultCP = new Binary(left, right, Binary.OperationTypes.MATMULT,
+				getDataType(), getValueType(), et, leftTrans, rightTrans);
 			setOutputDimensions(matmultCP);
-			setNnz(-1);
 		}
 		else {
 			if( isLeftTransposeRewriteApplicable(true, false) ) {
@@ -696,8 +684,8 @@ public class AggBinaryOp extends Hop implements MultiThreadedHop
 			}
 			else { 
 				int k = OptimizerUtils.getConstrainedNumThreads(_maxNumThreads);
-				matmultCP = new Binary(getInput().get(0).constructLops(),getInput().get(1).constructLops(), 
-										 Binary.OperationTypes.MATMULT, getDataType(), getValueType(), et, k);
+				matmultCP = new Binary(getInput().get(0).constructLops(),getInput().get(1).constructLops(),
+					Binary.OperationTypes.MATMULT, getDataType(), getValueType(), et, k);
 			}
 			setOutputDimensions(matmultCP);
 		}


[2/2] systemml git commit: [SYSTEMML-2029] Fix sync issue of parallel binary block reader sparse

Posted by mb...@apache.org.
[SYSTEMML-2029] Fix sync issue of parallel binary block reader sparse

The parallel (i.e., multi-threaded) reader for sparse binary block
matrices pre-allocates one sparse row per row of blocks as
synchronization object. In special scenarios with shallow/deep copy of
sparse rows (if the first block is read into an empty synchronization
point), these objects are overwritten leading to lost synchronization
points and thus potentially corruption on concurrent updates. We guard
against this unwanted overwrite by checking for allocated instead of
empty sparse rows.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/aefab8f8
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/aefab8f8
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/aefab8f8

Branch: refs/heads/master
Commit: aefab8f8c1ce5c419f02a8f39b457f127499b9a4
Parents: fbec479
Author: Matthias Boehm <mb...@gmail.com>
Authored: Wed Nov 29 18:53:40 2017 -0800
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Wed Nov 29 19:11:54 2017 -0800

----------------------------------------------------------------------
 .../sysml/runtime/matrix/data/MatrixBlock.java  |  2 +-
 .../sysml/runtime/matrix/data/SparseBlock.java  |  9 ++++
 .../runtime/matrix/data/SparseBlockCOO.java     |  5 ++
 .../runtime/matrix/data/SparseBlockCSR.java     |  5 ++
 .../runtime/matrix/data/SparseBlockMCSR.java    | 43 ++++++++--------
 .../runtime/matrix/data/SparseRowVector.java    | 52 ++++++--------------
 6 files changed, 57 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/aefab8f8/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
index f176c9a..22b20e4 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
@@ -738,7 +738,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
 				int aix = rowoffset+i;
 				
 				//single block append (avoid re-allocations)
-				if( sparseBlock.isEmpty(aix) && coloffset==0 ) { 
+				if( !sparseBlock.isAllocated(aix) && coloffset==0 ) { 
 					//note: the deep copy flag is only relevant for MCSR due to
 					//shallow references of b.get(i); other block formats do not
 					//require a redundant copy because b.get(i) created a new row.

http://git-wip-us.apache.org/repos/asf/systemml/blob/aefab8f8/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlock.java
index 1ece183..00b59e0 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlock.java
@@ -166,6 +166,15 @@ public abstract class SparseBlock implements Serializable
 	}
 	
 	/**
+	 * Indicates if the underlying data structure for a given row
+	 * is already allocated.
+	 * 
+	 * @param r row index
+	 * @return true if already allocated
+	 */
+	public abstract boolean isAllocated(int r);
+	
+	/**
 	 * Clears the sparse block by deleting non-zero values. After this call
 	 * all size() calls are guaranteed to return 0.
 	 */

http://git-wip-us.apache.org/repos/asf/systemml/blob/aefab8f8/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCOO.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCOO.java b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCOO.java
index 295a545..de43855 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCOO.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCOO.java
@@ -186,6 +186,11 @@ public class SparseBlockCOO extends SparseBlock
 		return true;
 	}
 	
+	@Override
+	public boolean isAllocated(int r) {
+		return true;
+	}
+	
 	@Override 
 	public void reset() {
 		_size = 0;

http://git-wip-us.apache.org/repos/asf/systemml/blob/aefab8f8/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCSR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCSR.java b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCSR.java
index 19fbe50..228a806 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCSR.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCSR.java
@@ -337,6 +337,11 @@ public class SparseBlockCSR extends SparseBlock
 		return true;
 	}
 	
+	@Override
+	public boolean isAllocated(int r) {
+		return true;
+	}
+	
 	@Override 
 	public void reset() {
 		if( _size > 0 ) {

http://git-wip-us.apache.org/repos/asf/systemml/blob/aefab8f8/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockMCSR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockMCSR.java b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockMCSR.java
index 2c04865..9aca9e5 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockMCSR.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockMCSR.java
@@ -47,7 +47,7 @@ public class SparseBlockMCSR extends SparseBlock
 			SparseRow[] orows = ((SparseBlockMCSR)sblock)._rows;
 			_rows = new SparseRow[orows.length];
 			for( int i=0; i<_rows.length; i++ )
-				_rows[i] = new SparseRowVector(orows[i]);		
+				_rows[i] = new SparseRowVector(orows[i]);
 		}
 		//general case SparseBlock
 		else { 
@@ -81,7 +81,7 @@ public class SparseBlockMCSR extends SparseBlock
 			}
 		}
 		else {
-			_rows = rows;	
+			_rows = rows;
 		}
 	}
 	
@@ -120,24 +120,22 @@ public class SparseBlockMCSR extends SparseBlock
 
 	@Override
 	public void allocate(int r) {
-		if( _rows[r] == null )
+		if( !isAllocated(r) )
 			_rows[r] = new SparseRowVector();
 	}
 	
 	@Override
 	public void allocate(int r, int nnz) {
-		if( _rows[r] == null ) {
+		if( !isAllocated(r) )
 			_rows[r] = (nnz == 1) ? new SparseRowScalar() :
 				new SparseRowVector(nnz);
-		}
 	}
 	
 	@Override
 	public void allocate(int r, int ennz, int maxnnz) {
-		if( _rows[r] == null ) {
+		if( !isAllocated(r) )
 			_rows[r] = (ennz == 1) ? new SparseRowScalar() :
 				new SparseRowVector(ennz, maxnnz);
-		}
 	}
 	
 	@Override
@@ -154,7 +152,12 @@ public class SparseBlockMCSR extends SparseBlock
 	public boolean isContiguous() {
 		return false;
 	}
-
+	
+	@Override
+	public boolean isAllocated(int r) {
+		return (_rows[r] != null);
+	}
+	
 	@Override 
 	public void reset() {
 		for( SparseRow row : _rows )
@@ -171,7 +174,7 @@ public class SparseBlockMCSR extends SparseBlock
 	
 	@Override 
 	public void reset(int r, int ennz, int maxnnz) {
-		if( _rows[r] != null )
+		if( isAllocated(r) )
 			_rows[r].reset(ennz, maxnnz);
 	}
 	
@@ -188,14 +191,14 @@ public class SparseBlockMCSR extends SparseBlock
 	@Override
 	public int size(int r) {
 		//prior check with isEmpty(r) expected
-		return (_rows[r]!=null) ? _rows[r].size() : 0;
+		return isAllocated(r) ? _rows[r].size() : 0;
 	}
 	
 	@Override
 	public long size(int rl, int ru) {
 		int ret = 0;
 		for( int i=rl; i<ru; i++ )
-			ret += (_rows[i]!=null) ? _rows[i].size() : 0;	
+			ret += isAllocated(i) ? _rows[i].size() : 0;
 		return ret;
 	}
 	
@@ -213,7 +216,7 @@ public class SparseBlockMCSR extends SparseBlock
 
 	@Override
 	public boolean isEmpty(int r) {
-		return (_rows[r]==null || _rows[r].isEmpty());
+		return (!isAllocated(r) || _rows[r].isEmpty());
 	}
 	
 	@Override
@@ -236,7 +239,7 @@ public class SparseBlockMCSR extends SparseBlock
 
 	@Override
 	public boolean set(int r, int c, double v) {
-		if( _rows[r] == null )
+		if( !isAllocated(r) )
 			_rows[r] = new SparseRowScalar();
 		else if( _rows[r] instanceof SparseRowScalar && !_rows[r].isEmpty())
 			_rows[r] = new SparseRowVector(_rows[r]);
@@ -246,18 +249,18 @@ public class SparseBlockMCSR extends SparseBlock
 	@Override
 	public void set(int r, SparseRow row, boolean deep) {
 		//copy values into existing row to avoid allocation
-		if( _rows[r] != null && _rows[r] instanceof SparseRowVector
+		if( isAllocated(r) && _rows[r] instanceof SparseRowVector
 			&& ((SparseRowVector)_rows[r]).capacity() >= row.size() && deep )
 			((SparseRowVector)_rows[r]).copy(row);
 		//set new sparse row (incl allocation if required)
 		else 
-			_rows[r] = (deep && row != null) ? 
-				new SparseRowVector(row) : row;		
+			_rows[r] = (deep && row != null) ?
+				new SparseRowVector(row) : row;
 	}
 	
 	@Override
 	public void append(int r, int c, double v) {
-		if( _rows[r] == null )
+		if( !isAllocated(r) )
 			_rows[r] = new SparseRowScalar();
 		else if( _rows[r] instanceof SparseRowScalar && !_rows[r].isEmpty() )
 			_rows[r] = new SparseRowVector(_rows[r]);
@@ -266,7 +269,7 @@ public class SparseBlockMCSR extends SparseBlock
 
 	@Override
 	public void setIndexRange(int r, int cl, int cu, double[] v, int vix, int len) {
-		if( _rows[r] == null )
+		if( !isAllocated(r) )
 			_rows[r] = new SparseRowVector();
 		else if( _rows[r] instanceof SparseRowScalar )
 			_rows[r] = new SparseRowVector(_rows[r]);
@@ -298,7 +301,7 @@ public class SparseBlockMCSR extends SparseBlock
 
 	@Override
 	public double get(int r, int c) {
-		if( _rows[r] == null )
+		if( !isAllocated(r) )
 			return 0;
 		return _rows[r].get(c); 
 	}
@@ -346,7 +349,7 @@ public class SparseBlockMCSR extends SparseBlock
 			sb.append(": ");
 			sb.append(_rows[i]);
 			sb.append("\n");
-		}		
+		}
 		
 		return sb.toString();
 	}

http://git-wip-us.apache.org/repos/asf/systemml/blob/aefab8f8/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRowVector.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRowVector.java b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRowVector.java
index 4927906..a73ed1a 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRowVector.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRowVector.java
@@ -54,7 +54,7 @@ public final class SparseRowVector extends SparseRow implements Serializable
 			estimatedNzs = estnnz;
 		maxNzs = maxnnz;
 		int capacity = ((estnnz<initialCapacity && estnnz>0) ? 
-				         estnnz : initialCapacity);
+				estnnz : initialCapacity);
 		values = new double[capacity];
 		indexes = new int[capacity];
 	}
@@ -163,10 +163,10 @@ public final class SparseRowVector extends SparseRow implements Serializable
 				shiftLeftAndDelete(index);
 				return true; // nnz--
 			}
-			else { 	
+			else {
 				values[index] = v;
 				return false;
-			} 
+			}
 		}
 
 		//early abort on zero (if no overwrite)
@@ -201,68 +201,44 @@ public final class SparseRowVector extends SparseRow implements Serializable
 	@Override
 	public double get(int col) {
 		//search for existing col index
-		int index = Arrays.binarySearch(indexes, 0, size, col);		
-		if( index >= 0 )
-			return values[index];
-		else
-			return 0;
+		int index = Arrays.binarySearch(indexes, 0, size, col);
+		return (index >= 0) ? values[index] : 0;
 	}
 
 	public int searchIndexesFirstLTE(int col)
 	{
 		//search for existing col index
 		int index = Arrays.binarySearch(indexes, 0, size, col);
-		if( index >= 0  ) {
-			if( index < size )
-				return index;
-			else 
-				return -1;
-		}
+		if( index >= 0 )
+			return (index < size) ? index : -1;
 		
 		//search lt col index (see binary search)
 		index = Math.abs( index+1 );
-		if( index-1 < size )
-			return index-1;
-		else 
-			return -1;
+		return (index-1 < size) ? index-1 : -1;
 	}
 
 	public int searchIndexesFirstGTE(int col)
 	{
 		//search for existing col index
 		int index = Arrays.binarySearch(indexes, 0, size, col);
-		if( index >= 0  ) {
-			if( index < size )
-				return index;
-			else 
-				return -1;
-		}
+		if( index >= 0 )
+			return (index < size) ? index : -1;
 		
 		//search gt col index (see binary search)
 		index = Math.abs( index+1 );
-		if( index < size )
-			return index;
-		else 
-			return -1;
+		return (index < size) ? index : -1;
 	}
 
 	public int searchIndexesFirstGT(int col)
 	{
 		//search for existing col index
 		int index = Arrays.binarySearch(indexes, 0, size, col);
-		if( index >= 0  ) {
-			if( index+1 < size )
-				return index+1;
-			else 
-				return -1;
-		}
+		if( index >= 0 )
+			return (index+1 < size) ? index+1 : -1;
 		
 		//search gt col index (see binary search)
 		index = Math.abs( index+1 );
-		if( index < size )
-			return index;
-		else 
-			return -1;
+		return (index < size) ? index : -1;
 	}
 
 	public void deleteIndexRange(int lowerCol, int upperCol)