You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by ar...@apache.org on 2022/02/13 19:30:20 UTC

[systemds] branch main updated: [SYSTEMDS-3293] Find optimum #partitions for transformencode

This is an automated email from the ASF dual-hosted git repository.

arnabp20 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new cce3cd5  [SYSTEMDS-3293] Find optimum #partitions for transformencode
cce3cd5 is described below

commit cce3cd516ed7182845feac9dd0202086029870f4
Author: arnabp <ar...@tugraz.at>
AuthorDate: Sun Feb 13 20:25:23 2022 +0100

    [SYSTEMDS-3293] Find optimum #partitions for transformencode
    
    This patch introduces a logic to automatically find the right
    number of row partitions for build and apply.
    No. of build blocks = (2 * #physical cores)/#build encoders
    No. of apply blocks = (4 * #physical cores)/#apply encoders
---
 .../runtime/transform/encode/ColumnEncoder.java    | 10 +--
 .../transform/encode/ColumnEncoderComposite.java   | 19 +++--
 .../transform/encode/ColumnEncoderDummycode.java   |  2 +-
 .../transform/encode/ColumnEncoderFeatureHash.java |  2 +-
 .../transform/encode/ColumnEncoderPassThrough.java |  2 +-
 .../transform/encode/MultiColumnEncoder.java       | 95 ++++++++++++++++++----
 6 files changed, 103 insertions(+), 27 deletions(-)

diff --git a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java
index a9f0c70..df673e9 100644
--- a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java
+++ b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java
@@ -55,7 +55,7 @@ import org.apache.sysds.utils.stats.TransformStatistics;
  */
 public abstract class ColumnEncoder implements Encoder, Comparable<ColumnEncoder> {
 	protected static final Log LOG = LogFactory.getLog(ColumnEncoder.class.getName());
-	protected static final int APPLY_ROW_BLOCKS_PER_COLUMN = 1;
+	public static int APPLY_ROW_BLOCKS_PER_COLUMN = -1;
 	public static int BUILD_ROW_BLOCKS_PER_COLUMN = -1;
 	private static final long serialVersionUID = 2299156350718979064L;
 	protected int _colID;
@@ -290,11 +290,11 @@ public abstract class ColumnEncoder implements Encoder, Comparable<ColumnEncoder
 	 * complete if all previous tasks are done. This is so that we can use the last task as a dependency for the whole
 	 * build, reducing unnecessary dependencies.
 	 */
-	public List<DependencyTask<?>> getBuildTasks(CacheBlock in) {
+	public List<DependencyTask<?>> getBuildTasks(CacheBlock in, int nBuildPartition) {
 		List<Callable<Object>> tasks = new ArrayList<>();
 		List<List<? extends Callable<?>>> dep = null;
 		int nRows = in.getNumRows();
-		int[] blockSizes = getBlockSizes(nRows, getNumBuildRowPartitions());
+		int[] blockSizes = getBlockSizes(nRows, nBuildPartition);
 		if(blockSizes.length == 1) {
 			tasks.add(getBuildTask(in));
 		}
@@ -325,10 +325,10 @@ public abstract class ColumnEncoder implements Encoder, Comparable<ColumnEncoder
 	}
 
 
-	public List<DependencyTask<?>> getApplyTasks(CacheBlock in, MatrixBlock out, int outputCol){
+	public List<DependencyTask<?>> getApplyTasks(CacheBlock in, MatrixBlock out, int nApplyPartitions, int outputCol) {
 		List<Callable<Object>> tasks = new ArrayList<>();
 		List<List<? extends Callable<?>>> dep = null;
-		int[] blockSizes = getBlockSizes(in.getNumRows(), getNumApplyRowPartitions());
+		int[] blockSizes = getBlockSizes(in.getNumRows(), nApplyPartitions);
 		for(int startRow = 0, i = 0; i < blockSizes.length; startRow+=blockSizes[i], i++){
 			if(out.isInSparseFormat())
 				tasks.add(getSparseTask(in, out, outputCol, startRow, blockSizes[i]));
diff --git a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderComposite.java b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderComposite.java
index 4eb57ba..ac2156e 100644
--- a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderComposite.java
+++ b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderComposite.java
@@ -106,17 +106,17 @@ public class ColumnEncoderComposite extends ColumnEncoder {
 	}
 
 	@Override
-	public List<DependencyTask<?>> getApplyTasks(CacheBlock in, MatrixBlock out, int outputCol) {
+	public List<DependencyTask<?>> getApplyTasks(CacheBlock in, MatrixBlock out, int nParition, int outputCol) {
 		List<DependencyTask<?>> tasks = new ArrayList<>();
 		List<Integer> sizes = new ArrayList<>();
 		for(int i = 0; i < _columnEncoders.size(); i++) {
 			List<DependencyTask<?>> t;
 			if(i == 0) {
 				// 1. encoder writes data into MatrixBlock Column all others use this column for further encoding
-				t = _columnEncoders.get(i).getApplyTasks(in, out, outputCol);
+				t = _columnEncoders.get(i).getApplyTasks(in, out, nParition, outputCol);
 			}
 			else {
-				t = _columnEncoders.get(i).getApplyTasks(out, out, outputCol);
+				t = _columnEncoders.get(i).getApplyTasks(out, out, nParition, outputCol);
 			}
 			if(t == null)
 				continue;
@@ -143,11 +143,11 @@ public class ColumnEncoderComposite extends ColumnEncoder {
 	}
 
 	@Override
-	public List<DependencyTask<?>> getBuildTasks(CacheBlock in) {
+	public List<DependencyTask<?>> getBuildTasks(CacheBlock in, int nPartition) {
 		List<DependencyTask<?>> tasks = new ArrayList<>();
 		Map<Integer[], Integer[]> depMap = null;
 		for(ColumnEncoder columnEncoder : _columnEncoders) {
-			List<DependencyTask<?>> t = columnEncoder.getBuildTasks(in);
+			List<DependencyTask<?>> t = columnEncoder.getBuildTasks(in, nPartition);
 			if(t == null)
 				continue;
 			// Linear execution between encoders so they can't be built in parallel
@@ -351,6 +351,15 @@ public class ColumnEncoderComposite extends ColumnEncoder {
 		return _columnEncoders.stream().anyMatch(encoder -> encoder.getClass().equals(type));
 	}
 
+	public <T extends ColumnEncoder> boolean hasBuild() {
+		for (ColumnEncoder e : _columnEncoders)
+			if (e.getClass().equals(ColumnEncoderRecode.class)
+				|| e.getClass().equals(ColumnEncoderDummycode.class)
+				|| e.getClass().equals(ColumnEncoderBin.class))
+				return true;
+		return false;
+	}
+
 	@Override
 	public void shiftCol(int columnOffset) {
 		super.shiftCol(columnOffset);
diff --git a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderDummycode.java b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderDummycode.java
index e0efe53..3366329 100644
--- a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderDummycode.java
+++ b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderDummycode.java
@@ -65,7 +65,7 @@ public class ColumnEncoderDummycode extends ColumnEncoder {
 	}
 
 	@Override
-	public List<DependencyTask<?>> getBuildTasks(CacheBlock in) {
+	public List<DependencyTask<?>> getBuildTasks(CacheBlock in, int nParition) {
 		return null;
 	}
 
diff --git a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderFeatureHash.java b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderFeatureHash.java
index 9445474..aa362fe 100644
--- a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderFeatureHash.java
+++ b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderFeatureHash.java
@@ -93,7 +93,7 @@ public class ColumnEncoderFeatureHash extends ColumnEncoder {
 	}
 
 	@Override
-	public List<DependencyTask<?>> getBuildTasks(CacheBlock in) {
+	public List<DependencyTask<?>> getBuildTasks(CacheBlock in, int nParition) {
 		return null;
 	}
 
diff --git a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderPassThrough.java b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderPassThrough.java
index 2f5739f..a134dfd 100644
--- a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderPassThrough.java
+++ b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderPassThrough.java
@@ -53,7 +53,7 @@ public class ColumnEncoderPassThrough extends ColumnEncoder {
 	}
 
 	@Override
-	public List<DependencyTask<?>> getBuildTasks(CacheBlock in) {
+	public List<DependencyTask<?>> getBuildTasks(CacheBlock in, int nParition) {
 		return null;
 	}
 
diff --git a/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java b/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
index aa7f408..00c7962 100644
--- a/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
+++ b/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
@@ -74,6 +74,7 @@ public class MultiColumnEncoder implements Encoder {
 	private EncoderOmit _legacyOmit = null;
 	private int _colOffset = 0; // offset for federated Workers who are using subrange encoders
 	private FrameBlock _meta = null;
+	private int[] _nPartitions = null;
 
 	public MultiColumnEncoder(List<ColumnEncoderComposite> columnEncoders) {
 		_columnEncoders = columnEncoders;
@@ -89,6 +90,7 @@ public class MultiColumnEncoder implements Encoder {
 
 	public MatrixBlock encode(CacheBlock in, int k) {
 		MatrixBlock out;
+		_nPartitions = getNumRowPartitions(in, k);
 		try {
 			if(k > 1 && !MULTI_THREADED_STAGES && !hasLegacyEncoder()) {
 				out = new MatrixBlock();
@@ -155,7 +157,7 @@ public class MultiColumnEncoder implements Encoder {
 
 		for(ColumnEncoderComposite e : _columnEncoders) {
 			// Create the build tasks
-			List<DependencyTask<?>> buildTasks = e.getBuildTasks(in);
+			List<DependencyTask<?>> buildTasks = e.getBuildTasks(in, _nPartitions[0]);
 			tasks.addAll(buildTasks);
 			if(buildTasks.size() > 0) {
 				// Check if any Build independent UpdateDC task (Bin+DC, FH+DC)
@@ -197,7 +199,7 @@ public class MultiColumnEncoder implements Encoder {
 			// Apply Task depends on InitOutputMatrixTask (output allocation)
 			depMap.put(new Integer[] {tasks.size(), tasks.size() + 1},         //ApplyTask
 					new Integer[] {0, 1});                                     //Allocation task (1st task)
-			ApplyTasksWrapperTask applyTaskWrapper = new ApplyTasksWrapperTask(e, in, out, pool);
+			ApplyTasksWrapperTask applyTaskWrapper = new ApplyTasksWrapperTask(e, in, out, _nPartitions[1], pool);
 
 			if(e.hasEncoder(ColumnEncoderDummycode.class)) {
 				// Allocation depends on build if DC is in the list.
@@ -244,6 +246,8 @@ public class MultiColumnEncoder implements Encoder {
 	public void build(CacheBlock in, int k) {
 		if(hasLegacyEncoder() && !(in instanceof FrameBlock))
 			throw new DMLRuntimeException("LegacyEncoders do not support non FrameBlock Inputs");
+		if(_nPartitions == null) //happens if this method is directly called from the tests
+			_nPartitions = getNumRowPartitions(in, k);
 		if(k > 1) {
 			buildMT(in, k);
 		}
@@ -260,7 +264,7 @@ public class MultiColumnEncoder implements Encoder {
 	private List<DependencyTask<?>> getBuildTasks(CacheBlock in) {
 		List<DependencyTask<?>> tasks = new ArrayList<>();
 		for(ColumnEncoderComposite columnEncoder : _columnEncoders) {
-			tasks.addAll(columnEncoder.getBuildTasks(in));
+			tasks.addAll(columnEncoder.getBuildTasks(in, _nPartitions[0]));
 		}
 		return tasks;
 	}
@@ -337,11 +341,11 @@ public class MultiColumnEncoder implements Encoder {
 		return out;
 	}
 
-	private List<DependencyTask<?>> getApplyTasks(CacheBlock in, MatrixBlock out, int outputCol) {
+	private List<DependencyTask<?>> getApplyTasks(CacheBlock in, MatrixBlock out, int nPartition, int outputCol) {
 		List<DependencyTask<?>> tasks = new ArrayList<>();
 		int offset = outputCol;
 		for(ColumnEncoderComposite e : _columnEncoders) {
-			tasks.addAll(e.getApplyTasks(in, out, e._colID - 1 + offset));
+			tasks.addAll(e.getApplyTasks(in, out, nPartition, e._colID - 1 + offset));
 			if(e.hasEncoder(ColumnEncoderDummycode.class))
 				offset += e.getEncoder(ColumnEncoderDummycode.class)._domainSize - 1;
 		}
@@ -354,12 +358,12 @@ public class MultiColumnEncoder implements Encoder {
 			if(APPLY_ENCODER_SEPARATE_STAGES){
 				int offset = outputCol;
 				for (ColumnEncoderComposite e : _columnEncoders) {
-					pool.submitAllAndWait(e.getApplyTasks(in, out, e._colID - 1 + offset));
+					pool.submitAllAndWait(e.getApplyTasks(in, out, _nPartitions[1], e._colID - 1 + offset));
 					if (e.hasEncoder(ColumnEncoderDummycode.class))
 						offset += e.getEncoder(ColumnEncoderDummycode.class)._domainSize - 1;
 				}
 			}else{
-				pool.submitAllAndWait(getApplyTasks(in, out, outputCol));
+				pool.submitAllAndWait(getApplyTasks(in, out, _nPartitions[1], outputCol));
 			}
 		}
 		catch(ExecutionException | InterruptedException e) {
@@ -369,6 +373,57 @@ public class MultiColumnEncoder implements Encoder {
 		pool.shutdown();
 	}
 
+	private int[] getNumRowPartitions(CacheBlock in, int k) {
+		int[] numBlocks = new int[2];
+		if (k == 1) { //single-threaded
+			numBlocks[0] = 1;
+			numBlocks[1] = 1;
+			return numBlocks;
+		}
+		// Read from global flags. These are set by the unit tests
+		if (ColumnEncoder.BUILD_ROW_BLOCKS_PER_COLUMN > 0)
+			numBlocks[0] = ColumnEncoder.BUILD_ROW_BLOCKS_PER_COLUMN;
+		if (ColumnEncoder.APPLY_ROW_BLOCKS_PER_COLUMN > 0)
+			numBlocks[1] = ColumnEncoder.APPLY_ROW_BLOCKS_PER_COLUMN;
+
+		// Read from the config file if set. These overwrite the derived values.
+		if (numBlocks[0] == 0 && ConfigurationManager.getParallelBuildBlocks() > 0)
+			numBlocks[0] = ConfigurationManager.getParallelBuildBlocks();
+		if (numBlocks[1] == 0 && ConfigurationManager.getParallelApplyBlocks() > 0)
+			numBlocks[1] = ConfigurationManager.getParallelApplyBlocks();
+
+		// Else, derive the optimum number of partitions
+		int nRow = in.getNumRows();
+		int nThread = OptimizerUtils.getTransformNumThreads(); //VCores
+		int minNumRows = 16000; //min rows per partition
+		// Count #Builds and #Applies (= #Col)
+		int nBuild = 0;
+		for (ColumnEncoderComposite e : _columnEncoders)
+			if (e.hasBuild())
+				nBuild++;
+		int nApply = in.getNumColumns();
+		// #BuildBlocks = (2 * #PhysicalCores)/#build
+		if (numBlocks[0] == 0 && nBuild < nThread)
+			numBlocks[0] = Math.round(((float)nThread)/nBuild);
+		// #ApplyBlocks = (4 * #PhysicalCores)/#apply
+		if (numBlocks[1] == 0 && nApply < nThread*2)
+			numBlocks[1] = Math.round(((float)nThread*2)/nBuild);
+
+		// Reduce #blocks if #rows per partition is too small
+		while (numBlocks[0] > 1 && nRow/numBlocks[0] < minNumRows)
+			numBlocks[0]--;
+		while (numBlocks[1] > 1 && nRow/numBlocks[1] < minNumRows)
+			numBlocks[1]--;
+
+		// Set to 1 if not set by the above logics
+		for (int i=0; i<2; i++)
+			if (numBlocks[i] == 0)
+				numBlocks[i] = 1; //default 1
+
+		return numBlocks;
+	}
+
+
 	private static void outputMatrixPreProcessing(MatrixBlock output, CacheBlock input, boolean hasDC) {
 		long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
 		if(output.isInSparseFormat()) {
@@ -426,7 +481,6 @@ public class MultiColumnEncoder implements Encoder {
 	private void outputMatrixPostProcessing(MatrixBlock output){
 		long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
 		int k = OptimizerUtils.getTransformNumThreads();
-		ForkJoinPool myPool = new ForkJoinPool(k);
 		if (k == 1) {
 			Set<Integer> indexSet = _columnEncoders.stream()
 					.map(ColumnEncoderComposite::getSparseRowsWZeros).flatMap(l -> {
@@ -441,14 +495,25 @@ public class MultiColumnEncoder implements Encoder {
 			}
 		}
 		else {
+			ExecutorService myPool = CommonThreadPool.get(k);
 			try {
-				Set<Integer> indexSet = _columnEncoders.stream().parallel()
+				// Collect the row indices that need compaction
+				Set<Integer> indexSet = myPool.submit(() ->
+					_columnEncoders.stream().parallel()
 					.map(ColumnEncoderComposite::getSparseRowsWZeros).flatMap(l -> {
 						if(l == null)
 							return null;
 						return l.stream();
-					}).collect(Collectors.toSet());
-				if(!indexSet.stream().parallel().allMatch(Objects::isNull)) {
+					}).collect(Collectors.toSet())
+				).get();
+
+				// Check if the set is empty
+				boolean emptySet = myPool.submit(() ->
+					indexSet.stream().parallel().allMatch(Objects::isNull)
+				).get();
+
+				// Concurrently compact the rows
+				if (emptySet) {
 					myPool.submit(() -> {
 						indexSet.stream().parallel().forEach(row -> {
 							output.getSparseBlock().get(row).compact();
@@ -459,8 +524,8 @@ public class MultiColumnEncoder implements Encoder {
 			catch(Exception ex) {
 				throw new DMLRuntimeException(ex);
 			}
+			myPool.shutdown();
 		}
-		myPool.shutdown();
 		output.recomputeNonZeros();
 		if(DMLScript.STATISTICS)
 			TransformStatistics.incOutMatrixPostProcessingTime(System.nanoTime()-t0);
@@ -929,20 +994,22 @@ public class MultiColumnEncoder implements Encoder {
 		private final ColumnEncoder _encoder;
 		private final MatrixBlock _out;
 		private final CacheBlock _in;
+		private final int _nApplyPartition;
 		private int _offset = -1; // offset dude to dummycoding in
 									// previous columns needs to be updated by external task!
 
 		private ApplyTasksWrapperTask(ColumnEncoder encoder, CacheBlock in, 
-				MatrixBlock out, DependencyThreadPool pool) {
+				MatrixBlock out, int nPart, DependencyThreadPool pool) {
 			super(pool);
 			_encoder = encoder;
 			_out = out;
 			_in = in;
+			_nApplyPartition = nPart;
 		}
 
 		@Override
 		public List<DependencyTask<?>> getWrappedTasks() {
-			return _encoder.getApplyTasks(_in, _out, _encoder._colID - 1 + _offset);
+			return _encoder.getApplyTasks(_in, _out, _nApplyPartition, _encoder._colID - 1 + _offset);
 		}
 
 		@Override