You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by mb...@apache.org on 2022/10/24 20:06:49 UTC

[systemds] branch main updated: [SYSTEMDS-3455] Improved multi-threaded unary cell operations

This is an automated email from the ASF dual-hosted git repository.

mboehm7 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new 8820f33d70 [SYSTEMDS-3455] Improved multi-threaded unary cell operations
8820f33d70 is described below

commit 8820f33d70fb408c7b5ad145ad149295d4e1c9be
Author: Matthias Boehm <mb...@gmail.com>
AuthorDate: Mon Oct 24 16:03:41 2022 -0400

    [SYSTEMDS-3455] Improved multi-threaded unary cell operations
    
    So far, unary operations used only a best-effort multi-threading,
    because allocation and read/write from/to memory dominate performance.
    However, with a proper implementation we get robust improvements for
    all sizes and can avoid multi-threading for small sizes where it does
    not pay off. The following results have been obtained on a laptop with
    8 vcores (with an expectation of larger improvements on scale-up nodes):
    
    * 10 times 100K x 1K (800MB): 13.8s -> 11.3s
    * 1K times 1K x 1K (8MB): 4.9s -> 2.7s
    * 100K times 10 x 1K (8KB) 9.1s -> 3.9s
---
 src/main/java/org/apache/sysds/hops/UnaryOp.java   |   2 +-
 .../java/org/apache/sysds/lops/compile/Dag.java    |   1 -
 .../runtime/matrix/data/LibMatrixBincell.java      | 191 +++++++++++++++++++++
 .../sysds/runtime/matrix/data/MatrixBlock.java     | 130 +-------------
 .../apache/sysds/utils/stats/SparkStatistics.java  |   1 -
 5 files changed, 194 insertions(+), 131 deletions(-)

diff --git a/src/main/java/org/apache/sysds/hops/UnaryOp.java b/src/main/java/org/apache/sysds/hops/UnaryOp.java
index 38db8a50b6..b250ce2c1b 100644
--- a/src/main/java/org/apache/sysds/hops/UnaryOp.java
+++ b/src/main/java/org/apache/sysds/hops/UnaryOp.java
@@ -452,7 +452,7 @@ public class UnaryOp extends MultiThreadedHop
 	}
 	
 	public boolean isExpensiveUnaryOperation() {
-		return (_op == OpOp1.EXP || _op == OpOp1.LOG
+		return (_op == OpOp1.EXP || _op == OpOp1.LOG || _op == OpOp1.LOG_NZ
 			|| _op == OpOp1.ROUND || _op == OpOp1.FLOOR || _op == OpOp1.CEIL
 			|| _op == OpOp1.SIGMOID || _op == OpOp1.SPROP || _op == OpOp1.SOFTMAX
 			|| _op == OpOp1.TAN || _op == OpOp1.TANH || _op == OpOp1.ATAN
diff --git a/src/main/java/org/apache/sysds/lops/compile/Dag.java b/src/main/java/org/apache/sysds/lops/compile/Dag.java
index 36cacde4d0..f87163eee3 100644
--- a/src/main/java/org/apache/sysds/lops/compile/Dag.java
+++ b/src/main/java/org/apache/sysds/lops/compile/Dag.java
@@ -38,7 +38,6 @@ import org.apache.sysds.common.Types.OpOpData;
 import org.apache.sysds.conf.ConfigurationManager;
 import org.apache.sysds.conf.DMLConfig;
 import org.apache.sysds.hops.AggBinaryOp.SparkAggType;
-import org.apache.sysds.hops.OptimizerUtils;
 import org.apache.sysds.lops.CSVReBlock;
 import org.apache.sysds.lops.CentralMoment;
 import org.apache.sysds.lops.Checkpoint;
diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixBincell.java b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixBincell.java
index 43ba3791a4..14f83e7de2 100644
--- a/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixBincell.java
+++ b/src/main/java/org/apache/sysds/runtime/matrix/data/LibMatrixBincell.java
@@ -57,6 +57,7 @@ import org.apache.sysds.runtime.functionobjects.Power2;
 import org.apache.sysds.runtime.functionobjects.ValueFunction;
 import org.apache.sysds.runtime.matrix.operators.BinaryOperator;
 import org.apache.sysds.runtime.matrix.operators.ScalarOperator;
+import org.apache.sysds.runtime.matrix.operators.UnaryOperator;
 import org.apache.sysds.runtime.util.CommonThreadPool;
 import org.apache.sysds.runtime.util.DataConverter;
 import org.apache.sysds.runtime.util.SortUtils;
@@ -98,6 +99,51 @@ public class LibMatrixBincell {
 	// public matrix bincell interface
 	///////////////////////////////////
 	
+	public static MatrixBlock uncellOp(MatrixBlock m1, MatrixBlock ret, UnaryOperator op) {
+		if(!m1.sparse && !m1.isEmptyBlock(false) 
+			&& op.getNumThreads() > 1 && m1.getLength() > PAR_NUMCELL_THRESHOLD2  ) {
+			//note: we apply multi-threading in a best-effort manner here
+			//only for expensive operators such as exp, log, sigmoid, because
+			//otherwise allocation, read and write anyway dominates
+			if (!op.isInplace() || m1.isEmpty())
+				ret.allocateDenseBlock(false);
+			else
+				ret = m1;
+
+			int k = op.getNumThreads();
+			DenseBlock a = m1.getDenseBlock();
+			DenseBlock c = ret.getDenseBlock();
+			try {
+				ExecutorService pool = CommonThreadPool.get(k);
+				ArrayList<UncellTask> tasks = new ArrayList<>();
+				ArrayList<Integer> blklens = UtilFunctions.getBalancedBlockSizesDefault(ret.rlen, k, false);
+				for( int i=0, lb=0; i<blklens.size(); lb+=blklens.get(i), i++ )
+					tasks.add(new UncellTask(a, c, op, lb, lb+blklens.get(i)));
+				List<Future<Long>> taskret = pool.invokeAll(tasks);
+				
+				//aggregate non-zeros
+				ret.nonZeros = 0; //reset after execute
+				for( Future<Long> task : taskret )
+					ret.nonZeros += task.get();
+				pool.shutdown();
+			}
+			catch(InterruptedException | ExecutionException ex) {
+				throw new DMLRuntimeException(ex);
+			}
+		}
+		else {
+			if (op.isInplace() && !m1.isInSparseFormat() )
+				ret = m1;
+			
+			//default execute unary operations
+			if(op.sparseSafe)
+				sparseUnaryOperations(m1, ret, op);
+			else
+				denseUnaryOperations(m1, ret, op);
+		}
+		return ret;
+	}
+	
 	/**
 	 * matrix-scalar, scalar-matrix binary operations.
 	 * 
@@ -445,6 +491,106 @@ public class LibMatrixBincell {
 	// private sparse-safe/sparse-unsafe implementations
 	///////////////////////////////////
 
+	private static void denseUnaryOperations(MatrixBlock m1, MatrixBlock ret, UnaryOperator op) {
+		//prepare 0-value init (determine if unnecessarily sparse-unsafe)
+		double val0 = op.fn.execute(0d);
+		
+		final int m = m1.rlen;
+		final int n = m1.clen;
+		
+		//early abort possible if unnecessarily sparse unsafe
+		//(otherwise full init with val0, no need for computation)
+		if( m1.isEmptyBlock(false) ) {
+			if( val0 != 0 )
+				ret.reset(m, n, val0);
+			return;
+		}
+		
+		//redirection to sparse safe operation w/ init by val0
+		if( m1.sparse && val0 != 0 ) {
+			ret.reset(m, n, val0);
+			ret.nonZeros = (long)m * n;
+		}
+		sparseUnaryOperations(m1, ret, op);
+	}
+	
+	private static void sparseUnaryOperations(MatrixBlock m1, MatrixBlock ret, UnaryOperator op) {
+		//early abort possible since sparse-safe
+		if( m1.isEmptyBlock(false) )
+			return;
+		
+		final int m = m1.rlen;
+		final int n = m1.clen;
+		
+		if( m1.sparse && ret.sparse ) //SPARSE <- SPARSE
+		{
+			ret.allocateSparseRowsBlock();
+			SparseBlock a = m1.sparseBlock;
+			SparseBlock c = ret.sparseBlock;
+		
+			long nnz = 0;
+			for(int i=0; i<m; i++) {
+				if( a.isEmpty(i) ) continue;
+				
+				int apos = a.pos(i);
+				int alen = a.size(i);
+				int[] aix = a.indexes(i);
+				double[] avals = a.values(i);
+				
+				c.allocate(i, alen); //avoid repeated alloc
+				for( int j=apos; j<apos+alen; j++ ) {
+					double val = op.fn.execute(avals[j]);
+					c.append(i, aix[j], val);
+					nnz += (val != 0) ? 1 : 0;
+				}
+			}
+			ret.nonZeros = nnz;
+		}
+		else if( m1.sparse ) //DENSE <- SPARSE
+		{
+			ret.allocateDenseBlock(false);
+			SparseBlock a = m1.sparseBlock;
+			DenseBlock c = ret.denseBlock;
+			long nnz = (ret.nonZeros > 0) ?
+				(long) m*n-a.size() : 0;
+			for(int i=0; i<m; i++) {
+				if( a.isEmpty(i) ) continue;
+				int apos = a.pos(i);
+				int alen = a.size(i);
+				int[] aix = a.indexes(i);
+				double[] avals = a.values(i);
+				double[] cvals = c.values(i);
+				int cix = c.pos(i);
+				for( int j=apos; j<apos+alen; j++ ) {
+					double val = op.fn.execute(avals[j]);
+					cvals[cix + aix[j]] = val; 
+					nnz += (val != 0) ? 1 : 0;
+				}
+			}
+			ret.nonZeros = nnz;
+		}
+		else //DENSE <- DENSE
+		{
+			if( m1 != ret ) //!in-place
+				ret.allocateDenseBlock(false);
+			DenseBlock da = m1.getDenseBlock();
+			DenseBlock dc = ret.getDenseBlock();
+			
+			//unary op, incl nnz maintenance
+			long nnz = 0;
+			for( int bi=0; bi<da.numBlocks(); bi++ ) {
+				double[] a = da.valuesAt(bi);
+				double[] c = dc.valuesAt(bi);
+				int len = da.size(bi);
+				for( int i=0; i<len; i++ ) {
+					c[i] = op.fn.execute(a[i]);
+					nnz += (c[i] != 0) ? 1 : 0;
+				}
+			}
+			ret.nonZeros = nnz;
+		}
+	}
+	
 	private static long safeBinary(MatrixBlock m1, MatrixBlock m2, MatrixBlock ret, BinaryOperator op,
 		BinaryAccessType atype, int rl, int ru)
 	{
@@ -1764,4 +1910,49 @@ public class LibMatrixBincell {
 			return safeBinaryScalar(_m1, _ret, _sop, _rl, _ru);
 		}
 	}
+	
+	private static class UncellTask implements Callable<Long> {
+		private final DenseBlock _a;
+		private final DenseBlock _c;
+		private final UnaryOperator _op;
+		private final int _rl;
+		private final int _ru;
+
+		protected UncellTask(DenseBlock a, DenseBlock c, UnaryOperator op, int rl, int ru ) {
+			_a = a;
+			_c = c;
+			_op = op;
+			_rl = rl;
+			_ru = ru;
+		}
+		
+		@Override
+		public Long call() {
+			long nnz = 0;
+			//fast dense-dense operations
+			if(_a.isContiguous(_rl, _ru)) {
+				double[] avals = _a.values(_rl);
+				double[] cvals = _c.values(_rl);
+				int start = _a.pos(_rl), end = _a.pos(_ru);
+				for( int i=start; i<end; i++ ) {
+					cvals[i] = _op.fn.execute(avals[i]);
+					nnz += (cvals[i] != 0) ? 1 : 0;
+				}
+			}
+			//generic dense-dense, including large blocks
+			else {
+				int clen = _a.getDim(1);
+				for(int i=_rl; i<_ru; i++) {
+					double[] avals = _a.values(i);
+					double[] cvals = _c.values(i);
+					int pos = _a.pos(i);
+					for( int j=0; j<clen; j++ ) {
+						cvals[pos+j] = _op.fn.execute(avals[pos+j]);
+						nnz += (cvals[pos+j] != 0) ? 1 : 0;
+					}
+				}
+			}
+			return nnz;
+		}
+	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
index ebd0ff88b0..6c8c6c9953 100644
--- a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
@@ -2859,34 +2859,8 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
 			else
 				ret = LibMatrixAgg.cumaggregateUnaryMatrix(this, ret, op);
 		}
-		else if(!sparse && !isEmptyBlock(false)
-			&& OptimizerUtils.isMaxLocalParallelism(op.getNumThreads())) {
-			//note: we apply multi-threading in a best-effort manner here
-			//only for expensive operators such as exp, log, sigmoid, because
-			//otherwise allocation, read and write anyway dominates
-			if (!op.isInplace() || isEmpty())
-				ret.allocateDenseBlock(false);
-			else
-				ret = this;
-
-			DenseBlock a = getDenseBlock();
-			DenseBlock c = ret.getDenseBlock();
-			for(int bi=0; bi<a.numBlocks(); bi++) {
-				double[] avals = a.valuesAt(bi), cvals = c.valuesAt(bi);
-				Arrays.parallelSetAll(cvals, i -> op.fn.execute(avals[i]));
-			}
-			ret.recomputeNonZeros();
-		}
-		else
-		{
-			if (op.isInplace() && !isInSparseFormat() )
-				ret = this;
-			
-			//default execute unary operations
-			if(op.sparseSafe)
-				sparseUnaryOperations(op, ret);
-			else
-				denseUnaryOperations(op, ret);
+		else {
+			ret = LibMatrixBincell.uncellOp(this, ret, op);
 		}
 		
 		//ensure empty results sparse representation 
@@ -2897,106 +2871,6 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
 		return ret;
 	}
 
-	private void sparseUnaryOperations(UnaryOperator op, MatrixBlock ret) {
-		//early abort possible since sparse-safe
-		if( isEmptyBlock(false) )
-			return;
-		
-		final int m = rlen;
-		final int n = clen;
-		
-		if( sparse && ret.sparse ) //SPARSE <- SPARSE
-		{
-			ret.allocateSparseRowsBlock();
-			SparseBlock a = sparseBlock;
-			SparseBlock c = ret.sparseBlock;
-		
-			long nnz = 0;
-			for(int i=0; i<m; i++) {
-				if( a.isEmpty(i) ) continue;
-				
-				int apos = a.pos(i);
-				int alen = a.size(i);
-				int[] aix = a.indexes(i);
-				double[] avals = a.values(i);
-				
-				c.allocate(i, alen); //avoid repeated alloc
-				for( int j=apos; j<apos+alen; j++ ) {
-					double val = op.fn.execute(avals[j]);
-					c.append(i, aix[j], val);
-					nnz += (val != 0) ? 1 : 0;
-				}
-			}
-			ret.nonZeros = nnz;
-		}
-		else if( sparse ) //DENSE <- SPARSE
-		{
-			ret.allocateDenseBlock(false);
-			SparseBlock a = sparseBlock;
-			DenseBlock c = ret.denseBlock;
-			long nnz = (ret.nonZeros > 0) ?
-				(long) m*n-a.size() : 0;
-			for(int i=0; i<m; i++) {
-				if( a.isEmpty(i) ) continue;
-				int apos = a.pos(i);
-				int alen = a.size(i);
-				int[] aix = a.indexes(i);
-				double[] avals = a.values(i);
-				double[] cvals = c.values(i);
-				int cix = c.pos(i);
-				for( int j=apos; j<apos+alen; j++ ) {
-					double val = op.fn.execute(avals[j]);
-					cvals[cix + aix[j]] = val; 
-					nnz += (val != 0) ? 1 : 0;
-				}
-			}
-			ret.nonZeros = nnz;
-		}
-		else //DENSE <- DENSE
-		{
-			if( this != ret ) //!in-place
-				ret.allocateDenseBlock(false);
-			DenseBlock da = getDenseBlock();
-			DenseBlock dc = ret.getDenseBlock();
-			
-			//unary op, incl nnz maintenance
-			long nnz = 0;
-			for( int bi=0; bi<da.numBlocks(); bi++ ) {
-				double[] a = da.valuesAt(bi);
-				double[] c = dc.valuesAt(bi);
-				int len = da.size(bi);
-				for( int i=0; i<len; i++ ) {
-					c[i] = op.fn.execute(a[i]);
-					nnz += (c[i] != 0) ? 1 : 0;
-				}
-			}
-			ret.nonZeros = nnz;
-		}
-	}
-
-	private void denseUnaryOperations(UnaryOperator op, MatrixBlock ret) {
-		//prepare 0-value init (determine if unnecessarily sparse-unsafe)
-		double val0 = op.fn.execute(0d);
-		
-		final int m = rlen;
-		final int n = clen;
-		
-		//early abort possible if unnecessarily sparse unsafe
-		//(otherwise full init with val0, no need for computation)
-		if( isEmptyBlock(false) ) {
-			if( val0 != 0 )
-				ret.reset(m, n, val0);
-			return;
-		}
-		
-		//redirection to sparse safe operation w/ init by val0
-		if( sparse && val0 != 0 ) {
-			ret.reset(m, n, val0);
-			ret.nonZeros = (long)m * n;
-		}
-		sparseUnaryOperations(op, ret);
-	}
-
 	public final MatrixBlock binaryOperations(BinaryOperator op, MatrixValue thatValue){
 		return binaryOperations(op, thatValue, null);
 	}
diff --git a/src/main/java/org/apache/sysds/utils/stats/SparkStatistics.java b/src/main/java/org/apache/sysds/utils/stats/SparkStatistics.java
index e374512267..331634d1bd 100644
--- a/src/main/java/org/apache/sysds/utils/stats/SparkStatistics.java
+++ b/src/main/java/org/apache/sysds/utils/stats/SparkStatistics.java
@@ -21,7 +21,6 @@ package org.apache.sysds.utils.stats;
 
 import java.util.concurrent.atomic.LongAdder;
 
-import org.apache.sysds.hops.OptimizerUtils;
 import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
 
 public class SparkStatistics {