You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by ar...@apache.org on 2021/09/08 17:59:15 UTC

[systemds] branch master updated: [SYSTEMDS-2972] Transformencode sparse improvements

This is an automated email from the ASF dual-hosted git repository.

arnabp20 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new cdff113  [SYSTEMDS-2972] Transformencode sparse improvements
cdff113 is described below

commit cdff113648371b2dace5905dc0eb81ebf486f094
Author: Lukas Erlbacher <lu...@gmail.com>
AuthorDate: Wed Sep 8 19:58:50 2021 +0200

    [SYSTEMDS-2972] Transformencode sparse improvements
    
    This PR introduces the sparse implementations for the transform encoders.
    Furthermore, this adds support for row partitioning, statistics, and
    debug logging. Now we can enable multithreaded transorm via a flag
    in the config file, "parallel.encode".
    
    Closes #1388, closes #1383
---
 .../apache/sysds/conf/ConfigurationManager.java    |   4 +
 src/main/java/org/apache/sysds/conf/DMLConfig.java |   4 +-
 .../java/org/apache/sysds/hops/OptimizerUtils.java |  18 +++
 .../runtime/compress/CompressedMatrixBlock.java    |   5 -
 ...ltiReturnParameterizedBuiltinCPInstruction.java |   4 +-
 .../sysds/runtime/matrix/data/MatrixBlock.java     |  35 +----
 .../runtime/transform/encode/ColumnEncoder.java    | 141 +++++++++++++-----
 .../runtime/transform/encode/ColumnEncoderBin.java |  90 ++++++++++--
 .../transform/encode/ColumnEncoderComposite.java   |  26 ++--
 .../transform/encode/ColumnEncoderDummycode.java   | 115 +++++++--------
 .../transform/encode/ColumnEncoderFeatureHash.java |  64 +++++++--
 .../transform/encode/ColumnEncoderPassThrough.java |  90 ++++++++++--
 .../transform/encode/ColumnEncoderRecode.java      |  81 +++++++++--
 .../runtime/transform/encode/EncoderFactory.java   |  26 ++--
 .../runtime/transform/encode/EncoderMVImpute.java  |  36 +++--
 .../runtime/transform/encode/EncoderOmit.java      |  22 +--
 .../transform/encode/MultiColumnEncoder.java       |  91 ++++++++----
 .../apache/sysds/runtime/util/DependencyTask.java  |  24 +++-
 .../sysds/runtime/util/DependencyThreadPool.java   |  12 +-
 .../apache/sysds/runtime/util/UtilFunctions.java   |   9 ++
 .../java/org/apache/sysds/utils/Statistics.java    | 159 +++++++++++++++++++--
 .../mt/TransformFrameBuildMultithreadedTest.java   |   2 +
 .../mt/TransformFrameEncodeMultithreadedTest.java  |   4 +-
 .../datasets/homes3/homes.tfspec_dummy_all.json    |   1 -
 .../datasets/homes3/homes.tfspec_dummy_sparse.json |   1 +
 25 files changed, 805 insertions(+), 259 deletions(-)

diff --git a/src/main/java/org/apache/sysds/conf/ConfigurationManager.java b/src/main/java/org/apache/sysds/conf/ConfigurationManager.java
index 5a3667c..93654b7 100644
--- a/src/main/java/org/apache/sysds/conf/ConfigurationManager.java
+++ b/src/main/java/org/apache/sysds/conf/ConfigurationManager.java
@@ -171,6 +171,10 @@ public class ConfigurationManager
 		return getCompilerConfigFlag(ConfigType.PARALLEL_CP_MATRIX_OPERATIONS);
 	}
 	
+	public static boolean isParallelTransform() {
+		return getDMLConfig().getBooleanValue(DMLConfig.PARALLEL_ENCODE);
+	}
+	
 	public static boolean isParallelParFor() {
 		return getCompilerConfigFlag(ConfigType.PARALLEL_LOCAL_OR_REMOTE_PARFOR);
 	}
diff --git a/src/main/java/org/apache/sysds/conf/DMLConfig.java b/src/main/java/org/apache/sysds/conf/DMLConfig.java
index 0b7692b..db59505 100644
--- a/src/main/java/org/apache/sysds/conf/DMLConfig.java
+++ b/src/main/java/org/apache/sysds/conf/DMLConfig.java
@@ -67,6 +67,7 @@ public class DMLConfig
 	public static final String DEFAULT_BLOCK_SIZE   = "sysds.defaultblocksize";
 	public static final String CP_PARALLEL_OPS      = "sysds.cp.parallel.ops";
 	public static final String CP_PARALLEL_IO       = "sysds.cp.parallel.io";
+	public static final String PARALLEL_ENCODE      = "sysds.parallel.encode";  // boolean: enable multi-threaded transformencode and apply
 	public static final String COMPRESSED_LINALG    = "sysds.compressed.linalg";
 	public static final String COMPRESSED_LOSSY     = "sysds.compressed.lossy";
 	public static final String COMPRESSED_VALID_COMPRESSIONS = "sysds.compressed.valid.compressions";
@@ -125,6 +126,7 @@ public class DMLConfig
 		_defaultVals.put(DEFAULT_BLOCK_SIZE,     String.valueOf(OptimizerUtils.DEFAULT_BLOCKSIZE) );
 		_defaultVals.put(CP_PARALLEL_OPS,        "true" );
 		_defaultVals.put(CP_PARALLEL_IO,         "true" );
+		_defaultVals.put(PARALLEL_ENCODE,        "false" );
 		_defaultVals.put(COMPRESSED_LINALG,      Compression.CompressConfig.FALSE.name() );
 		_defaultVals.put(COMPRESSED_LOSSY,       "false" );
 		_defaultVals.put(COMPRESSED_VALID_COMPRESSIONS, "SDC,DDC");
@@ -398,7 +400,7 @@ public class DMLConfig
 	public String getConfigInfo()  {
 		String[] tmpConfig = new String[] { 
 			LOCAL_TMP_DIR,SCRATCH_SPACE,OPTIMIZATION_LEVEL, DEFAULT_BLOCK_SIZE,
-			CP_PARALLEL_OPS, CP_PARALLEL_IO, NATIVE_BLAS, NATIVE_BLAS_DIR,
+			CP_PARALLEL_OPS, CP_PARALLEL_IO, PARALLEL_ENCODE, NATIVE_BLAS, NATIVE_BLAS_DIR,
 			COMPRESSED_LINALG, COMPRESSED_LOSSY, COMPRESSED_VALID_COMPRESSIONS, COMPRESSED_OVERLAPPING,
 			COMPRESSED_SAMPLING_RATIO, COMPRESSED_COCODE, COMPRESSED_TRANSPOSE,
 			CODEGEN, CODEGEN_API, CODEGEN_COMPILER, CODEGEN_OPTIMIZER, CODEGEN_PLANCACHE, CODEGEN_LITERALS,
diff --git a/src/main/java/org/apache/sysds/hops/OptimizerUtils.java b/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
index 01769c7..d63b2cb 100644
--- a/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
+++ b/src/main/java/org/apache/sysds/hops/OptimizerUtils.java
@@ -1008,6 +1008,24 @@ public class OptimizerUtils
 			
 		return ret;
 	}
+
+	public static int getTransformNumThreads(int maxNumThreads)
+	{
+		//by default max local parallelism (vcores) 
+		int ret = InfrastructureAnalyzer.getLocalParallelism();
+		
+		//apply external max constraint (e.g., set by parfor or other rewrites)
+		if( maxNumThreads > 0 ) {
+			ret = Math.min(ret, maxNumThreads);
+		}
+		
+		//check if enabled in config.xml
+		if( !ConfigurationManager.isParallelTransform() ) {
+			ret = 1;
+		}
+			
+		return ret;
+	}
 	
 	public static Level getDefaultLogLevel() {
 		Level log = Logger.getRootLogger().getLevel();
diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
index 2c205f3..d374d3c 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
@@ -1345,11 +1345,6 @@ public class CompressedMatrixBlock extends MatrixBlock {
 	}
 
 	@Override
-	public void quickSetValueThreadSafe(int r, int c, double v) {
-		throw new DMLCompressionException("Thread safe execution does not work on Compressed Matrix");
-	}
-
-	@Override
 	public double quickGetValueThreadSafe(int r, int c) {
 		throw new DMLCompressionException("Thread safe execution does not work on Compressed Matrix");
 	}
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnParameterizedBuiltinCPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnParameterizedBuiltinCPInstruction.java
index b6e0d97..800aa51 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnParameterizedBuiltinCPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnParameterizedBuiltinCPInstruction.java
@@ -25,6 +25,7 @@ import java.util.List;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.sysds.common.Types.DataType;
 import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.hops.OptimizerUtils;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
 import org.apache.sysds.runtime.instructions.InstructionUtils;
@@ -85,7 +86,8 @@ public class MultiReturnParameterizedBuiltinCPInstruction extends ComputationCPI
 
 		// execute block transform encode
 		MultiColumnEncoder encoder = EncoderFactory.createEncoder(spec, colnames, fin.getNumColumns(), null);
-		MatrixBlock data = encoder.encode(fin); // build and apply
+		// TODO: Assign #threads in compiler and pass via the instruction string
+		MatrixBlock data = encoder.encode(fin, OptimizerUtils.getTransformNumThreads(-1)); // build and apply
 		FrameBlock meta = encoder.getMetaData(new FrameBlock(fin.getNumColumns(), ValueType.STRING));
 		meta.setColumnNames(colnames);
 
diff --git a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
index 160f8e9..a86a878 100644
--- a/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/matrix/data/MatrixBlock.java
@@ -117,7 +117,7 @@ import org.apache.sysds.utils.NativeHelper;
 
 public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizable {
 	// private static final Log LOG = LogFactory.getLog(MatrixBlock.class.getName());
-	
+
 	private static final long serialVersionUID = 7319972089143154056L;
 	
 	//sparsity nnz threshold, based on practical experiments on space consumption and performance
@@ -654,27 +654,6 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
 		}
 	}
 
-	/**
-	 * Thread save set.
-	 * Blocks need to be allocated, and in case of MCSR sparse, all rows 
-	 * that are going to be accessed need to be allocated as well.
-	 * 
-	 * @param r row 
-	 * @param c column 
-	 * @param v value
-	 */
-	public void quickSetValueThreadSafe(int r, int c, double v) {
-		if(sparse) {
-			if(!(sparseBlock instanceof SparseBlockMCSR))
-				throw new RuntimeException("Only MCSR Blocks are supported for Multithreaded sparse set.");
-			synchronized (sparseBlock.get(r)) {
-				sparseBlock.set(r,c,v);
-			}
-		}
-		else
-			denseBlock.set(r,c,v);
-	}
-
 	public double quickGetValueThreadSafe(int r, int c) {
 		if(sparse) {
 			if(!(sparseBlock instanceof SparseBlockMCSR))
@@ -976,7 +955,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
 
 	/**
 	 * Wrapper method for single threaded reduceall-colSum of a matrix.
-	 * 
+	 *
 	 * @return A new MatrixBlock containing the column sums of this matrix.
 	 */
 	public MatrixBlock colSum() {
@@ -986,7 +965,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
 
 	/**
 	 * Wrapper method for single threaded reduceall-rowSum of a matrix.
-	 * 
+	 *
 	 * @return A new MatrixBlock containing the row sums of this matrix.
 	 */
 	public MatrixBlock rowSum(){
@@ -1422,7 +1401,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
 			throw new RuntimeException( "Copy must not overwrite itself!" );
 		if(that instanceof CompressedMatrixBlock)
 			that = CompressedMatrixBlock.getUncompressed(that, "Copy not effecient into a MatrixBlock");
-		
+
 		rlen=that.rlen;
 		clen=that.clen;
 		sparse=sp;
@@ -2935,7 +2914,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
 		LibMatrixBincell.isValidDimensionsBinary(this, that);
 		if(thatValue instanceof CompressedMatrixBlock)
 			return ((CompressedMatrixBlock) thatValue).binaryOperationsLeft(op, this, result);
-		
+
 		//compute output dimensions
 		boolean outer = (LibMatrixBincell.getBinaryAccessType(this, that)
 				== BinaryAccessType.OUTER_VECTOR_VECTOR); 
@@ -2980,7 +2959,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
 			m2 = ((CompressedMatrixBlock) m2).getUncompressed("Ternay Operator arg2 " + op.fn.getClass().getSimpleName());
 		if(m3 instanceof CompressedMatrixBlock)
 			m3 = ((CompressedMatrixBlock) m3).getUncompressed("Ternay Operator arg3 " + op.fn.getClass().getSimpleName());
-		
+
 		//prepare inputs
 		final boolean s1 = (rlen==1 && clen==1);
 		final boolean s2 = (m2.rlen==1 && m2.clen==1);
@@ -3674,7 +3653,7 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
 						"Invalid nCol dimension for append rbind: was " + in[i].clen + " should be: " + clen);
 		}
 	}
-	
+
 	public static MatrixBlock naryOperations(Operator op, MatrixBlock[] matrices, ScalarObject[] scalars, MatrixBlock ret) {
 		//note: currently only min max, plus supported and hence specialized implementation
 		//prepare operator
diff --git a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java
index 33bf452..e04eeff 100644
--- a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java
+++ b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java
@@ -20,6 +20,7 @@
 package org.apache.sysds.runtime.transform.encode;
 
 import static org.apache.sysds.runtime.transform.encode.EncoderFactory.getEncoderType;
+import static org.apache.sysds.runtime.util.UtilFunctions.getBlockSizes;
 
 import java.io.Externalizable;
 import java.io.IOException;
@@ -45,6 +46,8 @@ import org.apache.sysds.runtime.util.DependencyThreadPool;
  */
 public abstract class ColumnEncoder implements Externalizable, Encoder, Comparable<ColumnEncoder> {
 	protected static final Log LOG = LogFactory.getLog(ColumnEncoder.class.getName());
+	protected static final int APPLY_ROW_BLOCKS_PER_COLUMN = 1;
+	public static int BUILD_ROW_BLOCKS_PER_COLUMN = 1;
 	private static final long serialVersionUID = 2299156350718979064L;
 	protected int _colID;
 
@@ -52,7 +55,23 @@ public abstract class ColumnEncoder implements Externalizable, Encoder, Comparab
 		_colID = colID;
 	}
 
-	public abstract MatrixBlock apply(MatrixBlock in, MatrixBlock out, int outputCol);
+	/**
+	 * Apply Functions are only used in Single Threaded or Multi-Threaded Dense context.
+	 * That's why there is no regard for MT sparse!
+	 *
+	 * @param in Input Block
+	 * @param out Output Matrix
+	 * @param outputCol The output column for the given column
+	 * @return same as out
+	 *
+	 */
+	public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int outputCol){
+		return apply(in, out, outputCol, 0, -1);
+	}
+
+	public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol){
+		return apply(in, out, outputCol, 0, -1);
+	}
 
 	public abstract MatrixBlock apply(MatrixBlock in, MatrixBlock out, int outputCol, int rowStart, int blk);
 
@@ -172,18 +191,18 @@ public abstract class ColumnEncoder implements Externalizable, Encoder, Comparab
 	 * complete if all previous tasks are done. This is so that we can use the last task as a dependency for the whole
 	 * build, reducing unnecessary dependencies.
 	 */
-	public List<DependencyTask<?>> getBuildTasks(FrameBlock in, int blockSize) {
+	public List<DependencyTask<?>> getBuildTasks(FrameBlock in) {
 		List<Callable<Object>> tasks = new ArrayList<>();
 		List<List<? extends Callable<?>>> dep = null;
-		if(blockSize <= 0 || blockSize >= in.getNumRows()) {
+		int nRows = in.getNumRows();
+		int[] blockSizes = getBlockSizes(nRows, getNumBuildRowPartitions());
+		if(blockSizes.length == 1) {
 			tasks.add(getBuildTask(in));
 		}
 		else {
 			HashMap<Integer, Object> ret = new HashMap<>();
-			for(int i = 0; i < in.getNumRows(); i = i + blockSize)
-				tasks.add(getPartialBuildTask(in, i, blockSize, ret));
-			if(in.getNumRows() % blockSize != 0)
-				tasks.add(getPartialBuildTask(in, in.getNumRows() - in.getNumRows() % blockSize, -1, ret));
+			for(int startRow = 0, i = 0; i < blockSizes.length; startRow+=blockSizes[i], i++)
+				tasks.add(getPartialBuildTask(in, startRow, blockSizes[i], ret));
 			tasks.add(getPartialMergeBuildTask(ret));
 			dep = new ArrayList<>(Collections.nCopies(tasks.size() - 1, null));
 			dep.add(tasks.subList(0, tasks.size() - 1));
@@ -198,24 +217,63 @@ public abstract class ColumnEncoder implements Externalizable, Encoder, Comparab
 	public Callable<Object> getPartialBuildTask(FrameBlock in, int startRow, int blockSize,
 		HashMap<Integer, Object> ret) {
 		throw new DMLRuntimeException(
-			"Trying to get the PartialBuild task of an Encoder which does not support  " + "partial building");
+			"Trying to get the PartialBuild task of an Encoder which does not support  partial building");
 	}
 
 	public Callable<Object> getPartialMergeBuildTask(HashMap<Integer, ?> ret) {
 		throw new DMLRuntimeException(
-			"Trying to get the BuildMergeTask task of an Encoder which does not support " + "partial building");
+			"Trying to get the BuildMergeTask task of an Encoder which does not support partial building");
 	}
 
 	public List<DependencyTask<?>> getApplyTasks(FrameBlock in, MatrixBlock out, int outputCol) {
-		List<Callable<Object>> tasks = new ArrayList<>();
-		tasks.add(new ColumnApplyTask(this, in, out, outputCol));
-		return DependencyThreadPool.createDependencyTasks(tasks, null);
+		return getApplyTasks(in, null, out, outputCol);
 	}
 
 	public List<DependencyTask<?>> getApplyTasks(MatrixBlock in, MatrixBlock out, int outputCol) {
+		return getApplyTasks(null, in, out, outputCol);
+	}
+
+	private List<DependencyTask<?>> getApplyTasks(FrameBlock inF, MatrixBlock inM, MatrixBlock out, int outputCol){
 		List<Callable<Object>> tasks = new ArrayList<>();
-		tasks.add(new ColumnApplyTask(this, in, out, outputCol));
-		return DependencyThreadPool.createDependencyTasks(tasks, null);
+		List<List<? extends Callable<?>>> dep = null;
+		if ((inF != null && inM != null) || (inF == null && inM == null))
+			throw new DMLRuntimeException("getApplyTasks needs to be called with either FrameBlock input " +
+					"or MatrixBlock input");
+		int nRows = inF == null ? inM.getNumRows() : inF.getNumRows();
+		int[] blockSizes = getBlockSizes(nRows, getNumApplyRowPartitions());
+		for(int startRow = 0, i = 0; i < blockSizes.length; startRow+=blockSizes[i], i++){
+			if(inF != null)
+				if(out.isInSparseFormat())
+					tasks.add(getSparseTask(inF, out, outputCol, startRow, blockSizes[i]));
+				else
+					tasks.add(new ColumnApplyTask<>(this, inF, out, outputCol, startRow, blockSizes[i]));
+			else
+			if(out.isInSparseFormat())
+				tasks.add(getSparseTask(inM, out, outputCol, startRow, blockSizes[i]));
+			else
+				tasks.add(new ColumnApplyTask<>(this, inM, out, outputCol, startRow, blockSizes[i]));
+		}
+		if(tasks.size() > 1){
+			dep = new ArrayList<>(Collections.nCopies(tasks.size(), null));
+			tasks.add(() -> null);  // Empty task as barrier
+			dep.add(tasks.subList(0, tasks.size()-1));
+		}
+
+		return DependencyThreadPool.createDependencyTasks(tasks, dep);
+	}
+
+	protected abstract ColumnApplyTask<? extends ColumnEncoder> 
+			getSparseTask(FrameBlock in, MatrixBlock out, int outputCol, int startRow, int blk);
+
+	protected abstract ColumnApplyTask<? extends ColumnEncoder> 
+			getSparseTask(MatrixBlock in, MatrixBlock out, int outputCol, int startRow, int blk);
+
+	protected int getNumApplyRowPartitions(){
+		return APPLY_ROW_BLOCKS_PER_COLUMN;
+	}
+
+	protected int getNumBuildRowPartitions(){
+		return BUILD_ROW_BLOCKS_PER_COLUMN;
 	}
 
 	public enum EncoderType {
@@ -226,39 +284,54 @@ public abstract class ColumnEncoder implements Externalizable, Encoder, Comparab
 	 * This is the base Task for each column apply. If no custom "getApplyTasks" is implemented in an Encoder this task
 	 * will be used.
 	 */
-	private static class ColumnApplyTask implements Callable<Object> {
+	protected static class ColumnApplyTask<T extends ColumnEncoder> implements Callable<Object> {
+
+		protected final T _encoder;
+		protected final FrameBlock _inputF;
+		protected final MatrixBlock _inputM;
+		protected final MatrixBlock _out;
+		protected final int _outputCol;
+		protected final int _startRow;
+		protected final int _blk;
+
+		protected ColumnApplyTask(T encoder, FrameBlock input, MatrixBlock out, int outputCol){
+			this(encoder, input, out, outputCol, 0, -1);
+		}
 
-		private final ColumnEncoder _encoder;
-		private final FrameBlock _inputF;
-		private final MatrixBlock _inputM;
-		private final MatrixBlock _out;
-		private final int _outputCol;
+		protected ColumnApplyTask(T encoder, MatrixBlock input, MatrixBlock out, int outputCol){
+			this(encoder, input, out, outputCol, 0, -1);
+		}
 
-		protected ColumnApplyTask(ColumnEncoder encoder, FrameBlock input, MatrixBlock out, int outputCol) {
-			_encoder = encoder;
-			_inputF = input;
-			_inputM = null;
-			_out = out;
-			_outputCol = outputCol;
+		protected ColumnApplyTask(T encoder, FrameBlock input, MatrixBlock out, int outputCol, int startRow, int blk) {
+			this(encoder, input, null, out, outputCol, startRow, blk);
 		}
 
-		protected ColumnApplyTask(ColumnEncoder encoder, MatrixBlock input, MatrixBlock out, int outputCol) {
+		protected ColumnApplyTask(T encoder, MatrixBlock input, MatrixBlock out, int outputCol, int startRow, int blk) {
+			this(encoder, null, input, out, outputCol, startRow, blk);
+		}
+		private  ColumnApplyTask(T encoder, FrameBlock inputF, MatrixBlock inputM, MatrixBlock out, int outputCol,
+								 int startRow, int blk){
 			_encoder = encoder;
-			_inputM = input;
-			_inputF = null;
+			_inputM = inputM;
+			_inputF = inputF;
 			_out = out;
 			_outputCol = outputCol;
+			_startRow = startRow;
+			_blk = blk;
 		}
 
 		@Override
-		public Void call() throws Exception {
+		public Object call() throws Exception {
 			assert _outputCol >= 0;
-			int _rowStart = 0;
-			int _blk = -1;
+			if(_out.isInSparseFormat()){
+				// this is an issue since most sparse Tasks modify the sparse structure so normal get and set calls are
+				// not possible.
+				throw new DMLRuntimeException("ColumnApplyTask called although output is in sparse format.");
+			}
 			if(_inputF == null)
-				_encoder.apply(_inputM, _out, _outputCol, _rowStart, _blk);
+				_encoder.apply(_inputM, _out, _outputCol, _startRow, _blk);
 			else
-				_encoder.apply(_inputF, _out, _outputCol, _rowStart, _blk);
+				_encoder.apply(_inputF, _out, _outputCol, _startRow, _blk);
 			return null;
 		}
 
diff --git a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderBin.java b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderBin.java
index b4d4800..5736c1e 100644
--- a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderBin.java
+++ b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderBin.java
@@ -28,11 +28,15 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.concurrent.Callable;
 
+import org.apache.commons.lang3.NotImplementedException;
 import org.apache.commons.lang3.tuple.MutableTriple;
+import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.lops.Lop;
+import org.apache.sysds.runtime.data.SparseRowVector;
 import org.apache.sysds.runtime.matrix.data.FrameBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.util.UtilFunctions;
+import org.apache.sysds.utils.Statistics;
 
 public class ColumnEncoderBin extends ColumnEncoder {
 	public static final String MIN_PREFIX = "min";
@@ -84,10 +88,13 @@ public class ColumnEncoderBin extends ColumnEncoder {
 
 	@Override
 	public void build(FrameBlock in) {
+		long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
 		if(!isApplicable())
 			return;
 		double[] pairMinMax = getMinMaxOfCol(in, _colID, 0, -1);
 		computeBins(pairMinMax[0], pairMinMax[1]);
+		if(DMLScript.STATISTICS)
+			Statistics.incTransformBinningBuildTime(System.nanoTime()-t0);
 	}
 
 	private static double[] getMinMaxOfCol(FrameBlock in, int colID, int startRow, int blockSize) {
@@ -118,6 +125,8 @@ public class ColumnEncoderBin extends ColumnEncoder {
 		return new BinMergePartialBuildTask(this, ret);
 	}
 
+
+
 	public void computeBins(double min, double max) {
 		// ensure allocated internal transformation metadata
 		if(_binMins == null || _binMaxs == null) {
@@ -146,39 +155,47 @@ public class ColumnEncoderBin extends ColumnEncoder {
 	}
 
 	@Override
-	public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol) {
-		return apply(in, out, outputCol, 0, -1);
-	}
-
-	@Override
-	public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int outputCol) {
-		return apply(in, out, outputCol, 0, -1);
-	}
-
-	@Override
 	public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol, int rowStart, int blk) {
+		long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
 		for(int i = rowStart; i < getEndIndex(in.getNumRows(), rowStart, blk); i++) {
 			double inVal = UtilFunctions.objectToDouble(in.getSchema()[_colID - 1], in.get(i, _colID - 1));
 			int ix = Arrays.binarySearch(_binMaxs, inVal);
 			int binID = ((ix < 0) ? Math.abs(ix + 1) : ix) + 1;
-			out.quickSetValueThreadSafe(i, outputCol, binID);
+			out.quickSetValue(i, outputCol, binID);
 		}
+		if (DMLScript.STATISTICS)
+			Statistics.incTransformBinningApplyTime(System.nanoTime()-t0);
 		return out;
 	}
 
 	@Override
 	public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int outputCol, int rowStart, int blk) {
+		long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
 		int end = getEndIndex(in.getNumRows(), rowStart, blk);
 		for(int i = rowStart; i < end; i++) {
 			double inVal = in.quickGetValueThreadSafe(i, _colID - 1);
 			int ix = Arrays.binarySearch(_binMaxs, inVal);
 			int binID = ((ix < 0) ? Math.abs(ix + 1) : ix) + 1;
-			out.quickSetValueThreadSafe(i, outputCol, binID);
+			out.quickSetValue(i, outputCol, binID);
 		}
+		if (DMLScript.STATISTICS)
+			Statistics.incTransformBinningApplyTime(System.nanoTime()-t0);
 		return out;
 	}
 
 	@Override
+	protected ColumnApplyTask<? extends ColumnEncoder> 
+		getSparseTask(FrameBlock in, MatrixBlock out, int outputCol, int startRow, int blk) {
+		return new BinSparseApplyTask(this, in, out, outputCol);
+	}
+
+	@Override
+	protected ColumnApplyTask<? extends ColumnEncoder> 
+		getSparseTask(MatrixBlock in, MatrixBlock out, int outputCol, int startRow, int blk) {
+		throw new NotImplementedException("Sparse Binning for MatrixBlocks not jet implemented");
+	}
+
+	@Override
 	public void mergeAt(ColumnEncoder other) {
 		if(other instanceof ColumnEncoderBin) {
 			ColumnEncoderBin otherBin = (ColumnEncoderBin) other;
@@ -264,6 +281,43 @@ public class ColumnEncoderBin extends ColumnEncoder {
 		}
 	}
 
+	private static class BinSparseApplyTask extends ColumnApplyTask<ColumnEncoderBin> {
+
+		public BinSparseApplyTask(ColumnEncoderBin encoder, FrameBlock input, 
+				MatrixBlock out, int outputCol, int startRow, int blk) {
+			super(encoder, input, out, outputCol, startRow, blk);
+		}
+
+		private BinSparseApplyTask(ColumnEncoderBin encoder, FrameBlock input, MatrixBlock out, int outputCol) {
+			super(encoder, input, out, outputCol);
+		}
+
+		public Object call() throws Exception {
+			long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
+			int index = _encoder._colID - 1;
+			if(_out.getSparseBlock() == null)
+				return null;
+			assert _inputF != null;
+			for(int r = _startRow; r < getEndIndex(_inputF.getNumRows(), _startRow, _blk); r++) {
+				SparseRowVector row = (SparseRowVector) _out.getSparseBlock().get(r);
+				double inVal = UtilFunctions.objectToDouble(_inputF.getSchema()[index], _inputF.get(r, index));
+				int ix = Arrays.binarySearch(_encoder._binMaxs, inVal);
+				int binID = ((ix < 0) ? Math.abs(ix + 1) : ix) + 1;
+				row.values()[index] = binID;
+				row.indexes()[index] = _outputCol;
+			}
+			if(DMLScript.STATISTICS)
+				Statistics.incTransformBinningApplyTime(System.nanoTime()-t0);
+			return null;
+		}
+
+		@Override
+		public String toString() {
+			return getClass().getSimpleName() + "<ColId: " + _encoder._colID + ">";
+		}
+
+	}
+
 	private static class BinPartialBuildTask implements Callable<Object> {
 
 		private final FrameBlock _input;
@@ -284,7 +338,13 @@ public class ColumnEncoderBin extends ColumnEncoder {
 
 		@Override
 		public double[] call() throws Exception {
-			_partialMinMax.put(_startRow, getMinMaxOfCol(_input, _colID, _startRow, _blockSize));
+			long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
+			double[] minMax = getMinMaxOfCol(_input, _colID, _startRow, _blockSize);
+			synchronized (_partialMinMax){
+				_partialMinMax.put(_startRow, minMax);
+			}
+			if (DMLScript.STATISTICS)
+				Statistics.incTransformBinningBuildTime(System.nanoTime()-t0);
 			return null;
 		}
 
@@ -306,6 +366,7 @@ public class ColumnEncoderBin extends ColumnEncoder {
 
 		@Override
 		public Object call() throws Exception {
+			long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
 			double min = Double.POSITIVE_INFINITY;
 			double max = Double.NEGATIVE_INFINITY;
 			for(Object minMax : _partialMaps.values()) {
@@ -313,6 +374,9 @@ public class ColumnEncoderBin extends ColumnEncoder {
 				max = Math.max(max, ((double[]) minMax)[1]);
 			}
 			_encoder.computeBins(min, max);
+
+			if(DMLScript.STATISTICS)
+				Statistics.incTransformBinningBuildTime(System.nanoTime()-t0);
 			return null;
 		}
 
diff --git a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderComposite.java b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderComposite.java
index 54b8795..f7611c3 100644
--- a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderComposite.java
+++ b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderComposite.java
@@ -139,11 +139,23 @@ public class ColumnEncoderComposite extends ColumnEncoder {
 	}
 
 	@Override
-	public List<DependencyTask<?>> getBuildTasks(FrameBlock in, int blockSize) {
+	protected ColumnApplyTask<? extends ColumnEncoder> 
+		getSparseTask(MatrixBlock in, MatrixBlock out, int outputCol, int startRow, int blk) {
+		throw new NotImplementedException();
+	}
+
+	@Override
+	protected ColumnApplyTask<? extends ColumnEncoder> 
+		getSparseTask(FrameBlock in, MatrixBlock out, int outputCol, int startRow, int blk) {
+		throw new NotImplementedException();
+	}
+
+	@Override
+	public List<DependencyTask<?>> getBuildTasks(FrameBlock in) {
 		List<DependencyTask<?>> tasks = new ArrayList<>();
 		Map<Integer[], Integer[]> depMap = null;
 		for(ColumnEncoder columnEncoder : _columnEncoders) {
-			List<DependencyTask<?>> t = columnEncoder.getBuildTasks(in, blockSize);
+			List<DependencyTask<?>> t = columnEncoder.getBuildTasks(in);
 			if(t == null)
 				continue;
 			// Linear execution between encoders so they can't be built in parallel
@@ -179,16 +191,6 @@ public class ColumnEncoderComposite extends ColumnEncoder {
 	}
 
 	@Override
-	public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol) {
-		return apply(in, out, outputCol, 0, -1);
-	}
-
-	@Override
-	public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int outputCol) {
-		return apply(in, out, outputCol, 0, -1);
-	}
-
-	@Override
 	public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol, int rowStart, int blk) {
 		try {
 			for(int i = 0; i < _columnEncoders.size(); i++) {
diff --git a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderDummycode.java b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderDummycode.java
index 25b2eb9..1047f54 100644
--- a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderDummycode.java
+++ b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderDummycode.java
@@ -24,17 +24,15 @@ import static org.apache.sysds.runtime.util.UtilFunctions.getEndIndex;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
-import java.util.concurrent.Callable;
 
+import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.runtime.DMLRuntimeException;
-import org.apache.sysds.runtime.data.SparseRowVector;
 import org.apache.sysds.runtime.matrix.data.FrameBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.util.DependencyTask;
-import org.apache.sysds.runtime.util.DependencyThreadPool;
+import org.apache.sysds.utils.Statistics;
 
 public class ColumnEncoderDummycode extends ColumnEncoder {
 	private static final long serialVersionUID = 5832130477659116489L;
@@ -60,26 +58,18 @@ public class ColumnEncoderDummycode extends ColumnEncoder {
 	}
 
 	@Override
-	public List<DependencyTask<?>> getBuildTasks(FrameBlock in, int blockSize) {
+	public List<DependencyTask<?>> getBuildTasks(FrameBlock in) {
 		return null;
 	}
 
 	@Override
-	public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol) {
-		return apply(in, out, outputCol, 0, -1);
-	}
-
-	public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int outputCol) {
-		return apply(in, out, outputCol, 0, -1);
-	}
-
-	@Override
 	public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol, int rowStart, int blk) {
 		throw new DMLRuntimeException("Called DummyCoder with FrameBlock");
 	}
 
 	@Override
 	public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int outputCol, int rowStart, int blk) {
+		long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
 		// Out Matrix should already be correct size!
 		// append dummy coded or unchanged values to output
 		for(int i = rowStart; i < getEndIndex(in.getNumRows(), rowStart, blk); i++) {
@@ -89,20 +79,25 @@ public class ColumnEncoderDummycode extends ColumnEncoder {
 			int nCol = outputCol + (int) val - 1;
 			// Setting value to 0 first in case of sparse so the row vector does not need to be resized
 			if(nCol != outputCol)
-				out.quickSetValueThreadSafe(i, outputCol, 0);
-			out.quickSetValueThreadSafe(i, nCol, 1);
+				out.quickSetValue(i, outputCol, 0);
+			out.quickSetValue(i, nCol, 1);
 		}
+		if (DMLScript.STATISTICS)
+			Statistics.incTransformDummyCodeApplyTime(System.nanoTime()-t0);
 		return out;
 	}
 
+
+	@Override
+	protected ColumnApplyTask<? extends ColumnEncoder> 
+		getSparseTask(MatrixBlock in, MatrixBlock out, int outputCol, int startRow, int blk) {
+		return new DummycodeSparseApplyTask(this, in, out, outputCol, startRow, blk);
+	}
+
 	@Override
-	public List<DependencyTask<?>> getApplyTasks(MatrixBlock in, MatrixBlock out, int outputCol) {
-		List<Callable<Object>> tasks = new ArrayList<>();
-		if(out.isInSparseFormat())
-			tasks.add(new DummycodeSparseApplyTask(this, in, out, outputCol));
-		else
-			return super.getApplyTasks(in, out, outputCol);
-		return DependencyThreadPool.createDependencyTasks(tasks, null);
+	protected ColumnApplyTask<? extends ColumnEncoder> 
+		getSparseTask(FrameBlock in, MatrixBlock out, int outputCol, int startRow, int blk) {
+		throw new DMLRuntimeException("Called DummyCoder with FrameBlock");
 	}
 
 	@Override
@@ -139,6 +134,7 @@ public class ColumnEncoderDummycode extends ColumnEncoder {
 
 			if(distinct != -1) {
 				_domainSize = distinct;
+				LOG.debug("DummyCoder for column: " + _colID + " has domain size: " + _domainSize);
 			}
 		}
 	}
@@ -188,48 +184,47 @@ public class ColumnEncoderDummycode extends ColumnEncoder {
 		return _domainSize;
 	}
 
-	private static class DummycodeSparseApplyTask implements Callable<Object> {
-		private final ColumnEncoderDummycode _encoder;
-		private final MatrixBlock _input;
-		private final MatrixBlock _out;
-		private final int _outputCol;
+	private static class DummycodeSparseApplyTask extends ColumnApplyTask<ColumnEncoderDummycode> {
+
+		protected DummycodeSparseApplyTask(ColumnEncoderDummycode encoder, MatrixBlock input, 
+				MatrixBlock out, int outputCol) {
+			super(encoder, input, out, outputCol);
+		}
 
-		private DummycodeSparseApplyTask(ColumnEncoderDummycode encoder, MatrixBlock input, MatrixBlock out,
-			int outputCol) {
-			_encoder = encoder;
-			_input = input;
-			_out = out;
-			_outputCol = outputCol;
+		protected DummycodeSparseApplyTask(ColumnEncoderDummycode encoder, MatrixBlock input, 
+				MatrixBlock out, int outputCol, int startRow, int blk) {
+			super(encoder, input, out, outputCol, startRow, blk);
 		}
 
 		public Object call() throws Exception {
-			for(int r = 0; r < _input.getNumRows(); r++) {
-				if(_out.getSparseBlock() == null)
-					return null;
-				synchronized(_out.getSparseBlock().get(r)) {
-					// Since the recoded values are already offset in the output matrix (same as input at this point)
-					// the dummycoding only needs to offset them within their column domain. Which means that the
-					// indexes in the SparseRowVector do not need to be sorted anymore and can be updated directly.
-					//
-					// Input: Output:
-					//
-					// 1 | 0 | 2 | 0 1 | 0 | 0 | 1
-					// 2 | 0 | 1 | 0 ===> 0 | 1 | 1 | 0
-					// 1 | 0 | 2 | 0 1 | 0 | 0 | 1
-					// 1 | 0 | 1 | 0 1 | 0 | 1 | 0
-					//
-					// Example SparseRowVector Internals (1. row):
-					//
-					// indexes = [0,2] ===> indexes = [0,3]
-					// values = [1,2] values = [1,1]
-					int index = ((SparseRowVector) _out.getSparseBlock().get(r)).getIndex(_outputCol);
-					double val = _out.getSparseBlock().get(r).values()[index];
-					int nCol = _outputCol + (int) val - 1;
-
-					_out.getSparseBlock().get(r).indexes()[index] = nCol;
-					_out.getSparseBlock().get(r).values()[index] = 1;
-				}
+			long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
+			assert _inputM != null;
+			if(_out.getSparseBlock() == null)
+				return null;
+			for(int r = _startRow; r < getEndIndex(_inputM.getNumRows(), _startRow, _blk); r++) {
+				// Since the recoded values are already offset in the output matrix (same as input at this point)
+				// the dummycoding only needs to offset them within their column domain. Which means that the
+				// indexes in the SparseRowVector do not need to be sorted anymore and can be updated directly.
+				//
+				// Input: Output:
+				//
+				// 1 | 0 | 2 | 0 		1 | 0 | 0 | 1
+				// 2 | 0 | 1 | 0 ===> 	0 | 1 | 1 | 0
+				// 1 | 0 | 2 | 0 		1 | 0 | 0 | 1
+				// 1 | 0 | 1 | 0 		1 | 0 | 1 | 0
+				//
+				// Example SparseRowVector Internals (1. row):
+				//
+				// indexes = [0,2] ===> indexes = [0,3]
+				// values = [1,2] values = [1,1]
+				int index = _encoder._colID - 1;
+				double val = _out.getSparseBlock().get(r).values()[index];
+				int nCol = _outputCol + (int) val - 1;
+				_out.getSparseBlock().get(r).indexes()[index] = nCol;
+				_out.getSparseBlock().get(r).values()[index] = 1;
 			}
+			if (DMLScript.STATISTICS)
+				Statistics.incTransformDummyCodeApplyTime(System.nanoTime()-t0);
 			return null;
 		}
 
diff --git a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderFeatureHash.java b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderFeatureHash.java
index 1c74ae5..d30d8dc 100644
--- a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderFeatureHash.java
+++ b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderFeatureHash.java
@@ -26,11 +26,15 @@ import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.util.List;
 
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.data.SparseRowVector;
 import org.apache.sysds.runtime.matrix.data.FrameBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.util.DependencyTask;
 import org.apache.sysds.runtime.util.UtilFunctions;
+import org.apache.sysds.utils.Statistics;
 
 /**
  * Class used for feature hashing transformation of frames.
@@ -56,7 +60,7 @@ public class ColumnEncoderFeatureHash extends ColumnEncoder {
 	}
 
 	private long getCode(String key) {
-		return key.hashCode() % _K;
+		return (key.hashCode() % _K) + 1;
 	}
 
 	@Override
@@ -65,22 +69,25 @@ public class ColumnEncoderFeatureHash extends ColumnEncoder {
 	}
 
 	@Override
-	public List<DependencyTask<?>> getBuildTasks(FrameBlock in, int blockSize) {
+	public List<DependencyTask<?>> getBuildTasks(FrameBlock in) {
 		return null;
 	}
 
 	@Override
-	public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol) {
-		return apply(in, out, outputCol, 0, -1);
+	protected ColumnApplyTask<? extends ColumnEncoder> 
+		getSparseTask(FrameBlock in, MatrixBlock out, int outputCol, int startRow, int blk) {
+		return new FeatureHashSparseApplyTask(this, in, out, outputCol, startRow, blk);
 	}
 
 	@Override
-	public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int outputCol) {
-		return apply(in, out, outputCol, 0, -1);
+	protected ColumnApplyTask<? extends ColumnEncoder> 
+		getSparseTask(MatrixBlock in, MatrixBlock out, int outputCol, int startRow, int blk) {
+		throw new NotImplementedException("Sparse FeatureHashing for MatrixBlocks not jet implemented");
 	}
 
 	@Override
 	public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol, int rowStart, int blk) {
+		long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
 		// apply feature hashing column wise
 		for(int i = rowStart; i < getEndIndex(in.getNumRows(), rowStart, blk); i++) {
 			Object okey = in.get(i, _colID - 1);
@@ -88,21 +95,26 @@ public class ColumnEncoderFeatureHash extends ColumnEncoder {
 			if(key == null)
 				throw new DMLRuntimeException("Missing Value encountered in input Frame for FeatureHash");
 			long code = getCode(key);
-			out.quickSetValueThreadSafe(i, outputCol, (code >= 0) ? code : Double.NaN);
+			out.quickSetValue(i, outputCol, (code >= 0) ? code : Double.NaN);
 		}
+		if(DMLScript.STATISTICS)
+			Statistics.incTransformFeatureHashingApplyTime(System.nanoTime()-t0);
 		return out;
 	}
 
 	@Override
 	public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int outputCol, int rowStart, int blk) {
+		long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
 		int end = getEndIndex(in.getNumRows(), rowStart, blk);
 		// apply feature hashing column wise
 		for(int i = rowStart; i < end; i++) {
 			Object okey = in.quickGetValueThreadSafe(i, _colID - 1);
 			String key = okey.toString();
 			long code = getCode(key);
-			out.quickSetValueThreadSafe(i, outputCol, (code >= 0) ? code : Double.NaN);
+			out.quickSetValue(i, outputCol, (code >= 0) ? code : Double.NaN);
 		}
+		if(DMLScript.STATISTICS)
+			Statistics.incTransformFeatureHashingApplyTime(System.nanoTime()-t0);
 		return out;
 	}
 
@@ -145,4 +157,40 @@ public class ColumnEncoderFeatureHash extends ColumnEncoder {
 		super.readExternal(in);
 		_K = in.readLong();
 	}
+
+	public static class FeatureHashSparseApplyTask extends ColumnApplyTask<ColumnEncoderFeatureHash>{
+
+		public FeatureHashSparseApplyTask(ColumnEncoderFeatureHash encoder, FrameBlock input, 
+				MatrixBlock out, int outputCol, int startRow, int blk) {
+			super(encoder, input, out, outputCol, startRow, blk);
+		}
+
+		public FeatureHashSparseApplyTask(ColumnEncoderFeatureHash encoder, FrameBlock input, 
+				MatrixBlock out, int outputCol) {
+			super(encoder, input, out, outputCol);
+		}
+
+		@Override
+		public Object call() throws Exception {
+			if(_out.getSparseBlock() == null)
+				return null;
+			long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
+			int index = _encoder._colID - 1;
+			assert _inputF != null;
+			for(int r = _startRow; r < getEndIndex(_inputF.getNumRows(), _startRow, _blk); r++){
+				SparseRowVector row = (SparseRowVector) _out.getSparseBlock().get(r);
+				Object okey = _inputF.get(r, index);
+				String key = (okey != null) ? okey.toString() : null;
+				if(key == null)
+					throw new DMLRuntimeException("Missing Value encountered in input Frame for FeatureHash");
+				long code = _encoder.getCode(key);
+				row.values()[index] = code;
+				row.indexes()[index] = _outputCol;
+			}
+			if(DMLScript.STATISTICS)
+				Statistics.incTransformFeatureHashingApplyTime(System.nanoTime()-t0);
+			return null;
+		}
+	}
+
 }
diff --git a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderPassThrough.java b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderPassThrough.java
index 7a8df24..5c7392c 100644
--- a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderPassThrough.java
+++ b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderPassThrough.java
@@ -21,16 +21,22 @@ package org.apache.sysds.runtime.transform.encode;
 
 import static org.apache.sysds.runtime.util.UtilFunctions.getEndIndex;
 
+import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.common.Types.ValueType;
+import org.apache.sysds.runtime.data.SparseRowVector;
 import org.apache.sysds.runtime.matrix.data.FrameBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.util.DependencyTask;
 import org.apache.sysds.runtime.util.UtilFunctions;
+import org.apache.sysds.utils.Statistics;
 
 public class ColumnEncoderPassThrough extends ColumnEncoder {
 	private static final long serialVersionUID = -8473768154646831882L;
+	private List<Integer> sparseRowsWZeros = null;
 
 	protected ColumnEncoderPassThrough(int ptCols) {
 		super(ptCols); // 1-based
@@ -40,37 +46,46 @@ public class ColumnEncoderPassThrough extends ColumnEncoder {
 		this(-1);
 	}
 
+	public List<Integer> getSparseRowsWZeros(){
+		return sparseRowsWZeros;
+	}
+
 	@Override
 	public void build(FrameBlock in) {
 		// do nothing
 	}
 
 	@Override
-	public List<DependencyTask<?>> getBuildTasks(FrameBlock in, int blockSize) {
+	public List<DependencyTask<?>> getBuildTasks(FrameBlock in) {
 		return null;
 	}
 
 	@Override
-	public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol) {
-		return apply(in, out, outputCol, 0, -1);
+	protected ColumnApplyTask<? extends ColumnEncoder> 
+		getSparseTask(FrameBlock in, MatrixBlock out, int outputCol, int startRow, int blk) {
+		return new PassThroughSparseApplyTask(this, in, out, outputCol, startRow, blk);
 	}
 
 	@Override
-	public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int outputCol) {
-		return apply(in, out, outputCol, 0, -1);
+	protected ColumnApplyTask<? extends ColumnEncoder> 
+		getSparseTask(MatrixBlock in, MatrixBlock out, int outputCol, int startRow, int blk) {
+		throw new NotImplementedException("Sparse PassThrough for MatrixBlocks not jet implemented");
 	}
 
 	@Override
 	public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol, int rowStart, int blk) {
+		long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
 		int col = _colID - 1; // 1-based
 		ValueType vt = in.getSchema()[col];
 		for(int i = rowStart; i < getEndIndex(in.getNumRows(), rowStart, blk); i++) {
 			Object val = in.get(i, col);
 			double v = (val == null ||
-				(vt == ValueType.STRING && val.toString().isEmpty())) ? Double.NaN : UtilFunctions.objectToDouble(vt,
-					val);
-			out.quickSetValueThreadSafe(i, outputCol, v);
+				(vt == ValueType.STRING && val.toString().isEmpty())) 
+					? Double.NaN : UtilFunctions.objectToDouble(vt, val);
+			out.quickSetValue(i, outputCol, v);
 		}
+		if(DMLScript.STATISTICS)
+			Statistics.incTransformPassThroughApplyTime(System.nanoTime()-t0);
 		return out;
 	}
 
@@ -79,12 +94,15 @@ public class ColumnEncoderPassThrough extends ColumnEncoder {
 		// only transfer from in to out
 		if(in == out)
 			return out;
+		long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
 		int col = _colID - 1; // 1-based
 		int end = getEndIndex(in.getNumRows(), rowStart, blk);
 		for(int i = rowStart; i < end; i++) {
 			double val = in.quickGetValueThreadSafe(i, col);
-			out.quickSetValueThreadSafe(i, outputCol, val);
+			out.quickSetValue(i, outputCol, val);
 		}
+		if(DMLScript.STATISTICS)
+			Statistics.incTransformPassThroughApplyTime(System.nanoTime()-t0);
 		return out;
 	}
 
@@ -106,4 +124,58 @@ public class ColumnEncoderPassThrough extends ColumnEncoder {
 	public void initMetaData(FrameBlock meta) {
 		// do nothing
 	}
+
+	public static class PassThroughSparseApplyTask extends ColumnApplyTask<ColumnEncoderPassThrough>{
+
+
+		protected PassThroughSparseApplyTask(ColumnEncoderPassThrough encoder, FrameBlock input, 
+				MatrixBlock out, int outputCol) {
+			super(encoder, input, out, outputCol);
+		}
+
+		protected PassThroughSparseApplyTask(ColumnEncoderPassThrough encoder, FrameBlock input, MatrixBlock out, 
+				int outputCol, int startRow, int blk) {
+			super(encoder, input, out, outputCol, startRow, blk);
+		}
+
+		@Override
+		public Object call() throws Exception {
+			if(_out.getSparseBlock() == null)
+				return null;
+			long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
+			int index = _encoder._colID - 1;
+			assert _inputF != null;
+			List<Integer> sparseRowsWZeros = null;
+			ValueType vt = _inputF.getSchema()[index];
+			for(int r = _startRow; r < getEndIndex(_inputF.getNumRows(), _startRow, _blk); r++) {
+				Object val = _inputF.get(r, index);
+				double v = (val == null || (vt == ValueType.STRING && val.toString().isEmpty())) ?
+						Double.NaN : UtilFunctions.objectToDouble(vt, val);
+				SparseRowVector row = (SparseRowVector) _out.getSparseBlock().get(r);
+				if(v == 0) {
+					if(sparseRowsWZeros == null)
+						sparseRowsWZeros = new ArrayList<>();
+					sparseRowsWZeros.add(r);
+				}
+				row.values()[index] = v;
+				row.indexes()[index] = _outputCol;
+			}
+			if(sparseRowsWZeros != null){
+				synchronized (_encoder){
+					if(_encoder.sparseRowsWZeros == null)
+						_encoder.sparseRowsWZeros = new ArrayList<>();
+					_encoder.sparseRowsWZeros.addAll(sparseRowsWZeros);
+				}
+			}
+			if(DMLScript.STATISTICS)
+				Statistics.incTransformPassThroughApplyTime(System.nanoTime()-t0);
+			return null;
+		}
+
+		public String toString() {
+			return getClass().getSimpleName() + "<ColId: " + _encoder._colID + ">";
+		}
+
+	}
+
 }
diff --git a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderRecode.java b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderRecode.java
index fd18d86..e190d74 100644
--- a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderRecode.java
+++ b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderRecode.java
@@ -33,10 +33,13 @@ import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.concurrent.Callable;
 
+import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.lops.Lop;
 import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.data.SparseRowVector;
 import org.apache.sysds.runtime.matrix.data.FrameBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.utils.Statistics;
 
 public class ColumnEncoderRecode extends ColumnEncoder {
 	private static final long serialVersionUID = 8213163881283341874L;
@@ -135,7 +138,11 @@ public class ColumnEncoderRecode extends ColumnEncoder {
 	public void build(FrameBlock in) {
 		if(!isApplicable())
 			return;
+		long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
 		makeRcdMap(in, _rcdMap, _colID, 0, in.getNumRows());
+		if(DMLScript.STATISTICS){
+			Statistics.incTransformRecodeBuildTime(System.nanoTime() - t0);
+		}
 	}
 
 	@Override
@@ -186,18 +193,17 @@ public class ColumnEncoderRecode extends ColumnEncoder {
 	}
 
 	@Override
-	public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol) {
-		return apply(in, out, outputCol, 0, -1);
-	}
-
-	@Override
 	public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol, int rowStart, int blk) {
+		long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
 		// FrameBlock is column Major and MatrixBlock row Major this results in cache inefficiencies :(
 		for(int i = rowStart; i < getEndIndex(in.getNumRows(), rowStart, blk); i++) {
 			Object okey = in.get(i, _colID - 1);
 			String key = (okey != null) ? okey.toString() : null;
 			long code = lookupRCDMap(key);
-			out.quickSetValueThreadSafe(i, outputCol, (code >= 0) ? code : Double.NaN);
+			out.quickSetValue(i, outputCol, (code >= 0) ? code : Double.NaN);
+		}
+		if(DMLScript.STATISTICS){
+			Statistics.incTransformRecodeApplyTime(System.nanoTime() - t0);
 		}
 		return out;
 	}
@@ -209,9 +215,16 @@ public class ColumnEncoderRecode extends ColumnEncoder {
 	}
 
 	@Override
-	public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int outputCol) {
-		throw new DMLRuntimeException(
-			"Recode called with MatrixBlock. Should not happen since Recode is the first " + "encoder in the Stack");
+	protected ColumnApplyTask<? extends ColumnEncoder> 
+		getSparseTask(FrameBlock in, MatrixBlock out, int outputCol, int startRow, int blk){
+		return new RecodeSparseApplyTask(this, in ,out, outputCol, startRow, blk);
+	}
+
+	@Override
+	protected ColumnApplyTask<? extends ColumnEncoder> 
+		getSparseTask(MatrixBlock in, MatrixBlock out, int outputCol, int startRow, int blk) {
+		throw new DMLRuntimeException("Recode called with MatrixBlock. Should not happen since Recode is the first " +
+				"encoder in the Stack");
 	}
 
 	@Override
@@ -313,6 +326,48 @@ public class ColumnEncoderRecode extends ColumnEncoder {
 		return _rcdMap;
 	}
 
+	private static class RecodeSparseApplyTask extends ColumnApplyTask<ColumnEncoderRecode>{
+
+		public RecodeSparseApplyTask(ColumnEncoderRecode encoder, FrameBlock input, MatrixBlock out, int outputCol) {
+			super(encoder, input, out, outputCol);
+		}
+
+		protected RecodeSparseApplyTask(ColumnEncoderRecode encoder, FrameBlock input, MatrixBlock out, 
+				int outputCol, int startRow, int blk) {
+			super(encoder, input, out, outputCol, startRow, blk);
+		}
+
+		public Object call() throws Exception {
+			int index = _encoder._colID - 1;
+			long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
+			if(_out.getSparseBlock() == null)
+				return null;
+			assert _inputF != null;
+			for(int r = _startRow; r < getEndIndex(_inputF.getNumRows(), _startRow, _blk); r++) {
+				SparseRowVector row = (SparseRowVector) _out.getSparseBlock().get(r);
+				Object okey = _inputF.get(r, index);
+				String key = (okey != null) ? okey.toString() : null;
+				long code = _encoder.lookupRCDMap(key);
+				double val = (code < 0) ? Double.NaN : code;
+				row.values()[index] = val;
+				row.indexes()[index] = _outputCol;
+			}
+			if(DMLScript.STATISTICS){
+				Statistics.incTransformRecodeApplyTime(System.nanoTime() - t0);
+			}
+			return null;
+		}
+
+		@Override
+		public String toString() {
+			String str = getClass().getSimpleName() + "<ColId: " + _encoder._colID + ">";
+			if(_blk != -1)
+				str+= "<Sr: " + _startRow + ">";
+			return str;
+		}
+
+	}
+
 	private static class RecodePartialBuildTask implements Callable<Object> {
 
 		private final FrameBlock _input;
@@ -332,11 +387,15 @@ public class ColumnEncoderRecode extends ColumnEncoder {
 
 		@Override
 		public HashMap<String, Long> call() throws Exception {
+			long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
 			HashMap<String, Long> partialMap = new HashMap<>();
 			makeRcdMap(_input, partialMap, _colID, _startRow, _blockSize);
 			synchronized(_partialMaps) {
 				_partialMaps.put(_startRow, partialMap);
 			}
+			if(DMLScript.STATISTICS){
+				Statistics.incTransformRecodeBuildTime(System.nanoTime() - t0);
+			}
 			return null;
 		}
 
@@ -358,6 +417,7 @@ public class ColumnEncoderRecode extends ColumnEncoder {
 
 		@Override
 		public Object call() throws Exception {
+			long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
 			HashMap<String, Long> rcdMap = _encoder.getRcdMap();
 			_partialMaps.forEach((start_row, map) -> {
 				((HashMap<?, ?>) map).forEach((k, v) -> {
@@ -366,6 +426,9 @@ public class ColumnEncoderRecode extends ColumnEncoder {
 				});
 			});
 			_encoder._rcdMap = rcdMap;
+			if(DMLScript.STATISTICS){
+				Statistics.incTransformRecodeBuildTime(System.nanoTime() - t0);
+			}
 			return null;
 		}
 
diff --git a/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderFactory.java b/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderFactory.java
index 4b48d2a..012379a 100644
--- a/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderFactory.java
+++ b/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderFactory.java
@@ -19,16 +19,8 @@
 
 package org.apache.sysds.runtime.transform.encode;
 
-import static org.apache.sysds.runtime.util.CollectionUtils.except;
-import static org.apache.sysds.runtime.util.CollectionUtils.unionDistinct;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map.Entry;
-
 import org.apache.commons.lang.ArrayUtils;
+import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.common.Types.ValueType;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.matrix.data.FrameBlock;
@@ -36,9 +28,19 @@ import org.apache.sysds.runtime.transform.TfUtils.TfMethod;
 import org.apache.sysds.runtime.transform.encode.ColumnEncoder.EncoderType;
 import org.apache.sysds.runtime.transform.meta.TfMetaUtils;
 import org.apache.sysds.runtime.util.UtilFunctions;
+import org.apache.sysds.utils.Statistics;
 import org.apache.wink.json4j.JSONArray;
 import org.apache.wink.json4j.JSONObject;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map.Entry;
+
+import static org.apache.sysds.runtime.util.CollectionUtils.except;
+import static org.apache.sysds.runtime.util.CollectionUtils.unionDistinct;
+
 public class EncoderFactory {
 
 	public static MultiColumnEncoder createEncoder(String spec, String[] colnames, int clen, FrameBlock meta) {
@@ -125,16 +127,22 @@ public class EncoderFactory {
 				}
 			// create composite decoder of all created encoders
 			for(Entry<Integer, List<ColumnEncoder>> listEntry : colEncoders.entrySet()) {
+				if(DMLScript.STATISTICS)
+					Statistics.incTransformEncoderCount(listEntry.getValue().size());
 				lencoders.add(new ColumnEncoderComposite(listEntry.getValue()));
 			}
 			encoder = new MultiColumnEncoder(lencoders);
 			if(!oIDs.isEmpty()) {
 				encoder.addReplaceLegacyEncoder(new EncoderOmit(jSpec, colnames, schema.length, minCol, maxCol));
+				if(DMLScript.STATISTICS)
+					Statistics.incTransformEncoderCount(1);
 			}
 			if(!mvIDs.isEmpty()) {
 				EncoderMVImpute ma = new EncoderMVImpute(jSpec, colnames, schema.length, minCol, maxCol);
 				ma.initRecodeIDList(rcIDs);
 				encoder.addReplaceLegacyEncoder(ma);
+				if(DMLScript.STATISTICS)
+					Statistics.incTransformEncoderCount(1);
 			}
 
 			// initialize meta data w/ robustness for superset of cols
diff --git a/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderMVImpute.java b/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderMVImpute.java
index cda6b2a..f77e690 100644
--- a/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderMVImpute.java
+++ b/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderMVImpute.java
@@ -19,21 +19,8 @@
 
 package org.apache.sysds.runtime.transform.encode;
 
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.stream.Collectors;
-
 import org.apache.commons.lang.ArrayUtils;
+import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.functionobjects.KahanPlus;
 import org.apache.sysds.runtime.functionobjects.Mean;
@@ -44,10 +31,25 @@ import org.apache.sysds.runtime.transform.TfUtils.TfMethod;
 import org.apache.sysds.runtime.transform.meta.TfMetaUtils;
 import org.apache.sysds.runtime.util.IndexRange;
 import org.apache.sysds.runtime.util.UtilFunctions;
+import org.apache.sysds.utils.Statistics;
 import org.apache.wink.json4j.JSONArray;
 import org.apache.wink.json4j.JSONException;
 import org.apache.wink.json4j.JSONObject;
 
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.stream.Collectors;
+
 public class EncoderMVImpute extends LegacyEncoder {
 	private static final long serialVersionUID = 9057868620144662194L;
 	// objects required to compute mean and variance of all non-missing entries
@@ -173,6 +175,7 @@ public class EncoderMVImpute extends LegacyEncoder {
 
 	@Override
 	public void build(FrameBlock in) {
+		long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
 		try {
 			for(int j = 0; j < _colList.length; j++) {
 				int colID = _colList[j];
@@ -215,10 +218,13 @@ public class EncoderMVImpute extends LegacyEncoder {
 		catch(Exception ex) {
 			throw new RuntimeException(ex);
 		}
+		if(DMLScript.STATISTICS)
+			Statistics.incTransformImputeBuildTime(System.nanoTime()-t0);
 	}
 
 	@Override
 	public MatrixBlock apply(FrameBlock in, MatrixBlock out) {
+		long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
 		for(int i = 0; i < in.getNumRows(); i++) {
 			for(int j = 0; j < _colList.length; j++) {
 				int colID = _colList[j];
@@ -226,6 +232,8 @@ public class EncoderMVImpute extends LegacyEncoder {
 					out.quickSetValue(i, colID - 1, Double.parseDouble(_replacementList[j]));
 			}
 		}
+		if(DMLScript.STATISTICS)
+			Statistics.incTransformImputeApplyTime(System.nanoTime()-t0);
 		return out;
 	}
 
diff --git a/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderOmit.java b/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderOmit.java
index db61fc1..c2f3b68 100644
--- a/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderOmit.java
+++ b/src/main/java/org/apache/sysds/runtime/transform/encode/EncoderOmit.java
@@ -19,16 +19,9 @@
 
 package org.apache.sysds.runtime.transform.encode;
 
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Objects;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.common.Types.ValueType;
 import org.apache.sysds.runtime.matrix.data.FrameBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
@@ -37,9 +30,18 @@ import org.apache.sysds.runtime.transform.TfUtils.TfMethod;
 import org.apache.sysds.runtime.transform.meta.TfMetaUtils;
 import org.apache.sysds.runtime.util.IndexRange;
 import org.apache.sysds.runtime.util.UtilFunctions;
+import org.apache.sysds.utils.Statistics;
 import org.apache.wink.json4j.JSONException;
 import org.apache.wink.json4j.JSONObject;
 
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
 public class EncoderOmit extends LegacyEncoder {
 	/*
 	 * THIS CLASS IS ONLY FOR LEGACY SUPPORT!!! and will be fazed out slowly.
@@ -126,6 +128,7 @@ public class EncoderOmit extends LegacyEncoder {
 
 	public MatrixBlock apply(FrameBlock in, MatrixBlock out) {
 		// local rmRows for broadcasting encoder in spark
+		long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
 		boolean[] rmRows;
 		if(_federated)
 			rmRows = _rmRows;
@@ -148,7 +151,8 @@ public class EncoderOmit extends LegacyEncoder {
 		}
 
 		_rmRows = rmRows;
-
+		if(DMLScript.STATISTICS)
+			Statistics.incTransformOmitApplyTime(System.nanoTime()-t0);
 		return ret;
 	}
 
diff --git a/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java b/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
index 6db63f5..73d33a9 100644
--- a/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
+++ b/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
@@ -28,6 +28,8 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.function.Consumer;
@@ -36,22 +38,31 @@ import java.util.stream.Collectors;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.common.Types;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.data.SparseBlock;
 import org.apache.sysds.runtime.data.SparseBlockMCSR;
+import org.apache.sysds.runtime.data.SparseRowVector;
 import org.apache.sysds.runtime.matrix.data.FrameBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.util.DependencyTask;
 import org.apache.sysds.runtime.util.DependencyThreadPool;
 import org.apache.sysds.runtime.util.DependencyWrapperTask;
 import org.apache.sysds.runtime.util.IndexRange;
+import org.apache.sysds.utils.Statistics;
 
 public class MultiColumnEncoder implements Encoder {
 
 	protected static final Log LOG = LogFactory.getLog(MultiColumnEncoder.class.getName());
 	private static final boolean MULTI_THREADED = true;
-	public static boolean MULTI_THREADED_STAGES = true;
+	// If true build and apply separately by placing a synchronization barrier
+	public static boolean MULTI_THREADED_STAGES = false;  
+
+	// Only affects if  MULTI_THREADED_STAGES is true
+	// if true apply tasks for each column will complete
+	// before the next will start.
+	public static boolean APPLY_ENCODER_SEPARATE_STAGES = false; 
 
 	private List<ColumnEncoderComposite> _columnEncoders;
 	// These encoders are deprecated and will be phased out soon.
@@ -60,18 +71,6 @@ public class MultiColumnEncoder implements Encoder {
 	private int _colOffset = 0; // offset for federated Workers who are using subrange encoders
 	private FrameBlock _meta = null;
 
-	// TEMP CONSTANTS for testing only
-	//private int APPLY_BLOCKSIZE = 0; // temp only for testing until automatic calculation of block size
-	public static int BUILD_BLOCKSIZE = 0;
-
-	/*public void setApplyBlockSize(int blk) {
-		APPLY_BLOCKSIZE = blk;
-	}*/
-
-	public void setBuildBlockSize(int blk) {
-		BUILD_BLOCKSIZE = blk;
-	}
-
 	public MultiColumnEncoder(List<ColumnEncoderComposite> columnEncoders) {
 		_columnEncoders = columnEncoders;
 	}
@@ -90,6 +89,7 @@ public class MultiColumnEncoder implements Encoder {
 			if(MULTI_THREADED && k > 1 && !MULTI_THREADED_STAGES && !hasLegacyEncoder()) {
 				out = new MatrixBlock();
 				DependencyThreadPool pool = new DependencyThreadPool(k);
+				LOG.debug("Encoding with full DAG on " + k + " Threads");
 				try {
 					pool.submitAllAndWait(getEncodeTasks(in, out, pool));
 				}
@@ -98,10 +98,10 @@ public class MultiColumnEncoder implements Encoder {
 					e.printStackTrace();
 				}
 				pool.shutdown();
-				out.recomputeNonZeros();
-				return out;
+				outputMatrixPostProcessing(out);
 			}
 			else {
+				LOG.debug("Encoding with staged approach on: " + k + " Threads");
 				build(in, k);
 				if(_legacyMVImpute != null) {
 					// These operations are redundant for every encoder excluding the legacyMVImpute, the workaround to
@@ -127,9 +127,10 @@ public class MultiColumnEncoder implements Encoder {
 		List<DependencyTask<?>> applyTAgg = null;
 		Map<Integer[], Integer[]> depMap = new HashMap<>();
 		boolean hasDC = getColumnEncoders(ColumnEncoderDummycode.class).size() > 0;
+		boolean applyOffsetDep = false;
 		tasks.add(DependencyThreadPool.createDependencyTask(new InitOutputMatrixTask(this, in, out)));
 		for(ColumnEncoderComposite e : _columnEncoders) {
-			List<DependencyTask<?>> buildTasks = e.getBuildTasks(in, BUILD_BLOCKSIZE);
+			List<DependencyTask<?>> buildTasks = e.getBuildTasks(in);
 
 			tasks.addAll(buildTasks);
 			if(buildTasks.size() > 0) {
@@ -152,11 +153,14 @@ public class MultiColumnEncoder implements Encoder {
 				// colUpdateTask can start when all domain sizes, because it can now calculate the offsets for
 				// each column
 				depMap.put(new Integer[] {-2, -1}, new Integer[] {tasks.size() - 1, tasks.size()});
+				buildTasks.forEach(t -> t.setPriority(5));
+				applyOffsetDep = true;
 			}
 
-			if(hasDC) {
+			if(hasDC && applyOffsetDep) {
 				// Apply Task dependency to output col update task (is last in list)
-				// All ApplyTasks need to wait for this task so they all have the correct offsets.
+				// All ApplyTasks need to wait for this task, so they all have the correct offsets.
+				// But only for the columns that come after the first DC coder since they don't have an offset
 				depMap.put(new Integer[] {tasks.size(), tasks.size() + 1}, new Integer[] {-2, -1});
 
 				applyTAgg = applyTAgg == null ? new ArrayList<>() : applyTAgg;
@@ -195,7 +199,7 @@ public class MultiColumnEncoder implements Encoder {
 	private List<DependencyTask<?>> getBuildTasks(FrameBlock in) {
 		List<DependencyTask<?>> tasks = new ArrayList<>();
 		for(ColumnEncoderComposite columnEncoder : _columnEncoders) {
-			tasks.addAll(columnEncoder.getBuildTasks(in, BUILD_BLOCKSIZE));
+			tasks.addAll(columnEncoder.getBuildTasks(in));
 		}
 		return tasks;
 	}
@@ -241,23 +245,23 @@ public class MultiColumnEncoder implements Encoder {
 		if(in.getNumColumns() != numEncoders)
 			throw new DMLRuntimeException("Not every column in has a CompositeEncoder. Please make sure every column "
 				+ "has a encoder or slice the input accordingly");
-		// Block allocation for MT access
-		outputMatrixPreProcessing(out, in);
 		// TODO smart checks
 		if(MULTI_THREADED && k > 1) {
+			// Block allocation for MT access
+			outputMatrixPreProcessing(out, in);
 			applyMT(in, out, outputCol, k);
 		}
 		else {
 			int offset = outputCol;
 			for(ColumnEncoderComposite columnEncoder : _columnEncoders) {
 				columnEncoder.apply(in, out, columnEncoder._colID - 1 + offset);
-				if(columnEncoder.hasEncoder(ColumnEncoderDummycode.class))
+				if (columnEncoder.hasEncoder(ColumnEncoderDummycode.class))
 					offset += columnEncoder.getEncoder(ColumnEncoderDummycode.class)._domainSize - 1;
 			}
 		}
 		// Recomputing NNZ since we access the block directly
 		// TODO set NNZ explicit count them in the encoders
-		out.recomputeNonZeros();
+		outputMatrixPostProcessing(out);
 		if(_legacyOmit != null)
 			out = _legacyOmit.apply(in, out);
 		if(_legacyMVImpute != null)
@@ -280,16 +284,26 @@ public class MultiColumnEncoder implements Encoder {
 	private void applyMT(FrameBlock in, MatrixBlock out, int outputCol, int k) {
 		DependencyThreadPool pool = new DependencyThreadPool(k);
 		try {
-			pool.submitAllAndWait(getApplyTasks(in, out, outputCol));
+			if(APPLY_ENCODER_SEPARATE_STAGES){
+				int offset = outputCol;
+				for (ColumnEncoderComposite e : _columnEncoders) {
+					pool.submitAllAndWait(e.getApplyTasks(in, out, e._colID - 1 + offset));
+					if (e.hasEncoder(ColumnEncoderDummycode.class))
+						offset += e.getEncoder(ColumnEncoderDummycode.class)._domainSize - 1;
+				}
+			}else{
+				pool.submitAllAndWait(getApplyTasks(in, out, outputCol));
+			}
 		}
 		catch(ExecutionException | InterruptedException e) {
-			LOG.error("MT Column encode failed");
+			LOG.error("MT Column apply failed");
 			e.printStackTrace();
 		}
 		pool.shutdown();
 	}
 
 	private static void outputMatrixPreProcessing(MatrixBlock output, FrameBlock input) {
+		long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
 		output.allocateBlock();
 		if(output.isInSparseFormat()) {
 			SparseBlock block = output.getSparseBlock();
@@ -300,8 +314,33 @@ public class MultiColumnEncoder implements Encoder {
 				// allocate all sparse rows so MT sync can be done.
 				// should be rare that rows have only 0
 				block.allocate(r, input.getNumColumns());
+				// Setting the size here makes it possible to run all sparse apply tasks without any sync
+				// could become problematic if the input is very sparse since we allocate the same size as the input
+				// should be fine in theory ;)
+				((SparseRowVector)block.get(r)).setSize(input.getNumColumns());
+			}
+		}
+		if(DMLScript.STATISTICS)
+			Statistics.incTransformOutMatrixPreProcessingTime(System.nanoTime()-t0);
+	}
+
+	private void outputMatrixPostProcessing(MatrixBlock output){
+		long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
+		Set<Integer> indexSet = getColumnEncoders(ColumnEncoderPassThrough.class).stream()
+				.map(ColumnEncoderPassThrough::getSparseRowsWZeros).flatMap(l -> {
+					if(l == null)
+						return null;
+					return l.stream();
+				}).collect(Collectors.toSet());
+		if(!indexSet.stream().allMatch(Objects::isNull)){
+			for(Integer row : indexSet){
+				// TODO: Maybe MT in special cases when the number of rows is large
+				output.getSparseBlock().get(row).compact();
 			}
 		}
+		output.recomputeNonZeros();
+		if(DMLScript.STATISTICS)
+			Statistics.incTransformOutMatrixPostProcessingTime(System.nanoTime()-t0);
 	}
 
 	@Override
@@ -660,7 +699,7 @@ public class MultiColumnEncoder implements Encoder {
 	}
 
 	/*
-	 * Currently not in use will be integrated in the future
+	 * Currently, not in use will be integrated in the future
 	 */
 	@SuppressWarnings("unused")
 	private static class MultiColumnLegacyBuildTask implements Callable<Object> {
diff --git a/src/main/java/org/apache/sysds/runtime/util/DependencyTask.java b/src/main/java/org/apache/sysds/runtime/util/DependencyTask.java
index 5aff39b..17351a6 100644
--- a/src/main/java/org/apache/sysds/runtime/util/DependencyTask.java
+++ b/src/main/java/org/apache/sysds/runtime/util/DependencyTask.java
@@ -25,16 +25,20 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.sysds.runtime.DMLRuntimeException;
 
-public class DependencyTask<E> implements Callable<E> {
+public class DependencyTask<E> implements Comparable<DependencyTask<?>>, Callable<E> {
 	public static final boolean ENABLE_DEBUG_DATA = false;
+	protected static final Log LOG = LogFactory.getLog(DependencyTask.class.getName());
 
 	private final Callable<E> _task;
 	protected final List<DependencyTask<?>> _dependantTasks;
 	public List<DependencyTask<?>> _dependencyTasks = null; // only for debugging
 	private CompletableFuture<Future<?>> _future;
 	private int _rdy = 0;
+	private Integer _priority = 0;
 	private ExecutorService _pool;
 
 	public DependencyTask(Callable<E> task, List<DependencyTask<?>> dependantTasks) {
@@ -54,6 +58,10 @@ public class DependencyTask<E> implements Callable<E> {
 		return _rdy == 0;
 	}
 
+	public void setPriority(int priority) {
+		_priority = priority;
+	}
+
 	private boolean decrease() {
 		synchronized(this) {
 			_rdy -= 1;
@@ -68,7 +76,11 @@ public class DependencyTask<E> implements Callable<E> {
 
 	@Override
 	public E call() throws Exception {
+		LOG.debug("Executing Task: " + this);
+		long t0 = System.nanoTime();
 		E ret = _task.call();
+		LOG.debug("Finished Task: " + this + " in: " +
+				(String.format("%.3f", (System.nanoTime()-t0)*1e-9)) + "sec.");
 		_dependantTasks.forEach(t -> {
 			if(t.decrease()) {
 				if(_pool == null)
@@ -79,4 +91,14 @@ public class DependencyTask<E> implements Callable<E> {
 
 		return ret;
 	}
+
+	@Override
+	public String toString(){
+		return _task.toString() + "<Prio: " + _priority + ">" + "<Waiting: " + _dependantTasks.size() + ">";
+	}
+
+	@Override
+	public int compareTo(DependencyTask<?> task) {
+		return -1 * this._priority.compareTo(task._priority);
+	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/util/DependencyThreadPool.java b/src/main/java/org/apache/sysds/runtime/util/DependencyThreadPool.java
index 4fdd63a..50675d6 100644
--- a/src/main/java/org/apache/sysds/runtime/util/DependencyThreadPool.java
+++ b/src/main/java/org/apache/sysds/runtime/util/DependencyThreadPool.java
@@ -19,7 +19,12 @@
 
 package org.apache.sysds.runtime.util;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sysds.runtime.DMLRuntimeException;
+
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -31,11 +36,10 @@ import java.util.concurrent.Future;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import org.apache.sysds.runtime.DMLRuntimeException;
-
 
 public class DependencyThreadPool {
 
+	protected static final Log LOG = LogFactory.getLog(DependencyThreadPool.class.getName());
 	private final ExecutorService _pool;
 
 	public DependencyThreadPool(int k) {
@@ -50,6 +54,8 @@ public class DependencyThreadPool {
 		List<Future<Future<?>>> futures = new ArrayList<>();
 		List<Integer> rdyTasks = new ArrayList<>();
 		int i = 0;
+		// sort by priority
+		Collections.sort(dtasks);
 		for(DependencyTask<?> t : dtasks) {
 			CompletableFuture<Future<?>> f = new CompletableFuture<>();
 			t.addPool(_pool);
@@ -63,6 +69,8 @@ public class DependencyThreadPool {
 			futures.add(f);
 			i++;
 		}
+		LOG.debug("Initial Starting tasks: \n\t" +
+				rdyTasks.stream().map(index -> dtasks.get(index).toString()).collect(Collectors.joining("\n\t")));
 		// Two stages to avoid race condition!
 		for(Integer index : rdyTasks) {
 			synchronized(_pool) {
diff --git a/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java b/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java
index de6d7d6..7431a82 100644
--- a/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java
+++ b/src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java
@@ -989,6 +989,15 @@ public class UtilFunctions {
 		return (blockSize <= 0)? arrayLength: Math.min(arrayLength, startIndex + blockSize);
 	}
 
+	public static int[] getBlockSizes(int num, int numBlocks){
+		int[] blockSizes = new int[numBlocks];
+		Arrays.fill(blockSizes, num/numBlocks);
+		for (int i = 0; i < num%numBlocks; i++){
+			blockSizes[i]++;
+		}
+		return blockSizes;
+	}
+
 	public static String[] splitRecodeEntry(String s) {
 		//forward to column encoder, as UtilFunctions available in map context
 		return ColumnEncoderRecode.splitRecodeMapEntry(s);
diff --git a/src/main/java/org/apache/sysds/utils/Statistics.java b/src/main/java/org/apache/sysds/utils/Statistics.java
index cacd2f7..d91d9c5 100644
--- a/src/main/java/org/apache/sysds/utils/Statistics.java
+++ b/src/main/java/org/apache/sysds/utils/Statistics.java
@@ -19,20 +19,6 @@
 
 package org.apache.sysds.utils;
 
-import java.lang.management.CompilationMXBean;
-import java.lang.management.GarbageCollectorMXBean;
-import java.lang.management.ManagementFactory;
-import java.text.DecimalFormat;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.DoubleAdder;
-import java.util.concurrent.atomic.LongAdder;
-
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.sysds.api.DMLScript;
@@ -51,6 +37,20 @@ import org.apache.sysds.runtime.lineage.LineageCacheConfig.ReuseCacheType;
 import org.apache.sysds.runtime.lineage.LineageCacheStatistics;
 import org.apache.sysds.runtime.privacy.CheckedConstraintsLog;
 
+import java.lang.management.CompilationMXBean;
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.ManagementFactory;
+import java.text.DecimalFormat;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.DoubleAdder;
+import java.util.concurrent.atomic.LongAdder;
+
 /**
  * This class captures all statistics.
  */
@@ -149,7 +149,28 @@ public class Statistics
 	private static final LongAdder lTotalUIPVar = new LongAdder();
 	private static final LongAdder lTotalLix = new LongAdder();
 	private static final LongAdder lTotalLixUIP = new LongAdder();
-	
+
+	// Transformencode stats
+	private static final LongAdder transformEncoderCount = new LongAdder();
+
+	//private static final LongAdder transformBuildTime = new LongAdder();
+	private static final LongAdder transformRecodeBuildTime = new LongAdder();
+	private static final LongAdder transformBinningBuildTime = new LongAdder();
+	private static final LongAdder transformImputeBuildTime = new LongAdder();
+
+	//private static final LongAdder transformApplyTime = new LongAdder();
+	private static final LongAdder transformRecodeApplyTime = new LongAdder();
+	private static final LongAdder transformDummyCodeApplyTime = new LongAdder();
+	private static final LongAdder transformPassThroughApplyTime = new LongAdder();
+	private static final LongAdder transformFeatureHashingApplyTime = new LongAdder();
+	private static final LongAdder transformBinningApplyTime = new LongAdder();
+	private static final LongAdder transformOmitApplyTime = new LongAdder();
+	private static final LongAdder transformImputeApplyTime = new LongAdder();
+
+
+	private static final LongAdder transformOutMatrixPreProcessingTime = new LongAdder();
+	private static final LongAdder transformOutMatrixPostProcessingTime = new LongAdder();
+
 	// Federated stats
 	private static final LongAdder federatedReadCount = new LongAdder();
 	private static final LongAdder federatedPutCount = new LongAdder();
@@ -649,6 +670,70 @@ public class Statistics
 
 	public static void accFedPSCommunicationTime(long t) { fedPSCommunicationTime.add(t);}
 
+	public static void incTransformEncoderCount(long encoders){
+		transformEncoderCount.add(encoders);
+	}
+
+	public static void incTransformRecodeApplyTime(long t){
+		transformRecodeApplyTime.add(t);
+	}
+
+	public static void incTransformDummyCodeApplyTime(long t){
+		transformDummyCodeApplyTime.add(t);
+	}
+
+	public static void incTransformBinningApplyTime(long t){
+		transformBinningApplyTime.add(t);
+	}
+
+	public static void incTransformPassThroughApplyTime(long t){
+		transformPassThroughApplyTime.add(t);
+	}
+
+	public static void incTransformFeatureHashingApplyTime(long t){
+		transformFeatureHashingApplyTime.add(t);
+	}
+
+	public static void incTransformOmitApplyTime(long t) {
+		transformOmitApplyTime.add(t);
+	}
+
+	public static void incTransformImputeApplyTime(long t) {
+		transformImputeApplyTime.add(t);
+	}
+
+	public static void incTransformRecodeBuildTime(long t){
+		transformRecodeBuildTime.add(t);
+	}
+
+	public static void incTransformBinningBuildTime(long t){
+		transformBinningBuildTime.add(t);
+	}
+
+	public static void incTransformImputeBuildTime(long t) {
+		transformImputeBuildTime.add(t);
+	}
+
+	public static void incTransformOutMatrixPreProcessingTime(long t){
+		transformOutMatrixPreProcessingTime.add(t);
+	}
+
+	public static void incTransformOutMatrixPostProcessingTime(long t){
+		transformOutMatrixPostProcessingTime.add(t);
+	}
+
+	public static long getTransformEncodeBuildTime(){
+		return transformBinningBuildTime.longValue() + transformImputeBuildTime.longValue() +
+				transformRecodeBuildTime.longValue();
+	}
+
+	public static long getTransformEncodeApplyTime(){
+		return transformDummyCodeApplyTime.longValue() + transformBinningApplyTime.longValue() +
+				transformFeatureHashingApplyTime.longValue() + transformPassThroughApplyTime.longValue() +
+				transformRecodeApplyTime.longValue() + transformOmitApplyTime.longValue() +
+				transformImputeApplyTime.longValue();
+	}
+
 	public static String getCPHeavyHitterCode( Instruction inst )
 	{
 		String opcode = null;
@@ -1129,6 +1214,50 @@ public class Statistics
 					federatedExecuteInstructionCount.longValue() + "/" +
 					federatedExecuteUDFCount.longValue() + ".\n");
 			}
+			if( transformEncoderCount.longValue() > 0) {
+				//TODO: Cleanup and condense
+				sb.append("TransformEncode num. encoders:\t").append(transformEncoderCount.longValue()).append("\n");
+				sb.append("TransformEncode build time:\t").append(String.format("%.3f",
+						getTransformEncodeBuildTime()*1e-9)).append(" sec.\n");
+				if(transformRecodeBuildTime.longValue() > 0)
+					sb.append("\tRecode build time:\t").append(String.format("%.3f",
+							transformRecodeBuildTime.longValue()*1e-9)).append(" sec.\n");
+				if(transformBinningBuildTime.longValue() > 0)
+					sb.append("\tBinning build time:\t").append(String.format("%.3f",
+							transformBinningBuildTime.longValue()*1e-9)).append(" sec.\n");
+				if(transformImputeBuildTime.longValue() > 0)
+					sb.append("\tImpute build time:\t").append(String.format("%.3f",
+							transformImputeBuildTime.longValue()*1e-9)).append(" sec.\n");
+
+				sb.append("TransformEncode apply time:\t").append(String.format("%.3f",
+						getTransformEncodeApplyTime()*1e-9)).append(" sec.\n");
+				if(transformRecodeApplyTime.longValue() > 0)
+					sb.append("\tRecode apply time:\t").append(String.format("%.3f",
+							transformRecodeApplyTime.longValue()*1e-9)).append(" sec.\n");
+				if(transformBinningApplyTime.longValue() > 0)
+					sb.append("\tBinning apply time:\t").append(String.format("%.3f",
+							transformBinningApplyTime.longValue()*1e-9)).append(" sec.\n");
+				if(transformDummyCodeApplyTime.longValue() > 0)
+					sb.append("\tDummyCode apply time:\t").append(String.format("%.3f",
+							transformDummyCodeApplyTime.longValue()*1e-9)).append(" sec.\n");
+				if(transformFeatureHashingApplyTime.longValue() > 0)
+					sb.append("\tHashing apply time:\t").append(String.format("%.3f",
+							transformFeatureHashingApplyTime.longValue()*1e-9)).append(" sec.\n");
+				if(transformPassThroughApplyTime.longValue() > 0)
+					sb.append("\tPassThrough apply time:\t").append(String.format("%.3f",
+							transformPassThroughApplyTime.longValue()*1e-9)).append(" sec.\n");
+				if(transformOmitApplyTime.longValue() > 0)
+					sb.append("\tOmit apply time:\t").append(String.format("%.3f",
+							transformOmitApplyTime.longValue()*1e-9)).append(" sec.\n");
+				if(transformImputeApplyTime.longValue() > 0)
+					sb.append("\tImpute apply time:\t").append(String.format("%.3f",
+							transformImputeApplyTime.longValue()*1e-9)).append(" sec.\n");
+
+				sb.append("TransformEncode PreProc. time:\t").append(String.format("%.3f",
+						transformOutMatrixPreProcessingTime.longValue()*1e-9)).append(" sec.\n");
+				sb.append("TransformEncode PostProc. time:\t").append(String.format("%.3f",
+						transformOutMatrixPostProcessingTime.longValue()*1e-9)).append(" sec.\n");
+			}
 
 			if(ConfigurationManager.isCompressionEnabled()){
 				DMLCompressionStatistics.display(sb);
diff --git a/src/test/java/org/apache/sysds/test/functions/transform/mt/TransformFrameBuildMultithreadedTest.java b/src/test/java/org/apache/sysds/test/functions/transform/mt/TransformFrameBuildMultithreadedTest.java
index 8824b9d..b70571b 100644
--- a/src/test/java/org/apache/sysds/test/functions/transform/mt/TransformFrameBuildMultithreadedTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/transform/mt/TransformFrameBuildMultithreadedTest.java
@@ -30,6 +30,7 @@ import org.apache.sysds.common.Types;
 import org.apache.sysds.runtime.io.FileFormatPropertiesCSV;
 import org.apache.sysds.runtime.io.FrameReaderFactory;
 import org.apache.sysds.runtime.matrix.data.FrameBlock;
+import org.apache.sysds.runtime.transform.encode.ColumnEncoder;
 import org.apache.sysds.runtime.transform.encode.ColumnEncoderBin;
 import org.apache.sysds.runtime.transform.encode.ColumnEncoderRecode;
 import org.apache.sysds.runtime.transform.encode.EncoderFactory;
@@ -173,6 +174,7 @@ public class TransformFrameBuildMultithreadedTest extends AutomatedTestBase {
 				.readFrameFromHDFS(DATASET, -1L, -1L);
 			StringBuilder specSb = new StringBuilder();
 			Files.readAllLines(Paths.get(SPEC)).forEach(s -> specSb.append(s).append("\n"));
+			ColumnEncoder.BUILD_ROW_BLOCKS_PER_COLUMN = Math.max(blockSize, 1);
 			MultiColumnEncoder encoderS = EncoderFactory.createEncoder(specSb.toString(), input.getColumnNames(),
 				input.getNumColumns(), null);
 			MultiColumnEncoder encoderM = EncoderFactory.createEncoder(specSb.toString(), input.getColumnNames(),
diff --git a/src/test/java/org/apache/sysds/test/functions/transform/mt/TransformFrameEncodeMultithreadedTest.java b/src/test/java/org/apache/sysds/test/functions/transform/mt/TransformFrameEncodeMultithreadedTest.java
index 2156250..6679f36 100644
--- a/src/test/java/org/apache/sysds/test/functions/transform/mt/TransformFrameEncodeMultithreadedTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/transform/mt/TransformFrameEncodeMultithreadedTest.java
@@ -48,7 +48,7 @@ public class TransformFrameEncodeMultithreadedTest extends AutomatedTestBase {
 	private final static String DATASET1 = "homes3/homes.csv";
 	private final static String SPEC1 = "homes3/homes.tfspec_recode.json";
 	private final static String SPEC2 = "homes3/homes.tfspec_dummy.json";
-	private final static String SPEC2all = "homes3/homes.tfspec_dummy_all.json";
+	private final static String SPEC2sparse = "homes3/homes.tfspec_dummy_sparse.json";
 	private final static String SPEC3 = "homes3/homes.tfspec_bin.json"; // recode
 	private final static String SPEC6 = "homes3/homes.tfspec_recode_dummy.json";
 	private final static String SPEC7 = "homes3/homes.tfspec_binDummy.json"; // recode+dummy
@@ -164,7 +164,7 @@ public class TransformFrameEncodeMultithreadedTest extends AutomatedTestBase {
 				DATASET = DATASET1;
 				break;
 			case DUMMY_ALL:
-				SPEC = SPEC2all;
+				SPEC = SPEC2sparse;
 				DATASET = DATASET1;
 				break;
 			case BIN:
diff --git a/src/test/resources/datasets/homes3/homes.tfspec_dummy_all.json b/src/test/resources/datasets/homes3/homes.tfspec_dummy_all.json
deleted file mode 100644
index 65b8fee..0000000
--- a/src/test/resources/datasets/homes3/homes.tfspec_dummy_all.json
+++ /dev/null
@@ -1 +0,0 @@
-{"ids": true, "dummycode": [ 2, 7, 1, 3, 4, 5, 6, 8, 9 ] }
\ No newline at end of file
diff --git a/src/test/resources/datasets/homes3/homes.tfspec_dummy_sparse.json b/src/test/resources/datasets/homes3/homes.tfspec_dummy_sparse.json
new file mode 100644
index 0000000..ed48308
--- /dev/null
+++ b/src/test/resources/datasets/homes3/homes.tfspec_dummy_sparse.json
@@ -0,0 +1 @@
+{"ids": true, "dummycode": [ 2, 7, 1, 3, 4, 6, 8, 9 ] }
\ No newline at end of file