You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by ar...@apache.org on 2021/12/08 13:44:48 UTC

[systemds] branch main updated: [SYSTEMDS-3242] Multithreaded allocation for transformencode

This is an automated email from the ASF dual-hosted git repository.

arnabp20 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new 79ec601  [SYSTEMDS-3242] Multithreaded allocation for transformencode
79ec601 is described below

commit 79ec6019843a037545aee7bd6495f91cbeb88a6e
Author: arnabp <ar...@tugraz.at>
AuthorDate: Wed Dec 8 14:38:47 2021 +0100

    [SYSTEMDS-3242] Multithreaded allocation for transformencode
    
    This patch enables multi-threaded sparse target matrix allocation
    for transformencode apply phase.
---
 .../transform/encode/MultiColumnEncoder.java       | 43 ++++++++++++++++------
 1 file changed, 31 insertions(+), 12 deletions(-)

diff --git a/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java b/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
index f593487..7206600 100644
--- a/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
+++ b/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
@@ -37,12 +37,14 @@ import java.util.concurrent.Future;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.common.Types;
 import org.apache.sysds.conf.ConfigurationManager;
+import org.apache.sysds.hops.OptimizerUtils;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.controlprogram.caching.CacheBlock;
 import org.apache.sysds.runtime.data.SparseBlock;
@@ -263,7 +265,10 @@ public class MultiColumnEncoder implements Encoder {
 				+ "has a encoder or slice the input accordingly");
 		// TODO smart checks
 		// Block allocation for MT access
-		outputMatrixPreProcessing(out, in);
+		boolean hasDC = false;
+		for(ColumnEncoderComposite columnEncoder : _columnEncoders)
+			hasDC = columnEncoder.hasEncoder(ColumnEncoderDummycode.class);
+		outputMatrixPreProcessing(out, in, hasDC);
 		if(k > 1) {
 			applyMT(in, out, outputCol, k);
 		}
@@ -318,7 +323,7 @@ public class MultiColumnEncoder implements Encoder {
 		pool.shutdown();
 	}
 
-	private static void outputMatrixPreProcessing(MatrixBlock output, CacheBlock input) {
+	private static void outputMatrixPreProcessing(MatrixBlock output, CacheBlock input, boolean hasDC) {
 		long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
 		output.allocateBlock();
 		if(output.isInSparseFormat()) {
@@ -326,18 +331,31 @@ public class MultiColumnEncoder implements Encoder {
 			if(!(block instanceof SparseBlockMCSR))
 				throw new RuntimeException(
 					"Transform apply currently only supported for MCSR sparse and dense output Matrices");
-			for(int r = 0; r < output.getNumRows(); r++) {
-				// allocate all sparse rows so MT sync can be done.
-				// should be rare that rows have only 0
-				block.allocate(r, input.getNumColumns());
-				// Setting the size here makes it possible to run all sparse apply tasks without any sync
-				// could become problematic if the input is very sparse since we allocate the same size as the input
-				// should be fine in theory ;)
-				((SparseRowVector)block.get(r)).setSize(input.getNumColumns());
+			if (hasDC && OptimizerUtils.getTransformNumThreads()>1) {
+				// DC forces a single threaded allocation after the build phase and
+				// before the apply starts. Below code parallelizes sparse allocation.
+				IntStream.range(0, output.getNumRows())
+				.parallel().forEach(r -> {
+					block.allocate(r, input.getNumColumns());
+					((SparseRowVector)block.get(r)).setSize(input.getNumColumns());
+				});
+			}
+			else {
+				for(int r = 0; r < output.getNumRows(); r++) {
+					// allocate all sparse rows so MT sync can be done.
+					// should be rare that rows have only 0
+					block.allocate(r, input.getNumColumns());
+					// Setting the size here makes it possible to run all sparse apply tasks without any sync
+					// could become problematic if the input is very sparse since we allocate the same size as the input
+					// should be fine in theory ;)
+					((SparseRowVector)block.get(r)).setSize(input.getNumColumns());
+				}
 			}
 		}
-		if(DMLScript.STATISTICS)
+		if(DMLScript.STATISTICS) {
+			LOG.debug("Elapsed time for allocation: "+ ((double) System.nanoTime() - t0) / 1000000 + " ms");
 			Statistics.incTransformOutMatrixPreProcessingTime(System.nanoTime()-t0);
+		}
 	}
 
 	private void outputMatrixPostProcessing(MatrixBlock output){
@@ -803,10 +821,11 @@ public class MultiColumnEncoder implements Encoder {
 		@Override
 		public Object call() throws Exception {
 			int numCols = _input.getNumColumns() + _encoder.getNumExtraCols();
+			boolean hasDC = _encoder.getColumnEncoders(ColumnEncoderDummycode.class).size() > 0;
 			long estNNz = (long) _input.getNumColumns() * (long) _input.getNumRows();
 			boolean sparse = MatrixBlock.evalSparseFormatInMemory(_input.getNumRows(), numCols, estNNz);
 			_output.reset(_input.getNumRows(), numCols, sparse, estNNz);
-			outputMatrixPreProcessing(_output, _input);
+			outputMatrixPreProcessing(_output, _input, hasDC);
 			return null;
 		}