You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by ar...@apache.org on 2022/04/29 08:01:07 UTC

[systemds] branch main updated: [SYSTEMDS-3293] Materialize partition counts in the encoder objects

This is an automated email from the ASF dual-hosted git repository.

arnabp20 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new 12f283fda7 [SYSTEMDS-3293] Materialize partition counts in the encoder objects
12f283fda7 is described below

commit 12f283fda7e53f961322c87ea7e5f0ab279a7b13
Author: arnabp <ar...@tugraz.at>
AuthorDate: Fri Apr 29 10:00:31 2022 +0200

    [SYSTEMDS-3293] Materialize partition counts in the encoder objects
    
    This patch refactors the current code to derive optimum number of
    build and apply blocks and push them inside the encoder objects.
    This change allows us to column-wise vary the partition counts.
---
 src/main/java/org/apache/sysds/conf/DMLConfig.java |  4 +-
 .../runtime/transform/encode/ColumnEncoder.java    | 23 ++++++-----
 .../transform/encode/ColumnEncoderComposite.java   | 17 ++++++---
 .../transform/encode/ColumnEncoderDummycode.java   |  2 +-
 .../transform/encode/ColumnEncoderFeatureHash.java |  2 +-
 .../transform/encode/ColumnEncoderPassThrough.java |  2 +-
 .../transform/encode/MultiColumnEncoder.java       | 44 +++++++++++-----------
 .../TransformFrameBuildMultithreadedTest.java      |  2 +-
 8 files changed, 52 insertions(+), 44 deletions(-)

diff --git a/src/main/java/org/apache/sysds/conf/DMLConfig.java b/src/main/java/org/apache/sysds/conf/DMLConfig.java
index 8863faf9bd..1b730f6a3c 100644
--- a/src/main/java/org/apache/sysds/conf/DMLConfig.java
+++ b/src/main/java/org/apache/sysds/conf/DMLConfig.java
@@ -143,8 +143,8 @@ public class DMLConfig
 		_defaultVals.put(CP_PARALLEL_IO,         "true" );
 		_defaultVals.put(PARALLEL_ENCODE,        "false" );
 		_defaultVals.put(PARALLEL_ENCODE_STAGED, "false" );
-		_defaultVals.put(PARALLEL_ENCODE_APPLY_BLOCKS, "1");
-		_defaultVals.put(PARALLEL_ENCODE_BUILD_BLOCKS, "1");
+		_defaultVals.put(PARALLEL_ENCODE_APPLY_BLOCKS, "-1");
+		_defaultVals.put(PARALLEL_ENCODE_BUILD_BLOCKS, "-1");
 		_defaultVals.put(PARALLEL_ENCODE_NUM_THREADS, "-1");
 		_defaultVals.put(COMPRESSED_LINALG,      Compression.CompressConfig.FALSE.name() );
 		_defaultVals.put(COMPRESSED_LOSSY,       "false" );
diff --git a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java
index 9c7832c2bf..4e969896c3 100644
--- a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java
+++ b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java
@@ -61,6 +61,8 @@ public abstract class ColumnEncoder implements Encoder, Comparable<ColumnEncoder
 	protected int _colID;
 	protected ArrayList<Integer> _sparseRowsWZeros = null;
 	protected long _estMetaSize = 0;
+	protected int _nBuildPartitions = 0;
+	protected int _nApplyPartitions = 0;
 
 	protected enum TransformType{
 		BIN, RECODE, DUMMYCODE, FEATURE_HASH, PASS_THROUGH, N_A
@@ -300,11 +302,11 @@ public abstract class ColumnEncoder implements Encoder, Comparable<ColumnEncoder
 	 * complete if all previous tasks are done. This is so that we can use the last task as a dependency for the whole
 	 * build, reducing unnecessary dependencies.
 	 */
-	public List<DependencyTask<?>> getBuildTasks(CacheBlock in, int nBuildPartition) {
+	public List<DependencyTask<?>> getBuildTasks(CacheBlock in) {
 		List<Callable<Object>> tasks = new ArrayList<>();
 		List<List<? extends Callable<?>>> dep = null;
 		int nRows = in.getNumRows();
-		int[] blockSizes = getBlockSizes(nRows, nBuildPartition);
+		int[] blockSizes = getBlockSizes(nRows, _nBuildPartitions);
 		if(blockSizes.length == 1) {
 			tasks.add(getBuildTask(in));
 		}
@@ -335,17 +337,17 @@ public abstract class ColumnEncoder implements Encoder, Comparable<ColumnEncoder
 	}
 
 
-	public List<DependencyTask<?>> getApplyTasks(CacheBlock in, MatrixBlock out, int nApplyPartitions, int outputCol) {
+	public List<DependencyTask<?>> getApplyTasks(CacheBlock in, MatrixBlock out, int outputCol) {
 		List<Callable<Object>> tasks = new ArrayList<>();
 		List<List<? extends Callable<?>>> dep = null;
-		int[] blockSizes = getBlockSizes(in.getNumRows(), nApplyPartitions);
+		int[] blockSizes = getBlockSizes(in.getNumRows(), _nApplyPartitions);
 		for(int startRow = 0, i = 0; i < blockSizes.length; startRow+=blockSizes[i], i++){
 			if(out.isInSparseFormat())
 				tasks.add(getSparseTask(in, out, outputCol, startRow, blockSizes[i]));
 			else
 				tasks.add(getDenseTask(in, out, outputCol, startRow, blockSizes[i]));
 		}
-		if(tasks.size() > 1){
+		if(tasks.size() > 1) {
 			dep = new ArrayList<>(Collections.nCopies(tasks.size(), null));
 			tasks.add(() -> null);  // Empty task as barrier
 			dep.add(tasks.subList(0, tasks.size()-1));
@@ -380,15 +382,12 @@ public abstract class ColumnEncoder implements Encoder, Comparable<ColumnEncoder
 		}
 	}
 
-	protected int getNumApplyRowPartitions(){
-		return ConfigurationManager.getParallelApplyBlocks();
+	protected void setBuildRowBlocksPerColumn(int nPart) {
+		_nBuildPartitions = nPart;
 	}
 
-	protected int getNumBuildRowPartitions(){
-		if (BUILD_ROW_BLOCKS_PER_COLUMN == -1)
-			return ConfigurationManager.getParallelBuildBlocks();
-		else
-			return BUILD_ROW_BLOCKS_PER_COLUMN;
+	protected void setApplyRowBlocksPerColumn(int nPart) {
+		_nApplyPartitions = nPart;
 	}
 
 	public enum EncoderType {
diff --git a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderComposite.java b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderComposite.java
index 59dd3157c0..a22cab19ab 100644
--- a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderComposite.java
+++ b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderComposite.java
@@ -106,17 +106,17 @@ public class ColumnEncoderComposite extends ColumnEncoder {
 	}
 
 	@Override
-	public List<DependencyTask<?>> getApplyTasks(CacheBlock in, MatrixBlock out, int nParition, int outputCol) {
+	public List<DependencyTask<?>> getApplyTasks(CacheBlock in, MatrixBlock out, int outputCol) {
 		List<DependencyTask<?>> tasks = new ArrayList<>();
 		List<Integer> sizes = new ArrayList<>();
 		for(int i = 0; i < _columnEncoders.size(); i++) {
 			List<DependencyTask<?>> t;
 			if(i == 0) {
 				// 1. encoder writes data into MatrixBlock Column all others use this column for further encoding
-				t = _columnEncoders.get(i).getApplyTasks(in, out, nParition, outputCol);
+				t = _columnEncoders.get(i).getApplyTasks(in, out, outputCol);
 			}
 			else {
-				t = _columnEncoders.get(i).getApplyTasks(out, out, nParition, outputCol);
+				t = _columnEncoders.get(i).getApplyTasks(out, out, outputCol);
 			}
 			if(t == null)
 				continue;
@@ -143,11 +143,11 @@ public class ColumnEncoderComposite extends ColumnEncoder {
 	}
 
 	@Override
-	public List<DependencyTask<?>> getBuildTasks(CacheBlock in, int nPartition) {
+	public List<DependencyTask<?>> getBuildTasks(CacheBlock in) {
 		List<DependencyTask<?>> tasks = new ArrayList<>();
 		Map<Integer[], Integer[]> depMap = null;
 		for(ColumnEncoder columnEncoder : _columnEncoders) {
-			List<DependencyTask<?>> t = columnEncoder.getBuildTasks(in, nPartition);
+			List<DependencyTask<?>> t = columnEncoder.getBuildTasks(in);
 			if(t == null)
 				continue;
 			// Linear execution between encoders so they can't be built in parallel
@@ -368,6 +368,13 @@ public class ColumnEncoderComposite extends ColumnEncoder {
 		setEstMetaSize(totEstSize);
 	}
 
+	public void setNumPartitions(int nBuild, int nApply) {
+			_columnEncoders.forEach(e -> {
+				e.setBuildRowBlocksPerColumn(nBuild);
+				e.setApplyRowBlocksPerColumn(nApply);
+			});
+	}
+
 	@Override
 	public void shiftCol(int columnOffset) {
 		super.shiftCol(columnOffset);
diff --git a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderDummycode.java b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderDummycode.java
index 3366329a40..e0efe53e92 100644
--- a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderDummycode.java
+++ b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderDummycode.java
@@ -65,7 +65,7 @@ public class ColumnEncoderDummycode extends ColumnEncoder {
 	}
 
 	@Override
-	public List<DependencyTask<?>> getBuildTasks(CacheBlock in, int nParition) {
+	public List<DependencyTask<?>> getBuildTasks(CacheBlock in) {
 		return null;
 	}
 
diff --git a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderFeatureHash.java b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderFeatureHash.java
index aa362fef74..9445474446 100644
--- a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderFeatureHash.java
+++ b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderFeatureHash.java
@@ -93,7 +93,7 @@ public class ColumnEncoderFeatureHash extends ColumnEncoder {
 	}
 
 	@Override
-	public List<DependencyTask<?>> getBuildTasks(CacheBlock in, int nParition) {
+	public List<DependencyTask<?>> getBuildTasks(CacheBlock in) {
 		return null;
 	}
 
diff --git a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderPassThrough.java b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderPassThrough.java
index 04df3d5b9f..0b95734e2f 100644
--- a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderPassThrough.java
+++ b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderPassThrough.java
@@ -51,7 +51,7 @@ public class ColumnEncoderPassThrough extends ColumnEncoder {
 	}
 
 	@Override
-	public List<DependencyTask<?>> getBuildTasks(CacheBlock in, int nParition) {
+	public List<DependencyTask<?>> getBuildTasks(CacheBlock in) {
 		return null;
 	}
 
diff --git a/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java b/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
index fe49097296..b34a152fc7 100644
--- a/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
+++ b/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
@@ -76,7 +76,7 @@ public class MultiColumnEncoder implements Encoder {
 	private EncoderOmit _legacyOmit = null;
 	private int _colOffset = 0; // offset for federated Workers who are using subrange encoders
 	private FrameBlock _meta = null;
-	private int[] _nPartitions = null;
+	private boolean _partitionDone = false;
 
 	public MultiColumnEncoder(List<ColumnEncoderComposite> columnEncoders) {
 		_columnEncoders = columnEncoders;
@@ -92,7 +92,7 @@ public class MultiColumnEncoder implements Encoder {
 
 	public MatrixBlock encode(CacheBlock in, int k) {
 		MatrixBlock out;
-		_nPartitions = getNumRowPartitions(in, k);
+		deriveNumRowPartitions(in, k);
 		try {
 			if(k > 1 && !MULTI_THREADED_STAGES && !hasLegacyEncoder()) {
 				out = new MatrixBlock();
@@ -159,7 +159,7 @@ public class MultiColumnEncoder implements Encoder {
 
 		for(ColumnEncoderComposite e : _columnEncoders) {
 			// Create the build tasks
-			List<DependencyTask<?>> buildTasks = e.getBuildTasks(in, _nPartitions[0]);
+			List<DependencyTask<?>> buildTasks = e.getBuildTasks(in);
 			tasks.addAll(buildTasks);
 			if(buildTasks.size() > 0) {
 				// Check if any Build independent UpdateDC task (Bin+DC, FH+DC)
@@ -201,7 +201,7 @@ public class MultiColumnEncoder implements Encoder {
 			// Apply Task depends on InitOutputMatrixTask (output allocation)
 			depMap.put(new Integer[] {tasks.size(), tasks.size() + 1},         //ApplyTask
 					new Integer[] {0, 1});                                     //Allocation task (1st task)
-			ApplyTasksWrapperTask applyTaskWrapper = new ApplyTasksWrapperTask(e, in, out, _nPartitions[1], pool);
+			ApplyTasksWrapperTask applyTaskWrapper = new ApplyTasksWrapperTask(e, in, out, pool);
 
 			if(e.hasEncoder(ColumnEncoderDummycode.class)) {
 				// Allocation depends on build if DC is in the list.
@@ -248,8 +248,8 @@ public class MultiColumnEncoder implements Encoder {
 	public void build(CacheBlock in, int k) {
 		if(hasLegacyEncoder() && !(in instanceof FrameBlock))
 			throw new DMLRuntimeException("LegacyEncoders do not support non FrameBlock Inputs");
-		if(_nPartitions == null) //happens if this method is directly called
-			_nPartitions = getNumRowPartitions(in, k);
+		if(!_partitionDone) //happens if this method is directly called
+			deriveNumRowPartitions(in, k);
 		if(k > 1) {
 			buildMT(in, k);
 		}
@@ -266,7 +266,7 @@ public class MultiColumnEncoder implements Encoder {
 	private List<DependencyTask<?>> getBuildTasks(CacheBlock in) {
 		List<DependencyTask<?>> tasks = new ArrayList<>();
 		for(ColumnEncoderComposite columnEncoder : _columnEncoders) {
-			tasks.addAll(columnEncoder.getBuildTasks(in, _nPartitions[0]));
+			tasks.addAll(columnEncoder.getBuildTasks(in));
 		}
 		return tasks;
 	}
@@ -325,8 +325,8 @@ public class MultiColumnEncoder implements Encoder {
 			hasDC = columnEncoder.hasEncoder(ColumnEncoderDummycode.class);
 		outputMatrixPreProcessing(out, in, hasDC);
 		if(k > 1) {
-			if(_nPartitions == null) //happens if this method is directly called
-				_nPartitions = getNumRowPartitions(in, k);
+			if(!_partitionDone) //happens if this method is directly called
+				deriveNumRowPartitions(in, k);
 			applyMT(in, out, outputCol, k);
 		}
 		else {
@@ -348,11 +348,11 @@ public class MultiColumnEncoder implements Encoder {
 		return out;
 	}
 
-	private List<DependencyTask<?>> getApplyTasks(CacheBlock in, MatrixBlock out, int nPartition, int outputCol) {
+	private List<DependencyTask<?>> getApplyTasks(CacheBlock in, MatrixBlock out, int outputCol) {
 		List<DependencyTask<?>> tasks = new ArrayList<>();
 		int offset = outputCol;
 		for(ColumnEncoderComposite e : _columnEncoders) {
-			tasks.addAll(e.getApplyTasks(in, out, nPartition, e._colID - 1 + offset));
+			tasks.addAll(e.getApplyTasks(in, out, e._colID - 1 + offset));
 			if(e.hasEncoder(ColumnEncoderDummycode.class))
 				offset += e.getEncoder(ColumnEncoderDummycode.class)._domainSize - 1;
 		}
@@ -365,12 +365,12 @@ public class MultiColumnEncoder implements Encoder {
 			if(APPLY_ENCODER_SEPARATE_STAGES){
 				int offset = outputCol;
 				for (ColumnEncoderComposite e : _columnEncoders) {
-					pool.submitAllAndWait(e.getApplyTasks(in, out, _nPartitions[1], e._colID - 1 + offset));
+					pool.submitAllAndWait(e.getApplyTasks(in, out, e._colID - 1 + offset));
 					if (e.hasEncoder(ColumnEncoderDummycode.class))
 						offset += e.getEncoder(ColumnEncoderDummycode.class)._domainSize - 1;
 				}
 			}else{
-				pool.submitAllAndWait(getApplyTasks(in, out, _nPartitions[1], outputCol));
+				pool.submitAllAndWait(getApplyTasks(in, out, outputCol));
 			}
 		}
 		catch(ExecutionException | InterruptedException e) {
@@ -380,12 +380,14 @@ public class MultiColumnEncoder implements Encoder {
 		pool.shutdown();
 	}
 
-	private int[] getNumRowPartitions(CacheBlock in, int k) {
+	private void deriveNumRowPartitions(CacheBlock in, int k) {
 		int[] numBlocks = new int[2];
 		if (k == 1) { //single-threaded
 			numBlocks[0] = 1;
 			numBlocks[1] = 1;
-			return numBlocks;
+			_columnEncoders.forEach(e -> e.setNumPartitions(1, 1));
+			_partitionDone = true;
+			return;
 		}
 		// Read from global flags. These are set by the unit tests
 		if (ColumnEncoder.BUILD_ROW_BLOCKS_PER_COLUMN > 0)
@@ -443,7 +445,9 @@ public class MultiColumnEncoder implements Encoder {
 			if (numBlocks[i] == 0)
 				numBlocks[i] = 1; //default 1
 
-		return numBlocks;
+		_partitionDone = true;
+		// Materialize the partition counts in the encoders
+		_columnEncoders.forEach(e -> e.setNumPartitions(numBlocks[0], numBlocks[1]));
 	}
 
 	private void estimateRCMapSize(CacheBlock in, List<ColumnEncoderComposite> rcList) {
@@ -454,7 +458,7 @@ public class MultiColumnEncoder implements Encoder {
 		int seed = (int) System.nanoTime();
 		int[] sampleInds = CompressedSizeEstimatorSample.getSortedSample(in.getNumRows(), sampleSize, seed, 1);
 
-		// Concurrent (col-wise) recode map size estimation
+		// Concurrent (column-wise) recode map size estimation
 		ExecutorService myPool = CommonThreadPool.get(k);
 		try {
 			myPool.submit(() -> {
@@ -1046,22 +1050,20 @@ public class MultiColumnEncoder implements Encoder {
 		private final ColumnEncoder _encoder;
 		private final MatrixBlock _out;
 		private final CacheBlock _in;
-		private final int _nApplyPartition;
 		private int _offset = -1; // offset dude to dummycoding in
 									// previous columns needs to be updated by external task!
 
 		private ApplyTasksWrapperTask(ColumnEncoder encoder, CacheBlock in, 
-				MatrixBlock out, int nPart, DependencyThreadPool pool) {
+				MatrixBlock out, DependencyThreadPool pool) {
 			super(pool);
 			_encoder = encoder;
 			_out = out;
 			_in = in;
-			_nApplyPartition = nPart;
 		}
 
 		@Override
 		public List<DependencyTask<?>> getWrappedTasks() {
-			return _encoder.getApplyTasks(_in, _out, _nApplyPartition, _encoder._colID - 1 + _offset);
+			return _encoder.getApplyTasks(_in, _out, _encoder._colID - 1 + _offset);
 		}
 
 		@Override
diff --git a/src/test/java/org/apache/sysds/test/functions/transform/TransformFrameBuildMultithreadedTest.java b/src/test/java/org/apache/sysds/test/functions/transform/TransformFrameBuildMultithreadedTest.java
index 27d7c785df..c18f184f2b 100644
--- a/src/test/java/org/apache/sysds/test/functions/transform/TransformFrameBuildMultithreadedTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/transform/TransformFrameBuildMultithreadedTest.java
@@ -188,7 +188,7 @@ public class TransformFrameBuildMultithreadedTest extends AutomatedTestBase {
 				.readFrameFromHDFS(DATASET, -1L, -1L);
 			StringBuilder specSb = new StringBuilder();
 			Files.readAllLines(Paths.get(SPEC)).forEach(s -> specSb.append(s).append("\n"));
-			ColumnEncoder.BUILD_ROW_BLOCKS_PER_COLUMN = Math.max(blockSize, 1);
+			ColumnEncoder.BUILD_ROW_BLOCKS_PER_COLUMN = Math.max(blockSize, -1);
 			MultiColumnEncoder encoderS = EncoderFactory.createEncoder(specSb.toString(), input.getColumnNames(),
 				input.getNumColumns(), null);
 			MultiColumnEncoder encoderM = EncoderFactory.createEncoder(specSb.toString(), input.getColumnNames(),