You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by ar...@apache.org on 2021/07/30 11:51:14 UTC

[systemds] branch master updated: [SYSTEMDS-2972] Dependency Task execution for transform encode

This is an automated email from the ASF dual-hosted git repository.

arnabp20 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new ba799e8  [SYSTEMDS-2972] Dependency Task execution for transform encode
ba799e8 is described below

commit ba799e8796a411295164ebd6b3b7bbe1dff933bc
Author: Lukas Erlbacher <lu...@gmail.com>
AuthorDate: Fri Jul 30 13:50:16 2021 +0200

    [SYSTEMDS-2972] Dependency Task execution for transform encode
    
    This patch adds a Framework for dependent Task execution consisting of
    the DependencyTask and DependencyThreadPool, as well as the
    implementation of build and apply in said Framework.
    It is now possible for different encoders to provide tasks which best
    fit the given data characteristics by overriding the getApplyTasks and
    getBuildTasks functions. These functions can return a DAG of Tasks which
    will be integrated into the DAG of the whole Matrix/Frame. Subsequently
    this DAG is then executed in the DependencyThreadPool. This approach
    enables the possibility of trying different execution orders without the
    need to make major changes to the code.
    
    Closes #1351
---
 .../apache/sysds/runtime/data/SparseRowVector.java |   6 +
 .../runtime/transform/encode/ColumnEncoder.java    | 107 +++++-
 .../runtime/transform/encode/ColumnEncoderBin.java | 109 ++++--
 .../transform/encode/ColumnEncoderComposite.java   | 104 ++++--
 .../transform/encode/ColumnEncoderDummycode.java   |  76 ++++-
 .../transform/encode/ColumnEncoderFeatureHash.java |  13 +-
 .../transform/encode/ColumnEncoderPassThrough.java |  19 +-
 .../transform/encode/ColumnEncoderRecode.java      | 104 ++++--
 .../transform/encode/MultiColumnEncoder.java       | 379 ++++++++++++++-------
 .../apache/sysds/runtime/util/DependencyTask.java  |  82 +++++
 .../sysds/runtime/util/DependencyThreadPool.java   | 170 +++++++++
 .../sysds/runtime/util/DependencyWrapperTask.java  |  65 ++++
 .../mt/TransformFrameBuildMultithreadedTest.java   | 121 ++++---
 .../mt/TransformFrameEncodeMultithreadedTest.java  | 221 +++++++-----
 .../sysds/test/util/DependencyThreadPoolTest.java  | 145 ++++++++
 .../datasets/homes3/homes.tfspec_recode_bin.json   |   2 +
 16 files changed, 1352 insertions(+), 371 deletions(-)

diff --git a/src/main/java/org/apache/sysds/runtime/data/SparseRowVector.java b/src/main/java/org/apache/sysds/runtime/data/SparseRowVector.java
index 91fab8eb..a48648a 100644
--- a/src/main/java/org/apache/sysds/runtime/data/SparseRowVector.java
+++ b/src/main/java/org/apache/sysds/runtime/data/SparseRowVector.java
@@ -242,6 +242,12 @@ public final class SparseRowVector extends SparseRow implements Serializable
 		return (index >= 0) ? values[index] : 0;
 	}
 
+	public int getIndex(int col) {
+		//search for existing col index
+		int index = Arrays.binarySearch(indexes, 0, size, col);
+		return (index >= 0) ? index : -1;
+	}
+
 	public int searchIndexesFirstLTE(int col) {
 		if( size == 0 ) return -1;
 		
diff --git a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java
index 47afe0e..33bf452 100644
--- a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java
+++ b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoder.java
@@ -25,16 +25,19 @@ import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.matrix.data.FrameBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.util.DependencyTask;
+import org.apache.sysds.runtime.util.DependencyThreadPool;
 
 /**
  * Base class for all transform encoders providing both a row and block interface for decoding frames to matrices.
@@ -164,12 +167,106 @@ public abstract class ColumnEncoder implements Externalizable, Encoder, Comparab
 		return Integer.compare(getEncoderType(this), getEncoderType(o));
 	}
 
-	public abstract List<Callable<Object>> getPartialBuildTasks(FrameBlock in, int blockSize);
+	/*
+	 * Returns a Dependency Task List such that if executed the encoder is built. Last Task in the list shall only
+	 * complete if all previous tasks are done. This is so that we can use the last task as a dependency for the whole
+	 * build, reducing unnecessary dependencies.
+	 */
+	public List<DependencyTask<?>> getBuildTasks(FrameBlock in, int blockSize) {
+		List<Callable<Object>> tasks = new ArrayList<>();
+		List<List<? extends Callable<?>>> dep = null;
+		if(blockSize <= 0 || blockSize >= in.getNumRows()) {
+			tasks.add(getBuildTask(in));
+		}
+		else {
+			HashMap<Integer, Object> ret = new HashMap<>();
+			for(int i = 0; i < in.getNumRows(); i = i + blockSize)
+				tasks.add(getPartialBuildTask(in, i, blockSize, ret));
+			if(in.getNumRows() % blockSize != 0)
+				tasks.add(getPartialBuildTask(in, in.getNumRows() - in.getNumRows() % blockSize, -1, ret));
+			tasks.add(getPartialMergeBuildTask(ret));
+			dep = new ArrayList<>(Collections.nCopies(tasks.size() - 1, null));
+			dep.add(tasks.subList(0, tasks.size() - 1));
+		}
+		return DependencyThreadPool.createDependencyTasks(tasks, dep);
+	}
+
+	public Callable<Object> getBuildTask(FrameBlock in) {
+		throw new DMLRuntimeException("Trying to get the Build task of an Encoder which does not require building");
+	}
+
+	public Callable<Object> getPartialBuildTask(FrameBlock in, int startRow, int blockSize,
+		HashMap<Integer, Object> ret) {
+		throw new DMLRuntimeException(
+			"Trying to get the PartialBuild task of an Encoder which does not support  " + "partial building");
+	}
+
+	public Callable<Object> getPartialMergeBuildTask(HashMap<Integer, ?> ret) {
+		throw new DMLRuntimeException(
+			"Trying to get the BuildMergeTask task of an Encoder which does not support " + "partial building");
+	}
+
+	public List<DependencyTask<?>> getApplyTasks(FrameBlock in, MatrixBlock out, int outputCol) {
+		List<Callable<Object>> tasks = new ArrayList<>();
+		tasks.add(new ColumnApplyTask(this, in, out, outputCol));
+		return DependencyThreadPool.createDependencyTasks(tasks, null);
+	}
 
-	public abstract void mergeBuildPartial(List<Future<Object>> futurePartials, int start, int end)
-		throws ExecutionException, InterruptedException;
+	public List<DependencyTask<?>> getApplyTasks(MatrixBlock in, MatrixBlock out, int outputCol) {
+		List<Callable<Object>> tasks = new ArrayList<>();
+		tasks.add(new ColumnApplyTask(this, in, out, outputCol));
+		return DependencyThreadPool.createDependencyTasks(tasks, null);
+	}
 
 	public enum EncoderType {
 		Recode, FeatureHash, PassThrough, Bin, Dummycode, Omit, MVImpute, Composite
 	}
+
+	/*
+	 * This is the base Task for each column apply. If no custom "getApplyTasks" is implemented in an Encoder this task
+	 * will be used.
+	 */
+	private static class ColumnApplyTask implements Callable<Object> {
+
+		private final ColumnEncoder _encoder;
+		private final FrameBlock _inputF;
+		private final MatrixBlock _inputM;
+		private final MatrixBlock _out;
+		private final int _outputCol;
+
+		protected ColumnApplyTask(ColumnEncoder encoder, FrameBlock input, MatrixBlock out, int outputCol) {
+			_encoder = encoder;
+			_inputF = input;
+			_inputM = null;
+			_out = out;
+			_outputCol = outputCol;
+		}
+
+		protected ColumnApplyTask(ColumnEncoder encoder, MatrixBlock input, MatrixBlock out, int outputCol) {
+			_encoder = encoder;
+			_inputM = input;
+			_inputF = null;
+			_out = out;
+			_outputCol = outputCol;
+		}
+
+		@Override
+		public Void call() throws Exception {
+			assert _outputCol >= 0;
+			int _rowStart = 0;
+			int _blk = -1;
+			if(_inputF == null)
+				_encoder.apply(_inputM, _out, _outputCol, _rowStart, _blk);
+			else
+				_encoder.apply(_inputF, _out, _outputCol, _rowStart, _blk);
+			return null;
+		}
+
+		@Override
+		public String toString() {
+			return getClass().getSimpleName() + "<Encoder: " + _encoder.getClass().getSimpleName() + "; ColId: "
+				+ _encoder._colID + ">";
+		}
+
+	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderBin.java b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderBin.java
index 0a00a05..b4d4800 100644
--- a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderBin.java
+++ b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderBin.java
@@ -24,12 +24,9 @@ import static org.apache.sysds.runtime.util.UtilFunctions.getEndIndex;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.List;
+import java.util.HashMap;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 
 import org.apache.commons.lang3.tuple.MutableTriple;
 import org.apache.sysds.lops.Lop;
@@ -93,7 +90,7 @@ public class ColumnEncoderBin extends ColumnEncoder {
 		computeBins(pairMinMax[0], pairMinMax[1]);
 	}
 
-	private static double[] getMinMaxOfCol(FrameBlock in, int colID, int startRow, int blockSize){
+	private static double[] getMinMaxOfCol(FrameBlock in, int colID, int startRow, int blockSize) {
 		// derive bin boundaries from min/max per column
 		double min = Double.POSITIVE_INFINITY;
 		double max = Double.NEGATIVE_INFINITY;
@@ -102,30 +99,23 @@ public class ColumnEncoderBin extends ColumnEncoder {
 			min = Math.min(min, inVal);
 			max = Math.max(max, inVal);
 		}
-		return new double[]{min, max};
+		return new double[] {min, max};
 	}
 
 	@Override
-	public List<Callable<Object>> getPartialBuildTasks(FrameBlock in, int blockSize){
-		List<Callable<Object>> tasks = new ArrayList<>();
-		for(int i = 0; i < in.getNumRows(); i=i+blockSize)
-			tasks.add(new BinPartialBuildTask(in, _colID, i, blockSize));
-		if(in.getNumRows() % blockSize != 0)
-			tasks.add(new BinPartialBuildTask(in, _colID,
-					in.getNumRows()-in.getNumRows()%blockSize, -1));
-		return tasks;
+	public Callable<Object> getBuildTask(FrameBlock in) {
+		return new ColumnBinBuildTask(this, in);
 	}
 
 	@Override
-	public void mergeBuildPartial(List<Future<Object>> futurePartials, int start, int end) throws ExecutionException, InterruptedException {
-		double min = Double.POSITIVE_INFINITY;
-		double max = Double.NEGATIVE_INFINITY;
-		for(int i = start; i < end; i++){
-			double[] pairMinMax = (double[]) futurePartials.get(i).get();
-			min = Math.min(min, pairMinMax[0]);
-			max = Math.max(max, pairMinMax[1]);
-		}
-		computeBins(min, max);
+	public Callable<Object> getPartialBuildTask(FrameBlock in, int startRow, int blockSize,
+		HashMap<Integer, Object> ret) {
+		return new BinPartialBuildTask(in, _colID, startRow, blockSize, ret);
+	}
+
+	@Override
+	public Callable<Object> getPartialMergeBuildTask(HashMap<Integer, ?> ret) {
+		return new BinMergePartialBuildTask(this, ret);
 	}
 
 	public void computeBins(double min, double max) {
@@ -150,7 +140,7 @@ public class ColumnEncoderBin extends ColumnEncoder {
 		if(!isApplicable())
 			return;
 		// derive bin boundaries from min/max per column
-		double[] pairMinMax = getMinMaxOfCol(in, _colID, 0 ,-1);
+		double[] pairMinMax = getMinMaxOfCol(in, _colID, 0, -1);
 		_colMins = pairMinMax[0];
 		_colMaxs = pairMinMax[1];
 	}
@@ -178,7 +168,7 @@ public class ColumnEncoderBin extends ColumnEncoder {
 
 	@Override
 	public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int outputCol, int rowStart, int blk) {
-		int end = (blk <= 0)? in.getNumRows(): in.getNumRows() < rowStart + blk ? in.getNumRows() : rowStart + blk;
+		int end = getEndIndex(in.getNumRows(), rowStart, blk);
 		for(int i = rowStart; i < end; i++) {
 			double inVal = in.quickGetValueThreadSafe(i, _colID - 1);
 			int ix = Arrays.binarySearch(_binMaxs, inVal);
@@ -224,9 +214,7 @@ public class ColumnEncoderBin extends ColumnEncoder {
 		// serialize the internal state into frame meta data
 		meta.getColumnMetadata(_colID - 1).setNumDistinct(_numBin);
 		for(int i = 0; i < _binMaxs.length; i++) {
-			String sb = _binMins[i] +
-					Lop.DATATYPE_PREFIX +
-					_binMaxs[i];
+			String sb = _binMins[i] + Lop.DATATYPE_PREFIX + _binMaxs[i];
 			meta.set(i, _colID - 1, sb);
 		}
 		return meta;
@@ -282,19 +270,80 @@ public class ColumnEncoderBin extends ColumnEncoder {
 		private final int _blockSize;
 		private final int _startRow;
 		private final int _colID;
+		private final HashMap<Integer, Object> _partialMinMax;
 
 		// if a pool is passed the task may be split up into multiple smaller tasks.
-		protected BinPartialBuildTask(FrameBlock input, int colID, int startRow, int blocksize){
+		protected BinPartialBuildTask(FrameBlock input, int colID, int startRow, int blocksize,
+			HashMap<Integer, Object> partialMinMax) {
 			_input = input;
 			_blockSize = blocksize;
 			_colID = colID;
 			_startRow = startRow;
+			_partialMinMax = partialMinMax;
 		}
 
 		@Override
 		public double[] call() throws Exception {
-			return getMinMaxOfCol(_input, _colID, _startRow, _blockSize);
+			_partialMinMax.put(_startRow, getMinMaxOfCol(_input, _colID, _startRow, _blockSize));
+			return null;
+		}
+
+		@Override
+		public String toString() {
+			return getClass().getSimpleName() + "<Start row: " + _startRow + "; Block size: " + _blockSize + ">";
+		}
+
+	}
+
+	private static class BinMergePartialBuildTask implements Callable<Object> {
+		private final HashMap<Integer, ?> _partialMaps;
+		private final ColumnEncoderBin _encoder;
+
+		private BinMergePartialBuildTask(ColumnEncoderBin encoderBin, HashMap<Integer, ?> partialMaps) {
+			_partialMaps = partialMaps;
+			_encoder = encoderBin;
+		}
+
+		@Override
+		public Object call() throws Exception {
+			double min = Double.POSITIVE_INFINITY;
+			double max = Double.NEGATIVE_INFINITY;
+			for(Object minMax : _partialMaps.values()) {
+				min = Math.min(min, ((double[]) minMax)[0]);
+				max = Math.max(max, ((double[]) minMax)[1]);
+			}
+			_encoder.computeBins(min, max);
+			return null;
 		}
+
+		@Override
+		public String toString() {
+			return getClass().getSimpleName() + "<ColId: " + _encoder._colID + ">";
+		}
+
+	}
+
+	private static class ColumnBinBuildTask implements Callable<Object> {
+
+		private final ColumnEncoderBin _encoder;
+		private final FrameBlock _input;
+
+		protected ColumnBinBuildTask(ColumnEncoderBin encoder, FrameBlock input) {
+			_encoder = encoder;
+			_input = input;
+		}
+
+		@Override
+		public Void call() throws Exception {
+			_encoder.build(_input);
+			return null;
+		}
+
+		@Override
+		public String toString() {
+			return getClass().getSimpleName() + "<ColId: " + _encoder._colID + ">";
+		}
+
 	}
 
 }
diff --git a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderComposite.java b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderComposite.java
index 1973741..54b8795 100644
--- a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderComposite.java
+++ b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderComposite.java
@@ -23,17 +23,19 @@ import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 
+import org.apache.commons.lang.NotImplementedException;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.matrix.data.FrameBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.util.DependencyTask;
+import org.apache.sysds.runtime.util.DependencyThreadPool;
 
 /**
  * Simple composite encoder that applies a list of encoders in specified order. By implementing the default encoder API
@@ -48,7 +50,7 @@ public class ColumnEncoderComposite extends ColumnEncoder {
 	private FrameBlock _meta = null;
 
 	// map to keep track of which encoder has how many build tasks
-	private Map<ColumnEncoder, Integer> _partialBuildTaskMap;
+	//private Map<ColumnEncoder, Integer> _partialBuildTaskMap;
 
 	public ColumnEncoderComposite() {
 		super(-1);
@@ -101,29 +103,67 @@ public class ColumnEncoderComposite extends ColumnEncoder {
 	}
 
 	@Override
-	public List<Callable<Object>> getPartialBuildTasks(FrameBlock in, int blockSize) {
-		List<Callable<Object>> tasks = new ArrayList<>();
-		_partialBuildTaskMap = new HashMap<>();
-		for(ColumnEncoder columnEncoder : _columnEncoders) {
-			List<Callable<Object>> _tasks = columnEncoder.getPartialBuildTasks(in, blockSize);
-			if(_tasks != null)
-				tasks.addAll(_tasks);
-			_partialBuildTaskMap.put(columnEncoder, _tasks != null ? _tasks.size() : 0);
+	public List<DependencyTask<?>> getApplyTasks(FrameBlock in, MatrixBlock out, int outputCol) {
+		List<DependencyTask<?>> tasks = new ArrayList<>();
+		List<Integer> sizes = new ArrayList<>();
+		for(int i = 0; i < _columnEncoders.size(); i++) {
+			List<DependencyTask<?>> t;
+			if(i == 0) {
+				// 1. encoder writes data into MatrixBlock Column all others use this column for further encoding
+				t = _columnEncoders.get(i).getApplyTasks(in, out, outputCol);
+			}
+			else {
+				t = _columnEncoders.get(i).getApplyTasks(out, out, outputCol);
+			}
+			if(t == null)
+				continue;
+			sizes.add(t.size());
+			tasks.addAll(t);
+		}
+
+		List<List<? extends Callable<?>>> dep = new ArrayList<>(Collections.nCopies(tasks.size(), null));
+
+		for(int c = 0, i = sizes.get(c); i < tasks.size(); c++, i += sizes.get(c)) {
+			for(int k = i; k < i + sizes.get(c + 1); k++) {
+				dep.set(k, tasks.subList(i - 1, i));
+			}
 		}
-		return tasks.size() == 0 ? null : tasks;
+
+		tasks = DependencyThreadPool.createDependencyTasks(tasks, dep);
+		return tasks;
+	}
+
+	@Override
+	public List<DependencyTask<?>> getApplyTasks(MatrixBlock in, MatrixBlock out, int outputCol) {
+		throw new NotImplementedException();
 	}
 
 	@Override
-	public void mergeBuildPartial(List<Future<Object>> futurePartials, int start, int end)
-		throws ExecutionException, InterruptedException {
-		int endLocal;
+	public List<DependencyTask<?>> getBuildTasks(FrameBlock in, int blockSize) {
+		List<DependencyTask<?>> tasks = new ArrayList<>();
+		Map<Integer[], Integer[]> depMap = null;
 		for(ColumnEncoder columnEncoder : _columnEncoders) {
-			endLocal = start + _partialBuildTaskMap.get(columnEncoder);
-			columnEncoder.mergeBuildPartial(futurePartials, start, endLocal);
-			start = endLocal;
-			if(start >= end)
-				break;
+			List<DependencyTask<?>> t = columnEncoder.getBuildTasks(in, blockSize);
+			if(t == null)
+				continue;
+			// Linear execution between encoders so they can't be built in parallel
+			if(tasks.size() != 0) {
+				// avoid unnecessary map initialization
+				depMap = (depMap == null) ? new HashMap<>() : depMap;
+				// This workaround is needed since sublist is only valid for effective final lists,
+				// otherwise the view breaks
+				depMap.put(new Integer[] {tasks.size(), tasks.size() + t.size()},
+					new Integer[] {tasks.size() - 1, tasks.size()});
+			}
+			tasks.addAll(t);
 		}
+		List<List<? extends Callable<?>>> dep = new ArrayList<>(Collections.nCopies(tasks.size(), null));
+		DependencyThreadPool.createDependencyList(tasks, depMap, dep);
+		if(hasEncoder(ColumnEncoderDummycode.class)) {
+			tasks.add(DependencyThreadPool.createDependencyTask(new ColumnCompositeUpdateDCTask(this)));
+			dep.add(tasks.subList(tasks.size() - 2, tasks.size() - 1));
+		}
+		return DependencyThreadPool.createDependencyTasks(tasks, dep);
 	}
 
 	@Override
@@ -219,7 +259,7 @@ public class ColumnEncoderComposite extends ColumnEncoder {
 		updateAllDCEncoders();
 	}
 
-	public void updateAllDCEncoders(){
+	public void updateAllDCEncoders() {
 		// update dummycode encoder domain sizes based on distinctness information from other encoders
 		ColumnEncoderDummycode dc = getEncoder(ColumnEncoderDummycode.class);
 		if(dc != null)
@@ -313,4 +353,26 @@ public class ColumnEncoderComposite extends ColumnEncoder {
 		super.shiftCol(columnOffset);
 		_columnEncoders.forEach(e -> e.shiftCol(columnOffset));
 	}
+
+	private static class ColumnCompositeUpdateDCTask implements Callable<Object> {
+
+		private final ColumnEncoderComposite _encoder;
+
+		protected ColumnCompositeUpdateDCTask(ColumnEncoderComposite encoder) {
+			_encoder = encoder;
+		}
+
+		@Override
+		public Void call() throws Exception {
+			_encoder.updateAllDCEncoders();
+			return null;
+		}
+
+		@Override
+		public String toString() {
+			return getClass().getSimpleName() + "<ColId: " + _encoder._colID + ">";
+		}
+
+	}
+
 }
diff --git a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderDummycode.java b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderDummycode.java
index 708fead..25b2eb9 100644
--- a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderDummycode.java
+++ b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderDummycode.java
@@ -24,14 +24,17 @@ import static org.apache.sysds.runtime.util.UtilFunctions.getEndIndex;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.Callable;
-import java.util.concurrent.Future;
 
 import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.data.SparseRowVector;
 import org.apache.sysds.runtime.matrix.data.FrameBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.util.DependencyTask;
+import org.apache.sysds.runtime.util.DependencyThreadPool;
 
 public class ColumnEncoderDummycode extends ColumnEncoder {
 	private static final long serialVersionUID = 5832130477659116489L;
@@ -57,17 +60,11 @@ public class ColumnEncoderDummycode extends ColumnEncoder {
 	}
 
 	@Override
-	public List<Callable<Object>> getPartialBuildTasks(FrameBlock in, int blockSize) {
-		// do nothing
+	public List<DependencyTask<?>> getBuildTasks(FrameBlock in, int blockSize) {
 		return null;
 	}
 
 	@Override
-	public void mergeBuildPartial(List<Future<Object>> futurePartials, int start, int end) {
-
-	}
-
-	@Override
 	public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol) {
 		return apply(in, out, outputCol, 0, -1);
 	}
@@ -99,6 +96,16 @@ public class ColumnEncoderDummycode extends ColumnEncoder {
 	}
 
 	@Override
+	public List<DependencyTask<?>> getApplyTasks(MatrixBlock in, MatrixBlock out, int outputCol) {
+		List<Callable<Object>> tasks = new ArrayList<>();
+		if(out.isInSparseFormat())
+			tasks.add(new DummycodeSparseApplyTask(this, in, out, outputCol));
+		else
+			return super.getApplyTasks(in, out, outputCol);
+		return DependencyThreadPool.createDependencyTasks(tasks, null);
+	}
+
+	@Override
 	public void mergeAt(ColumnEncoder other) {
 		if(other instanceof ColumnEncoderDummycode) {
 			assert other._colID == _colID;
@@ -180,4 +187,57 @@ public class ColumnEncoderDummycode extends ColumnEncoder {
 	public int getDomainSize() {
 		return _domainSize;
 	}
+
+	private static class DummycodeSparseApplyTask implements Callable<Object> {
+		private final ColumnEncoderDummycode _encoder;
+		private final MatrixBlock _input;
+		private final MatrixBlock _out;
+		private final int _outputCol;
+
+		private DummycodeSparseApplyTask(ColumnEncoderDummycode encoder, MatrixBlock input, MatrixBlock out,
+			int outputCol) {
+			_encoder = encoder;
+			_input = input;
+			_out = out;
+			_outputCol = outputCol;
+		}
+
+		public Object call() throws Exception {
+			for(int r = 0; r < _input.getNumRows(); r++) {
+				if(_out.getSparseBlock() == null)
+					return null;
+				synchronized(_out.getSparseBlock().get(r)) {
+					// Since the recoded values are already offset in the output matrix (same as input at this point)
+					// the dummycoding only needs to offset them within their column domain. Which means that the
+					// indexes in the SparseRowVector do not need to be sorted anymore and can be updated directly.
+					//
+					// Input: Output:
+					//
+					// 1 | 0 | 2 | 0 1 | 0 | 0 | 1
+					// 2 | 0 | 1 | 0 ===> 0 | 1 | 1 | 0
+					// 1 | 0 | 2 | 0 1 | 0 | 0 | 1
+					// 1 | 0 | 1 | 0 1 | 0 | 1 | 0
+					//
+					// Example SparseRowVector Internals (1. row):
+					//
+					// indexes = [0,2] ===> indexes = [0,3]
+					// values = [1,2] values = [1,1]
+					int index = ((SparseRowVector) _out.getSparseBlock().get(r)).getIndex(_outputCol);
+					double val = _out.getSparseBlock().get(r).values()[index];
+					int nCol = _outputCol + (int) val - 1;
+
+					_out.getSparseBlock().get(r).indexes()[index] = nCol;
+					_out.getSparseBlock().get(r).values()[index] = 1;
+				}
+			}
+			return null;
+		}
+
+		@Override
+		public String toString() {
+			return getClass().getSimpleName() + "<ColId: " + _encoder._colID + ">";
+		}
+
+	}
+
 }
diff --git a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderFeatureHash.java b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderFeatureHash.java
index 84d09b4..1c74ae5 100644
--- a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderFeatureHash.java
+++ b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderFeatureHash.java
@@ -25,12 +25,11 @@ import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Future;
 
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.matrix.data.FrameBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.util.DependencyTask;
 import org.apache.sysds.runtime.util.UtilFunctions;
 
 /**
@@ -66,17 +65,11 @@ public class ColumnEncoderFeatureHash extends ColumnEncoder {
 	}
 
 	@Override
-	public List<Callable<Object>> getPartialBuildTasks(FrameBlock in, int blockSize) {
-		// do nothing
+	public List<DependencyTask<?>> getBuildTasks(FrameBlock in, int blockSize) {
 		return null;
 	}
 
 	@Override
-	public void mergeBuildPartial(List<Future<Object>> futurePartials, int start, int end) {
-
-	}
-
-	@Override
 	public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol) {
 		return apply(in, out, outputCol, 0, -1);
 	}
@@ -102,7 +95,7 @@ public class ColumnEncoderFeatureHash extends ColumnEncoder {
 
 	@Override
 	public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int outputCol, int rowStart, int blk) {
-		int end = (blk <= 0) ? in.getNumRows() : in.getNumRows() < rowStart + blk ? in.getNumRows() : rowStart + blk;
+		int end = getEndIndex(in.getNumRows(), rowStart, blk);
 		// apply feature hashing column wise
 		for(int i = rowStart; i < end; i++) {
 			Object okey = in.quickGetValueThreadSafe(i, _colID - 1);
diff --git a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderPassThrough.java b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderPassThrough.java
index 7e4a02f..7a8df24 100644
--- a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderPassThrough.java
+++ b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderPassThrough.java
@@ -22,12 +22,11 @@ package org.apache.sysds.runtime.transform.encode;
 import static org.apache.sysds.runtime.util.UtilFunctions.getEndIndex;
 
 import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Future;
 
 import org.apache.sysds.common.Types.ValueType;
 import org.apache.sysds.runtime.matrix.data.FrameBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.util.DependencyTask;
 import org.apache.sysds.runtime.util.UtilFunctions;
 
 public class ColumnEncoderPassThrough extends ColumnEncoder {
@@ -47,17 +46,11 @@ public class ColumnEncoderPassThrough extends ColumnEncoder {
 	}
 
 	@Override
-	public List<Callable<Object>> getPartialBuildTasks(FrameBlock in, int blockSize) {
-		// do nothing
+	public List<DependencyTask<?>> getBuildTasks(FrameBlock in, int blockSize) {
 		return null;
 	}
 
 	@Override
-	public void mergeBuildPartial(List<Future<Object>> futurePartials, int start, int end) {
-
-	}
-
-	@Override
 	public MatrixBlock apply(FrameBlock in, MatrixBlock out, int outputCol) {
 		return apply(in, out, outputCol, 0, -1);
 	}
@@ -74,8 +67,8 @@ public class ColumnEncoderPassThrough extends ColumnEncoder {
 		for(int i = rowStart; i < getEndIndex(in.getNumRows(), rowStart, blk); i++) {
 			Object val = in.get(i, col);
 			double v = (val == null ||
-				(vt == ValueType.STRING && val.toString().isEmpty())) 
-					? Double.NaN : UtilFunctions.objectToDouble(vt, val);
+				(vt == ValueType.STRING && val.toString().isEmpty())) ? Double.NaN : UtilFunctions.objectToDouble(vt,
+					val);
 			out.quickSetValueThreadSafe(i, outputCol, v);
 		}
 		return out;
@@ -84,8 +77,10 @@ public class ColumnEncoderPassThrough extends ColumnEncoder {
 	@Override
 	public MatrixBlock apply(MatrixBlock in, MatrixBlock out, int outputCol, int rowStart, int blk) {
 		// only transfer from in to out
-		int end = (blk <= 0) ? in.getNumRows() : in.getNumRows() < rowStart + blk ? in.getNumRows() : rowStart + blk;
+		if(in == out)
+			return out;
 		int col = _colID - 1; // 1-based
+		int end = getEndIndex(in.getNumRows(), rowStart, blk);
 		for(int i = rowStart; i < end; i++) {
 			double val = in.quickGetValueThreadSafe(i, col);
 			out.quickSetValueThreadSafe(i, outputCol, val);
diff --git a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderRecode.java b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderRecode.java
index d15db5b..fd18d86 100644
--- a/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderRecode.java
+++ b/src/main/java/org/apache/sysds/runtime/transform/encode/ColumnEncoderRecode.java
@@ -24,18 +24,14 @@ import static org.apache.sysds.runtime.util.UtilFunctions.getEndIndex;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 
 import org.apache.sysds.lops.Lop;
 import org.apache.sysds.runtime.DMLRuntimeException;
@@ -143,30 +139,19 @@ public class ColumnEncoderRecode extends ColumnEncoder {
 	}
 
 	@Override
-	public List<Callable<Object>> getPartialBuildTasks(FrameBlock in, int blockSize) {
-		List<Callable<Object>> tasks = new ArrayList<>();
-		for(int i = 0; i < in.getNumRows(); i = i + blockSize)
-			tasks.add(new RecodePartialBuildTask(in, _colID, i, blockSize));
-		if(in.getNumRows() % blockSize != 0)
-			tasks.add(new RecodePartialBuildTask(in, _colID, in.getNumRows() - in.getNumRows() % blockSize, -1));
-		return tasks;
+	public Callable<Object> getBuildTask(FrameBlock in) {
+		return new ColumnRecodeBuildTask(this, in);
 	}
 
 	@Override
-	public void mergeBuildPartial(List<Future<Object>> futurePartials, int start, int end)
-		throws ExecutionException, InterruptedException {
-		for(int i = start; i < end; i++) {
-			Object partial = futurePartials.get(i).get();
-			if(!(partial instanceof HashMap)) {
-				throw new DMLRuntimeException(
-					"Tried to merge " + partial.getClass() + " object into RecodeEncoder. " + "HashMap was expected.");
-			}
-			HashMap<?, ?> partialMap = (HashMap<?, ?>) partial;
-			partialMap.forEach((k, v) -> {
-				if(!_rcdMap.containsKey((String) k))
-					putCode(_rcdMap, (String) k);
-			});
-		}
+	public Callable<Object> getPartialBuildTask(FrameBlock in, int startRow, int blockSize,
+		HashMap<Integer, Object> ret) {
+		return new RecodePartialBuildTask(in, _colID, startRow, blockSize, ret);
+	}
+
+	@Override
+	public Callable<Object> getPartialMergeBuildTask(HashMap<Integer, ?> ret) {
+		return new RecodeMergePartialBuildTask(this, ret);
 	}
 
 	/**
@@ -334,21 +319,84 @@ public class ColumnEncoderRecode extends ColumnEncoder {
 		private final int _blockSize;
 		private final int _startRow;
 		private final int _colID;
+		private final HashMap<Integer, Object> _partialMaps;
 
-		// if a pool is passed the task may be split up into multiple smaller tasks.
-		protected RecodePartialBuildTask(FrameBlock input, int colID, int startRow, int blocksize) {
+		protected RecodePartialBuildTask(FrameBlock input, int colID, int startRow, int blocksize,
+			HashMap<Integer, Object> partialMaps) {
 			_input = input;
 			_blockSize = blocksize;
 			_colID = colID;
 			_startRow = startRow;
+			_partialMaps = partialMaps;
 		}
 
 		@Override
 		public HashMap<String, Long> call() throws Exception {
 			HashMap<String, Long> partialMap = new HashMap<>();
 			makeRcdMap(_input, partialMap, _colID, _startRow, _blockSize);
-			return partialMap;
+			synchronized(_partialMaps) {
+				_partialMaps.put(_startRow, partialMap);
+			}
+			return null;
+		}
+
+		@Override
+		public String toString() {
+			return getClass().getSimpleName() + "<Start row: " + _startRow + "; Block size: " + _blockSize + ">";
+		}
+
+	}
+
+	private static class RecodeMergePartialBuildTask implements Callable<Object> {
+		private final HashMap<Integer, ?> _partialMaps;
+		private final ColumnEncoderRecode _encoder;
+
+		private RecodeMergePartialBuildTask(ColumnEncoderRecode encoderRecode, HashMap<Integer, ?> partialMaps) {
+			_partialMaps = partialMaps;
+			_encoder = encoderRecode;
+		}
+
+		@Override
+		public Object call() throws Exception {
+			HashMap<String, Long> rcdMap = _encoder.getRcdMap();
+			_partialMaps.forEach((start_row, map) -> {
+				((HashMap<?, ?>) map).forEach((k, v) -> {
+					if(!rcdMap.containsKey((String) k))
+						putCode(rcdMap, (String) k);
+				});
+			});
+			_encoder._rcdMap = rcdMap;
+			return null;
 		}
+
+		@Override
+		public String toString() {
+			return getClass().getSimpleName() + "<ColId: " + _encoder._colID + ">";
+		}
+
+	}
+
+	private static class ColumnRecodeBuildTask implements Callable<Object> {
+
+		private final ColumnEncoderRecode _encoder;
+		private final FrameBlock _input;
+
+		protected ColumnRecodeBuildTask(ColumnEncoderRecode encoder, FrameBlock input) {
+			_encoder = encoder;
+			_input = input;
+		}
+
+		@Override
+		public Void call() throws Exception {
+			_encoder.build(_input);
+			return null;
+		}
+
+		@Override
+		public String toString() {
+			return getClass().getSimpleName() + "<ColId: " + _encoder._colID + ">";
+		}
+
 	}
 
 }
diff --git a/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java b/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
index aff078b..6db63f5 100644
--- a/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
+++ b/src/main/java/org/apache/sysds/runtime/transform/encode/MultiColumnEncoder.java
@@ -23,12 +23,13 @@ import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -41,27 +42,31 @@ import org.apache.sysds.runtime.data.SparseBlock;
 import org.apache.sysds.runtime.data.SparseBlockMCSR;
 import org.apache.sysds.runtime.matrix.data.FrameBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
-import org.apache.sysds.runtime.util.CommonThreadPool;
+import org.apache.sysds.runtime.util.DependencyTask;
+import org.apache.sysds.runtime.util.DependencyThreadPool;
+import org.apache.sysds.runtime.util.DependencyWrapperTask;
 import org.apache.sysds.runtime.util.IndexRange;
 
 public class MultiColumnEncoder implements Encoder {
 
 	protected static final Log LOG = LogFactory.getLog(MultiColumnEncoder.class.getName());
 	private static final boolean MULTI_THREADED = true;
+	public static boolean MULTI_THREADED_STAGES = true;
+
 	private List<ColumnEncoderComposite> _columnEncoders;
-	// These encoders are deprecated and will be fazed out soon.
+	// These encoders are deprecated and will be phased out soon.
 	private EncoderMVImpute _legacyMVImpute = null;
 	private EncoderOmit _legacyOmit = null;
 	private int _colOffset = 0; // offset for federated Workers who are using subrange encoders
 	private FrameBlock _meta = null;
 
 	// TEMP CONSTANTS for testing only
-	private int APPLY_BLOCKSIZE = 0; // temp only for testing until automatic calculation of block size
+	//private int APPLY_BLOCKSIZE = 0; // temp only for testing until automatic calculation of block size
 	public static int BUILD_BLOCKSIZE = 0;
 
-	public void setApplyBlockSize(int blk) {
+	/*public void setApplyBlockSize(int blk) {
 		APPLY_BLOCKSIZE = blk;
-	}
+	}*/
 
 	public void setBuildBlockSize(int blk) {
 		BUILD_BLOCKSIZE = blk;
@@ -82,16 +87,33 @@ public class MultiColumnEncoder implements Encoder {
 	public MatrixBlock encode(FrameBlock in, int k) {
 		MatrixBlock out;
 		try {
-			build(in, k);
-			if(_legacyMVImpute != null){
-				// These operations are redundant for every encoder excluding the legacyMVImpute, the workaround to fix
-				// it for this encoder would be very dirty. This will only have a performance impact if there is a lot of
-				// recoding in combination with the legacyMVImpute. But since it is legacy this should be fine
-				_meta = getMetaData(new FrameBlock(in.getNumColumns(), Types.ValueType.STRING));
-				initMetaData(_meta);
+			if(MULTI_THREADED && k > 1 && !MULTI_THREADED_STAGES && !hasLegacyEncoder()) {
+				out = new MatrixBlock();
+				DependencyThreadPool pool = new DependencyThreadPool(k);
+				try {
+					pool.submitAllAndWait(getEncodeTasks(in, out, pool));
+				}
+				catch(ExecutionException | InterruptedException e) {
+					LOG.error("MT Column encode failed");
+					e.printStackTrace();
+				}
+				pool.shutdown();
+				out.recomputeNonZeros();
+				return out;
+			}
+			else {
+				build(in, k);
+				if(_legacyMVImpute != null) {
+					// These operations are redundant for every encoder excluding the legacyMVImpute, the workaround to
+					// fix it for this encoder would be very dirty. This will only have a performance impact if there
+					// is a lot of recoding in combination with the legacyMVImpute.
+					// But since it is legacy this should be fine
+					_meta = getMetaData(new FrameBlock(in.getNumColumns(), Types.ValueType.STRING));
+					initMetaData(_meta);
+				}
+				// apply meta data
+				out = apply(in, k);
 			}
-			// apply meta data
-			out = apply(in, k);
 		}
 		catch(Exception ex) {
 			LOG.error("Failed transform-encode frame with \n" + this);
@@ -100,6 +122,59 @@ public class MultiColumnEncoder implements Encoder {
 		return out;
 	}
 
+	private List<DependencyTask<?>> getEncodeTasks(FrameBlock in, MatrixBlock out, DependencyThreadPool pool) {
+		List<DependencyTask<?>> tasks = new ArrayList<>();
+		List<DependencyTask<?>> applyTAgg = null;
+		Map<Integer[], Integer[]> depMap = new HashMap<>();
+		boolean hasDC = getColumnEncoders(ColumnEncoderDummycode.class).size() > 0;
+		tasks.add(DependencyThreadPool.createDependencyTask(new InitOutputMatrixTask(this, in, out)));
+		for(ColumnEncoderComposite e : _columnEncoders) {
+			List<DependencyTask<?>> buildTasks = e.getBuildTasks(in, BUILD_BLOCKSIZE);
+
+			tasks.addAll(buildTasks);
+			if(buildTasks.size() > 0) {
+				// Apply Task dependency to build completion task
+				depMap.put(new Integer[] {tasks.size(), tasks.size() + 1},
+					new Integer[] {tasks.size() - 1, tasks.size()});
+			}
+
+			// Apply Task dependency to InitOutputMatrixTask
+			depMap.put(new Integer[] {tasks.size(), tasks.size() + 1}, new Integer[] {0, 1});
+			ApplyTasksWrapperTask applyTaskWrapper = new ApplyTasksWrapperTask(e, in, out, pool);
+
+			if(e.hasEncoder(ColumnEncoderDummycode.class)) {
+				// InitMatrix dependency to build of recode if a DC is present
+				// Since they are the only ones that change the domain size which would influence the Matrix creation
+				depMap.put(new Integer[] {0, 1}, // InitMatrix Task first in list
+					new Integer[] {tasks.size() - 1, tasks.size()});
+				// output col update task dependent on Build completion only for Recode and binning since they can
+				// change dummycode domain size
+				// colUpdateTask can start when all domain sizes, because it can now calculate the offsets for
+				// each column
+				depMap.put(new Integer[] {-2, -1}, new Integer[] {tasks.size() - 1, tasks.size()});
+			}
+
+			if(hasDC) {
+				// Apply Task dependency to output col update task (is last in list)
+				// All ApplyTasks need to wait for this task so they all have the correct offsets.
+				depMap.put(new Integer[] {tasks.size(), tasks.size() + 1}, new Integer[] {-2, -1});
+
+				applyTAgg = applyTAgg == null ? new ArrayList<>() : applyTAgg;
+				applyTAgg.add(applyTaskWrapper);
+			}
+			else {
+				applyTaskWrapper.setOffset(0);
+			}
+			tasks.add(applyTaskWrapper);
+		}
+		if(hasDC)
+			tasks.add(DependencyThreadPool.createDependencyTask(new UpdateOutputColTask(this, applyTAgg)));
+
+		List<List<? extends Callable<?>>> deps = new ArrayList<>(Collections.nCopies(tasks.size(), null));
+		DependencyThreadPool.createDependencyList(tasks, depMap, deps);
+		return DependencyThreadPool.createDependencyTasks(tasks, deps);
+	}
+
 	public void build(FrameBlock in) {
 		build(in, 1);
 	}
@@ -109,7 +184,7 @@ public class MultiColumnEncoder implements Encoder {
 			buildMT(in, k);
 		}
 		else {
-			for(ColumnEncoderComposite columnEncoder : _columnEncoders){
+			for(ColumnEncoderComposite columnEncoder : _columnEncoders) {
 				columnEncoder.build(in);
 				columnEncoder.updateAllDCEncoders();
 			}
@@ -117,46 +192,24 @@ public class MultiColumnEncoder implements Encoder {
 		legacyBuild(in);
 	}
 
+	private List<DependencyTask<?>> getBuildTasks(FrameBlock in) {
+		List<DependencyTask<?>> tasks = new ArrayList<>();
+		for(ColumnEncoderComposite columnEncoder : _columnEncoders) {
+			tasks.addAll(columnEncoder.getBuildTasks(in, BUILD_BLOCKSIZE));
+		}
+		return tasks;
+	}
+
 	private void buildMT(FrameBlock in, int k) {
-		int blockSize = BUILD_BLOCKSIZE <= 0 ? in.getNumRows() : BUILD_BLOCKSIZE;
-		List<Callable<Integer>> tasks = new ArrayList<>();
-		ExecutorService pool = CommonThreadPool.get(k);
+		DependencyThreadPool pool = new DependencyThreadPool(k);
 		try {
-			if(blockSize != in.getNumRows()) {
-				// Partial builds and merges
-				// Most of the time not worth it for RC with the current implementation, GC overhead is to large.
-				// Depending on unique values and rows more testing need to be done
-				List<List<Future<Object>>> partials = new ArrayList<>();
-				for(ColumnEncoderComposite encoder : _columnEncoders) {
-					List<Callable<Object>> partialBuildTasks = encoder.getPartialBuildTasks(in, blockSize);
-					if(partialBuildTasks == null) {
-						partials.add(null);
-						continue;
-					}
-					partials.add(partialBuildTasks.stream().map(pool::submit).collect(Collectors.toList()));
-				}
-				for(int e = 0; e < _columnEncoders.size(); e++) {
-					List<Future<Object>> partial = partials.get(e);
-					if(partial == null)
-						continue;
-					tasks.add(new ColumnMergeBuildPartialTask(_columnEncoders.get(e), partial));
-				}
-			}
-			else {
-				// building every column in one thread
-				for(ColumnEncoderComposite e : _columnEncoders) {
-					tasks.add(new ColumnBuildTask(e, in));
-				}
-			}
-			List<Future<Integer>> rtasks = pool.invokeAll(tasks);
-			pool.shutdown();
-			for(Future<Integer> t : rtasks)
-				t.get();
+			pool.submitAllAndWait(getBuildTasks(in));
 		}
-		catch(InterruptedException | ExecutionException e) {
-			LOG.error("MT Column encode failed");
+		catch(ExecutionException | InterruptedException e) {
+			LOG.error("MT Column build failed");
 			e.printStackTrace();
 		}
+		pool.shutdown();
 	}
 
 	public void legacyBuild(FrameBlock in) {
@@ -189,18 +242,7 @@ public class MultiColumnEncoder implements Encoder {
 			throw new DMLRuntimeException("Not every column in has a CompositeEncoder. Please make sure every column "
 				+ "has a encoder or slice the input accordingly");
 		// Block allocation for MT access
-		out.allocateBlock();
-		if(out.isInSparseFormat()) {
-			SparseBlock block = out.getSparseBlock();
-			if(!(block instanceof SparseBlockMCSR))
-				throw new RuntimeException(
-					"Transform apply currently only supported for MCSR sparse and dense output Matrices");
-			for(int r = 0; r < out.getNumRows(); r++) {
-				// allocate all sparse rows so MT sync can be done.
-				// should be rare that rows have only 0
-				block.allocate(r, in.getNumColumns());
-			}
-		}
+		outputMatrixPreProcessing(out, in);
 		// TODO smart checks
 		if(MULTI_THREADED && k > 1) {
 			applyMT(in, out, outputCol, k);
@@ -224,31 +266,42 @@ public class MultiColumnEncoder implements Encoder {
 		return out;
 	}
 
+	private List<DependencyTask<?>> getApplyTasks(FrameBlock in, MatrixBlock out, int outputCol) {
+		List<DependencyTask<?>> tasks = new ArrayList<>();
+		int offset = outputCol;
+		for(ColumnEncoderComposite e : _columnEncoders) {
+			tasks.addAll(e.getApplyTasks(in, out, e._colID - 1 + offset));
+			if(e.hasEncoder(ColumnEncoderDummycode.class))
+				offset += e.getEncoder(ColumnEncoderDummycode.class)._domainSize - 1;
+		}
+		return tasks;
+	}
+
 	private void applyMT(FrameBlock in, MatrixBlock out, int outputCol, int k) {
+		DependencyThreadPool pool = new DependencyThreadPool(k);
 		try {
-			ExecutorService pool = CommonThreadPool.get(k);
-			ArrayList<ColumnApplyTask> tasks = new ArrayList<>();
-			int offset = outputCol;
-			// TODO calculate smart blocksize
-			int blockSize = APPLY_BLOCKSIZE <= 0 ? in.getNumRows() : APPLY_BLOCKSIZE;
-			for(ColumnEncoderComposite e : _columnEncoders) {
-				for(int i = 0; i < in.getNumRows(); i = i + blockSize)
-					tasks.add(new ColumnApplyTask(e, in, out, e._colID - 1 + offset, i, blockSize));
-				if(in.getNumRows() % blockSize != 0)
-					tasks.add(new ColumnApplyTask(e, in, out, e._colID - 1 + offset,
-						in.getNumRows() - in.getNumRows() % blockSize, -1));
-				if(e.hasEncoder(ColumnEncoderDummycode.class))
-					offset += e.getEncoder(ColumnEncoderDummycode.class)._domainSize - 1;
-			}
-			List<Future<Integer>> rtasks = pool.invokeAll(tasks);
-			pool.shutdown();
-			for(Future<Integer> t : rtasks)
-				t.get();
+			pool.submitAllAndWait(getApplyTasks(in, out, outputCol));
 		}
-		catch(InterruptedException | ExecutionException e) {
+		catch(ExecutionException | InterruptedException e) {
 			LOG.error("MT Column encode failed");
 			e.printStackTrace();
 		}
+		pool.shutdown();
+	}
+
+	private static void outputMatrixPreProcessing(MatrixBlock output, FrameBlock input) {
+		output.allocateBlock();
+		if(output.isInSparseFormat()) {
+			SparseBlock block = output.getSparseBlock();
+			if(!(block instanceof SparseBlockMCSR))
+				throw new RuntimeException(
+					"Transform apply currently only supported for MCSR sparse and dense output Matrices");
+			for(int r = 0; r < output.getNumRows(); r++) {
+				// allocate all sparse rows so MT sync can be done.
+				// should be rare that rows have only 0
+				block.allocate(r, input.getNumColumns());
+			}
+		}
 	}
 
 	@Override
@@ -438,6 +491,9 @@ public class MultiColumnEncoder implements Encoder {
 		if(dc.isEmpty()) {
 			return 0;
 		}
+		if(dc.stream().anyMatch(e -> e.getDomainSize() < 0)) {
+			throw new DMLRuntimeException("Trying to get extra columns when DC encoders are not ready");
+		}
 		return dc.stream().map(ColumnEncoderDummycode::getDomainSize).mapToInt(i -> i).sum() - dc.size();
 	}
 
@@ -570,6 +626,10 @@ public class MultiColumnEncoder implements Encoder {
 		}
 	}
 
+	public <T extends LegacyEncoder> boolean hasLegacyEncoder() {
+		return hasLegacyEncoder(EncoderMVImpute.class) || hasLegacyEncoder(EncoderOmit.class);
+	}
+
 	public <T extends LegacyEncoder> boolean hasLegacyEncoder(Class<T> type) {
 		if(type.equals(EncoderMVImpute.class))
 			return _legacyMVImpute != null;
@@ -599,72 +659,151 @@ public class MultiColumnEncoder implements Encoder {
 			_legacyMVImpute.shiftCols(_colOffset);
 	}
 
-	private static class ColumnApplyTask implements Callable<Integer> {
+	/*
+	 * Currently not in use will be integrated in the future
+	 */
+	@SuppressWarnings("unused")
+	private static class MultiColumnLegacyBuildTask implements Callable<Object> {
 
-		private final ColumnEncoder _encoder;
+		private final MultiColumnEncoder _encoder;
 		private final FrameBlock _input;
-		private final MatrixBlock _out;
-		private final int _columnOut;
-		private int _rowStart = 0;
-		private int _blk = -1;
 
-		protected ColumnApplyTask(ColumnEncoder encoder, FrameBlock input, MatrixBlock out, int columnOut) {
+		protected MultiColumnLegacyBuildTask(MultiColumnEncoder encoder, FrameBlock input) {
 			_encoder = encoder;
 			_input = input;
-			_out = out;
-			_columnOut = columnOut;
 		}
 
-		protected ColumnApplyTask(ColumnEncoder encoder, FrameBlock input, MatrixBlock out, int columnOut, int rowStart, int blk) {
-			this(encoder, input, out, columnOut);
-			_rowStart = rowStart;
-			_blk = blk;
+		@Override
+		public Void call() throws Exception {
+			_encoder.legacyBuild(_input);
+			return null;
+		}
+	}
+
+	@SuppressWarnings("unused")
+	private static class MultiColumnLegacyMVImputeMetaPrepareTask implements Callable<Object> {
+
+		private final MultiColumnEncoder _encoder;
+		private final FrameBlock _input;
+
+		protected MultiColumnLegacyMVImputeMetaPrepareTask(MultiColumnEncoder encoder, FrameBlock input) {
+			_encoder = encoder;
+			_input = input;
 		}
 
 		@Override
-		public Integer call() throws Exception {
-			_encoder.apply(_input, _out, _columnOut, _rowStart, _blk);
-			// TODO return NNZ
-			return 1;
+		public Void call() throws Exception {
+			_encoder._meta = _encoder.getMetaData(new FrameBlock(_input.getNumColumns(), Types.ValueType.STRING));
+			_encoder.initMetaData(_encoder._meta);
+			return null;
 		}
 	}
 
-	private static class ColumnBuildTask implements Callable<Integer> {
-
-		private final ColumnEncoder _encoder;
+	private static class InitOutputMatrixTask implements Callable<Object> {
+		private final MultiColumnEncoder _encoder;
 		private final FrameBlock _input;
+		private final MatrixBlock _output;
 
-		// if a pool is passed the task may be split up into multiple smaller tasks.
-		protected ColumnBuildTask(ColumnEncoder encoder, FrameBlock input) {
+		private InitOutputMatrixTask(MultiColumnEncoder encoder, FrameBlock input, MatrixBlock output) {
 			_encoder = encoder;
 			_input = input;
+			_output = output;
+		}
+
+		@Override
+		public Object call() throws Exception {
+			int numCols = _input.getNumColumns() + _encoder.getNumExtraCols();
+			long estNNz = (long) _input.getNumColumns() * (long) _input.getNumRows();
+			boolean sparse = MatrixBlock.evalSparseFormatInMemory(_input.getNumRows(), numCols, estNNz);
+			_output.reset(_input.getNumRows(), numCols, sparse, estNNz);
+			outputMatrixPreProcessing(_output, _input);
+			return null;
 		}
 
 		@Override
-		public Integer call() throws Exception {
-			_encoder.build(_input);
-			if(_encoder instanceof ColumnEncoderComposite)
-				((ColumnEncoderComposite) _encoder).updateAllDCEncoders();
-			return 1;
+		public String toString() {
+			return getClass().getSimpleName();
 		}
+
 	}
 
-	private static class ColumnMergeBuildPartialTask implements Callable<Integer> {
+	private static class ApplyTasksWrapperTask extends DependencyWrapperTask<Object> {
+		private final ColumnEncoder _encoder;
+		private final MatrixBlock _out;
+		private final FrameBlock _in;
+		private int _offset = -1; // offset dude to dummycoding in
+									// previous columns needs to be updated by external task!
+
+		private ApplyTasksWrapperTask(ColumnEncoder encoder, FrameBlock in, MatrixBlock out,
+			DependencyThreadPool pool) {
+			super(pool);
+			_encoder = encoder;
+			_out = out;
+			_in = in;
+		}
 
-		private final ColumnEncoderComposite _encoder;
-		private final List<Future<Object>> _partials;
+		@Override
+		public List<DependencyTask<?>> getWrappedTasks() {
+			return _encoder.getApplyTasks(_in, _out, _encoder._colID - 1 + _offset);
+		}
 
-		// if a pool is passed the task may be split up into multiple smaller tasks.
-		protected ColumnMergeBuildPartialTask(ColumnEncoderComposite encoder, List<Future<Object>> partials) {
+		@Override
+		public Object call() throws Exception {
+			// Is called only when building of encoder is done, Output Matrix is allocated
+			// and _outputCol has been updated!
+			if(_offset == -1)
+				throw new DMLRuntimeException(
+					"OutputCol for apply task wrapper has not been updated!, Most likely some " + "concurrency issues");
+			return super.call();
+		}
+
+		public void setOffset(int offset) {
+			_offset = offset;
+		}
+
+		@Override
+		public String toString() {
+			return getClass().getSimpleName() + "<ColId: " + _encoder._colID + ">";
+		}
+	}
+
+	/*
+	 * Task responsible for updating the output column of the apply tasks after the building of the DC recoders. So the
+	 * offsets in the output are correct.
+	 */
+	private static class UpdateOutputColTask implements Callable<Object> {
+		private final MultiColumnEncoder _encoder;
+		private final List<DependencyTask<?>> _applyTasksWrappers;
+
+		private UpdateOutputColTask(MultiColumnEncoder encoder, List<DependencyTask<?>> applyTasksWrappers) {
 			_encoder = encoder;
-			_partials = partials;
+			_applyTasksWrappers = applyTasksWrappers;
 		}
 
 		@Override
-		public Integer call() throws Exception {
-			_encoder.mergeBuildPartial(_partials, 0, _partials.size());
-			_encoder.updateAllDCEncoders();
-			return 1;
+		public String toString() {
+			return getClass().getSimpleName();
+		}
+
+		@Override
+		public Object call() throws Exception {
+			int currentCol = -1;
+			int currentOffset = 0;
+			for(DependencyTask<?> dtask : _applyTasksWrappers) {
+				int nonOffsetCol = ((ApplyTasksWrapperTask) dtask)._encoder._colID - 1;
+				if(nonOffsetCol > currentCol) {
+					currentCol = nonOffsetCol;
+					currentOffset = _encoder._columnEncoders.subList(0, nonOffsetCol).stream().mapToInt(e -> {
+						ColumnEncoderDummycode dc = e.getEncoder(ColumnEncoderDummycode.class);
+						if(dc == null)
+							return 0;
+						return dc._domainSize - 1;
+					}).sum();
+				}
+				((ApplyTasksWrapperTask) dtask).setOffset(currentOffset);
+
+			}
+			return null;
 		}
 	}
 
diff --git a/src/main/java/org/apache/sysds/runtime/util/DependencyTask.java b/src/main/java/org/apache/sysds/runtime/util/DependencyTask.java
new file mode 100644
index 0000000..5aff39b
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/util/DependencyTask.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.util;
+
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.sysds.runtime.DMLRuntimeException;
+
+public class DependencyTask<E> implements Callable<E> {
+	public static final boolean ENABLE_DEBUG_DATA = false;
+
+	private final Callable<E> _task;
+	protected final List<DependencyTask<?>> _dependantTasks;
+	public List<DependencyTask<?>> _dependencyTasks = null; // only for debugging
+	private CompletableFuture<Future<?>> _future;
+	private int _rdy = 0;
+	private ExecutorService _pool;
+
+	public DependencyTask(Callable<E> task, List<DependencyTask<?>> dependantTasks) {
+		_dependantTasks = dependantTasks;
+		_task = task;
+	}
+
+	public void addPool(ExecutorService pool) {
+		_pool = pool;
+	}
+
+	public void assignFuture(CompletableFuture<Future<?>> f) {
+		_future = f;
+	}
+
+	public boolean isReady() {
+		return _rdy == 0;
+	}
+
+	private boolean decrease() {
+		synchronized(this) {
+			_rdy -= 1;
+			return isReady();
+		}
+	}
+
+	public void addDependent(DependencyTask<?> dependencyTask) {
+		_dependantTasks.add(dependencyTask);
+		dependencyTask._rdy += 1;
+	}
+
+	@Override
+	public E call() throws Exception {
+		E ret = _task.call();
+		_dependantTasks.forEach(t -> {
+			if(t.decrease()) {
+				if(_pool == null)
+					throw new DMLRuntimeException("ExecutorService was not set for DependencyTask");
+				t._future.complete(_pool.submit(t));
+			}
+		});
+
+		return ret;
+	}
+}
diff --git a/src/main/java/org/apache/sysds/runtime/util/DependencyThreadPool.java b/src/main/java/org/apache/sysds/runtime/util/DependencyThreadPool.java
new file mode 100644
index 0000000..4fdd63a
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/util/DependencyThreadPool.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.util;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.sysds.runtime.DMLRuntimeException;
+
+
+public class DependencyThreadPool {
+
+	private final ExecutorService _pool;
+
+	public DependencyThreadPool(int k) {
+		_pool = CommonThreadPool.get(k);
+	}
+
+	public void shutdown() {
+		_pool.shutdown();
+	}
+
+	public List<Future<Future<?>>> submitAll(List<DependencyTask<?>> dtasks) {
+		List<Future<Future<?>>> futures = new ArrayList<>();
+		List<Integer> rdyTasks = new ArrayList<>();
+		int i = 0;
+		for(DependencyTask<?> t : dtasks) {
+			CompletableFuture<Future<?>> f = new CompletableFuture<>();
+			t.addPool(_pool);
+			if(!t.isReady()) {
+				t.assignFuture(f);
+			}
+			else {
+				// need to save rdy tasks before execution begins otherwise tasks may start 2 times
+				rdyTasks.add(i);
+			}
+			futures.add(f);
+			i++;
+		}
+		// Two stages to avoid race condition!
+		for(Integer index : rdyTasks) {
+			synchronized(_pool) {
+				((CompletableFuture<Future<?>>) futures.get(index)).complete(_pool.submit(dtasks.get(index)));
+			}
+
+		}
+		return futures;
+	}
+
+	public List<Future<Future<?>>> submitAll(List<? extends Callable<?>> tasks,
+		List<List<? extends Callable<?>>> dependencies) {
+		List<DependencyTask<?>> dtasks = createDependencyTasks(tasks, dependencies);
+		return submitAll(dtasks);
+	}
+
+	public List<Object> submitAllAndWait(List<DependencyTask<?>> dtasks)
+		throws ExecutionException, InterruptedException {
+		List<Object> res = new ArrayList<>();
+		// printDependencyGraph(dtasks);
+		List<Future<Future<?>>> futures = submitAll(dtasks);
+		int i = 0;
+		for(Future<Future<?>> ff : futures) {
+			if(dtasks.get(i) instanceof DependencyWrapperTask) {
+				for(Future<Future<?>> f : ((DependencyWrapperTask<?>) dtasks.get(i)).getWrappedTaskFuture()) {
+					res.add(f.get().get());
+				}
+			}
+			else {
+				res.add(ff.get().get());
+			}
+			i++;
+		}
+		return res;
+	}
+
+	public static DependencyTask<?> createDependencyTask(Callable<?> task) {
+		return new DependencyTask<>(task, new ArrayList<>());
+	}
+
+	/*
+	 * Creates the Dependency list from a map and the tasks. The map specifies which tasks should have a Dependency on
+	 * which other task. e.g.
+	 * ([0, 3], [4, 6])   means the first 3 tasks in the tasks list are dependent on tasks at index 4 and 5
+	 * ([-2, -1], [0, 5]) means the last task has a Dependency on the first 5 tasks.
+	 */
+	public static List<List<? extends Callable<?>>> createDependencyList(List<? extends Callable<?>> tasks,
+		Map<Integer[], Integer[]> depMap, List<List<? extends Callable<?>>> dep) {
+		if(depMap != null) {
+			depMap.forEach((ti, di) -> {
+				ti[0] = ti[0] < 0 ? dep.size() + ti[0] + 1 : ti[0];
+				ti[1] = ti[1] < 0 ? dep.size() + ti[1] + 1 : ti[1];
+				di[0] = di[0] < 0 ? tasks.size() + di[0] + 1 : di[0];
+				di[1] = di[1] < 0 ? tasks.size() + di[1] + 1 : di[1];
+				for(int r = ti[0]; r < ti[1]; r++) {
+					if(dep.get(r) == null)
+						dep.set(r, tasks.subList(di[0], di[1]));
+					else
+						dep.set(r, Stream.concat(dep.get(r).stream(), tasks.subList(di[0], di[1]).stream())
+							.collect(Collectors.toList()));
+				}
+			});
+		}
+		return dep;
+	}
+
+	public static List<DependencyTask<?>> createDependencyTasks(List<? extends Callable<?>> tasks,
+		List<List<? extends Callable<?>>> dependencies) {
+		if(dependencies != null && tasks.size() != dependencies.size())
+			throw new DMLRuntimeException(
+				"Could not create DependencyTasks since the input array sizes are where mismatched");
+		List<DependencyTask<?>> ret = new ArrayList<>();
+		Map<Callable<?>, DependencyTask<?>> map = new HashMap<>();
+		for(Callable<?> task : tasks) {
+			DependencyTask<?> dt;
+			if(task instanceof DependencyTask) {
+				dt = (DependencyTask<?>) task;
+			}
+			else {
+				dt = new DependencyTask<>(task, new ArrayList<>());
+			}
+			ret.add(dt);
+			map.put(task, dt);
+		}
+		if(dependencies == null)
+			return ret;
+
+		for(int i = 0; i < tasks.size(); i++) {
+			List<? extends Callable<?>> deps = dependencies.get(i);
+			if(deps == null)
+				continue;
+			DependencyTask<?> t = ret.get(i);
+			for(Callable<?> dep : deps) {
+				DependencyTask<?> dt = map.get(dep);
+				if(DependencyTask.ENABLE_DEBUG_DATA) {
+					t._dependencyTasks = t._dependencyTasks == null ? new ArrayList<>() : t._dependencyTasks;
+					t._dependencyTasks.add(dt);
+				}
+				if(dt != null)
+					dt.addDependent(t);
+			}
+		}
+		return ret;
+	}
+}
diff --git a/src/main/java/org/apache/sysds/runtime/util/DependencyWrapperTask.java b/src/main/java/org/apache/sysds/runtime/util/DependencyWrapperTask.java
new file mode 100644
index 0000000..12e274a
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/util/DependencyWrapperTask.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.util;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+/*
+* Abstract class for wrapping dependency tasks.
+* Subclasses need to implement the "getWrappedTasks" function which returns the tasks that should be run.
+* Tasks that are set to have a dependent on this task are going to have a dependency on all child tasks.
+ */
+public abstract class DependencyWrapperTask<E> extends DependencyTask<E> {
+
+	private final List<Future<Future<?>>> _wrappedTaskFutures = new ArrayList<>();
+	private final CompletableFuture<Void> _submitted = new CompletableFuture<>();
+	private final DependencyThreadPool _pool;
+
+	public DependencyWrapperTask(DependencyThreadPool pool) {
+		super(() -> null, new ArrayList<>());
+		_pool = pool;
+	}
+
+	public void addWrappedTaskFuture(Future<Future<?>> future) {
+		_wrappedTaskFutures.add(future);
+	}
+
+	public List<Future<Future<?>>> getWrappedTaskFuture() throws ExecutionException, InterruptedException {
+		_submitted.get();
+		return _wrappedTaskFutures;
+	}
+
+	public abstract List<DependencyTask<?>> getWrappedTasks();
+
+	@Override
+	public E call() throws Exception {
+		List<DependencyTask<?>> wrappedTasks = getWrappedTasks();
+		// passing the dependency to the wrapped tasks.
+		_dependantTasks.forEach(t -> wrappedTasks.forEach(w -> w.addDependent(t)));
+		_pool.submitAll(wrappedTasks).forEach(this::addWrappedTaskFuture);
+		_submitted.complete(null);
+		return super.call();
+	}
+
+}
diff --git a/src/test/java/org/apache/sysds/test/functions/transform/mt/TransformFrameBuildMultithreadedTest.java b/src/test/java/org/apache/sysds/test/functions/transform/mt/TransformFrameBuildMultithreadedTest.java
index f40a150..8824b9d 100644
--- a/src/test/java/org/apache/sysds/test/functions/transform/mt/TransformFrameBuildMultithreadedTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/transform/mt/TransformFrameBuildMultithreadedTest.java
@@ -19,6 +19,13 @@
 
 package org.apache.sysds.test.functions.transform.mt;
 
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.List;
+
 import org.apache.sysds.common.Types;
 import org.apache.sysds.runtime.io.FileFormatPropertiesCSV;
 import org.apache.sysds.runtime.io.FrameReaderFactory;
@@ -32,36 +39,32 @@ import org.apache.sysds.test.TestConfiguration;
 import org.apache.sysds.test.TestUtils;
 import org.junit.Test;
 
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.List;
-
-import static org.junit.Assert.*;
-
-public class TransformFrameBuildMultithreadedTest  extends AutomatedTestBase {
+public class TransformFrameBuildMultithreadedTest extends AutomatedTestBase {
 	private final static String TEST_NAME1 = "TransformFrameBuildMultithreadedTest";
 	private final static String TEST_DIR = "functions/transform/";
-	private final static String TEST_CLASS_DIR = TEST_DIR + TransformFrameBuildMultithreadedTest.class.getSimpleName() + "/";
+	private final static String TEST_CLASS_DIR = TEST_DIR + TransformFrameBuildMultithreadedTest.class.getSimpleName()
+		+ "/";
 
 	// dataset and transform tasks without missing values
 	private final static String DATASET1 = "homes3/homes.csv";
 	private final static String SPEC1 = "homes3/homes.tfspec_recode.json";
-	private final static String SPEC1b = "homes3/homes.tfspec_recode2.json";
+	//private final static String SPEC1b = "homes3/homes.tfspec_recode2.json";
 	private final static String SPEC2 = "homes3/homes.tfspec_dummy.json";
-	private final static String SPEC2b = "homes3/homes.tfspec_dummy2.json";
+	//private final static String SPEC2b = "homes3/homes.tfspec_dummy2.json";
 	private final static String SPEC3 = "homes3/homes.tfspec_bin.json"; // recode
-	private final static String SPEC3b = "homes3/homes.tfspec_bin2.json"; // recode
+	//private final static String SPEC3b = "homes3/homes.tfspec_bin2.json"; // recode
 	private final static String SPEC6 = "homes3/homes.tfspec_recode_dummy.json";
-	private final static String SPEC6b = "homes3/homes.tfspec_recode_dummy2.json";
+	//private final static String SPEC6b = "homes3/homes.tfspec_recode_dummy2.json";
 	private final static String SPEC7 = "homes3/homes.tfspec_binDummy.json"; // recode+dummy
-	private final static String SPEC7b = "homes3/homes.tfspec_binDummy2.json"; // recode+dummy
+	//private final static String SPEC7b = "homes3/homes.tfspec_binDummy2.json"; // recode+dummy
 	private final static String SPEC8 = "homes3/homes.tfspec_hash.json";
-	private final static String SPEC8b = "homes3/homes.tfspec_hash2.json";
+	//private final static String SPEC8b = "homes3/homes.tfspec_hash2.json";
 	private final static String SPEC9 = "homes3/homes.tfspec_hash_recode.json";
-	private final static String SPEC9b = "homes3/homes.tfspec_hash_recode2.json";
+	//private final static String SPEC9b = "homes3/homes.tfspec_hash_recode2.json";
+	private final static String SPEC10 = "homes3/homes.tfspec_recode_bin.json";
 
 	public enum TransformType {
-		RECODE, DUMMY, RECODE_DUMMY, BIN, BIN_DUMMY, HASH, HASH_RECODE,
+		RECODE, DUMMY, RECODE_DUMMY, BIN, BIN_DUMMY, HASH, HASH_RECODE, RECODE_BIN,
 	}
 
 	@Override
@@ -71,120 +74,132 @@ public class TransformFrameBuildMultithreadedTest  extends AutomatedTestBase {
 	}
 
 	@Test
-	public void testHomesRecodeIDsSingleNodeCSV() {
-		runTransformTest(Types.ExecMode.SINGLE_NODE, "csv", TransformType.RECODE, false);
+	public void testHomesBuildRecodeSingleNodeCSV() {
+		runTransformTest(Types.ExecMode.SINGLE_NODE, "csv", TransformType.RECODE, 0);
 	}
 
 	@Test
-	public void testHomesDummyCodeIDsSingleNodeCSV() {
-		runTransformTest(Types.ExecMode.SINGLE_NODE, "csv", TransformType.DUMMY, false);
+	public void testHomesBuild50RecodeSingleNodeCSV() {
+		runTransformTest(Types.ExecMode.SINGLE_NODE, "csv", TransformType.RECODE, 50);
 	}
 
 	@Test
-	public void testHomesRecodeDummyCodeIDsSingleNodeCSV() {
-		runTransformTest(Types.ExecMode.SINGLE_NODE, "csv", TransformType.RECODE_DUMMY, false);
+	public void testHomesBuildDummyCodeSingleNodeCSV() {
+		runTransformTest(Types.ExecMode.SINGLE_NODE, "csv", TransformType.DUMMY, 0);
 	}
 
 	@Test
-	public void testHomesBinIDsSingleNodeCSV() {
-		runTransformTest(Types.ExecMode.SINGLE_NODE, "csv", TransformType.BIN, false);
+	public void testHomesBuildRecodeDummyCodeSingleNodeCSV() {
+		runTransformTest(Types.ExecMode.SINGLE_NODE, "csv", TransformType.RECODE_DUMMY, 0);
 	}
 
 	@Test
-	public void testHomesBinDummyIDsSingleNodeCSV() {
-		runTransformTest(Types.ExecMode.SINGLE_NODE, "csv", TransformType.BIN_DUMMY, false);
+	public void testHomesBuildRecodeBinningSingleNodeCSV() {
+		runTransformTest(Types.ExecMode.SINGLE_NODE, "csv", TransformType.RECODE_BIN, 0);
 	}
 
 	@Test
-	public void testHomesHashIDsSingleNodeCSV() {
-		runTransformTest(Types.ExecMode.SINGLE_NODE, "csv", TransformType.HASH, false);
+	public void testHomesBuildBinSingleNodeCSV() {
+		runTransformTest(Types.ExecMode.SINGLE_NODE, "csv", TransformType.BIN, 0);
 	}
 
 	@Test
-	public void testHomesHashRecodeIDsSingleNodeCSV() {
-		runTransformTest(Types.ExecMode.SINGLE_NODE, "csv", TransformType.HASH_RECODE, false);
+	public void testHomesBuild50BinSingleNodeCSV() {
+		runTransformTest(Types.ExecMode.SINGLE_NODE, "csv", TransformType.BIN, 50);
 	}
 
+	@Test
+	public void testHomesBuildBinDummySingleNodeCSV() {
+		runTransformTest(Types.ExecMode.SINGLE_NODE, "csv", TransformType.BIN_DUMMY, 0);
+	}
 
-	private void runTransformTest(Types.ExecMode rt, String ofmt, TransformType type, boolean colnames) 
-	{
+	@Test
+	public void testHomesBuildHashRecodeSingleNodeCSV() {
+		runTransformTest(Types.ExecMode.SINGLE_NODE, "csv", TransformType.HASH_RECODE, 0);
+	}
+
+	private void runTransformTest(Types.ExecMode rt, String ofmt, TransformType type, int blockSize) {
 		// set transform specification
 		String SPEC = null;
 		String DATASET = null;
-		switch (type) {
+		switch(type) {
 			case RECODE:
-				SPEC = colnames ? SPEC1b : SPEC1;
+				SPEC = SPEC1;
 				DATASET = DATASET1;
 				break;
 			case DUMMY:
-				SPEC = colnames ? SPEC2b : SPEC2;
+				SPEC = SPEC2;
 				DATASET = DATASET1;
 				break;
 			case BIN:
-				SPEC = colnames ? SPEC3b : SPEC3;
+				SPEC = SPEC3;
 				DATASET = DATASET1;
 				break;
 			case RECODE_DUMMY:
-				SPEC = colnames ? SPEC6b : SPEC6;
+				SPEC = SPEC6;
 				DATASET = DATASET1;
 				break;
 			case BIN_DUMMY:
-				SPEC = colnames ? SPEC7b : SPEC7;
+				SPEC = SPEC7;
 				DATASET = DATASET1;
 				break;
 			case HASH:
-				SPEC = colnames ? SPEC8b : SPEC8;
+				SPEC = SPEC8;
 				DATASET = DATASET1;
 				break;
 			case HASH_RECODE:
-				SPEC = colnames ? SPEC9b : SPEC9;
+				SPEC = SPEC9;
+				DATASET = DATASET1;
+				break;
+			case RECODE_BIN:
+				SPEC = SPEC10;
 				DATASET = DATASET1;
 				break;
 		}
 
-		if (!ofmt.equals("csv"))
+		if(!ofmt.equals("csv"))
 			throw new RuntimeException("Unsupported test output format");
 
 		try {
 			getAndLoadTestConfiguration(TEST_NAME1);
 
-			//String HOME = SCRIPT_DIR + TEST_DIR;
+			// String HOME = SCRIPT_DIR + TEST_DIR;
 			DATASET = DATASET_DIR + DATASET;
 			SPEC = DATASET_DIR + SPEC;
 
 			FileFormatPropertiesCSV props = new FileFormatPropertiesCSV();
 			props.setHeader(true);
 			FrameBlock input = FrameReaderFactory.createFrameReader(Types.FileFormat.CSV, props)
-					.readFrameFromHDFS(DATASET, -1L, -1L);
+				.readFrameFromHDFS(DATASET, -1L, -1L);
 			StringBuilder specSb = new StringBuilder();
 			Files.readAllLines(Paths.get(SPEC)).forEach(s -> specSb.append(s).append("\n"));
-			MultiColumnEncoder encoderS = EncoderFactory.createEncoder(specSb.toString(), 
-					input.getColumnNames(), input.getNumColumns(), null);
-			MultiColumnEncoder encoderM = EncoderFactory.createEncoder(specSb.toString(), 
-					input.getColumnNames(), input.getNumColumns(), null);
+			MultiColumnEncoder encoderS = EncoderFactory.createEncoder(specSb.toString(), input.getColumnNames(),
+				input.getNumColumns(), null);
+			MultiColumnEncoder encoderM = EncoderFactory.createEncoder(specSb.toString(), input.getColumnNames(),
+				input.getNumColumns(), null);
 
-			encoderM.setBuildBlockSize(10);
 			encoderS.build(input, 1);
 			encoderM.build(input, 12);
-			if (type == TransformType.RECODE) {
+
+			if(type == TransformType.RECODE) {
 				List<ColumnEncoderRecode> encodersS = encoderS.getColumnEncoders(ColumnEncoderRecode.class);
 				List<ColumnEncoderRecode> encodersM = encoderM.getColumnEncoders(ColumnEncoderRecode.class);
 				assertEquals(encodersS.size(), encodersM.size());
-				for (int i = 0; i < encodersS.size(); i++) {
+				for(int i = 0; i < encodersS.size(); i++) {
 					assertEquals(encodersS.get(i).getRcdMap().keySet(), encodersM.get(i).getRcdMap().keySet());
 				}
 			}
-			else if (type == TransformType.BIN) {
+			else if(type == TransformType.BIN) {
 				List<ColumnEncoderBin> encodersS = encoderS.getColumnEncoders(ColumnEncoderBin.class);
 				List<ColumnEncoderBin> encodersM = encoderM.getColumnEncoders(ColumnEncoderBin.class);
 				assertEquals(encodersS.size(), encodersM.size());
-				for (int i = 0; i < encodersS.size(); i++) {
+				for(int i = 0; i < encodersS.size(); i++) {
 					assertArrayEquals(encodersS.get(i).getBinMins(), encodersM.get(i).getBinMins(), 0);
 					assertArrayEquals(encodersS.get(i).getBinMaxs(), encodersM.get(i).getBinMaxs(), 0);
 				}
 			}
 		}
-		catch (Exception ex) {
+		catch(Exception ex) {
 			throw new RuntimeException(ex);
 		}
 	}
diff --git a/src/test/java/org/apache/sysds/test/functions/transform/mt/TransformFrameEncodeMultithreadedTest.java b/src/test/java/org/apache/sysds/test/functions/transform/mt/TransformFrameEncodeMultithreadedTest.java
index 75ac71c..2156250 100644
--- a/src/test/java/org/apache/sysds/test/functions/transform/mt/TransformFrameEncodeMultithreadedTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/transform/mt/TransformFrameEncodeMultithreadedTest.java
@@ -19,6 +19,9 @@
 
 package org.apache.sysds.test.functions.transform.mt;
 
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
 import org.apache.sysds.common.Types.ExecMode;
 import org.apache.sysds.common.Types.FileFormat;
 import org.apache.sysds.runtime.io.FileFormatPropertiesCSV;
@@ -35,125 +38,176 @@ import org.apache.sysds.utils.Statistics;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.nio.file.Files;
-import java.nio.file.Paths;
-
 public class TransformFrameEncodeMultithreadedTest extends AutomatedTestBase {
 	private final static String TEST_NAME1 = "TransformFrameEncodeMultithreadedTest";
 	private final static String TEST_DIR = "functions/transform/";
-	private final static String TEST_CLASS_DIR = TEST_DIR + TransformFrameEncodeMultithreadedTest.class.getSimpleName() + "/";
-	
-	//dataset and transform tasks without missing values
+	private final static String TEST_CLASS_DIR = TEST_DIR + TransformFrameEncodeMultithreadedTest.class.getSimpleName()
+		+ "/";
+
+	// dataset and transform tasks without missing values
 	private final static String DATASET1 = "homes3/homes.csv";
-	private final static String SPEC1    = "homes3/homes.tfspec_recode.json";
-	private final static String SPEC1b   = "homes3/homes.tfspec_recode2.json";
-	private final static String SPEC2    = "homes3/homes.tfspec_dummy.json";
+	private final static String SPEC1 = "homes3/homes.tfspec_recode.json";
+	private final static String SPEC2 = "homes3/homes.tfspec_dummy.json";
 	private final static String SPEC2all = "homes3/homes.tfspec_dummy_all.json";
-	private final static String SPEC2b   = "homes3/homes.tfspec_dummy2.json";
-	private final static String SPEC3    = "homes3/homes.tfspec_bin.json"; //recode
-	private final static String SPEC3b   = "homes3/homes.tfspec_bin2.json"; //recode
-	private final static String SPEC6    = "homes3/homes.tfspec_recode_dummy.json";
-	private final static String SPEC6b   = "homes3/homes.tfspec_recode_dummy2.json";
-	private final static String SPEC7    = "homes3/homes.tfspec_binDummy.json"; //recode+dummy
-	private final static String SPEC7b   = "homes3/homes.tfspec_binDummy2.json"; //recode+dummy
-	private final static String SPEC8    = "homes3/homes.tfspec_hash.json";
-	private final static String SPEC8b   = "homes3/homes.tfspec_hash2.json";
-	private final static String SPEC9    = "homes3/homes.tfspec_hash_recode.json";
-	private final static String SPEC9b   = "homes3/homes.tfspec_hash_recode2.json";
-	
-	private static final int[] BIN_col3 = new int[]{1,4,2,3,3,2,4};
-	private static final int[] BIN_col8 = new int[]{1,2,2,2,2,2,3};
-	
+	private final static String SPEC3 = "homes3/homes.tfspec_bin.json"; // recode
+	private final static String SPEC6 = "homes3/homes.tfspec_recode_dummy.json";
+	private final static String SPEC7 = "homes3/homes.tfspec_binDummy.json"; // recode+dummy
+	private final static String SPEC8 = "homes3/homes.tfspec_hash.json";
+	private final static String SPEC9 = "homes3/homes.tfspec_hash_recode.json";
+
+	private static final int[] BIN_col3 = new int[] {1, 4, 2, 3, 3, 2, 4};
+	private static final int[] BIN_col8 = new int[] {1, 2, 2, 2, 2, 2, 3};
+
 	public enum TransformType {
-		RECODE,
-		DUMMY,
-		DUMMY_ALL, //to test sparse
-		RECODE_DUMMY,
-		BIN,
-		BIN_DUMMY,
-		HASH,
-		HASH_RECODE,
-	}
-	
+		RECODE, DUMMY, DUMMY_ALL, // to test sparse
+		RECODE_DUMMY, BIN, BIN_DUMMY, HASH, HASH_RECODE,
+	}
+
 	@Override
-	public void setUp()  {
+	public void setUp() {
 		TestUtils.clearAssertionInformation();
-		addTestConfiguration(TEST_NAME1, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] { "y" }) );
+		addTestConfiguration(TEST_NAME1, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] {"y"}));
 	}
-	
+
 	@Test
-	public void testHomesRecodeIDsSingleNodeCSV() {
+	public void testHomesRecodeNonStaged() {
 		runTransformTest(ExecMode.SINGLE_NODE, "csv", TransformType.RECODE, false);
 	}
 
 	@Test
-	public void testHomesDummyCodeIDsSingleNodeCSV() {
+	public void testHomesDummyCodeNonStaged() {
 		runTransformTest(ExecMode.SINGLE_NODE, "csv", TransformType.DUMMY, false);
 	}
 
 	@Test
-	public void testHomesDummyAllCodeIDsSingleNodeCSV() {
+	public void testHomesDummyAllCodeNonStaged() {
 		runTransformTest(ExecMode.SINGLE_NODE, "csv", TransformType.DUMMY_ALL, false);
 	}
 
-
 	@Test
-	public void testHomesRecodeDummyCodeIDsSingleNodeCSV() {
+	public void testHomesRecodeDummyCodeNonStaged() {
 		runTransformTest(ExecMode.SINGLE_NODE, "csv", TransformType.RECODE_DUMMY, false);
 	}
 
 	@Test
-	public void testHomesBinIDsSingleNodeCSV() {
+	public void testHomesBinNonStaged() {
 		runTransformTest(ExecMode.SINGLE_NODE, "csv", TransformType.BIN, false);
 	}
 
 	@Test
-	public void testHomesBinDummyIDsSingleNodeCSV() {
+	public void testHomesBinDummyNonStaged() {
 		runTransformTest(ExecMode.SINGLE_NODE, "csv", TransformType.BIN_DUMMY, false);
 	}
 
 	@Test
-	public void testHomesHashIDsSingleNodeCSV() {
+	public void testHomesHashNonStaged() {
 		runTransformTest(ExecMode.SINGLE_NODE, "csv", TransformType.HASH, false);
 	}
 
 	@Test
-	public void testHomesHashRecodeIDsSingleNodeCSV() {
+	public void testHomesHashRecodeNonStaged() {
 		runTransformTest(ExecMode.SINGLE_NODE, "csv", TransformType.HASH_RECODE, false);
 	}
-	
-	private void runTransformTest( ExecMode rt, String ofmt, TransformType type, boolean colnames)	{
-
-		//set transform specification
-		String SPEC = null; String DATASET = null;
-		switch( type ) {
-			case RECODE: SPEC = colnames?SPEC1b:SPEC1; DATASET = DATASET1; break;
-			case DUMMY:  SPEC = colnames?SPEC2b:SPEC2; DATASET = DATASET1; break;
-			case DUMMY_ALL:  SPEC = SPEC2all; DATASET = DATASET1; break;
-			case BIN:    SPEC = colnames?SPEC3b:SPEC3; DATASET = DATASET1; break;
-			case RECODE_DUMMY: SPEC = colnames?SPEC6b:SPEC6; DATASET = DATASET1; break;
-			case BIN_DUMMY: SPEC = colnames?SPEC7b:SPEC7; DATASET = DATASET1; break;
-			case HASH:	 SPEC = colnames?SPEC8b:SPEC8; DATASET = DATASET1; break;
-			case HASH_RECODE: SPEC = colnames?SPEC9b:SPEC9; DATASET = DATASET1; break;
+
+	@Test
+	public void testHomesRecodeStaged() {
+		runTransformTest(ExecMode.SINGLE_NODE, "csv", TransformType.RECODE, true);
+	}
+
+	@Test
+	public void testHomesDummyCodeStaged() {
+		runTransformTest(ExecMode.SINGLE_NODE, "csv", TransformType.DUMMY, true);
+	}
+
+	@Test
+	public void testHomesDummyAllCodeStaged() {
+		runTransformTest(ExecMode.SINGLE_NODE, "csv", TransformType.DUMMY_ALL, true);
+	}
+
+	@Test
+	public void testHomesRecodeDummyCodeStaged() {
+		runTransformTest(ExecMode.SINGLE_NODE, "csv", TransformType.RECODE_DUMMY, true);
+	}
+
+	@Test
+	public void testHomesBinStaged() {
+		runTransformTest(ExecMode.SINGLE_NODE, "csv", TransformType.BIN, true);
+	}
+
+	@Test
+	public void testHomesBinDummyStaged() {
+		runTransformTest(ExecMode.SINGLE_NODE, "csv", TransformType.BIN_DUMMY, true);
+	}
+
+	@Test
+	public void testHomesHashStaged() {
+		runTransformTest(ExecMode.SINGLE_NODE, "csv", TransformType.HASH, true);
+	}
+
+	@Test
+	public void testHomesHashRecodeStaged() {
+		runTransformTest(ExecMode.SINGLE_NODE, "csv", TransformType.HASH_RECODE, true);
+	}
+
+	private void runTransformTest(ExecMode rt, String ofmt, TransformType type, boolean staged) {
+
+		// set transform specification
+		String SPEC = null;
+		String DATASET = null;
+		switch(type) {
+			case RECODE:
+				SPEC = SPEC1;
+				DATASET = DATASET1;
+				break;
+			case DUMMY:
+				SPEC = SPEC2;
+				DATASET = DATASET1;
+				break;
+			case DUMMY_ALL:
+				SPEC = SPEC2all;
+				DATASET = DATASET1;
+				break;
+			case BIN:
+				SPEC = SPEC3;
+				DATASET = DATASET1;
+				break;
+			case RECODE_DUMMY:
+				SPEC = SPEC6;
+				DATASET = DATASET1;
+				break;
+			case BIN_DUMMY:
+				SPEC = SPEC7;
+				DATASET = DATASET1;
+				break;
+			case HASH:
+				SPEC = SPEC8;
+				DATASET = DATASET1;
+				break;
+			case HASH_RECODE:
+				SPEC = SPEC9;
+				DATASET = DATASET1;
+				break;
 		}
 
-		if( !ofmt.equals("csv") )
+		if(!ofmt.equals("csv"))
 			throw new RuntimeException("Unsupported test output format");
-		
-		try
-		{
+
+		try {
 			getAndLoadTestConfiguration(TEST_NAME1);
-			
-			//String HOME = SCRIPT_DIR + TEST_DIR;
+
+			// String HOME = SCRIPT_DIR + TEST_DIR;
 			DATASET = DATASET_DIR + DATASET;
 			SPEC = DATASET_DIR + SPEC;
 
 			FileFormatPropertiesCSV props = new FileFormatPropertiesCSV();
 			props.setHeader(true);
-			FrameBlock input = FrameReaderFactory.createFrameReader(FileFormat.CSV, props).readFrameFromHDFS(DATASET, -1L,-1L);
+			FrameBlock input = FrameReaderFactory.createFrameReader(FileFormat.CSV, props).readFrameFromHDFS(DATASET,
+				-1L, -1L);
 			StringBuilder specSb = new StringBuilder();
 			Files.readAllLines(Paths.get(SPEC)).forEach(s -> specSb.append(s).append("\n"));
-			MultiColumnEncoder encoder = EncoderFactory.createEncoder(specSb.toString(), input.getColumnNames(), input.getNumColumns(), null);
+			MultiColumnEncoder encoder = EncoderFactory.createEncoder(specSb.toString(), input.getColumnNames(),
+				input.getNumColumns(), null);
+			MultiColumnEncoder.MULTI_THREADED_STAGES = staged;
 
 			MatrixBlock outputS = encoder.encode(input, 1);
 			MatrixBlock outputM = encoder.encode(input, 12);
@@ -164,29 +218,28 @@ public class TransformFrameEncodeMultithreadedTest extends AutomatedTestBase {
 			Assert.assertEquals(outputS.getNonZeros(), outputM.getNonZeros());
 			Assert.assertTrue(outputM.getNonZeros() > 0);
 
-			if( rt == ExecMode.HYBRID ) {
-				Assert.assertEquals("Wrong number of executed Spark instructions: " +
-					Statistics.getNoOfExecutedSPInst(), new Long(0), new Long(Statistics.getNoOfExecutedSPInst()));
+			if(rt == ExecMode.HYBRID) {
+				Assert.assertEquals(
+					"Wrong number of executed Spark instructions: " + Statistics.getNoOfExecutedSPInst(), new Long(0),
+					new Long(Statistics.getNoOfExecutedSPInst()));
 			}
-			
-			//additional checks for binning as encode-decode impossible
-			//TODO fix distributed binning as well
-			if( type == TransformType.BIN ) {
-				for(int i=0; i<7; i++) {
+
+			// additional checks for binning as encode-decode impossible
+			// TODO fix distributed binning as well
+			if(type == TransformType.BIN) {
+				for(int i = 0; i < 7; i++) {
 					Assert.assertEquals(BIN_col3[i], R1[i][2], 1e-8);
 					Assert.assertEquals(BIN_col8[i], R1[i][7], 1e-8);
 				}
 			}
-			else if( type == TransformType.BIN_DUMMY ) {
+			else if(type == TransformType.BIN_DUMMY) {
 				Assert.assertEquals(14, R1[0].length);
-				for(int i=0; i<7; i++) {
-					for(int j=0; j<4; j++) { //check dummy coded
-						Assert.assertEquals((j==BIN_col3[i]-1)?
-							1:0, R1[i][2+j], 1e-8);
+				for(int i = 0; i < 7; i++) {
+					for(int j = 0; j < 4; j++) { // check dummy coded
+						Assert.assertEquals((j == BIN_col3[i] - 1) ? 1 : 0, R1[i][2 + j], 1e-8);
 					}
-					for(int j=0; j<3; j++) { //check dummy coded
-						Assert.assertEquals((j==BIN_col8[i]-1)?
-							1:0, R1[i][10+j], 1e-8);
+					for(int j = 0; j < 3; j++) { // check dummy coded
+						Assert.assertEquals((j == BIN_col8[i] - 1) ? 1 : 0, R1[i][10 + j], 1e-8);
 					}
 				}
 			}
diff --git a/src/test/java/org/apache/sysds/test/util/DependencyThreadPoolTest.java b/src/test/java/org/apache/sysds/test/util/DependencyThreadPoolTest.java
new file mode 100644
index 0000000..eb5e244
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/util/DependencyThreadPoolTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.util;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.sysds.runtime.util.DependencyThreadPool;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.apache.sysds.test.functions.transform.mt.TransformFrameBuildMultithreadedTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class DependencyThreadPoolTest extends AutomatedTestBase {
+	private final static String TEST_NAME = "DependencyThreadPoolTest";
+	private final static String TEST_DIR = "util/";
+	private final static String TEST_CLASS_DIR = TEST_DIR + TransformFrameBuildMultithreadedTest.class.getSimpleName()
+		+ "/";
+
+	@Override
+	public void setUp() {
+		TestUtils.clearAssertionInformation();
+		addTestConfiguration(TEST_NAME, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME, new String[] {"y"}));
+	}
+
+	@Test
+	public void testSimpleDependency() throws InterruptedException, ExecutionException {
+		DependencyThreadPool pool = new DependencyThreadPool(4);
+		TestObj global = new TestObj();
+		TestTaskAdd task1 = new TestTaskAdd(1, 5, global);
+		TestTaskMult task2 = new TestTaskMult(2, 20, global);
+		List<? extends Callable<?>> tasks = Arrays.asList(task1, task2);
+		List<List<? extends Callable<?>>> dependencies = new ArrayList<>();
+		dependencies.add(Collections.singletonList(task2));
+		dependencies.add(null);
+		List<Future<Future<?>>> futures = pool.submitAll(tasks, dependencies);
+		for(Future<Future<?>> ff : futures) {
+			ff.get().get();
+		}
+		Assert.assertEquals(5, global.value);
+	}
+
+	@Test
+	public void testMultipleDependency() throws InterruptedException, ExecutionException {
+		DependencyThreadPool pool = new DependencyThreadPool(4);
+		TestObj global = new TestObj();
+		TestTaskMult task1 = new TestTaskMult(1, 20, global);
+		TestTaskAdd task2 = new TestTaskAdd(2, 5, global);
+		TestTaskMult task3 = new TestTaskMult(3, 20, global);
+		TestTaskAdd task4 = new TestTaskAdd(4, 10, global);
+
+		List<? extends Callable<?>> tasks = Arrays.asList(task1, task2, task3, task4);
+		List<List<? extends Callable<?>>> dependencies = new ArrayList<>();
+		dependencies.add(Collections.singletonList(task2));
+		dependencies.add(null);
+		dependencies.add(Collections.singletonList(task2));
+		dependencies.add(Arrays.asList(task3, task1));
+		List<Future<Future<?>>> futures = pool.submitAll(tasks, dependencies);
+		for(Future<Future<?>> ff : futures) {
+			ff.get().get();
+		}
+		Assert.assertEquals(2010, global.value);
+	}
+
+	private static class TestObj {
+		public int value = 0;
+
+		private void add(int v) {
+			synchronized(this) {
+				value += v;
+			}
+		}
+
+		private void mult(int v) {
+			synchronized(this) {
+				value *= v;
+			}
+		}
+	}
+
+	private static class TestTaskAdd implements Callable<Integer> {
+
+		int _id;
+		int _time;
+		TestObj _global;
+
+		public TestTaskAdd(int id, int time, TestObj global) {
+			_id = id;
+			_time = time;
+			_global = global;
+		}
+
+		@Override
+		public Integer call() throws Exception {
+			Thread.sleep(_time);
+			_global.add(_time);
+			return _id;
+		}
+	}
+
+	private static class TestTaskMult implements Callable<Integer> {
+
+		int _id;
+		int _time;
+		TestObj _global;
+
+		public TestTaskMult(int id, int time, TestObj global) {
+			_id = id;
+			_time = time;
+			_global = global;
+		}
+
+		@Override
+		public Integer call() throws Exception {
+			Thread.sleep(_time);
+			_global.mult(_time);
+			return _id;
+		}
+	}
+
+}
diff --git a/src/test/resources/datasets/homes3/homes.tfspec_recode_bin.json b/src/test/resources/datasets/homes3/homes.tfspec_recode_bin.json
new file mode 100644
index 0000000..830d712
--- /dev/null
+++ b/src/test/resources/datasets/homes3/homes.tfspec_recode_bin.json
@@ -0,0 +1,2 @@
+{
+ "ids": true, "recode": [ 2, 1, 7 ], "bin": [ { "id": 1  , "method": "equi-width", "numbins": 3 }, { "id": 3  , "method": "equi-width", "numbins": 5 } ] }
\ No newline at end of file