You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by ar...@apache.org on 2021/12/08 13:44:48 UTC
[systemds] branch main updated: [SYSTEMDS-3242] Multithreaded allocation for transformencode
This is an automated email from the ASF dual-hosted git repository.
arnabp20 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new 79ec601 [SYSTEMDS-3242] Multithreaded allocation for transformencode
79ec601 is described below
commit 79ec6019843a037545aee7bd6495f91cbeb88a6e
Author: arnabp <ar...@tugraz.at>
AuthorDate: Wed Dec 8 14:38:47 2021 +0100
[SYSTEMDS-3242] Multithreaded allocation for transformencode
This patch enables multi-threaded sparse target matrix allocation
for transformencode apply phase.
---
.../transform/encode/MultiColumnEncoder.java | 43 ++++++++++++++++------
1 file changed, 31 insertions(+), 12 deletions(-)
diff --git a/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java b/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
index f593487..7206600 100644
--- a/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
+++ b/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
@@ -37,12 +37,14 @@ import java.util.concurrent.Future;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sysds.api.DMLScript;
import org.apache.sysds.common.Types;
import org.apache.sysds.conf.ConfigurationManager;
+import org.apache.sysds.hops.OptimizerUtils;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.caching.CacheBlock;
import org.apache.sysds.runtime.data.SparseBlock;
@@ -263,7 +265,10 @@ public class MultiColumnEncoder implements Encoder {
+ "has a encoder or slice the input accordingly");
// TODO smart checks
// Block allocation for MT access
- outputMatrixPreProcessing(out, in);
+ boolean hasDC = false;
+ for(ColumnEncoderComposite columnEncoder : _columnEncoders)
+ hasDC = columnEncoder.hasEncoder(ColumnEncoderDummycode.class);
+ outputMatrixPreProcessing(out, in, hasDC);
if(k > 1) {
applyMT(in, out, outputCol, k);
}
@@ -318,7 +323,7 @@ public class MultiColumnEncoder implements Encoder {
pool.shutdown();
}
- private static void outputMatrixPreProcessing(MatrixBlock output, CacheBlock input) {
+ private static void outputMatrixPreProcessing(MatrixBlock output, CacheBlock input, boolean hasDC) {
long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
output.allocateBlock();
if(output.isInSparseFormat()) {
@@ -326,18 +331,31 @@ public class MultiColumnEncoder implements Encoder {
if(!(block instanceof SparseBlockMCSR))
throw new RuntimeException(
"Transform apply currently only supported for MCSR sparse and dense output Matrices");
- for(int r = 0; r < output.getNumRows(); r++) {
- // allocate all sparse rows so MT sync can be done.
- // should be rare that rows have only 0
- block.allocate(r, input.getNumColumns());
- // Setting the size here makes it possible to run all sparse apply tasks without any sync
- // could become problematic if the input is very sparse since we allocate the same size as the input
- // should be fine in theory ;)
- ((SparseRowVector)block.get(r)).setSize(input.getNumColumns());
+ if (hasDC && OptimizerUtils.getTransformNumThreads()>1) {
+ // DC forces a single threaded allocation after the build phase and
+ // before the apply starts. Below code parallelizes sparse allocation.
+ IntStream.range(0, output.getNumRows())
+ .parallel().forEach(r -> {
+ block.allocate(r, input.getNumColumns());
+ ((SparseRowVector)block.get(r)).setSize(input.getNumColumns());
+ });
+ }
+ else {
+ for(int r = 0; r < output.getNumRows(); r++) {
+ // allocate all sparse rows so MT sync can be done.
+ // should be rare that rows have only 0
+ block.allocate(r, input.getNumColumns());
+ // Setting the size here makes it possible to run all sparse apply tasks without any sync
+ // could become problematic if the input is very sparse since we allocate the same size as the input
+ // should be fine in theory ;)
+ ((SparseRowVector)block.get(r)).setSize(input.getNumColumns());
+ }
}
}
- if(DMLScript.STATISTICS)
+ if(DMLScript.STATISTICS) {
+ LOG.debug("Elapsed time for allocation: "+ ((double) System.nanoTime() - t0) / 1000000 + " ms");
Statistics.incTransformOutMatrixPreProcessingTime(System.nanoTime()-t0);
+ }
}
private void outputMatrixPostProcessing(MatrixBlock output){
@@ -803,10 +821,11 @@ public class MultiColumnEncoder implements Encoder {
@Override
public Object call() throws Exception {
int numCols = _input.getNumColumns() + _encoder.getNumExtraCols();
+ boolean hasDC = _encoder.getColumnEncoders(ColumnEncoderDummycode.class).size() > 0;
long estNNz = (long) _input.getNumColumns() * (long) _input.getNumRows();
boolean sparse = MatrixBlock.evalSparseFormatInMemory(_input.getNumRows(), numCols, estNNz);
_output.reset(_input.getNumRows(), numCols, sparse, estNNz);
- outputMatrixPreProcessing(_output, _input);
+ outputMatrixPreProcessing(_output, _input, hasDC);
return null;
}