You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2017/11/30 03:11:20 UTC
[1/2] systemml git commit: [SYSTEMML-2030] Improved transitive exec
type selection spark mm
Repository: systemml
Updated Branches:
refs/heads/master 4416b5e51 -> aefab8f8c
[SYSTEMML-2030] Improved transitive exec type selection spark mm
This patch improves the transitive execution type selection of spark
matrix multiply for cases where the input is produced by a spark
transpose operation and the transpose-mm rewrite is not applicable due
to memory constraints. On the perftest L2SVM 800GB sparse icpt 1, this
patch improved performance from 2,420s to 223s.
Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/fbec4795
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/fbec4795
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/fbec4795
Branch: refs/heads/master
Commit: fbec47952122fb3ffd1193568035daa052618102
Parents: 4416b5e
Author: Matthias Boehm <mb...@gmail.com>
Authored: Wed Nov 29 16:24:46 2017 -0800
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Wed Nov 29 19:11:52 2017 -0800
----------------------------------------------------------------------
.../java/org/apache/sysml/hops/AggBinaryOp.java | 54 ++++++++------------
1 file changed, 21 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/systemml/blob/fbec4795/src/main/java/org/apache/sysml/hops/AggBinaryOp.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/hops/AggBinaryOp.java b/src/main/java/org/apache/sysml/hops/AggBinaryOp.java
index d733d6a..2ff432b 100644
--- a/src/main/java/org/apache/sysml/hops/AggBinaryOp.java
+++ b/src/main/java/org/apache/sysml/hops/AggBinaryOp.java
@@ -431,14 +431,14 @@ public class AggBinaryOp extends Hop implements MultiThreadedHop
}
@Override
- protected ExecType optFindExecType()
+ protected ExecType optFindExecType()
throws HopsException
{
checkAndSetForcedPlatform();
ExecType REMOTE = OptimizerUtils.isSparkExecutionMode() ? ExecType.SPARK : ExecType.MR;
- if( _etypeForced != null )
+ if( _etypeForced != null )
{
_etype = _etypeForced;
}
@@ -472,8 +472,9 @@ public class AggBinaryOp extends Hop implements MultiThreadedHop
//spark-specific decision refinement (execute binary aggregate w/ left or right spark input and
//single parent also in spark because it's likely cheap and reduces data transfer)
- if( _etype == ExecType.CP && _etypeForced != ExecType.CP &&
- (isApplicableForTransitiveSparkExecType(true) || isApplicableForTransitiveSparkExecType(false)) )
+ if( _etype == ExecType.CP && _etypeForced != ExecType.CP
+ && (isApplicableForTransitiveSparkExecType(true)
+ || isApplicableForTransitiveSparkExecType(false)) )
{
//pull binary aggregate into spark
_etype = ExecType.SPARK;
@@ -489,9 +490,10 @@ public class AggBinaryOp extends Hop implements MultiThreadedHop
throws HopsException
{
int index = left ? 0 : 1;
- return !(getInput().get(index) instanceof DataOp && ((DataOp)getInput().get(index)).requiresCheckpoint())
- && !HopRewriteUtils.isTransposeOperation(getInput().get(index))
- && getInput().get(index).getParent().size()==1 //bagg is only parent
+ return !(getInput().get(index) instanceof DataOp && ((DataOp)getInput().get(index)).requiresCheckpoint())
+ && (!HopRewriteUtils.isTransposeOperation(getInput().get(index))
+ || (left && !isLeftTransposeRewriteApplicable(true, false)))
+ && getInput().get(index).getParent().size()==1 //bagg is only parent
&& !getInput().get(index).areDimsBelowThreshold()
&& getInput().get(index).optFindExecType() == ExecType.SPARK
&& getInput().get(index).getOutputMemEstimate()>getOutputMemEstimate();
@@ -660,35 +662,21 @@ public class AggBinaryOp extends Hop implements MultiThreadedHop
private void constructCPLopsMM(ExecType et)
throws HopsException, LopsException
- {
+ {
Lop matmultCP = null;
-
+
if (et == ExecType.GPU) {
Hop h1 = getInput().get(0);
Hop h2 = getInput().get(1);
- Lop left; Lop right;
- boolean isLeftTransposed; boolean isRightTransposed;
- if( HopRewriteUtils.isTransposeOperation(h1) ) {
- isLeftTransposed = true;
- left = h1.getInput().get(0).constructLops();
- }
- else {
- isLeftTransposed = false;
- left = h1.constructLops();
- }
- if( HopRewriteUtils.isTransposeOperation(h2) ) {
- isRightTransposed = true;
- right = h2.getInput().get(0).constructLops();
- }
- else {
- isRightTransposed = false;
- right = h2.constructLops();
- }
-
- matmultCP = new Binary(left, right,
- Binary.OperationTypes.MATMULT, getDataType(), getValueType(), et, isLeftTransposed, isRightTransposed);
+ boolean leftTrans = HopRewriteUtils.isTransposeOperation(h1);
+ boolean rightTrans = HopRewriteUtils.isTransposeOperation(h1);
+ Lop left = !leftTrans ? h1.constructLops() :
+ h1.getInput().get(0).constructLops();
+ Lop right = !rightTrans ? h2.constructLops() :
+ h2.getInput().get(0).constructLops();
+ matmultCP = new Binary(left, right, Binary.OperationTypes.MATMULT,
+ getDataType(), getValueType(), et, leftTrans, rightTrans);
setOutputDimensions(matmultCP);
- setNnz(-1);
}
else {
if( isLeftTransposeRewriteApplicable(true, false) ) {
@@ -696,8 +684,8 @@ public class AggBinaryOp extends Hop implements MultiThreadedHop
}
else {
int k = OptimizerUtils.getConstrainedNumThreads(_maxNumThreads);
- matmultCP = new Binary(getInput().get(0).constructLops(),getInput().get(1).constructLops(),
- Binary.OperationTypes.MATMULT, getDataType(), getValueType(), et, k);
+ matmultCP = new Binary(getInput().get(0).constructLops(),getInput().get(1).constructLops(),
+ Binary.OperationTypes.MATMULT, getDataType(), getValueType(), et, k);
}
setOutputDimensions(matmultCP);
}
[2/2] systemml git commit: [SYSTEMML-2029] Fix sync issue of parallel
binary block reader sparse
Posted by mb...@apache.org.
[SYSTEMML-2029] Fix sync issue of parallel binary block reader sparse
The parallel (i.e., multi-threaded) reader for sparse binary block
matrices pre-allocates one sparse row per row of blocks as
synchronization object. In special scenarios with shallow/deep copy of
sparse rows (if the first block is read into an empty synchronization
point), these objects are overwritten leading to lost synchronization
points and thus potentially corruption on concurrent updates. We guard
against this unwanted overwrite by checking for allocated instead of
empty sparse rows.
Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/aefab8f8
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/aefab8f8
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/aefab8f8
Branch: refs/heads/master
Commit: aefab8f8c1ce5c419f02a8f39b457f127499b9a4
Parents: fbec479
Author: Matthias Boehm <mb...@gmail.com>
Authored: Wed Nov 29 18:53:40 2017 -0800
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Wed Nov 29 19:11:54 2017 -0800
----------------------------------------------------------------------
.../sysml/runtime/matrix/data/MatrixBlock.java | 2 +-
.../sysml/runtime/matrix/data/SparseBlock.java | 9 ++++
.../runtime/matrix/data/SparseBlockCOO.java | 5 ++
.../runtime/matrix/data/SparseBlockCSR.java | 5 ++
.../runtime/matrix/data/SparseBlockMCSR.java | 43 ++++++++--------
.../runtime/matrix/data/SparseRowVector.java | 52 ++++++--------------
6 files changed, 57 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/systemml/blob/aefab8f8/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
index f176c9a..22b20e4 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
@@ -738,7 +738,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
int aix = rowoffset+i;
//single block append (avoid re-allocations)
- if( sparseBlock.isEmpty(aix) && coloffset==0 ) {
+ if( !sparseBlock.isAllocated(aix) && coloffset==0 ) {
//note: the deep copy flag is only relevant for MCSR due to
//shallow references of b.get(i); other block formats do not
//require a redundant copy because b.get(i) created a new row.
http://git-wip-us.apache.org/repos/asf/systemml/blob/aefab8f8/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlock.java
index 1ece183..00b59e0 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlock.java
@@ -166,6 +166,15 @@ public abstract class SparseBlock implements Serializable
}
/**
+ * Indicates if the underlying data structure for a given row
+ * is already allocated.
+ *
+ * @param r row index
+ * @return true if already allocated
+ */
+ public abstract boolean isAllocated(int r);
+
+ /**
* Clears the sparse block by deleting non-zero values. After this call
* all size() calls are guaranteed to return 0.
*/
http://git-wip-us.apache.org/repos/asf/systemml/blob/aefab8f8/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCOO.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCOO.java b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCOO.java
index 295a545..de43855 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCOO.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCOO.java
@@ -186,6 +186,11 @@ public class SparseBlockCOO extends SparseBlock
return true;
}
+ @Override
+ public boolean isAllocated(int r) {
+ return true;
+ }
+
@Override
public void reset() {
_size = 0;
http://git-wip-us.apache.org/repos/asf/systemml/blob/aefab8f8/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCSR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCSR.java b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCSR.java
index 19fbe50..228a806 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCSR.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockCSR.java
@@ -337,6 +337,11 @@ public class SparseBlockCSR extends SparseBlock
return true;
}
+ @Override
+ public boolean isAllocated(int r) {
+ return true;
+ }
+
@Override
public void reset() {
if( _size > 0 ) {
http://git-wip-us.apache.org/repos/asf/systemml/blob/aefab8f8/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockMCSR.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockMCSR.java b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockMCSR.java
index 2c04865..9aca9e5 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockMCSR.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseBlockMCSR.java
@@ -47,7 +47,7 @@ public class SparseBlockMCSR extends SparseBlock
SparseRow[] orows = ((SparseBlockMCSR)sblock)._rows;
_rows = new SparseRow[orows.length];
for( int i=0; i<_rows.length; i++ )
- _rows[i] = new SparseRowVector(orows[i]);
+ _rows[i] = new SparseRowVector(orows[i]);
}
//general case SparseBlock
else {
@@ -81,7 +81,7 @@ public class SparseBlockMCSR extends SparseBlock
}
}
else {
- _rows = rows;
+ _rows = rows;
}
}
@@ -120,24 +120,22 @@ public class SparseBlockMCSR extends SparseBlock
@Override
public void allocate(int r) {
- if( _rows[r] == null )
+ if( !isAllocated(r) )
_rows[r] = new SparseRowVector();
}
@Override
public void allocate(int r, int nnz) {
- if( _rows[r] == null ) {
+ if( !isAllocated(r) )
_rows[r] = (nnz == 1) ? new SparseRowScalar() :
new SparseRowVector(nnz);
- }
}
@Override
public void allocate(int r, int ennz, int maxnnz) {
- if( _rows[r] == null ) {
+ if( !isAllocated(r) )
_rows[r] = (ennz == 1) ? new SparseRowScalar() :
new SparseRowVector(ennz, maxnnz);
- }
}
@Override
@@ -154,7 +152,12 @@ public class SparseBlockMCSR extends SparseBlock
public boolean isContiguous() {
return false;
}
-
+
+ @Override
+ public boolean isAllocated(int r) {
+ return (_rows[r] != null);
+ }
+
@Override
public void reset() {
for( SparseRow row : _rows )
@@ -171,7 +174,7 @@ public class SparseBlockMCSR extends SparseBlock
@Override
public void reset(int r, int ennz, int maxnnz) {
- if( _rows[r] != null )
+ if( isAllocated(r) )
_rows[r].reset(ennz, maxnnz);
}
@@ -188,14 +191,14 @@ public class SparseBlockMCSR extends SparseBlock
@Override
public int size(int r) {
//prior check with isEmpty(r) expected
- return (_rows[r]!=null) ? _rows[r].size() : 0;
+ return isAllocated(r) ? _rows[r].size() : 0;
}
@Override
public long size(int rl, int ru) {
int ret = 0;
for( int i=rl; i<ru; i++ )
- ret += (_rows[i]!=null) ? _rows[i].size() : 0;
+ ret += isAllocated(i) ? _rows[i].size() : 0;
return ret;
}
@@ -213,7 +216,7 @@ public class SparseBlockMCSR extends SparseBlock
@Override
public boolean isEmpty(int r) {
- return (_rows[r]==null || _rows[r].isEmpty());
+ return (!isAllocated(r) || _rows[r].isEmpty());
}
@Override
@@ -236,7 +239,7 @@ public class SparseBlockMCSR extends SparseBlock
@Override
public boolean set(int r, int c, double v) {
- if( _rows[r] == null )
+ if( !isAllocated(r) )
_rows[r] = new SparseRowScalar();
else if( _rows[r] instanceof SparseRowScalar && !_rows[r].isEmpty())
_rows[r] = new SparseRowVector(_rows[r]);
@@ -246,18 +249,18 @@ public class SparseBlockMCSR extends SparseBlock
@Override
public void set(int r, SparseRow row, boolean deep) {
//copy values into existing row to avoid allocation
- if( _rows[r] != null && _rows[r] instanceof SparseRowVector
+ if( isAllocated(r) && _rows[r] instanceof SparseRowVector
&& ((SparseRowVector)_rows[r]).capacity() >= row.size() && deep )
((SparseRowVector)_rows[r]).copy(row);
//set new sparse row (incl allocation if required)
else
- _rows[r] = (deep && row != null) ?
- new SparseRowVector(row) : row;
+ _rows[r] = (deep && row != null) ?
+ new SparseRowVector(row) : row;
}
@Override
public void append(int r, int c, double v) {
- if( _rows[r] == null )
+ if( !isAllocated(r) )
_rows[r] = new SparseRowScalar();
else if( _rows[r] instanceof SparseRowScalar && !_rows[r].isEmpty() )
_rows[r] = new SparseRowVector(_rows[r]);
@@ -266,7 +269,7 @@ public class SparseBlockMCSR extends SparseBlock
@Override
public void setIndexRange(int r, int cl, int cu, double[] v, int vix, int len) {
- if( _rows[r] == null )
+ if( !isAllocated(r) )
_rows[r] = new SparseRowVector();
else if( _rows[r] instanceof SparseRowScalar )
_rows[r] = new SparseRowVector(_rows[r]);
@@ -298,7 +301,7 @@ public class SparseBlockMCSR extends SparseBlock
@Override
public double get(int r, int c) {
- if( _rows[r] == null )
+ if( !isAllocated(r) )
return 0;
return _rows[r].get(c);
}
@@ -346,7 +349,7 @@ public class SparseBlockMCSR extends SparseBlock
sb.append(": ");
sb.append(_rows[i]);
sb.append("\n");
- }
+ }
return sb.toString();
}
http://git-wip-us.apache.org/repos/asf/systemml/blob/aefab8f8/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRowVector.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRowVector.java b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRowVector.java
index 4927906..a73ed1a 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRowVector.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/SparseRowVector.java
@@ -54,7 +54,7 @@ public final class SparseRowVector extends SparseRow implements Serializable
estimatedNzs = estnnz;
maxNzs = maxnnz;
int capacity = ((estnnz<initialCapacity && estnnz>0) ?
- estnnz : initialCapacity);
+ estnnz : initialCapacity);
values = new double[capacity];
indexes = new int[capacity];
}
@@ -163,10 +163,10 @@ public final class SparseRowVector extends SparseRow implements Serializable
shiftLeftAndDelete(index);
return true; // nnz--
}
- else {
+ else {
values[index] = v;
return false;
- }
+ }
}
//early abort on zero (if no overwrite)
@@ -201,68 +201,44 @@ public final class SparseRowVector extends SparseRow implements Serializable
@Override
public double get(int col) {
//search for existing col index
- int index = Arrays.binarySearch(indexes, 0, size, col);
- if( index >= 0 )
- return values[index];
- else
- return 0;
+ int index = Arrays.binarySearch(indexes, 0, size, col);
+ return (index >= 0) ? values[index] : 0;
}
public int searchIndexesFirstLTE(int col)
{
//search for existing col index
int index = Arrays.binarySearch(indexes, 0, size, col);
- if( index >= 0 ) {
- if( index < size )
- return index;
- else
- return -1;
- }
+ if( index >= 0 )
+ return (index < size) ? index : -1;
//search lt col index (see binary search)
index = Math.abs( index+1 );
- if( index-1 < size )
- return index-1;
- else
- return -1;
+ return (index-1 < size) ? index-1 : -1;
}
public int searchIndexesFirstGTE(int col)
{
//search for existing col index
int index = Arrays.binarySearch(indexes, 0, size, col);
- if( index >= 0 ) {
- if( index < size )
- return index;
- else
- return -1;
- }
+ if( index >= 0 )
+ return (index < size) ? index : -1;
//search gt col index (see binary search)
index = Math.abs( index+1 );
- if( index < size )
- return index;
- else
- return -1;
+ return (index < size) ? index : -1;
}
public int searchIndexesFirstGT(int col)
{
//search for existing col index
int index = Arrays.binarySearch(indexes, 0, size, col);
- if( index >= 0 ) {
- if( index+1 < size )
- return index+1;
- else
- return -1;
- }
+ if( index >= 0 )
+ return (index+1 < size) ? index+1 : -1;
//search gt col index (see binary search)
index = Math.abs( index+1 );
- if( index < size )
- return index;
- else
- return -1;
+ return (index < size) ? index : -1;
}
public void deleteIndexRange(int lowerCol, int upperCol)