You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by mb...@apache.org on 2022/10/24 20:06:49 UTC
[systemds] branch main updated: [SYSTEMDS-3455] Improved multi-threaded unary cell operations
This is an automated email from the ASF dual-hosted git repository.
mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new 8820f33d70 [SYSTEMDS-3455] Improved multi-threaded unary cell operations
8820f33d70 is described below
commit 8820f33d70fb408c7b5ad145ad149295d4e1c9be
Author: Matthias Boehm <mb...@gmail.com>
AuthorDate: Mon Oct 24 16:03:41 2022 -0400
[SYSTEMDS-3455] Improved multi-threaded unary cell operations
So far, unary operations used only a best-effort multi-threading,
because allocation and read/write from/to memory dominate performance.
However, with a proper implementation we get robust improvements for
all sizes and can avoid multi-threading for small sizes where it does
not pay off. The following results have been obtained on a laptop with
8 vcores (with an expectation of larger improvements on scale-up nodes):
* 10 times 100K x 1K (800MB): 13.8s -> 11.3s
* 1K times 1K x 1K (8MB): 4.9s -> 2.7s
* 100K times 10 x 1K (8KB) 9.1s -> 3.9s
---
src/main/java/org/apache/sysds/hops/UnaryOp.java | 2 +-
.../java/org/apache/sysds/lops/compile/Dag.java | 1 -
.../runtime/matrix/data/LibMatrixBincell.java | 191 +++++++++++++++++++++
.../sysds/runtime/matrix/data/MatrixBlock.java | 130 +-------------
.../apache/sysds/utils/stats/SparkStatistics.java | 1 -
5 files changed, 194 insertions(+), 131 deletions(-)
diff --git a/src/main/java/org/apache/sysds/hops/UnaryOp.java b/src/main/java/org/apache/sysds/hops/UnaryOp.java
index 38db8a50b6..b250ce2c1b 100644
--- a/src/main/java/org/apache/sysds/hops/UnaryOp.java
+++ b/src/main/java/org/apache/sysds/hops/UnaryOp.java
@@ -452,7 +452,7 @@ public class UnaryOp extends MultiThreadedHop
}
public boolean isExpensiveUnaryOperation() {
- return (_op == OpOp1.EXP || _op == OpOp1.LOG
+ return (_op == OpOp1.EXP || _op == OpOp1.LOG || _op == OpOp1.LOG_NZ
|| _op == OpOp1.ROUND || _op == OpOp1.FLOOR || _op == OpOp1.CEIL
|| _op == OpOp1.SIGMOID || _op == OpOp1.SPROP || _op == OpOp1.SOFTMAX
|| _op == OpOp1.TAN || _op == OpOp1.TANH || _op == OpOp1.ATAN
diff --git a/src/main/java/org/apache/sysds/lops/compile/Dag.java b/src/main/java/org/apache/sysds/lops/compile/Dag.java
index 36cacde4d0..f87163eee3 100644
--- a/src/main/java/org/apache/sysds/lops/compile/Dag.java
+++ b/src/main/java/org/apache/sysds/lops/compile/Dag.java
@@ -38,7 +38,6 @@ import org.apache.sysds.common.Types.OpOpData;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.conf.DMLConfig;
import org.apache.sysds.hops.AggBinaryOp.SparkAggType;
-import org.apache.sysds.hops.OptimizerUtils;
import org.apache.sysds.lops.CSVReBlock;
import org.apache.sysds.lops.CentralMoment;
import org.apache.sysds.lops.Checkpoint;
diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixBincell.java b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixBincell.java
index 43ba3791a4..14f83e7de2 100644
--- a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixBincell.java
+++ b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixBincell.java
@@ -57,6 +57,7 @@ import org.apache.sysds.runtime.functionobjects.Power2;
import org.apache.sysds.runtime.functionobjects.ValueFunction;
import org.apache.sysds.runtime.matrix.operators.BinaryOperator;
import org.apache.sysds.runtime.matrix.operators.ScalarOperator;
+import org.apache.sysds.runtime.matrix.operators.UnaryOperator;
import org.apache.sysds.runtime.util.CommonThreadPool;
import org.apache.sysds.runtime.util.DataConverter;
import org.apache.sysds.runtime.util.SortUtils;
@@ -98,6 +99,51 @@ public class LibMatrixBincell {
// public matrix bincell interface
///////////////////////////////////
+ public static MatrixBlock uncellOp(MatrixBlock m1, MatrixBlock ret, UnaryOperator op) {
+ if(!m1.sparse && !m1.isEmptyBlock(false)
+ && op.getNumThreads() > 1 && m1.getLength() > PAR_NUMCELL_THRESHOLD2 ) {
+ //note: we apply multi-threading in a best-effort manner here
+ //only for expensive operators such as exp, log, sigmoid, because
+ //otherwise allocation, read and write anyway dominates
+ if (!op.isInplace() || m1.isEmpty())
+ ret.allocateDenseBlock(false);
+ else
+ ret = m1;
+
+ int k = op.getNumThreads();
+ DenseBlock a = m1.getDenseBlock();
+ DenseBlock c = ret.getDenseBlock();
+ try {
+ ExecutorService pool = CommonThreadPool.get(k);
+ ArrayList<UncellTask> tasks = new ArrayList<>();
+ ArrayList<Integer> blklens = UtilFunctions.getBalancedBlockSizesDefault(ret.rlen, k, false);
+ for( int i=0, lb=0; i<blklens.size(); lb+=blklens.get(i), i++ )
+ tasks.add(new UncellTask(a, c, op, lb, lb+blklens.get(i)));
+ List<Future<Long>> taskret = pool.invokeAll(tasks);
+
+ //aggregate non-zeros
+ ret.nonZeros = 0; //reset after execute
+ for( Future<Long> task : taskret )
+ ret.nonZeros += task.get();
+ pool.shutdown();
+ }
+ catch(InterruptedException | ExecutionException ex) {
+ throw new DMLRuntimeException(ex);
+ }
+ }
+ else {
+ if (op.isInplace() && !m1.isInSparseFormat() )
+ ret = m1;
+
+ //default execute unary operations
+ if(op.sparseSafe)
+ sparseUnaryOperations(m1, ret, op);
+ else
+ denseUnaryOperations(m1, ret, op);
+ }
+ return ret;
+ }
+
/**
* matrix-scalar, scalar-matrix binary operations.
*
@@ -445,6 +491,106 @@ public class LibMatrixBincell {
// private sparse-safe/sparse-unsafe implementations
///////////////////////////////////
+ private static void denseUnaryOperations(MatrixBlock m1, MatrixBlock ret, UnaryOperator op) {
+ //prepare 0-value init (determine if unnecessarily sparse-unsafe)
+ double val0 = op.fn.execute(0d);
+
+ final int m = m1.rlen;
+ final int n = m1.clen;
+
+ //early abort possible if unnecessarily sparse unsafe
+ //(otherwise full init with val0, no need for computation)
+ if( m1.isEmptyBlock(false) ) {
+ if( val0 != 0 )
+ ret.reset(m, n, val0);
+ return;
+ }
+
+ //redirection to sparse safe operation w/ init by val0
+ if( m1.sparse && val0 != 0 ) {
+ ret.reset(m, n, val0);
+ ret.nonZeros = (long)m * n;
+ }
+ sparseUnaryOperations(m1, ret, op);
+ }
+
+ private static void sparseUnaryOperations(MatrixBlock m1, MatrixBlock ret, UnaryOperator op) {
+ //early abort possible since sparse-safe
+ if( m1.isEmptyBlock(false) )
+ return;
+
+ final int m = m1.rlen;
+ final int n = m1.clen;
+
+ if( m1.sparse && ret.sparse ) //SPARSE <- SPARSE
+ {
+ ret.allocateSparseRowsBlock();
+ SparseBlock a = m1.sparseBlock;
+ SparseBlock c = ret.sparseBlock;
+
+ long nnz = 0;
+ for(int i=0; i<m; i++) {
+ if( a.isEmpty(i) ) continue;
+
+ int apos = a.pos(i);
+ int alen = a.size(i);
+ int[] aix = a.indexes(i);
+ double[] avals = a.values(i);
+
+ c.allocate(i, alen); //avoid repeated alloc
+ for( int j=apos; j<apos+alen; j++ ) {
+ double val = op.fn.execute(avals[j]);
+ c.append(i, aix[j], val);
+ nnz += (val != 0) ? 1 : 0;
+ }
+ }
+ ret.nonZeros = nnz;
+ }
+ else if( m1.sparse ) //DENSE <- SPARSE
+ {
+ ret.allocateDenseBlock(false);
+ SparseBlock a = m1.sparseBlock;
+ DenseBlock c = ret.denseBlock;
+ long nnz = (ret.nonZeros > 0) ?
+ (long) m*n-a.size() : 0;
+ for(int i=0; i<m; i++) {
+ if( a.isEmpty(i) ) continue;
+ int apos = a.pos(i);
+ int alen = a.size(i);
+ int[] aix = a.indexes(i);
+ double[] avals = a.values(i);
+ double[] cvals = c.values(i);
+ int cix = c.pos(i);
+ for( int j=apos; j<apos+alen; j++ ) {
+ double val = op.fn.execute(avals[j]);
+ cvals[cix + aix[j]] = val;
+ nnz += (val != 0) ? 1 : 0;
+ }
+ }
+ ret.nonZeros = nnz;
+ }
+ else //DENSE <- DENSE
+ {
+ if( m1 != ret ) //!in-place
+ ret.allocateDenseBlock(false);
+ DenseBlock da = m1.getDenseBlock();
+ DenseBlock dc = ret.getDenseBlock();
+
+ //unary op, incl nnz maintenance
+ long nnz = 0;
+ for( int bi=0; bi<da.numBlocks(); bi++ ) {
+ double[] a = da.valuesAt(bi);
+ double[] c = dc.valuesAt(bi);
+ int len = da.size(bi);
+ for( int i=0; i<len; i++ ) {
+ c[i] = op.fn.execute(a[i]);
+ nnz += (c[i] != 0) ? 1 : 0;
+ }
+ }
+ ret.nonZeros = nnz;
+ }
+ }
+
private static long safeBinary(MatrixBlock m1, MatrixBlock m2, MatrixBlock ret, BinaryOperator op,
BinaryAccessType atype, int rl, int ru)
{
@@ -1764,4 +1910,49 @@ public class LibMatrixBincell {
return safeBinaryScalar(_m1, _ret, _sop, _rl, _ru);
}
}
+
+ private static class UncellTask implements Callable<Long> {
+ private final DenseBlock _a;
+ private final DenseBlock _c;
+ private final UnaryOperator _op;
+ private final int _rl;
+ private final int _ru;
+
+ protected UncellTask(DenseBlock a, DenseBlock c, UnaryOperator op, int rl, int ru ) {
+ _a = a;
+ _c = c;
+ _op = op;
+ _rl = rl;
+ _ru = ru;
+ }
+
+ @Override
+ public Long call() {
+ long nnz = 0;
+ //fast dense-dense operations
+ if(_a.isContiguous(_rl, _ru)) {
+ double[] avals = _a.values(_rl);
+ double[] cvals = _c.values(_rl);
+ int start = _a.pos(_rl), end = _a.pos(_ru);
+ for( int i=start; i<end; i++ ) {
+ cvals[i] = _op.fn.execute(avals[i]);
+ nnz += (cvals[i] != 0) ? 1 : 0;
+ }
+ }
+ //generic dense-dense, including large blocks
+ else {
+ int clen = _a.getDim(1);
+ for(int i=_rl; i<_ru; i++) {
+ double[] avals = _a.values(i);
+ double[] cvals = _c.values(i);
+ int pos = _a.pos(i);
+ for( int j=0; j<clen; j++ ) {
+ cvals[pos+j] = _op.fn.execute(avals[pos+j]);
+ nnz += (cvals[pos+j] != 0) ? 1 : 0;
+ }
+ }
+ }
+ return nnz;
+ }
+ }
}
diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
index ebd0ff88b0..6c8c6c9953 100644
--- a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
@@ -2859,34 +2859,8 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
else
ret = LibMatrixAgg.cumaggregateUnaryMatrix(this, ret, op);
}
- else if(!sparse && !isEmptyBlock(false)
- && OptimizerUtils.isMaxLocalParallelism(op.getNumThreads())) {
- //note: we apply multi-threading in a best-effort manner here
- //only for expensive operators such as exp, log, sigmoid, because
- //otherwise allocation, read and write anyway dominates
- if (!op.isInplace() || isEmpty())
- ret.allocateDenseBlock(false);
- else
- ret = this;
-
- DenseBlock a = getDenseBlock();
- DenseBlock c = ret.getDenseBlock();
- for(int bi=0; bi<a.numBlocks(); bi++) {
- double[] avals = a.valuesAt(bi), cvals = c.valuesAt(bi);
- Arrays.parallelSetAll(cvals, i -> op.fn.execute(avals[i]));
- }
- ret.recomputeNonZeros();
- }
- else
- {
- if (op.isInplace() && !isInSparseFormat() )
- ret = this;
-
- //default execute unary operations
- if(op.sparseSafe)
- sparseUnaryOperations(op, ret);
- else
- denseUnaryOperations(op, ret);
+ else {
+ ret = LibMatrixBincell.uncellOp(this, ret, op);
}
//ensure empty results sparse representation
@@ -2897,106 +2871,6 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
return ret;
}
- private void sparseUnaryOperations(UnaryOperator op, MatrixBlock ret) {
- //early abort possible since sparse-safe
- if( isEmptyBlock(false) )
- return;
-
- final int m = rlen;
- final int n = clen;
-
- if( sparse && ret.sparse ) //SPARSE <- SPARSE
- {
- ret.allocateSparseRowsBlock();
- SparseBlock a = sparseBlock;
- SparseBlock c = ret.sparseBlock;
-
- long nnz = 0;
- for(int i=0; i<m; i++) {
- if( a.isEmpty(i) ) continue;
-
- int apos = a.pos(i);
- int alen = a.size(i);
- int[] aix = a.indexes(i);
- double[] avals = a.values(i);
-
- c.allocate(i, alen); //avoid repeated alloc
- for( int j=apos; j<apos+alen; j++ ) {
- double val = op.fn.execute(avals[j]);
- c.append(i, aix[j], val);
- nnz += (val != 0) ? 1 : 0;
- }
- }
- ret.nonZeros = nnz;
- }
- else if( sparse ) //DENSE <- SPARSE
- {
- ret.allocateDenseBlock(false);
- SparseBlock a = sparseBlock;
- DenseBlock c = ret.denseBlock;
- long nnz = (ret.nonZeros > 0) ?
- (long) m*n-a.size() : 0;
- for(int i=0; i<m; i++) {
- if( a.isEmpty(i) ) continue;
- int apos = a.pos(i);
- int alen = a.size(i);
- int[] aix = a.indexes(i);
- double[] avals = a.values(i);
- double[] cvals = c.values(i);
- int cix = c.pos(i);
- for( int j=apos; j<apos+alen; j++ ) {
- double val = op.fn.execute(avals[j]);
- cvals[cix + aix[j]] = val;
- nnz += (val != 0) ? 1 : 0;
- }
- }
- ret.nonZeros = nnz;
- }
- else //DENSE <- DENSE
- {
- if( this != ret ) //!in-place
- ret.allocateDenseBlock(false);
- DenseBlock da = getDenseBlock();
- DenseBlock dc = ret.getDenseBlock();
-
- //unary op, incl nnz maintenance
- long nnz = 0;
- for( int bi=0; bi<da.numBlocks(); bi++ ) {
- double[] a = da.valuesAt(bi);
- double[] c = dc.valuesAt(bi);
- int len = da.size(bi);
- for( int i=0; i<len; i++ ) {
- c[i] = op.fn.execute(a[i]);
- nnz += (c[i] != 0) ? 1 : 0;
- }
- }
- ret.nonZeros = nnz;
- }
- }
-
- private void denseUnaryOperations(UnaryOperator op, MatrixBlock ret) {
- //prepare 0-value init (determine if unnecessarily sparse-unsafe)
- double val0 = op.fn.execute(0d);
-
- final int m = rlen;
- final int n = clen;
-
- //early abort possible if unnecessarily sparse unsafe
- //(otherwise full init with val0, no need for computation)
- if( isEmptyBlock(false) ) {
- if( val0 != 0 )
- ret.reset(m, n, val0);
- return;
- }
-
- //redirection to sparse safe operation w/ init by val0
- if( sparse && val0 != 0 ) {
- ret.reset(m, n, val0);
- ret.nonZeros = (long)m * n;
- }
- sparseUnaryOperations(op, ret);
- }
-
public final MatrixBlock binaryOperations(BinaryOperator op, MatrixValue thatValue){
return binaryOperations(op, thatValue, null);
}
diff --git a/src/main/java/org/apache/sysds/utils/stats/SparkStatistics.java b/src/main/java/org/apache/sysds/utils/stats/SparkStatistics.java
index e374512267..331634d1bd 100644
--- a/src/main/java/org/apache/sysds/utils/stats/SparkStatistics.java
+++ b/src/main/java/org/apache/sysds/utils/stats/SparkStatistics.java
@@ -21,7 +21,6 @@ package org.apache.sysds.utils.stats;
import java.util.concurrent.atomic.LongAdder;
-import org.apache.sysds.hops.OptimizerUtils;
import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
public class SparkStatistics {