You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by ba...@apache.org on 2021/06/01 10:43:11 UTC

[systemds] 04/07: [SYSTEMDS-2992] CLA init workload cost functions

This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git

commit e8da72fa0c607fe191265a962f39b79420d06611
Author: baunsgaard <ba...@tugraz.at>
AuthorDate: Fri May 14 10:13:40 2021 +0200

    [SYSTEMDS-2992] CLA init workload cost functions
    
    Initial version of cost based co-coding functions.
    This commit adds costs for compressed tsmm and compressed mm.
---
 .../runtime/compress/CompressedMatrixBlock.java    |   2 +
 .../compress/CompressedMatrixBlockFactory.java     |  17 +-
 .../runtime/compress/CompressionSettings.java      |   8 +-
 .../compress/CompressionSettingsBuilder.java       |  15 +-
 .../runtime/compress/CompressionStatistics.java    |  18 +-
 .../runtime/compress/cocode/AColumnCoCoder.java    |   6 +-
 .../runtime/compress/cocode/CoCodeBinPacking.java  |   4 +-
 .../sysds/runtime/compress/cocode/CoCodeCost.java  |   4 +-
 .../compress/cocode/CoCodeCostMatrixMult.java      |  72 ++--
 .../runtime/compress/cocode/CoCodeStatic.java      |   4 +-
 .../runtime/compress/cocode/PlanningCoCoder.java   |  16 +-
 .../sysds/runtime/compress/colgroup/AColGroup.java |  16 +-
 .../compress/colgroup/ColGroupCompressed.java      |   8 +-
 .../runtime/compress/colgroup/ColGroupConst.java   |   5 -
 .../runtime/compress/colgroup/ColGroupDDC.java     |   5 +-
 .../runtime/compress/colgroup/ColGroupEmpty.java   |   4 -
 .../runtime/compress/colgroup/ColGroupFactory.java | 102 ++---
 .../runtime/compress/colgroup/ColGroupOLE.java     |   7 +-
 .../runtime/compress/colgroup/ColGroupOffset.java  |  10 +-
 .../runtime/compress/colgroup/ColGroupSDC.java     |   2 +-
 .../compress/colgroup/ColGroupSDCSingle.java       |   2 +-
 .../compress/colgroup/ColGroupSDCSingleZeros.java  |  20 +-
 .../compress/colgroup/ColGroupSDCZeros.java        |  16 +-
 .../runtime/compress/colgroup/ColGroupSizes.java   |  57 ++-
 .../compress/colgroup/ColGroupUncompressed.java    |   5 +
 .../runtime/compress/colgroup/ColGroupValue.java   |  66 +--
 .../compress/colgroup/dictionary/ADictionary.java  |  36 +-
 .../compress/colgroup/dictionary/Dictionary.java   |  22 +
 .../colgroup/dictionary/DictionaryFactory.java     | 135 +++++-
 .../colgroup/dictionary/MatrixBlockDictionary.java | 456 +++++++++++++++++++++
 .../compress/colgroup/dictionary/QDictionary.java  |  10 +
 .../colgroup/dictionary/SparseDictionary.java      | 218 ----------
 .../compress/estim/CompressedSizeEstimator.java    |  87 +++-
 .../estim/CompressedSizeEstimatorExact.java        |   3 +-
 .../estim/CompressedSizeEstimatorFactory.java      |  41 +-
 .../estim/CompressedSizeEstimatorSample.java       |  38 +-
 .../compress/estim/CompressedSizeInfoColGroup.java |  27 +-
 .../sysds/runtime/compress/lib/BitmapEncoder.java  |  56 +--
 .../runtime/compress/lib/BitmapLossyEncoder.java   |  30 +-
 .../runtime/compress/lib/CLALibLeftMultBy.java     |   2 +-
 .../runtime/compress/lib/CLALibRightMultBy.java    |   6 +-
 .../sysds/runtime/compress/lib/CLALibSquash.java   |  11 +-
 .../sysds/runtime/compress/utils/Bitmap.java       |  29 +-
 .../utils/{Bitmap.java => MultiColBitmap.java}     |  43 +-
 .../org/apache/sysds/utils/MemoryEstimates.java    |   2 +
 .../compress/AbstractCompressedUnaryTests.java     |   3 -
 .../component/compress/CompressedTestBase.java     |   7 +-
 .../compress/CompressibleInputGenerator.java       |  15 +
 .../sysds/test/component/compress/TestBase.java    |  13 +-
 .../test/component/compress/TestConstants.java     |   1 +
 .../compress/colgroup/JolEstimateTest.java         |  74 ++--
 .../compress/estim/SampleEstimatorTest.java        | 119 ++++++
 52 files changed, 1314 insertions(+), 661 deletions(-)

diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
index bb7781e..249ad3e 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
@@ -208,6 +208,7 @@ public class CompressedMatrixBlock extends MatrixBlock {
 		ret.allocateDenseBlock();
 		decompress(ret);
 
+		ret.examSparsity();
 		if(DMLScript.STATISTICS || LOG.isDebugEnabled()) {
 			double t = time.stop();
 			LOG.debug("decompressed block w/ k=" + 1 + " in " + t + "ms.");
@@ -256,6 +257,7 @@ public class CompressedMatrixBlock extends MatrixBlock {
 		ret.allocateDenseBlock();
 		decompress(ret, k);
 
+		ret.examSparsity();
 		if(DMLScript.STATISTICS || LOG.isDebugEnabled()) {
 			double t = time.stop();
 			LOG.debug("decompressed block w/ k=" + k + " in " + time.stop() + "ms.");
diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java
index bb0cf8a..1756018 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java
@@ -28,6 +28,7 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sysds.runtime.compress.cocode.PlanningCoCoder;
+import org.apache.sysds.runtime.compress.cocode.PlanningCoCoder.PartitionerType;
 import org.apache.sysds.runtime.compress.colgroup.AColGroup;
 import org.apache.sysds.runtime.compress.colgroup.ColGroupConst;
 import org.apache.sysds.runtime.compress.colgroup.ColGroupEmpty;
@@ -161,18 +162,18 @@ public class CompressedMatrixBlockFactory {
 			_stats.estimatedSizeCols = sizeInfos.memoryEstimate();
 
 		logPhase();
-		long memoryEstimate = sizeInfos.memoryEstimate();
 
-		if(memoryEstimate < _stats.originalSize)
+		if(_stats.estimatedSizeCols < _stats.originalSize || compSettings.columnPartitioner == PartitionerType.COST_MATRIX_MULT)
 			coCodePhase(sizeEstimator, sizeInfos, mb.getNumRows());
 		else {
-			LOG.info("Estimated Size of singleColGroups: " + memoryEstimate);
+			LOG.info("Estimated Size of singleColGroups: " + _stats.estimatedSizeCols);
 			LOG.info("Original size                    : " + _stats.originalSize);
 		}
 	}
 
 	private void coCodePhase(CompressedSizeEstimator sizeEstimator, CompressedSizeInfo sizeInfos, int numRows) {
 		coCodeColGroups = PlanningCoCoder.findCoCodesByPartitioning(sizeEstimator, sizeInfos, numRows, k, compSettings);
+		_stats.estimatedSizeCoCoded = coCodeColGroups.memoryEstimate();
 		logPhase();
 	}
 
@@ -206,6 +207,7 @@ public class CompressedMatrixBlockFactory {
 
 	private void compressPhase() {
 		res.allocateColGroupList(ColGroupFactory.compressColGroups(mb, coCodeColGroups, compSettings, k));
+		_stats.compressedInitialSize = res.getInMemorySize();
 		logPhase();
 	}
 
@@ -229,6 +231,7 @@ public class CompressedMatrixBlockFactory {
 			o.add(combineConst(c));
 
 		res.allocateColGroupList(o);
+
 		logPhase();
 	}
 
@@ -276,12 +279,11 @@ public class CompressedMatrixBlockFactory {
 	private void cleanupPhase() {
 
 		res.cleanupBlock(true, true);
-		mb.cleanupBlock(true, true);
 
 		_stats.size = res.estimateCompressedSizeInMemory();
 		
 		final double ratio = _stats.getRatio();
-		if(ratio < 1) {
+		if(ratio < 1 && compSettings.columnPartitioner != PartitionerType.COST_MATRIX_MULT)  {
 			LOG.info("--dense size:        " + _stats.denseSize);
 			LOG.info("--original size:     " + _stats.originalSize);
 			LOG.info("--compressed size:   " + _stats.size);
@@ -291,6 +293,8 @@ public class CompressedMatrixBlockFactory {
 			return;
 		}
 
+		mb.cleanupBlock(true, true);
+
 		_stats.setColGroupsCounts(res.getColGroups());
 
 		logPhase();
@@ -311,10 +315,12 @@ public class CompressedMatrixBlockFactory {
 			switch(phase) {
 				case 0:
 					LOG.debug("--compression phase " + phase + " Classify  : " + getLastTimePhase());
+					LOG.debug("--Individual Columns Estimated Compression: " + _stats.estimatedSizeCols);
 					break;
 				case 1:
 					LOG.debug("--compression phase " + phase + " Grouping  : " + getLastTimePhase());
 					LOG.debug("Grouping using: " + compSettings.columnPartitioner);
+					LOG.debug("--Cocoded Columns estimated Compression:" + _stats.estimatedSizeCoCoded);
 					break;
 				case 2:
 					LOG.debug("--compression phase " + phase + " Transpose : " + getLastTimePhase());
@@ -324,6 +330,7 @@ public class CompressedMatrixBlockFactory {
 					LOG.debug("--compression phase " + phase + " Compress  : " + getLastTimePhase());
 					LOG.debug("--compression Hash collisions:" + DblArrayIntListHashMap.hashMissCount);
 					DblArrayIntListHashMap.hashMissCount = 0;
+					LOG.debug("--compressed initial actual size:" + _stats.compressedInitialSize);
 					break;
 				case 4:
 					LOG.debug("--compression phase " + phase + " Share     : " + getLastTimePhase());
diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java b/src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java
index ddbc60b..895600d 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java
@@ -97,10 +97,15 @@ public class CompressionSettings {
 	 */
 	public final EnumSet<CompressionType> validCompressions;
 
+	/**
+	 * The minimum size of the sample extracted.
+	 */
+	public final int minimumSampleSize;
+
 	protected CompressionSettings(double samplingRatio, boolean allowSharedDictionary, String transposeInput,
 		boolean skipList, int seed, boolean investigateEstimate, boolean lossy,
 		EnumSet<CompressionType> validCompressions, boolean sortValuesByLength, PartitionerType columnPartitioner,
-		int maxColGroupCoCode, double coCodePercentage) {
+		int maxColGroupCoCode, double coCodePercentage, int minimumSampleSize) {
 		this.samplingRatio = samplingRatio;
 		this.allowSharedDictionary = allowSharedDictionary;
 		this.transposeInput = transposeInput;
@@ -113,6 +118,7 @@ public class CompressionSettings {
 		this.columnPartitioner = columnPartitioner;
 		this.maxColGroupCoCode = maxColGroupCoCode;
 		this.coCodePercentage = coCodePercentage;
+		this.minimumSampleSize = minimumSampleSize;
 		LOG.debug(this);
 	}
 
diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressionSettingsBuilder.java b/src/main/java/org/apache/sysds/runtime/compress/CompressionSettingsBuilder.java
index 216267e..83d01e5 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/CompressionSettingsBuilder.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/CompressionSettingsBuilder.java
@@ -43,6 +43,7 @@ public class CompressionSettingsBuilder {
 	private PartitionerType columnPartitioner;
 	private int maxStaticColGroupCoCode = 10000;
 	private double coCodePercentage = 0.01;
+	private int minimumSampleSize = 2000;
 
 	private final static double defaultSampleRate = 0.01;
 
@@ -255,6 +256,18 @@ public class CompressionSettingsBuilder {
 	}
 
 	/**
+	 * Set the minimum sample size to extract from a given matrix, this overrules the sample percentage if the sample
+	 * percentage extracted is lower than this minimum bound.
+	 * 
+	 * @param minimumSampleSize The minimum sample size to extract
+	 * @return The CompressionSettingsBuilder
+	 */
+	public CompressionSettingsBuilder setMinimumSampleSize(int minimumSampleSize) {
+		this.minimumSampleSize = minimumSampleSize;
+		return this;
+	}
+
+	/**
 	 * Create the CompressionSettings object to use in the compression.
 	 * 
 	 * @return The CompressionSettings
@@ -262,6 +275,6 @@ public class CompressionSettingsBuilder {
 	public CompressionSettings create() {
 		return new CompressionSettings(samplingRatio, allowSharedDictionary, transposeInput, skipList, seed,
 			investigateEstimate, lossy, validCompressions, sortValuesByLength, columnPartitioner,
-			maxStaticColGroupCoCode, coCodePercentage);
+			maxStaticColGroupCoCode, coCodePercentage, minimumSampleSize);
 	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressionStatistics.java b/src/main/java/org/apache/sysds/runtime/compress/CompressionStatistics.java
index 5c7d815..466953a 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/CompressionStatistics.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/CompressionStatistics.java
@@ -31,10 +31,16 @@ import org.apache.sysds.runtime.compress.colgroup.AColGroup;
  */
 public class CompressionStatistics {
 
+	// sizes while compressing
+	public long estimatedSizeCoCoded;
+	public long estimatedSizeCols;
+	public long compressedInitialSize;
+
+	// sizes before compression
 	public long originalSize;
 	public long denseSize;
-	public long estimatedSizeColGroups;
-	public long estimatedSizeCols;
+
+	// compressed size
 	public long size;
 
 	private Map<String, int[]> colGroupCounts;
@@ -74,20 +80,20 @@ public class CompressionStatistics {
 
 		for(String ctKey : colGroupCounts.keySet())
 			sb.append(ctKey + ":" + colGroupCounts.get(ctKey)[0] + " ");
-		
+
 		return sb.toString();
 	}
 
 	public String getGroupsSizesString() {
 		StringBuilder sb = new StringBuilder();
 
-		for(String ctKey : colGroupCounts.keySet()) 
+		for(String ctKey : colGroupCounts.keySet())
 			sb.append(ctKey + ":" + colGroupCounts.get(ctKey)[1] + " ");
-		
+
 		return sb.toString();
 	}
 
-	public double getRatio(){
+	public double getRatio() {
 		return (double) originalSize / size;
 	}
 
diff --git a/src/main/java/org/apache/sysds/runtime/compress/cocode/AColumnCoCoder.java b/src/main/java/org/apache/sysds/runtime/compress/cocode/AColumnCoCoder.java
index 1ede652..124e3a4 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/cocode/AColumnCoCoder.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/cocode/AColumnCoCoder.java
@@ -32,12 +32,10 @@ public abstract class AColumnCoCoder {
 
 	final protected CompressedSizeEstimator _est;
 	final protected CompressionSettings _cs;
-	// final protected int _numRows;
 
-	protected AColumnCoCoder(CompressedSizeEstimator sizeEstimator, CompressionSettings cs, int numRows) {
+	protected AColumnCoCoder(CompressedSizeEstimator sizeEstimator, CompressionSettings cs) {
 		_est = sizeEstimator;
 		_cs = cs;
-		// _numRows = numRows;
 	}
 
 	/**
@@ -57,7 +55,7 @@ public abstract class AColumnCoCoder {
 	protected CompressedSizeInfoColGroup joinWithAnalysis(CompressedSizeInfoColGroup lhs,
 		CompressedSizeInfoColGroup rhs) {
 		int[] joined = Util.join(lhs.getColumns(), rhs.getColumns());
-		return _est.estimateCompressedColGroupSize(joined);
+		return _est.estimateCompressedColGroupSize(joined,( lhs.getNumVals() + 1) * (rhs.getNumVals() + 1));
 	}
 
 	protected CompressedSizeInfoColGroup joinWithoutAnalysis(CompressedSizeInfoColGroup lhs,
diff --git a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeBinPacking.java b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeBinPacking.java
index 0116edc..e6dace1 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeBinPacking.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeBinPacking.java
@@ -46,8 +46,8 @@ public class CoCodeBinPacking extends AColumnCoCoder {
 	 */
 	public static double BIN_CAPACITY = 0.000032;
 
-	protected CoCodeBinPacking(CompressedSizeEstimator sizeEstimator, CompressionSettings cs, int numRows) {
-		super(sizeEstimator, cs, numRows);
+	protected CoCodeBinPacking(CompressedSizeEstimator sizeEstimator, CompressionSettings cs) {
+		super(sizeEstimator, cs);
 	}
 
 	@Override
diff --git a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeCost.java b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeCost.java
index 66ad209..3f7185a 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeCost.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeCost.java
@@ -49,8 +49,8 @@ public class CoCodeCost extends AColumnCoCoder {
 
 	private final static int toSmallForAnalysis = 64;
 
-	protected CoCodeCost(CompressedSizeEstimator sizeEstimator, CompressionSettings cs, int numRows) {
-		super(sizeEstimator, cs, numRows);
+	protected CoCodeCost(CompressedSizeEstimator sizeEstimator, CompressionSettings cs) {
+		super(sizeEstimator, cs);
 		largestDistinct = Math.min(4096, Math.max(256, (int) (sizeEstimator.getNumRows() * cs.coCodePercentage)));
 	}
 
diff --git a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeCostMatrixMult.java b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeCostMatrixMult.java
index 910c94a..836e4d0 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeCostMatrixMult.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeCostMatrixMult.java
@@ -26,28 +26,35 @@ import java.util.PriorityQueue;
 import java.util.Queue;
 
 import org.apache.sysds.runtime.compress.CompressionSettings;
-import org.apache.sysds.runtime.compress.colgroup.AColGroup.CompressionType;
 import org.apache.sysds.runtime.compress.estim.CompressedSizeEstimator;
+import org.apache.sysds.runtime.compress.estim.CompressedSizeEstimatorSample;
 import org.apache.sysds.runtime.compress.estim.CompressedSizeInfo;
 import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup;
 
-/**
- * Column group partitioning by number of distinct items estimated. This allows us to join columns based on the worst
- * case estimate of the joined sizes. Then once we decide to join, if the worst case is okay, we then analyze the actual
- * cardinality of the join.
- * 
- * This method allows us to compress many more columns than the BinPacking
- * 
- */
 public class CoCodeCostMatrixMult extends AColumnCoCoder {
 
-	protected CoCodeCostMatrixMult(CompressedSizeEstimator e, CompressionSettings cs, int numRows) {
-		super(e, cs, numRows);
+	protected CoCodeCostMatrixMult(CompressedSizeEstimator e, CompressionSettings cs) {
+		super(e, cs);
 	}
 
 	@Override
 	protected CompressedSizeInfo coCodeColumns(CompressedSizeInfo colInfos, int k) {
-		colInfos.setInfo(join(colInfos.getInfo()));
+
+		List<CompressedSizeInfoColGroup> joinRes = join(colInfos.getInfo());
+
+		if(_cs.samplingRatio < 0.1 && _est instanceof CompressedSizeEstimatorSample) {
+			LOG.debug("Performing second join with double sample rate");
+			CompressedSizeEstimatorSample estS = (CompressedSizeEstimatorSample) _est;
+			estS.sampleData(estS.getSample().getNumRows() * 2);
+			List<int[]> colG = new ArrayList<>(joinRes.size());
+			for(CompressedSizeInfoColGroup g : joinRes)
+				colG.add(g.getColumns());
+
+			joinRes = join(estS.computeCompressedSizeInfos(colG, k));
+		}
+
+		colInfos.setInfo(joinRes);
+
 		return colInfos;
 	}
 
@@ -66,6 +73,8 @@ public class CoCodeCostMatrixMult extends AColumnCoCoder {
 					final CostOfJoin r = que.poll();
 					final double costIndividual = (l.cost + r.cost);
 					final CostOfJoin g = new CostOfJoin(joinWithAnalysis(l.elm, r.elm));
+					if(LOG.isDebugEnabled())
+						LOG.debug("\nl:      " + l + "\nr:      " + r + "\njoined: " + g);
 					if(g.cost < costIndividual)
 						que.add(g);
 					else {
@@ -81,6 +90,7 @@ public class CoCodeCostMatrixMult extends AColumnCoCoder {
 			else
 				break;
 		}
+
 		for(CostOfJoin g : que)
 			ret.add(g.elm);
 
@@ -94,30 +104,16 @@ public class CoCodeCostMatrixMult extends AColumnCoCoder {
 		protected CostOfJoin(CompressedSizeInfoColGroup elm) {
 			this.elm = elm;
 
-			final double constantOverheadForColGroup = 5;
-			final double nCols = elm.getColumns().length;
+			final int nCols = elm.getColumns().length;
 			final double nRows = _est.getNumRows();
-			if(elm.getBestCompressionType() == CompressionType.UNCOMPRESSED)
-				this.cost = nRows * nCols * 2 + constantOverheadForColGroup;
-			else {
-				final int blksz = CompressionSettings.BITMAP_BLOCK_SZ;
-
-				// LOG.error(constantOverheadForColGroup);
-				final double commonFraction = elm.getMostCommonFraction();
-				final double rowsCost = commonFraction > 0.2 ? nRows * (1 - commonFraction) : nRows;
-				// this.cost = rowsToProcess + elm.getNumVals() * nCols + constantOverheadForColGroup;
-				// this.cost = rowsToProcess + elm.getNumVals() * nCols * (1 - commonFraction) +
-				// constantOverheadForColGroup;
-				// final double sparsity_tuple_effect = elm.getTupleSparsity() > 0.4 ? 1 -
-				// Math.min(elm.getTupleSparsity(), 0.9) : 1;
-				final int numberTuples = elm.getNumVals();
-				final double tuplesCost = (numberTuples < blksz) ? numberTuples : numberTuples * 2;
-
-				// this.cost = elementsCost;
-				// this.cost = rowsCost + tuplesCost * sparsity_tuple_effect + constantOverheadForColGroup;
-
-				this.cost = rowsCost + tuplesCost + constantOverheadForColGroup;
-			}
+			final double preAggregateCost = nRows;
+
+			final int numberTuples = elm.getNumVals();
+			final double tupleSparsity = elm.getTupleSparsity();
+			final double postScalingCost = (nCols > 1 && elm.getTupleSparsity() > 0.4) ? numberTuples *
+				nCols : numberTuples * nCols * tupleSparsity;
+
+			this.cost = preAggregateCost + postScalingCost;
 		}
 
 		@Override
@@ -128,10 +124,14 @@ public class CoCodeCostMatrixMult extends AColumnCoCoder {
 		@Override
 		public String toString() {
 			StringBuilder sb = new StringBuilder();
-			sb.append("\n");
 			sb.append(cost);
 			sb.append(" - ");
+			sb.append(elm.getBestCompressionType());
+			sb.append(" nrVals: ");
+			sb.append(elm.getNumVals());
+			sb.append(" ");
 			sb.append(Arrays.toString(elm.getColumns()));
+
 			return sb.toString();
 		}
 	}
diff --git a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeStatic.java b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeStatic.java
index 06e01b4..674a819 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeStatic.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeStatic.java
@@ -29,8 +29,8 @@ import org.apache.sysds.runtime.compress.estim.CompressedSizeInfo;
  */
 public class CoCodeStatic extends AColumnCoCoder {
 
-	protected CoCodeStatic(CompressedSizeEstimator sizeEstimator, CompressionSettings cs, int numRows) {
-		super(sizeEstimator, cs, numRows);
+	protected CoCodeStatic(CompressedSizeEstimator sizeEstimator, CompressionSettings cs) {
+		super(sizeEstimator, cs);
 	}
 
 	@Override
diff --git a/src/main/java/org/apache/sysds/runtime/compress/cocode/PlanningCoCoder.java b/src/main/java/org/apache/sysds/runtime/compress/cocode/PlanningCoCoder.java
index 94572c9..3bae0e5 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/cocode/PlanningCoCoder.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/cocode/PlanningCoCoder.java
@@ -67,7 +67,7 @@ public class PlanningCoCoder {
 			List<CompressedSizeInfoColGroup> newGroups = new ArrayList<>();
 			mem = new Memorizer();
 			for(CompressedSizeInfoColGroup g : colInfos.getInfo()) {
-				if(g.getBestCompressionType() == CompressionType.CONST)
+				if(g.getBestCompressionType(cs) == CompressionType.CONST)
 					constantGroups.add(g);
 				else {
 					mem.put(g);
@@ -93,13 +93,13 @@ public class PlanningCoCoder {
 		CompressionSettings cs, int numRows) {
 		switch(type) {
 			case BIN_PACKING:
-				return new CoCodeBinPacking(est, cs, numRows);
+				return new CoCodeBinPacking(est, cs);
 			case STATIC:
-				return new CoCodeStatic(est, cs, numRows);
+				return new CoCodeStatic(est, cs);
 			case COST:
-				return new CoCodeCost(est, cs, numRows);
+				return new CoCodeCost(est, cs);
 			case COST_MATRIX_MULT:
-				return new CoCodeCostMatrixMult(est, cs, numRows);
+				return new CoCodeCostMatrixMult(est, cs);
 			default:
 				throw new RuntimeException("Unsupported column group partitioner: " + type.toString());
 		}
@@ -186,7 +186,7 @@ public class PlanningCoCoder {
 				break;
 		}
 
-		LOG.error(mem.stats());
+		LOG.debug(mem.stats());
 		mem.resetStats();
 
 		List<CompressedSizeInfoColGroup> ret = new ArrayList<>(workset.size());
@@ -226,9 +226,9 @@ public class PlanningCoCoder {
 			if(g == null) {
 				final CompressedSizeInfoColGroup left = mem.get(new ColIndexes(c1));
 				final CompressedSizeInfoColGroup right = mem.get(new ColIndexes(c2));
-				final boolean leftConst = left.getBestCompressionType() == CompressionType.CONST &&
+				final boolean leftConst = left.getBestCompressionType(cs) == CompressionType.CONST &&
 					left.getNumOffs() == 0;
-				final boolean rightConst = right.getBestCompressionType() == CompressionType.CONST &&
+				final boolean rightConst = right.getBestCompressionType(cs) == CompressionType.CONST &&
 					right.getNumOffs() == 0;
 				if(leftConst)
 					g = CompressedSizeInfoColGroup.addConstGroup(c, right, cs.validCompressions);
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroup.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroup.java
index ddc6195..6b74136 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroup.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroup.java
@@ -32,6 +32,7 @@ import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.matrix.operators.AggregateUnaryOperator;
 import org.apache.sysds.runtime.matrix.operators.BinaryOperator;
 import org.apache.sysds.runtime.matrix.operators.ScalarOperator;
+import org.apache.sysds.utils.MemoryEstimates;
 
 import edu.emory.mathcs.backport.java.util.Arrays;
 
@@ -143,6 +144,15 @@ public abstract class AColGroup implements Serializable {
 	public abstract int getNumRows();
 
 	/**
+	 * Obtain number of distinct tuples in contained sets of values associated with this column group.
+	 * 
+	 * If the column group is uncompressed the number or rows is returned.
+	 * 
+	 * @return the number of distinct sets of values associated with the bitmaps in this column group
+	 */
+	public abstract int getNumValues();
+
+	/**
 	 * Obtain the number of columns in this column group.
 	 * 
 	 * @return number of columns in this column group
@@ -183,7 +193,11 @@ public abstract class AColGroup implements Serializable {
 	 * 
 	 * @return an upper bound on the number of bytes used to store this ColGroup in memory.
 	 */
-	public abstract long estimateInMemorySize();
+	public long estimateInMemorySize(){
+		long size = 16; // object header
+		size += MemoryEstimates.intArrayCost(_colIndexes.length);
+		return size;
+	}
 
 	/**
 	 * Decompress the contents of this column group into the specified full matrix block.
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupCompressed.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupCompressed.java
index c5d29ba..3b598e6 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupCompressed.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupCompressed.java
@@ -58,8 +58,6 @@ public abstract class ColGroupCompressed extends AColGroup {
 		_numRows = numRows;
 	}
 
-	public abstract int getNumValues();
-
 	public abstract double[] getValues();
 
 	public abstract void addMinMax(double[] ret);
@@ -147,4 +145,10 @@ public abstract class ColGroupCompressed extends AColGroup {
 		return _numRows;
 	}
 
+	@Override
+	public long estimateInMemorySize() {
+		long size = super.estimateInMemorySize();
+		size += 4;
+		return size;
+	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupConst.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupConst.java
index 8f6299c..e3c2965 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupConst.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupConst.java
@@ -113,11 +113,6 @@ public class ColGroupConst extends ColGroupValue {
 	}
 
 	@Override
-	public long estimateInMemorySize() {
-		return ColGroupSizes.estimateInMemorySizeCONST(getNumCols(), getNumValues(), isLossy());
-	}
-
-	@Override
 	public void decompressToBlockSafe(MatrixBlock target, int rl, int ru, int offT, double[] values) {
 		decompressToBlockUnSafe(target, rl, ru, offT, values);
 		target.setNonZeros(_colIndexes.length * target.getNumRows() + target.getNonZeros());
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java
index df45f56..fa967da 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java
@@ -555,8 +555,9 @@ public class ColGroupDDC extends ColGroupValue {
 
 	@Override
 	public long estimateInMemorySize() {
-		return ColGroupSizes.estimateInMemorySizeDDC(getNumCols(), getNumValues(), _numRows, isLossy());
-
+		long size = super.estimateInMemorySize();
+		size += _data.getInMemorySize();
+		return size;
 	}
 
 	@Override
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupEmpty.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupEmpty.java
index c873b1a..be33491 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupEmpty.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupEmpty.java
@@ -83,10 +83,6 @@ public class ColGroupEmpty extends ColGroupCompressed {
 		return ColGroupType.EMPTY;
 	}
 
-	@Override
-	public long estimateInMemorySize() {
-		return ColGroupSizes.estimateInMemorySizeEMPTY(getNumCols());
-	}
 
 	@Override
 	public void decompressToBlockSafe(MatrixBlock target, int rl, int ru, int offT, double[] values) {
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java
index 62acf71..ffcc2fb 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java
@@ -34,9 +34,10 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.sysds.runtime.DMLCompressionException;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.compress.CompressionSettings;
+import org.apache.sysds.runtime.compress.cocode.PlanningCoCoder.PartitionerType;
 import org.apache.sysds.runtime.compress.colgroup.AColGroup.CompressionType;
 import org.apache.sysds.runtime.compress.colgroup.dictionary.ADictionary;
-import org.apache.sysds.runtime.compress.colgroup.dictionary.Dictionary;
+import org.apache.sysds.runtime.compress.colgroup.dictionary.DictionaryFactory;
 import org.apache.sysds.runtime.compress.colgroup.mapping.AMapToData;
 import org.apache.sysds.runtime.compress.colgroup.mapping.MapToFactory;
 import org.apache.sysds.runtime.compress.colgroup.tree.AInsertionSorter;
@@ -47,7 +48,6 @@ import org.apache.sysds.runtime.compress.estim.CompressedSizeInfo;
 import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup;
 import org.apache.sysds.runtime.compress.lib.BitmapEncoder;
 import org.apache.sysds.runtime.compress.utils.ABitmap;
-import org.apache.sysds.runtime.compress.utils.Bitmap;
 import org.apache.sysds.runtime.compress.utils.IntArrayList;
 import org.apache.sysds.runtime.data.SparseBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
@@ -155,8 +155,9 @@ public class ColGroupFactory {
 
 	private static Collection<AColGroup> compressColGroup(MatrixBlock in, int[] colIndexes,
 		CompressionSettings compSettings) {
-		if(in.isInSparseFormat() && compSettings.transposed) {
-
+		if(in.isEmpty())
+			return Collections.singletonList(new ColGroupEmpty(colIndexes, compSettings.transposed ? in.getNumColumns(): in.getNumRows()));
+		else if(in.isInSparseFormat() && compSettings.transposed) {
 			final SparseBlock sb = in.getSparseBlock();
 			for(int col : colIndexes)
 				if(sb.isEmpty(col))
@@ -191,7 +192,7 @@ public class ColGroupFactory {
 
 	private static AColGroup compressColGroupForced(MatrixBlock in, int[] colIndexes,
 		CompressionSettings compSettings) {
-			
+
 		ABitmap ubm = BitmapEncoder.extractBitmap(colIndexes, in, compSettings.transposed);
 
 		CompressedSizeEstimator estimator = new CompressedSizeEstimatorExact(in, compSettings);
@@ -200,7 +201,8 @@ public class ColGroupFactory {
 			estimator.estimateCompressedColGroupSize(ubm, colIndexes), compSettings.validCompressions);
 
 		int numRows = compSettings.transposed ? in.getNumColumns() : in.getNumRows();
-		return compress(colIndexes, numRows, ubm, sizeInfo.getBestCompressionType(), compSettings, in);
+		return compress(colIndexes, numRows, ubm, sizeInfo.getBestCompressionType(compSettings), compSettings, in,
+			sizeInfo.getTupleSparsity());
 	}
 
 	// private static AColGroup compressColGroupCorrecting(MatrixBlock in,
@@ -275,17 +277,21 @@ public class ColGroupFactory {
 	 * @param compType       The CompressionType selected
 	 * @param cs             The compression Settings used for the given compression
 	 * @param rawMatrixBlock The copy of the original input (maybe transposed) MatrixBlock
+	 * @param tupleSparsity  The sparsity of the ubs entries.
 	 * @return A Compressed ColGroup
 	 */
 	public static AColGroup compress(int[] colIndexes, int rlen, ABitmap ubm, CompressionType compType,
-		CompressionSettings cs, MatrixBlock rawMatrixBlock) {
+		CompressionSettings cs, MatrixBlock rawMatrixBlock, double tupleSparsity) {
+
+		if(compType == CompressionType.UNCOMPRESSED && cs.columnPartitioner == PartitionerType.COST_MATRIX_MULT)
+			compType = CompressionType.DDC;
 
 		final IntArrayList[] of = ubm.getOffsetList();
 
 		if(of == null)
 			return new ColGroupEmpty(colIndexes, rlen);
 		else if(of.length == 1 && of[0].size() == rlen)
-			return new ColGroupConst(colIndexes, rlen, ADictionary.getDictionary(ubm));
+			return new ColGroupConst(colIndexes, rlen, DictionaryFactory.create(ubm));
 
 		if(LOG.isTraceEnabled())
 			LOG.trace("compressing to: " + compType);
@@ -295,13 +301,13 @@ public class ColGroupFactory {
 
 			switch(compType) {
 				case DDC:
-					return compressDDC(colIndexes, rlen, ubm, cs);
+					return compressDDC(colIndexes, rlen, ubm, cs, tupleSparsity);
 				case RLE:
-					return compressRLE(colIndexes, rlen, ubm, cs);
+					return compressRLE(colIndexes, rlen, ubm, cs, tupleSparsity);
 				case OLE:
-					return compressOLE(colIndexes, rlen, ubm, cs);
+					return compressOLE(colIndexes, rlen, ubm, cs, tupleSparsity);
 				case SDC:
-					return compressSDC(colIndexes, rlen, ubm, cs);
+					return compressSDC(colIndexes, rlen, ubm, cs, tupleSparsity);
 				case UNCOMPRESSED:
 					return new ColGroupUncompressed(colIndexes, rawMatrixBlock, cs.transposed);
 				default:
@@ -316,7 +322,8 @@ public class ColGroupFactory {
 		}
 	}
 
-	private static AColGroup compressSDC(int[] colIndexes, int rlen, ABitmap ubm, CompressionSettings cs) {
+	private static AColGroup compressSDC(int[] colIndexes, int rlen, ABitmap ubm, CompressionSettings cs,
+		double tupleSparsity) {
 
 		int numZeros = (int) ((long) rlen - (int) ubm.getNumOffsets());
 		int largestOffset = 0;
@@ -330,17 +337,17 @@ public class ColGroupFactory {
 			index++;
 		}
 		AColGroup cg;
-		ADictionary dict = new Dictionary(((Bitmap) ubm).getValues());
+		ADictionary dict = DictionaryFactory.create(ubm, tupleSparsity);
 		if(numZeros >= largestOffset && ubm.getOffsetList().length == 1)
 			cg = new ColGroupSDCSingleZeros(colIndexes, rlen, dict, ubm.getOffsetList()[0].extractValues(true), null);
 		else if(ubm.getOffsetList().length == 1) {// todo
-			dict = moveFrequentToLastDictionaryEntry(dict, ubm, rlen, largestIndex);
+			dict = DictionaryFactory.moveFrequentToLastDictionaryEntry(dict, ubm, rlen, largestIndex);
 			cg = setupSingleValueSDCColGroup(colIndexes, rlen, ubm, dict);
 		}
 		else if(numZeros >= largestOffset)
 			cg = setupMultiValueZeroColGroup(colIndexes, ubm, rlen, dict);
 		else {
-			dict = moveFrequentToLastDictionaryEntry(dict, ubm, rlen, largestIndex);
+			dict = DictionaryFactory.moveFrequentToLastDictionaryEntry(dict, ubm, rlen, largestIndex);
 			cg = setupMultiValueColGroup(colIndexes, numZeros, largestOffset, ubm, rlen, largestIndex, dict);
 		}
 		return cg;
@@ -397,62 +404,17 @@ public class ColGroupFactory {
 		return new ColGroupSDCSingle(colIndexes, numRows, dict, _indexes, null);
 	}
 
-	private static ADictionary moveFrequentToLastDictionaryEntry(ADictionary dict, ABitmap ubm, int numRows,
-		int largestIndex) {
-		final double[] dictValues = dict.getValues();
-		final int zeros = numRows - (int) ubm.getNumOffsets();
-		final int nCol = ubm.getNumColumns();
-		final int offsetToLargest = largestIndex * nCol;
-
-		if(zeros == 0) {
-			final double[] swap = new double[nCol];
-			System.arraycopy(dictValues, offsetToLargest, swap, 0, nCol);
-			for(int i = offsetToLargest; i < dictValues.length - nCol; i++) {
-				dictValues[i] = dictValues[i + nCol];
-			}
-			System.arraycopy(swap, 0, dictValues, dictValues.length - nCol, nCol);
-			return dict;
-		}
-
-		final int largestIndexSize = ubm.getOffsetsList(largestIndex).size();
-		final double[] newDict = new double[dictValues.length + nCol];
-
-		if(zeros > largestIndexSize)
-			System.arraycopy(dictValues, 0, newDict, 0, dictValues.length);
-		else {
-			System.arraycopy(dictValues, 0, newDict, 0, offsetToLargest);
-			System.arraycopy(dictValues, offsetToLargest + nCol, newDict, offsetToLargest,
-				dictValues.length - offsetToLargest - nCol);
-			System.arraycopy(dictValues, offsetToLargest, newDict, newDict.length - nCol, nCol);
-		}
-		return new Dictionary(newDict);
-	}
-
-	private static AColGroup compressDDC(int[] colIndexes, int rlen, ABitmap ubm, CompressionSettings cs) {
+	private static AColGroup compressDDC(int[] colIndexes, int rlen, ABitmap ubm, CompressionSettings cs,
+		double tupleSparsity) {
 
 		boolean _zeros = ubm.getNumOffsets() < (long) rlen;
-		ADictionary dict = ADictionary.getDictionary(ubm);
-		double[] values = dict.getValues();
-		if(_zeros) {
-			double[] appendedZero = new double[values.length + colIndexes.length];
-			System.arraycopy(values, 0, appendedZero, 0, values.length);
-			dict = new Dictionary(appendedZero);
-		}
-		else
-			dict = new Dictionary(values);
-
+		ADictionary dict = (_zeros) ? DictionaryFactory.createWithAppendedZeroTuple(ubm,
+			tupleSparsity) : DictionaryFactory.create(ubm, tupleSparsity);
 		int numVals = ubm.getNumValues();
 		AMapToData _data = MapToFactory.create(rlen, numVals + (_zeros ? 1 : 0));
 		if(_zeros)
 			_data.fill(numVals);
 
-		// for(int i = 0; i < numVals; i++) {
-		// int[] tmpList = ubm.getOffsetsList(i).extractValues();
-		// int tmpListSize = ubm.getNumOffsets(i);
-		// for(int k = 0; k < tmpListSize; k++)
-		// _data[tmpList[k]] = (char) i;
-		// }
-
 		for(int i = 0; i < numVals; i++) {
 			IntArrayList tmpList = ubm.getOffsetsList(i);
 			final int sz = tmpList.size();
@@ -464,9 +426,10 @@ public class ColGroupFactory {
 
 	}
 
-	private static AColGroup compressOLE(int[] colIndexes, int rlen, ABitmap ubm, CompressionSettings cs) {
+	private static AColGroup compressOLE(int[] colIndexes, int rlen, ABitmap ubm, CompressionSettings cs,
+		double tupleSparsity) {
 
-		ADictionary dict = ADictionary.getDictionary(ubm);
+		ADictionary dict = DictionaryFactory.create(ubm, tupleSparsity);
 		ColGroupOLE ole = new ColGroupOLE(rlen);
 
 		final int numVals = ubm.getNumValues();
@@ -485,9 +448,10 @@ public class ColGroupFactory {
 		return ole;
 	}
 
-	private static AColGroup compressRLE(int[] colIndexes, int rlen, ABitmap ubm, CompressionSettings cs) {
+	private static AColGroup compressRLE(int[] colIndexes, int rlen, ABitmap ubm, CompressionSettings cs,
+		double tupleSparsity) {
 
-		ADictionary dict = ADictionary.getDictionary(ubm);
+		ADictionary dict = DictionaryFactory.create(ubm, tupleSparsity);
 		ColGroupRLE rle = new ColGroupRLE(rlen);
 		// compress the bitmaps
 		final int numVals = ubm.getNumValues();
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupOLE.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupOLE.java
index 2df2878..bb4f325 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupOLE.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupOLE.java
@@ -329,11 +329,6 @@ public class ColGroupOLE extends ColGroupOffset {
 	}
 
 	@Override
-	public long estimateInMemorySize() {
-		return ColGroupSizes.estimateInMemorySizeOLE(getNumCols(), getNumValues(), _data.length, _numRows, isLossy());
-	}
-
-	@Override
 	public AColGroup scalarOperation(ScalarOperator op) {
 		double val0 = op.executeScalar(0);
 
@@ -1209,7 +1204,7 @@ public class ColGroupOLE extends ColGroupOffset {
 	}
 
 	@Override
-	public Dictionary preAggregateThatSDCSingleStructure(ColGroupSDCSingle that, Dictionary ret, boolean preModified){
+	public Dictionary preAggregateThatSDCSingleStructure(ColGroupSDCSingle that, Dictionary ret, boolean preModified) {
 		throw new NotImplementedException();
 	}
 
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupOffset.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupOffset.java
index d33e816..7b7c595 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupOffset.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupOffset.java
@@ -27,6 +27,7 @@ import java.util.Arrays;
 import org.apache.sysds.runtime.compress.colgroup.dictionary.ADictionary;
 import org.apache.sysds.runtime.compress.utils.LinearAlgebraUtils;
 import org.apache.sysds.runtime.functionobjects.Builtin;
+import org.apache.sysds.utils.MemoryEstimates;
 
 /**
  * Base class for column groups encoded with various types of bitmap encoding.
@@ -75,11 +76,10 @@ public abstract class ColGroupOffset extends ColGroupValue {
 
 	@Override
 	public long estimateInMemorySize() {
-		// Could use a ternary operator, but it looks odd with our code formatter here.
-
-		return ColGroupSizes.estimateInMemorySizeOffset(getNumCols(), getNumValues(), _ptr.length, _data.length,
-			isLossy());
-
+		long size = super.estimateInMemorySize();
+		size += MemoryEstimates.intArrayCost(_ptr.length);
+		size += MemoryEstimates.charArrayCost(_data.length);
+		return size;
 	}
 
 	protected final void sumAllValues(double[] b, double[] c) {
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDC.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDC.java
index e449259..d2cebf5 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDC.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDC.java
@@ -330,7 +330,7 @@ public class ColGroupSDC extends ColGroupValue {
 
 	@Override
 	public long estimateInMemorySize() {
-		long size = ColGroupSizes.estimateInMemorySizeGroupValue(_colIndexes.length, getNumValues(), isLossy());
+		long size = super.estimateInMemorySize();
 		size += _indexes.getInMemorySize();
 		size += _data.getInMemorySize();
 		return size;
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDCSingle.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDCSingle.java
index 58649c3..1424072 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDCSingle.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDCSingle.java
@@ -314,7 +314,7 @@ public class ColGroupSDCSingle extends ColGroupValue {
 
 	@Override
 	public long estimateInMemorySize() {
-		long size = ColGroupSizes.estimateInMemorySizeGroupValue(_colIndexes.length, getNumValues(), isLossy());
+		long size = super.estimateInMemorySize();
 		size += _indexes.getInMemorySize();
 		return size;
 	}
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDCSingleZeros.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDCSingleZeros.java
index 94be81c..997c42a 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDCSingleZeros.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDCSingleZeros.java
@@ -187,10 +187,18 @@ public class ColGroupSDCSingleZeros extends ColGroupValue {
 		final double vals = _dict.aggregateTuples(builtin, _colIndexes.length)[0];
 		final AIterator it = _indexes.getIterator();
 		it.skipTo(rl);
-		while(it.hasNext() && it.value() < ru) {
-			final int idx = it.value();
-			it.next();
-			c[idx] = builtin.execute(c[idx], vals);
+		int rix = rl;
+		for(; rix < ru && it.hasNext(); rix++) {
+			if(it.value() != rix)
+				c[rix] = builtin.execute(c[rix], 0);
+			else {
+				c[rix] = builtin.execute(c[rix], vals);
+				it.next();
+			}
+		}
+
+		for(; rix < ru; rix++) {
+			c[rix] = builtin.execute(c[rix], 0);
 		}
 	}
 
@@ -260,7 +268,7 @@ public class ColGroupSDCSingleZeros extends ColGroupValue {
 
 	@Override
 	public long estimateInMemorySize() {
-		long size = ColGroupSizes.estimateInMemorySizeGroupValue(_colIndexes.length, getNumValues(), isLossy());
+		long size = super.estimateInMemorySize();
 		size += _indexes.getInMemorySize();
 		return size;
 	}
@@ -450,7 +458,7 @@ public class ColGroupSDCSingleZeros extends ColGroupValue {
 	}
 
 	@Override
-	public Dictionary preAggregateThatSDCSingleStructure(ColGroupSDCSingle that, Dictionary ret, boolean preModified){
+	public Dictionary preAggregateThatSDCSingleStructure(ColGroupSDCSingle that, Dictionary ret, boolean preModified) {
 		throw new NotImplementedException();
 	}
 
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDCZeros.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDCZeros.java
index 23b06a9..77cf6e2 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDCZeros.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDCZeros.java
@@ -198,9 +198,17 @@ public class ColGroupSDCZeros extends ColGroupValue {
 		final double[] vals = _dict.aggregateTuples(builtin, _colIndexes.length);
 		final AIterator it = _indexes.getIterator();
 		it.skipTo(rl);
-		while(it.hasNext() && it.value() < ru) {
-			final int idx = it.value();
-			c[idx] = builtin.execute(c[idx], vals[getIndex(it.getDataIndexAndIncrement())]);
+
+		int rix = rl;
+		for(; rix < ru && it.hasNext(); rix++) {
+			if(it.value() != rix)
+				c[rix] = builtin.execute(c[rix], 0);
+			else
+				c[rix] = builtin.execute(c[rix], vals[_data.getIndex(it.getDataIndexAndIncrement())]);
+		}
+
+		for(; rix < ru; rix++) {
+			c[rix] = builtin.execute(c[rix], 0);
 		}
 	}
 
@@ -277,7 +285,7 @@ public class ColGroupSDCZeros extends ColGroupValue {
 
 	@Override
 	public long estimateInMemorySize() {
-		long size = ColGroupSizes.estimateInMemorySizeGroupValue(_colIndexes.length, getNumValues(), isLossy());
+		long size = super.estimateInMemorySize();
 		size += _indexes.getInMemorySize();
 		size += _data.getInMemorySize();
 		return size;
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSizes.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSizes.java
index 3c8c50a..faa3bb3 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSizes.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSizes.java
@@ -32,67 +32,68 @@ public class ColGroupSizes {
 	protected static final Log LOG = LogFactory.getLog(ColGroupSizes.class.getName());
 
 	public static long estimateInMemorySizeGroup(int nrColumns) {
-		long size = 0;
-		size += 16; // Object header
+		long size = 16; // Object header
 		size += MemoryEstimates.intArrayCost(nrColumns);
 		return size;
 	}
 
-	public static long estimateInMemorySizeGroupValue(int nrColumns, int nrValues, boolean lossy) {
-		long size = estimateInMemorySizeGroup(nrColumns);
+	public static long estimateInMoemorySizeCompressedColumn(int nrColumns) {
+		return estimateInMemorySizeGroup(nrColumns) + 4; // 4 for num Rows;
+	}
+
+	public static long estimateInMemorySizeGroupValue(int nrColumns, int nrValues, double tupleSparsity,
+		boolean lossy) {
+		long size = estimateInMoemorySizeCompressedColumn(nrColumns);
 		// LOG.error("MemorySize Group Value: " + nrColumns + " " + nrValues + " " + lossy);
 		size += 8; // Dictionary Reference.
 		size += 8; // Counts reference
-		size += 4; // int numRows
 		size += 1; // _zeros boolean reference
 		size += 1; // _lossy boolean reference
 		size += 2; // padding
-		size += DictionaryFactory.getInMemorySize(nrValues, nrColumns, lossy);
+		size += DictionaryFactory.getInMemorySize(nrValues, nrColumns, tupleSparsity, lossy);
 		return size;
 	}
 
-	public static long estimateInMemorySizeDDC(int nrCols, int numTuples, int dataLength, boolean lossy) {
+	public static long estimateInMemorySizeDDC(int nrCols, int numTuples, int dataLength, double tupleSparsity,
+		boolean lossy) {
 		// LOG.error("Arguments for DDC memory Estimate " + nrCols + " " + numTuples + " " + dataLength + " " + lossy);
-		long size = estimateInMemorySizeGroupValue(nrCols, numTuples, lossy);
-		size += 8; // Map toFactory reference;
+		long size = estimateInMemorySizeGroupValue(nrCols, numTuples, tupleSparsity, lossy);
 		size += MapToFactory.estimateInMemorySize(dataLength, numTuples);
 		return size;
 	}
 
 	public static long estimateInMemorySizeOffset(int nrColumns, int nrValues, int pointers, int offsetLength,
-		boolean lossy) {
+		double tupleSparsity, boolean lossy) {
 		// LOG.error("Offset Size: " + nrColumns + " " + nrValues + " " + pointers + " " + offsetLength);
-		long size = estimateInMemorySizeGroupValue(nrColumns, nrValues, lossy);
+		long size = estimateInMemorySizeGroupValue(nrColumns, nrValues, tupleSparsity, lossy);
 		size += MemoryEstimates.intArrayCost(pointers);
 		size += MemoryEstimates.charArrayCost(offsetLength);
 		return size;
 	}
 
 	public static long estimateInMemorySizeOLE(int nrColumns, int nrValues, int offsetLength, int nrRows,
-		boolean lossy) {
+		double tupleSparsity, boolean lossy) {
 		// LOG.error(nrColumns + " " + nrValues + " " + offsetLength + " " + nrRows + " " + lossy);
 		nrColumns = nrColumns > 0 ? nrColumns : 1;
 		offsetLength += (nrRows / CompressionSettings.BITMAP_BLOCK_SZ) * 2;
-		long size = 0;
-		size = estimateInMemorySizeOffset(nrColumns, nrValues, (nrValues / nrColumns) + 1, offsetLength, lossy);
-		if(nrRows > CompressionSettings.BITMAP_BLOCK_SZ * 2) {
-			size += MemoryEstimates.intArrayCost((int) nrValues / nrColumns);
-		}
+		long size = estimateInMemorySizeOffset(nrColumns, nrValues, nrValues  + 1, offsetLength,
+			tupleSparsity, lossy);
 		return size;
 	}
 
-	public static long estimateInMemorySizeRLE(int nrColumns, int nrValues, int nrRuns, int nrRows, boolean lossy) {
+	public static long estimateInMemorySizeRLE(int nrColumns, int nrValues, int nrRuns, int nrRows,
+		double tupleSparsity, boolean lossy) {
 		// LOG.error("RLE Size: " + nrColumns + " " + nrValues + " " + nrRuns + " " + nrRows);
 		int offsetLength = (nrRuns) * 2;
-		long size = estimateInMemorySizeOffset(nrColumns, nrValues, (nrValues) + 1, offsetLength, lossy);
+		long size = estimateInMemorySizeOffset(nrColumns, nrValues, (nrValues) + 1, offsetLength, tupleSparsity, lossy);
 
 		return size;
 	}
 
 	public static long estimateInMemorySizeSDC(int nrColumns, int nrValues, int nrRows, int largestOff,
-		boolean largestOffIsZero, boolean containNoZeroValues, boolean lossy) {
+		boolean largestOffIsZero, boolean containNoZeroValues, double tupleSparsity, boolean lossy) {
 		long size = estimateInMemorySizeGroupValue(nrColumns,
-			nrValues + (largestOffIsZero || containNoZeroValues ? 0 : 1), lossy);
+			nrValues + (largestOffIsZero || containNoZeroValues ? 0 : 1), tupleSparsity, lossy);
 		// LOG.error("SDC Estimation values: " + nrColumns + " " + nrValues + " " + nrRows + " " + largestOff);
 		size += OffsetFactory.estimateInMemorySize(nrRows - largestOff - 1, nrRows);
 		if(nrValues > 1)
@@ -101,22 +102,20 @@ public class ColGroupSizes {
 	}
 
 	public static long estimateInMemorySizeSDCSingle(int nrColumns, int nrValues, int nrRows, int largestOff,
-		boolean largestOffIsZero, boolean containNoZeroValues, boolean lossy) {
+		boolean largestOffIsZero, boolean containNoZeroValues, double tupleSparsity, boolean lossy) {
 		long size = estimateInMemorySizeGroupValue(nrColumns,
-			nrValues + (largestOffIsZero || containNoZeroValues ? 0 : 1), lossy);
+			nrValues + (largestOffIsZero || containNoZeroValues ? 0 : 1), tupleSparsity, lossy);
 		size += OffsetFactory.estimateInMemorySize(nrRows - largestOff, nrRows);
 		return size;
 	}
 
-	public static long estimateInMemorySizeCONST(int nrColumns, int nrValues, boolean lossy) {
-		long size = estimateInMemorySizeGroupValue(nrColumns, nrValues, lossy);
+	public static long estimateInMemorySizeCONST(int nrColumns, int nrValues, double tupleSparsity, boolean lossy) {
+		long size = estimateInMemorySizeGroupValue(nrColumns, nrValues, tupleSparsity, lossy);
 		return size;
 	}
 
 	public static long estimateInMemorySizeEMPTY(int nrColumns) {
-		long size = estimateInMemorySizeGroup(nrColumns);
-		size += 8; // null pointer to _dict
-		return size;
+		return estimateInMoemorySizeCompressedColumn(nrColumns);
 	}
 
 	public static long estimateInMemorySizeUncompressed(int nrRows, int nrColumns, double sparsity) {
@@ -124,7 +123,7 @@ public class ColGroupSizes {
 		// Since the Object is a col group the overhead from the Memory Size group is added
 		size += estimateInMemorySizeGroup(nrColumns);
 		size += 8; // reference to MatrixBlock.
-		size += MatrixBlock.estimateSizeInMemory(nrRows, nrColumns, (nrColumns > 1) ?  sparsity : 1);
+		size += MatrixBlock.estimateSizeInMemory(nrRows, nrColumns, (nrColumns > 1) ? sparsity : 1);
 		return size;
 	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupUncompressed.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupUncompressed.java
index 3cec6fb..c8f9bf6 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupUncompressed.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupUncompressed.java
@@ -660,4 +660,9 @@ public class ColGroupUncompressed extends AColGroup {
 		LibMatrixMult.matrixMult(_data, right, out, InfrastructureAnalyzer.getLocalParallelism());
 		return new ColGroupUncompressed(outputCols, out, false);
 	}
+
+	@Override
+	public int getNumValues() {
+		return _data.getNumRows();
+	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupValue.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupValue.java
index fc7da24..a0c127a 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupValue.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupValue.java
@@ -33,6 +33,7 @@ import org.apache.sysds.runtime.DMLCompressionException;
 import org.apache.sysds.runtime.compress.colgroup.dictionary.ADictionary;
 import org.apache.sysds.runtime.compress.colgroup.dictionary.Dictionary;
 import org.apache.sysds.runtime.compress.colgroup.dictionary.DictionaryFactory;
+import org.apache.sysds.runtime.compress.colgroup.dictionary.MatrixBlockDictionary;
 import org.apache.sysds.runtime.compress.colgroup.pre.ArrPreAggregate;
 import org.apache.sysds.runtime.compress.colgroup.pre.IPreAggregate;
 import org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
@@ -93,11 +94,7 @@ public abstract class ColGroupValue extends ColGroupCompressed implements Clonea
 		decompressToBlock(target, rl, ru, offT, getValues());
 	}
 
-	/**
-	 * Obtain number of distinct sets of values associated with the bitmaps in this column group.
-	 * 
-	 * @return the number of distinct sets of values associated with the bitmaps in this column group
-	 */
+	@Override
 	public final int getNumValues() {
 		return _dict.getNumberOfValues(_colIndexes.length);
 	}
@@ -250,22 +247,21 @@ public abstract class ColGroupValue extends ColGroupCompressed implements Clonea
 		return aggregateColumns;
 	}
 
-	private Pair<int[], double[]> preaggValuesFromDense(final int numVals, final double[] b, double[] dictVals,
-		final int cl, final int cu, final int cut) {
+	private Pair<int[], double[]> preaggValuesFromDense(final int numVals, final double[] b, final int cl, final int cu,
+		final int cut) {
 
-		int[] aggregateColumns = getAggregateColumnsSetDense(b, cl, cu, cut);
-		double[] ret = new double[numVals * aggregateColumns.length];
+		final int[] aggregateColumns = getAggregateColumnsSetDense(b, cl, cu, cut);
+		final double[] ret = new double[numVals * aggregateColumns.length];
 
 		for(int k = 0, off = 0;
 			k < numVals * _colIndexes.length;
 			k += _colIndexes.length, off += aggregateColumns.length) {
 			for(int h = 0; h < _colIndexes.length; h++) {
 				int idb = _colIndexes[h] * cut;
-				double v = dictVals[k + h];
-				for(int i = 0; i < aggregateColumns.length; i++) {
-					ret[off + i] += v * b[idb + aggregateColumns[i]];
-				}
-
+				double v = _dict.getValue(k + h);
+				if(v != 0)
+					for(int i = 0; i < aggregateColumns.length; i++)
+						ret[off + i] += v * b[idb + aggregateColumns[i]];
 			}
 		}
 
@@ -290,8 +286,7 @@ public abstract class ColGroupValue extends ColGroupCompressed implements Clonea
 		return aggregateColumns;
 	}
 
-	private Pair<int[], double[]> preaggValuesFromSparse(int numVals, SparseBlock b, double[] dictVals, int cl, int cu,
-		int cut) {
+	private Pair<int[], double[]> preaggValuesFromSparse(int numVals, SparseBlock b, int cl, int cu, int cut) {
 
 		int[] aggregateColumns = getAggregateColumnsSetSparse(b);
 
@@ -310,7 +305,7 @@ public abstract class ColGroupValue extends ColGroupCompressed implements Clonea
 						for(int j = 0, offOrg = h;
 							j < numVals * aggregateColumns.length;
 							j += aggregateColumns.length, offOrg += _colIndexes.length) {
-							ret[j + retIdx] += dictVals[offOrg] * sValues[i];
+							ret[j + retIdx] += _dict.getValue(offOrg) * sValues[i];
 						}
 				}
 			}
@@ -318,18 +313,18 @@ public abstract class ColGroupValue extends ColGroupCompressed implements Clonea
 		return new ImmutablePair<>(aggregateColumns, ret);
 	}
 
-	public Pair<int[], double[]> preaggValues(int numVals, MatrixBlock b, double[] dictVals, int cl, int cu, int cut) {
-		return b.isInSparseFormat() ? preaggValuesFromSparse(numVals, b.getSparseBlock(), dictVals, cl, cu,
-			cut) : preaggValuesFromDense(numVals, b.getDenseBlockValues(), dictVals, cl, cu, cut);
+	public Pair<int[], double[]> preaggForRightMultiplyValues(int numVals, MatrixBlock b, int cl, int cu, int cut) {
+		return b.isInSparseFormat() ? preaggValuesFromSparse(numVals, b.getSparseBlock(), cl, cu,
+			cut) : preaggValuesFromDense(numVals, b.getDenseBlockValues(), cl, cu, cut);
 	}
 
-	protected static double[] sparsePreaggValues(int numVals, double v, boolean allocNew, double[] dictVals) {
-		double[] ret = allocNew ? new double[numVals + 1] : allocDVector(numVals + 1, true);
+	// protected static double[] sparsePreaggValues(int numVals, double v, boolean allocNew, ADictionary dict) {
+	// double[] ret = allocNew ? new double[numVals + 1] : allocDVector(numVals + 1, true);
 
-		for(int k = 0; k < numVals; k++)
-			ret[k] = dictVals[k] * v;
-		return ret;
-	}
+	// for(int k = 0; k < numVals; k++)
+	// ret[k] = dictVals[k] * v;
+	// return ret;
+	// }
 
 	protected double computeMxx(double c, Builtin builtin) {
 		if(_zeros)
@@ -1016,9 +1011,10 @@ public abstract class ColGroupValue extends ColGroupCompressed implements Clonea
 	 */
 	public void leftMultByMatrix(MatrixBlock matrix, MatrixBlock result, double[] values, int rl, int ru) {
 		final int numVals = getNumValues();
+		if(!(_dict instanceof MatrixBlockDictionary))
+			_dict = _dict.getAsMatrixBlockDictionary(_colIndexes.length);
 
-		DenseBlock dictV = new DenseBlockFP64(new int[] {numVals, _colIndexes.length}, values);
-		MatrixBlock dictM = new MatrixBlock(numVals, _colIndexes.length, dictV);
+		MatrixBlock dictM = ((MatrixBlockDictionary) _dict).getMatrixBlock();
 		dictM.examSparsity();
 		MatrixBlock tmpRes = new MatrixBlock(1, _colIndexes.length, false);
 		for(int i = rl; i < ru; i++) {
@@ -1064,10 +1060,22 @@ public abstract class ColGroupValue extends ColGroupCompressed implements Clonea
 	}
 
 	public AColGroup rightMultByMatrix(MatrixBlock right) {
-		Pair<int[], double[]> pre = preaggValues(getNumValues(), right, getValues(), 0, right.getNumColumns(),
+		Pair<int[], double[]> pre = preaggForRightMultiplyValues(getNumValues(), right, 0, right.getNumColumns(),
 			right.getNumColumns());
 		if(pre.getLeft().length > 0)
 			return copyAndSet(pre.getLeft(), pre.getRight());
 		return null;
 	}
+
+	@Override
+	public long estimateInMemorySize() {
+		long size = super.estimateInMemorySize();
+		size += 8; // Dictionary Reference.
+		size += 8; // Counts reference
+		size += 1; // _zeros boolean reference
+		size += 1; // _lossy boolean reference
+		size += 2; // padding
+		size += _dict.getInMemorySize();
+		return size;
+	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/ADictionary.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/ADictionary.java
index 352e96b..e5d2a15 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/ADictionary.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/ADictionary.java
@@ -24,9 +24,6 @@ import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.sysds.runtime.compress.utils.ABitmap;
-import org.apache.sysds.runtime.compress.utils.Bitmap;
-import org.apache.sysds.runtime.compress.utils.BitmapLossy;
 import org.apache.sysds.runtime.functionobjects.Builtin;
 import org.apache.sysds.runtime.functionobjects.ValueFunction;
 import org.apache.sysds.runtime.matrix.operators.ScalarOperator;
@@ -143,13 +140,7 @@ public abstract class ADictionary {
 	 * @param fn         The function to apply to individual columns
 	 * @param colIndexes The mapping to the target columns from the individual columns
 	 */
-	public void aggregateCols(double[] c, Builtin fn, int[] colIndexes) {
-		int ncol = colIndexes.length;
-		int vlen = size() / ncol;
-		for(int k = 0; k < vlen; k++)
-			for(int j = 0, valOff = k * ncol; j < ncol; j++)
-				c[colIndexes[j]] = fn.execute(c[colIndexes[j]], getValue(valOff + j));
-	}
+	public abstract void aggregateCols(double[] c, Builtin fn, int[] colIndexes);
 
 	/**
 	 * Write the dictionary to a DataOutput.
@@ -178,7 +169,7 @@ public abstract class ADictionary {
 	public abstract boolean isLossy();
 
 	/**
-	 * Get the number of values given that the column group has n columns
+	 * Get the number of distinct tuples given that the column group has n columns
 	 * 
 	 * @param ncol The number of Columns in the ColumnGroup.
 	 * @return the number of value tuples contained in the dictionary.
@@ -254,6 +245,14 @@ public abstract class ADictionary {
 
 	public abstract boolean containsValue(double pattern);
 
+	/**
+	 * Calculate the number of non zeros in the dictionary. The number of non zeros should be scaled with the counts
+	 * given
+	 * 
+	 * @param counts The counts of each dictionary entry
+	 * @param nCol   The number of columns in this dictionary
+	 * @return The nonZero count
+	 */
 	public abstract long getNumberNonZeros(int[] counts, int nCol);
 
 	public abstract long getNumberNonZerosContained();
@@ -268,13 +267,6 @@ public abstract class ADictionary {
 	 */
 	public abstract void addToEntry(Dictionary d, int fr, int to, int nCol);
 
-	public static ADictionary getDictionary(ABitmap ubm) {
-		if(ubm instanceof BitmapLossy)
-			return new QDictionary((BitmapLossy) ubm).makeDoubleDictionary();
-		else
-			return new Dictionary(((Bitmap) ubm).getValues());
-	}
-
 	/**
 	 * Get the most common tuple element contained in the dictionary
 	 * 
@@ -293,4 +285,12 @@ public abstract class ADictionary {
 	 * @return a new instance of dictionary with the tuple subtracted.
 	 */
 	public abstract ADictionary subtractTuple(double[] tuple);
+
+	/**
+	 * Get this dictionary as a matrixBlock dictionary. This allows us to use optimized kernels coded elsewhere in the
+	 * system, such as matrix multiplication.
+	 * 
+	 * @return A Dictionary containing a MatrixBlock.
+	 */
+	public abstract MatrixBlockDictionary getAsMatrixBlockDictionary(int nCol);
 }
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/Dictionary.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/Dictionary.java
index 5a32823..595c250 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/Dictionary.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/Dictionary.java
@@ -25,8 +25,11 @@ import java.io.IOException;
 import java.util.Arrays;
 
 import org.apache.sysds.runtime.DMLCompressionException;
+import org.apache.sysds.runtime.data.DenseBlock;
+import org.apache.sysds.runtime.data.DenseBlockFP64;
 import org.apache.sysds.runtime.functionobjects.Builtin;
 import org.apache.sysds.runtime.functionobjects.ValueFunction;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.matrix.operators.ScalarOperator;
 import org.apache.sysds.utils.MemoryEstimates;
 
@@ -473,4 +476,23 @@ public class Dictionary extends ADictionary {
 		}
 		return new Dictionary(newValues);
 	}
+
+	@Override
+	public MatrixBlockDictionary getAsMatrixBlockDictionary(int nCol) {
+		final int nRow = _values.length / nCol;
+		DenseBlock dictV = new DenseBlockFP64(new int[] {nRow, nCol}, _values);
+		MatrixBlock dictM = new MatrixBlock(nRow, nCol, dictV);
+		dictM.examSparsity();
+		return new MatrixBlockDictionary(dictM);
+	}
+
+	@Override
+	public void aggregateCols(double[] c, Builtin fn, int[] colIndexes) {
+		int ncol = colIndexes.length;
+		int vlen = size() / ncol;
+		for(int k = 0; k < vlen; k++)
+			for(int j = 0, valOff = k * ncol; j < ncol; j++)
+				c[colIndexes[j]] = fn.execute(c[colIndexes[j]], getValue(valOff + j));
+		
+	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/DictionaryFactory.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/DictionaryFactory.java
index 328e145..bab32b0 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/DictionaryFactory.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/DictionaryFactory.java
@@ -22,8 +22,20 @@ package org.apache.sysds.runtime.compress.colgroup.dictionary;
 import java.io.DataInput;
 import java.io.IOException;
 
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sysds.runtime.compress.utils.ABitmap;
+import org.apache.sysds.runtime.compress.utils.Bitmap;
+import org.apache.sysds.runtime.compress.utils.BitmapLossy;
+import org.apache.sysds.runtime.compress.utils.MultiColBitmap;
+import org.apache.sysds.runtime.data.SparseBlock;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+
 public class DictionaryFactory {
 
+	protected static final Log LOG = LogFactory.getLog(DictionaryFactory.class.getName());
+
 	public static ADictionary read(DataInput in) throws IOException {
 		boolean lossy = in.readBoolean();
 		if(lossy) {
@@ -46,10 +58,131 @@ public class DictionaryFactory {
 		}
 	}
 
-	public static long getInMemorySize(int nrValues, int nrColumns, boolean lossy) {
+	public static long getInMemorySize(int nrValues, int nrColumns, double tupleSparsity, boolean lossy) {
 		if(lossy)
 			return QDictionary.getInMemorySize(nrValues * nrColumns);
+		else if(nrColumns > 1 && tupleSparsity < 0.4)
+			return MatrixBlockDictionary.getInMemorySize(nrValues, nrColumns, tupleSparsity);
 		else
 			return Dictionary.getInMemorySize(nrValues * nrColumns);
 	}
+
+	public static ADictionary create(ABitmap ubm) {
+		return create(ubm, 1.0);
+	}
+
+	public static ADictionary create(ABitmap ubm, double sparsity) {
+		if(ubm instanceof BitmapLossy)
+			return new QDictionary((BitmapLossy) ubm);
+		else if(ubm instanceof Bitmap)
+			return new Dictionary(((Bitmap) ubm).getValues());
+		else if(sparsity < 0.4 && ubm instanceof MultiColBitmap) {
+			final int nCols = ubm.getNumColumns();
+			final int nRows = ubm.getNumValues();
+			final MultiColBitmap mcbm = (MultiColBitmap) ubm;
+
+			final MatrixBlock m = new MatrixBlock(nRows, nCols, true);
+			m.allocateSparseRowsBlock();
+			final SparseBlock sb = m.getSparseBlock();
+
+			final int nVals = ubm.getNumValues();
+			for(int i = 0; i < nVals; i++) {
+				final double[] tuple = mcbm.getValues(i);
+				for(int col = 0; col < nCols; col++)
+					sb.append(i, col, tuple[col]);
+			}
+			m.recomputeNonZeros();
+			return new MatrixBlockDictionary(m);
+		}
+		else if(ubm instanceof MultiColBitmap) {
+			MultiColBitmap mcbm = (MultiColBitmap) ubm;
+			final int nCol = ubm.getNumColumns();
+			final int nVals = ubm.getNumValues();
+			double[] resValues = new double[nVals * nCol];
+			for(int i = 0; i < nVals; i++)
+				System.arraycopy(mcbm.getValues(i), 0, resValues, i * nCol, nCol);
+
+			return new Dictionary(resValues);
+		}
+		throw new NotImplementedException(
+			"Not implemented creation of bitmap type : " + ubm.getClass().getSimpleName());
+	}
+
+	public static ADictionary createWithAppendedZeroTuple(ABitmap ubm) {
+		return createWithAppendedZeroTuple(ubm, 1.0);
+	}
+
+	public static ADictionary createWithAppendedZeroTuple(ABitmap ubm, double sparsity) {
+		// Log.warn("Inefficient creation of dictionary, to then allocate again.");
+		final int nRows = ubm.getNumValues() + 1;
+		final int nCols = ubm.getNumColumns();
+		if(ubm instanceof Bitmap) {
+			Bitmap bm = (Bitmap) ubm;
+			double[] resValues = new double[ubm.getNumValues() + 1];
+			double[] from = bm.getValues();
+			System.arraycopy(from, 0, resValues, 0, from.length);
+			return new Dictionary(resValues);
+		}
+		else if(sparsity < 0.4 && ubm instanceof MultiColBitmap) {
+			final MultiColBitmap mcbm = (MultiColBitmap) ubm;
+			final MatrixBlock m = new MatrixBlock(nRows, nCols, true);
+			m.allocateSparseRowsBlock();
+			final SparseBlock sb = m.getSparseBlock();
+
+			final int nVals = ubm.getNumValues();
+			for(int i = 0; i < nVals; i++) {
+				final double[] tuple = mcbm.getValues(i);
+				for(int col = 0; col < nCols; col++)
+					sb.append(i, col, tuple[col]);
+			}
+			m.recomputeNonZeros();
+			return new MatrixBlockDictionary(m);
+		}
+		else if(ubm instanceof MultiColBitmap) {
+			MultiColBitmap mcbm = (MultiColBitmap) ubm;
+			final int nVals = ubm.getNumValues();
+			double[] resValues = new double[nRows * nCols];
+			for(int i = 0; i < nVals; i++)
+				System.arraycopy(mcbm.getValues(i), 0, resValues, i * nCols, nCols);
+
+			return new Dictionary(resValues);
+		}
+		else {
+			throw new NotImplementedException(
+				"Not implemented creation of bitmap type : " + ubm.getClass().getSimpleName());
+		}
+
+	}
+
+	public static ADictionary moveFrequentToLastDictionaryEntry(ADictionary dict, ABitmap ubm, int numRows,
+		int largestIndex) {
+		LOG.warn("Inefficient creation of dictionary, to then allocate again to move one entry to end.");
+		final double[] dictValues = dict.getValues();
+		final int zeros = numRows - (int) ubm.getNumOffsets();
+		final int nCol = ubm.getNumColumns();
+		final int offsetToLargest = largestIndex * nCol;
+
+		if(zeros == 0) {
+			final double[] swap = new double[nCol];
+			System.arraycopy(dictValues, offsetToLargest, swap, 0, nCol);
+			for(int i = offsetToLargest; i < dictValues.length - nCol; i++) {
+				dictValues[i] = dictValues[i + nCol];
+			}
+			System.arraycopy(swap, 0, dictValues, dictValues.length - nCol, nCol);
+			return dict;
+		}
+
+		final int largestIndexSize = ubm.getOffsetsList(largestIndex).size();
+		final double[] newDict = new double[dictValues.length + nCol];
+
+		if(zeros > largestIndexSize)
+			System.arraycopy(dictValues, 0, newDict, 0, dictValues.length);
+		else {
+			System.arraycopy(dictValues, 0, newDict, 0, offsetToLargest);
+			System.arraycopy(dictValues, offsetToLargest + nCol, newDict, offsetToLargest,
+				dictValues.length - offsetToLargest - nCol);
+			System.arraycopy(dictValues, offsetToLargest, newDict, newDict.length - nCol, nCol);
+		}
+		return new Dictionary(newDict);
+	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/MatrixBlockDictionary.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/MatrixBlockDictionary.java
new file mode 100644
index 0000000..322e56b
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/MatrixBlockDictionary.java
@@ -0,0 +1,456 @@
+package org.apache.sysds.runtime.compress.colgroup.dictionary;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.sysds.runtime.data.SparseBlock;
+import org.apache.sysds.runtime.functionobjects.Builtin;
+import org.apache.sysds.runtime.functionobjects.Builtin.BuiltinCode;
+import org.apache.sysds.runtime.functionobjects.ValueFunction;
+import org.apache.sysds.runtime.matrix.data.LibMatrixReorg;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.matrix.operators.ScalarOperator;
+
+public class MatrixBlockDictionary extends ADictionary {
+
+    private MatrixBlock _data;
+
+    public MatrixBlockDictionary(MatrixBlock data) {
+        _data = data;
+    }
+
+    public MatrixBlock getMatrixBlock() {
+        return _data;
+    }
+
+    @Override
+    public double[] getValues() {
+        LOG.warn("Inefficient force dense format.");
+        _data.sparseToDense();
+        return _data.getDenseBlockValues();
+    }
+
+    @Override
+    public double getValue(int i) {
+        final int nCol = _data.getNumColumns();
+        LOG.warn("inefficient get value at index");
+        return _data.quickGetValue(i / nCol, i % nCol);
+    }
+
+    @Override
+    public int hasZeroTuple(int nCol) {
+        if(_data.isInSparseFormat()) {
+            SparseBlock sb = _data.getSparseBlock();
+            for(int i = 0; i < _data.getNumRows(); i++) {
+                if(sb.isEmpty(i)) {
+                    return i;
+                }
+            }
+        }
+        else {
+            throw new NotImplementedException();
+        }
+        return -1;
+    }
+
+    @Override
+    public long getInMemorySize() {
+        return 8 + _data.estimateSizeInMemory();
+    }
+
+    public static long getInMemorySize(int numberValues, int numberColumns, double sparsity) {
+        return 8 + MatrixBlock.estimateSizeInMemory(numberValues, numberColumns, sparsity);
+    }
+
+    @Override
+    public double aggregate(double init, Builtin fn) {
+        if(fn.getBuiltinCode() == BuiltinCode.MAX)
+            return fn.execute(init, _data.max());
+        else if(fn.getBuiltinCode() == BuiltinCode.MIN)
+            return fn.execute(init, _data.min());
+        else
+            throw new NotImplementedException();
+    }
+
+    @Override
+    public double[] aggregateTuples(Builtin fn, int nCol) {
+        double[] ret = new double[_data.getNumRows()];
+        if(_data.isEmpty())
+            return ret;
+        else if(_data.isInSparseFormat()) {
+            SparseBlock sb = _data.getSparseBlock();
+            for(int i = 0; i < _data.getNumRows(); i++) {
+                if(!sb.isEmpty(i)) {
+                    final int apos = sb.pos(i);
+                    final int alen = sb.size(i) + apos;
+                    final double[] avals = sb.values(i);
+                    ret[i] = avals[apos];
+                    for(int j = apos + 1; j < alen; j++)
+                        ret[i] = fn.execute(ret[i], avals[j]);
+
+                    if(sb.size(i) < _data.getNumColumns())
+                        ret[i] = fn.execute(ret[i], 0);
+                }
+                else
+                    ret[i] = fn.execute(ret[i], 0);
+            }
+        }
+        else if(nCol == 1)
+            return _data.getDenseBlockValues();
+        else {
+            double[] values = _data.getDenseBlockValues();
+            int off = 0;
+            for(int k = 0; k < _data.getNumRows(); k++) {
+                ret[k] = values[off++];
+                for(int j = 1; j < _data.getNumColumns(); j++)
+                    ret[k] = fn.execute(ret[k], values[off++]);
+            }
+        }
+        return ret;
+    }
+
+    @Override
+    public void aggregateCols(double[] c, Builtin fn, int[] colIndexes) {
+        if(_data.isEmpty()) {
+            for(int j = 0; j < colIndexes.length; j++) {
+                final int idx = colIndexes[j];
+                c[idx] = fn.execute(c[idx], 0);
+            }
+        }
+        else if(_data.isInSparseFormat()) {
+            MatrixBlock t = LibMatrixReorg.transposeInPlace(_data, 1);
+            if(!t.isInSparseFormat()) {
+                throw new NotImplementedException();
+            }
+            SparseBlock sbt = t.getSparseBlock();
+
+            for(int i = 0; i < _data.getNumColumns(); i++) {
+                final int idx = colIndexes[i];
+                if(!sbt.isEmpty(i)) {
+                    final int apos = sbt.pos(i);
+                    final int alen = sbt.size(i) + apos;
+                    final double[] avals = sbt.values(i);
+                    for(int j = apos; j < alen; j++)
+                        c[idx] = fn.execute(c[idx], avals[j]);
+                    if(avals.length != _data.getNumRows())
+                        c[idx] = fn.execute(c[idx], 0);
+                }
+                else
+                    c[idx] = fn.execute(c[idx], 0);
+            }
+        }
+        else {
+            double[] values = _data.getDenseBlockValues();
+            int off = 0;
+            for(int k = 0; k < _data.getNumRows(); k++) {
+                for(int j = 0; j < _data.getNumColumns(); j++) {
+                    final int idx = colIndexes[j];
+                    c[idx] = fn.execute(c[idx], values[off++]);
+                }
+            }
+        }
+    }
+
+    @Override
+    public int size() {
+        return (int) _data.getNonZeros();
+    }
+
+    @Override
+    public ADictionary apply(ScalarOperator op) {
+        MatrixBlock res = _data.scalarOperations(op, new MatrixBlock());
+        return new MatrixBlockDictionary(res);
+    }
+
+    @Override
+    public ADictionary applyScalarOp(ScalarOperator op, double newVal, int numCols) {
+        MatrixBlock res = _data.scalarOperations(op, new MatrixBlock());
+        MatrixBlock res2 = res.append(new MatrixBlock(1, 1, newVal), new MatrixBlock());
+        return new MatrixBlockDictionary(res2);
+    }
+
+    @Override
+    public ADictionary applyBinaryRowOpLeft(ValueFunction fn, double[] v, boolean sparseSafe, int[] colIndexes) {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public ADictionary applyBinaryRowOpRight(ValueFunction fn, double[] v, boolean sparseSafe, int[] colIndexes) {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public ADictionary clone() {
+        MatrixBlock ret = new MatrixBlock();
+        ret.copy(_data);
+        return new MatrixBlockDictionary(ret);
+    }
+
+    @Override
+    public ADictionary cloneAndExtend(int len) {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public boolean isLossy() {
+        return false;
+    }
+
+    @Override
+    public int getNumberOfValues(int ncol) {
+        return _data.getNumRows();
+    }
+
+    @Override
+    public double[] sumAllRowsToDouble(boolean square, int nrColumns) {
+        double[] ret = new double[_data.getNumRows()];
+
+        if(_data.isEmpty())
+            return ret;
+        else if(_data.isInSparseFormat()) {
+            SparseBlock sb = _data.getSparseBlock();
+            for(int i = 0; i < _data.getNumRows(); i++) {
+                if(!sb.isEmpty(i)) {
+                    final int apos = sb.pos(i);
+                    final int alen = sb.size(i) + apos;
+                    final double[] avals = sb.values(i);
+                    for(int j = apos; j < alen; j++) {
+                        ret[i] += (square) ? avals[j] * avals[j] : avals[j];
+                    }
+                }
+            }
+        }
+        else {
+            double[] values = _data.getDenseBlockValues();
+            int off = 0;
+            for(int k = 0; k < _data.getNumRows(); k++) {
+                for(int j = 0; j < _data.getNumColumns(); j++) {
+                    final double v = values[off++];
+                    ret[k] += (square) ? v * v : v;
+                }
+            }
+        }
+        return ret;
+    }
+
+    @Override
+    public double sumRow(int k, boolean square, int nrColumns) {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public double[] colSum(int[] counts, int nCol) {
+        if(_data.isEmpty())
+            return null;
+        double[] ret = new double[nCol];
+        if(_data.isInSparseFormat()) {
+            SparseBlock sb = _data.getSparseBlock();
+            for(int i = 0; i < _data.getNumRows(); i++) {
+                if(!sb.isEmpty(i)) {
+                    // double tmpSum = 0;
+                    final int count = counts[i];
+                    final int apos = sb.pos(i);
+                    final int alen = sb.size(i) + apos;
+                    final int[] aix = sb.indexes(i);
+                    final double[] avals = sb.values(i);
+                    for(int j = apos; j < alen; j++) {
+                        ret[aix[j]] += count * avals[j];
+                    }
+                }
+            }
+        }
+        else {
+            double[] values = _data.getDenseBlockValues();
+            int off = 0;
+            for(int k = 0; k < _data.getNumRows(); k++) {
+                final int countK = counts[k];
+                for(int j = 0; j < _data.getNumColumns(); j++) {
+                    final double v = values[off++];
+                    ret[j] += v * countK;
+                }
+            }
+        }
+        return ret;
+    }
+
+    @Override
+    public void colSum(double[] c, int[] counts, int[] colIndexes, boolean square) {
+        if(_data.isEmpty())
+            return;
+        if(_data.isInSparseFormat()) {
+            SparseBlock sb = _data.getSparseBlock();
+            for(int i = 0; i < _data.getNumRows(); i++) {
+                if(!sb.isEmpty(i)) {
+                    // double tmpSum = 0;
+                    final int count = counts[i];
+                    final int apos = sb.pos(i);
+                    final int alen = sb.size(i) + apos;
+                    final int[] aix = sb.indexes(i);
+                    final double[] avals = sb.values(i);
+                    for(int j = apos; j < alen; j++) {
+                        c[colIndexes[aix[j]]] += square ? count * avals[j] * avals[j] : count * avals[j];
+                    }
+                }
+            }
+        }
+        else {
+            double[] values = _data.getDenseBlockValues();
+            int off = 0;
+            for(int k = 0; k < _data.getNumRows(); k++) {
+                final int countK = counts[k];
+                for(int j = 0; j < _data.getNumColumns(); j++) {
+                    final double v = values[off++];
+                    c[colIndexes[j]] += square ? v * v * countK : v * countK;
+                }
+            }
+        }
+    }
+
+    @Override
+    public double sum(int[] counts, int ncol) {
+        double tmpSum = 0;
+        if(_data.isEmpty())
+            return tmpSum;
+        if(_data.isInSparseFormat()) {
+            SparseBlock sb = _data.getSparseBlock();
+            for(int i = 0; i < _data.getNumRows(); i++) {
+                if(!sb.isEmpty(i)) {
+                    final int count = counts[i];
+                    final int apos = sb.pos(i);
+                    final int alen = sb.size(i) + apos;
+                    final double[] avals = sb.values(i);
+                    for(int j = apos; j < alen; j++) {
+                        tmpSum += count * avals[j];
+                    }
+                }
+            }
+        }
+        else {
+            double[] values = _data.getDenseBlockValues();
+            int off = 0;
+            for(int k = 0; k < _data.getNumRows(); k++) {
+                final int countK = counts[k];
+                for(int j = 0; j < _data.getNumColumns(); j++) {
+                    final double v = values[off++];
+                    tmpSum += v * countK;
+                }
+            }
+        }
+        return tmpSum;
+    }
+
+    @Override
+    public double sumsq(int[] counts, int ncol) {
+        double tmpSum = 0;
+        if(_data.isEmpty())
+            return tmpSum;
+        if(_data.isInSparseFormat()) {
+            SparseBlock sb = _data.getSparseBlock();
+            for(int i = 0; i < _data.getNumRows(); i++) {
+                if(!sb.isEmpty(i)) {
+                    final int count = counts[i];
+                    final int apos = sb.pos(i);
+                    final int alen = sb.size(i) + apos;
+                    final double[] avals = sb.values(i);
+                    for(int j = apos; j < alen; j++) {
+                        tmpSum += count * avals[j] * avals[j];
+                    }
+                }
+            }
+        }
+        else {
+            double[] values = _data.getDenseBlockValues();
+            int off = 0;
+            for(int k = 0; k < _data.getNumRows(); k++) {
+                final int countK = counts[k];
+                for(int j = 0; j < _data.getNumColumns(); j++) {
+                    final double v = values[off++];
+                    tmpSum += v * v * countK;
+                }
+            }
+        }
+        return tmpSum;
+    }
+
+    @Override
+    public String getString(int colIndexes) {
+        return _data.toString();
+    }
+
+    @Override
+    public void addMaxAndMin(double[] ret, int[] colIndexes) {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public ADictionary sliceOutColumnRange(int idxStart, int idxEnd, int previousNumberOfColumns) {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public ADictionary reExpandColumns(int max) {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public boolean containsValue(double pattern) {
+        return _data.containsValue(pattern);
+    }
+
+    @Override
+    public long getNumberNonZeros(int[] counts, int nCol) {
+        if(_data.isEmpty())
+            return 0;
+        long nnz = 0;
+        if(_data.isInSparseFormat()) {
+            SparseBlock sb = _data.getSparseBlock();
+            for(int i = 0; i < _data.getNumRows(); i++)
+                if(sb.isEmpty(i))
+                    nnz += sb.size(i) * counts[i];
+
+        }
+        else {
+            double[] values = _data.getDenseBlockValues();
+            int off = 0;
+            for(int i = 0; i < _data.getNumRows(); i++) {
+                int countThisTuple = 0;
+                for(int j = 0; j < _data.getNumColumns(); j++) {
+                    double v = values[off++];
+                    if(v != 0)
+                        countThisTuple++;
+                }
+                nnz += countThisTuple * counts[i];
+            }
+        }
+        return nnz;
+    }
+
+    @Override
+    public long getNumberNonZerosContained() {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public void addToEntry(Dictionary d, int fr, int to, int nCol) {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public double[] getMostCommonTuple(int[] counts, int nCol) {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public ADictionary subtractTuple(double[] tuple) {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public MatrixBlockDictionary getAsMatrixBlockDictionary(int nCol) {
+        // Simply return this.
+        return this;
+    }
+
+    @Override
+    public String toString() {
+        return "MatrixBlock Dictionary :" + _data.toString();
+    }
+}
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/QDictionary.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/QDictionary.java
index 7b01fb5..70836ca 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/QDictionary.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/QDictionary.java
@@ -499,4 +499,14 @@ public class QDictionary extends ADictionary {
 	public ADictionary subtractTuple(double[] tuple) {
 		throw new NotImplementedException();
 	}
+
+	@Override
+	public MatrixBlockDictionary getAsMatrixBlockDictionary(int nCol) {
+		throw new NotImplementedException();
+	}
+
+	@Override
+	public void aggregateCols(double[] c, Builtin fn, int[] colIndexes) {
+		throw new NotImplementedException();	
+	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/SparseDictionary.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/SparseDictionary.java
deleted file mode 100644
index 3a400f3..0000000
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/SparseDictionary.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysds.runtime.compress.colgroup.dictionary;
-
-import org.apache.sysds.runtime.functionobjects.Builtin;
-import org.apache.sysds.runtime.functionobjects.ValueFunction;
-import org.apache.sysds.runtime.matrix.operators.ScalarOperator;
-
-/**
- * A sparse dictionary implementation, use if the tuples are sparse.
- */
-public class SparseDictionary extends ADictionary {
-
-	@Override
-	public double[] getValues() {
-		LOG.warn("Inefficient materialization of sparse Dictionary.");
-
-		return null;
-	}
-
-	@Override
-	public double getValue(int i) {
-		// TODO Auto-generated method stub
-		return 0;
-	}
-
-	@Override
-	public int hasZeroTuple(int nCol) {
-		// TODO Auto-generated method stub
-		return 0;
-	}
-
-	@Override
-	public long getInMemorySize() {
-		// TODO Auto-generated method stub
-		return 0;
-	}
-
-	@Override
-	public double aggregate(double init, Builtin fn) {
-		// TODO Auto-generated method stub
-		return 0;
-	}
-
-	@Override
-	public double[] aggregateTuples(Builtin fn, int nCol) {
-		// TODO Auto-generated method stub
-		return null;
-	}
-
-	@Override
-	public int size() {
-		// TODO Auto-generated method stub
-		return 0;
-	}
-
-	@Override
-	public ADictionary apply(ScalarOperator op) {
-		// TODO Auto-generated method stub
-		return null;
-	}
-
-	@Override
-	public ADictionary applyScalarOp(ScalarOperator op, double newVal, int numCols) {
-		// TODO Auto-generated method stub
-		return null;
-	}
-
-	@Override
-	public ADictionary applyBinaryRowOpLeft(ValueFunction fn, double[] v, boolean sparseSafe, int[] colIndexes) {
-		// TODO Auto-generated method stub
-		return null;
-	}
-
-	@Override
-	public ADictionary applyBinaryRowOpRight(ValueFunction fn, double[] v, boolean sparseSafe, int[] colIndexes) {
-		// TODO Auto-generated method stub
-		return null;
-	}
-
-	@Override
-	public ADictionary clone() {
-		// TODO Auto-generated method stub
-		return null;
-	}
-
-	@Override
-	public ADictionary cloneAndExtend(int len) {
-		// TODO Auto-generated method stub
-		return null;
-	}
-
-	@Override
-	public boolean isLossy() {
-		// TODO Auto-generated method stub
-		return false;
-	}
-
-	@Override
-	public int getNumberOfValues(int ncol) {
-		// TODO Auto-generated method stub
-		return 0;
-	}
-
-	@Override
-	public double[] sumAllRowsToDouble(boolean square, int nrColumns) {
-		// TODO Auto-generated method stub
-		return null;
-	}
-
-	@Override
-	public double sumRow(int k, boolean square, int nrColumns) {
-		// TODO Auto-generated method stub
-		return 0;
-	}
-
-	@Override
-	public double[] colSum(int[] counts, int nCol) {
-		// TODO Auto-generated method stub
-		return null;
-	}
-
-	@Override
-	public void colSum(double[] c, int[] counts, int[] colIndexes, boolean square) {
-		// TODO Auto-generated method stub
-
-	}
-
-	@Override
-	public double sum(int[] counts, int ncol) {
-		// TODO Auto-generated method stub
-		return 0;
-	}
-
-	@Override
-	public double sumsq(int[] counts, int ncol) {
-		// TODO Auto-generated method stub
-		return 0;
-	}
-
-	@Override
-	public String getString(int colIndexes) {
-		// TODO Auto-generated method stub
-		return null;
-	}
-
-	@Override
-	public void addMaxAndMin(double[] ret, int[] colIndexes) {
-		// TODO Auto-generated method stub
-
-	}
-
-	@Override
-	public ADictionary sliceOutColumnRange(int idxStart, int idxEnd, int previousNumberOfColumns) {
-		// TODO Auto-generated method stub
-		return null;
-	}
-
-	@Override
-	public ADictionary reExpandColumns(int max) {
-		// TODO Auto-generated method stub
-		return null;
-	}
-
-	@Override
-	public boolean containsValue(double pattern) {
-		// TODO Auto-generated method stub
-		return false;
-	}
-
-	@Override
-	public long getNumberNonZeros(int[] counts, int nCol) {
-		// TODO Auto-generated method stub
-		return 0;
-	}
-
-	@Override
-	public long getNumberNonZerosContained() {
-		// TODO Auto-generated method stub
-		return 0;
-	}
-
-	@Override
-	public void addToEntry(Dictionary d, int fr, int to, int nCol) {
-		// TODO Auto-generated method stub
-
-	}
-
-	@Override
-	public double[] getMostCommonTuple(int[] counts, int nCol) {
-		// TODO Auto-generated method stub
-		return null;
-	}
-
-	@Override
-	public ADictionary subtractTuple(double[] tuple) {
-		// TODO Auto-generated method stub
-		return null;
-	}
-
-}
diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimator.java b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimator.java
index d4b2a63..f46ee53 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimator.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimator.java
@@ -21,6 +21,7 @@ package org.apache.sysds.runtime.compress.estim;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
@@ -91,6 +92,52 @@ public abstract class CompressedSizeEstimator {
 		return new CompressedSizeInfo(sizeInfos);
 	}
 
+	/**
+	 * Multi threaded version of extracting Compression Size info from list of specified columns
+	 * 
+	 * @return
+	 */
+
+	/**
+	 * Multi threaded version of extracting Compression Size info from list of specified columns
+	 * 
+	 * @param columnLists The specified columns to extract.
+	 * @param k           The parallelization degree
+	 * @return The Compression information from the specified column groups.
+	 */
+	public List<CompressedSizeInfoColGroup> computeCompressedSizeInfos(Collection<int[]> columnLists, int k) {
+		if(k == 1)
+			return computeCompressedSizeInfos(columnLists);
+		try {
+			ExecutorService pool = CommonThreadPool.get(k);
+			ArrayList<SizeEstimationTask> tasks = new ArrayList<>();
+			for(int[] g : columnLists)
+				tasks.add(new SizeEstimationTask(this, g));
+			List<Future<CompressedSizeInfoColGroup>> rtask = pool.invokeAll(tasks);
+			ArrayList<CompressedSizeInfoColGroup> ret = new ArrayList<>();
+			for(Future<CompressedSizeInfoColGroup> lrtask : rtask)
+				ret.add(lrtask.get());
+			pool.shutdown();
+			return ret;
+		}
+		catch(InterruptedException | ExecutionException e) {
+			return computeCompressedSizeInfos(columnLists);
+		}
+	}
+
+	/**
+	 * Compression Size info from list of specified columns
+	 * 
+	 * @param columnLists The specified columns to extract.
+	 * @return The Compression information from the specified column groups.
+	 */
+	public List<CompressedSizeInfoColGroup> computeCompressedSizeInfos(Collection<int[]> columnLists) {
+		ArrayList<CompressedSizeInfoColGroup> ret = new ArrayList<>();
+		for(int[] g : columnLists)
+			ret.add(estimateCompressedColGroupSize(g));
+		return ret;
+	}
+
 	private CompressedSizeInfoColGroup[] estimateIndividualColumnGroupSizes(int k) {
 		return (k > 1) ? CompressedSizeInfoColGroup(_numCols, k) : CompressedSizeInfoColGroup(_numCols);
 	}
@@ -106,19 +153,36 @@ public abstract class CompressedSizeEstimator {
 	}
 
 	/**
-	 * Abstract method for extracting Compressed Size Info of specified columns, together in a single ColGroup
+	 * Method for extracting Compressed Size Info of specified columns, together in a single ColGroup
 	 * 
-	 * @param colIndexes The Colums to group together inside a ColGroup
+	 * @param colIndexes The columns to group together inside a ColGroup
 	 * @return The CompressedSizeInformation associated with the selected ColGroups.
 	 */
-	public abstract CompressedSizeInfoColGroup estimateCompressedColGroupSize(int[] colIndexes);
+	public CompressedSizeInfoColGroup estimateCompressedColGroupSize(int[] colIndexes) {
+		return estimateCompressedColGroupSize(colIndexes, Integer.MAX_VALUE);
+	}
+
+	/**
+	 * A method to extract the Compressed Size Info for a given list of columns, This method further limits the
+	 * estimated number of unique values, since in some cases the estimated number of uniques is estimated higher than
+	 * the number estimated in sub groups of the given colIndexes.
+	 * 
+	 * @param colIndexes         The columns to extract compression information from
+	 * @param nrUniqueUpperBound The upper bound of unique elements allowed in the estimate, can be calculated from the
+	 *                           number of unique elements estimated in sub columns multiplied together. This is
+	 *                           flexible in the sense that if the sample is small then this unique can be manually
+	 *                           edited like in CoCodeCostMatrixMult.
+	 * 
+	 * @return The CompressedSizeInfoColGroup fro the given column indexes.
+	 */
+	public abstract CompressedSizeInfoColGroup estimateCompressedColGroupSize(int[] colIndexes, int nrUniqueUpperBound);
 
 	/**
 	 * Method used to extract the CompressedSizeEstimationFactors from an constructed UncompressedBitmap. Note this
 	 * method works both for the sample based estimator and the exact estimator, since the bitmap, can be extracted from
 	 * a sample or from the entire dataset.
 	 * 
-	 * @param ubm        The UncompressedBitmap, either extracted from a sample or from the entier dataset
+	 * @param ubm        The UncompressedBitmap, either extracted from a sample or from the entire dataset
 	 * @param colIndexes The columns that is compressed together.
 	 * @return The size factors estimated from the Bit Map.
 	 */
@@ -154,16 +218,21 @@ public abstract class CompressedSizeEstimator {
 
 	private static class SizeEstimationTask implements Callable<CompressedSizeInfoColGroup> {
 		private final CompressedSizeEstimator _estimator;
-		private final int _col;
+		private final int[] _cols;
 
 		protected SizeEstimationTask(CompressedSizeEstimator estimator, int col) {
 			_estimator = estimator;
-			_col = col;
+			_cols = new int[] {col};
+		}
+
+		protected SizeEstimationTask(CompressedSizeEstimator estimator, int[] cols) {
+			_estimator = estimator;
+			_cols = cols;
 		}
 
 		@Override
 		public CompressedSizeInfoColGroup call() {
-			return _estimator.estimateCompressedColGroupSize(new int[] {_col});
+			return _estimator.estimateCompressedColGroupSize(_cols);
 		}
 	}
 
@@ -174,8 +243,4 @@ public abstract class CompressedSizeEstimator {
 		}
 		return colIndexes;
 	}
-
-	public MatrixBlock getSample() {
-		return _data;
-	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorExact.java b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorExact.java
index 40677e9..5fdf8e2 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorExact.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorExact.java
@@ -34,7 +34,8 @@ public class CompressedSizeEstimatorExact extends CompressedSizeEstimator {
 	}
 
 	@Override
-	public CompressedSizeInfoColGroup estimateCompressedColGroupSize(int[] colIndexes) {
+	public CompressedSizeInfoColGroup estimateCompressedColGroupSize(int[] colIndexes, int nrUniqueUpperBound) {
+		// exact estimator can ignore upper bound.
 		ABitmap entireBitMap = BitmapEncoder.extractBitmap(colIndexes, _data, _transposed);
 		return new CompressedSizeInfoColGroup(estimateCompressedColGroupSize(entireBitMap, colIndexes),
 			_cs.validCompressions);
diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorFactory.java b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorFactory.java
index cf5437e..ebeef16 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorFactory.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorFactory.java
@@ -27,23 +27,42 @@ import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 public class CompressedSizeEstimatorFactory {
 	protected static final Log LOG = LogFactory.getLog(CompressedSizeEstimatorFactory.class.getName());
 
-	public static final int minimumSampleSize = 2000;
-
 	public static CompressedSizeEstimator getSizeEstimator(MatrixBlock data, CompressionSettings cs) {
 
-		final long nRows = cs.transposed ? data.getNumColumns() : data.getNumRows();
+		final int nRows = cs.transposed ? data.getNumColumns() : data.getNumRows();
+		final int nCols = cs.transposed ? data.getNumRows() : data.getNumColumns();
+		final int nnzRows = (int) Math.ceil(data.getNonZeros() / nCols);
 		
-		// Calculate the sample size.
-		// If the sample size is very small, set it to the minimum size
-		final int sampleSize = Math.max((int) Math.ceil(nRows * cs.samplingRatio), minimumSampleSize);
+		final double sampleRatio = cs.samplingRatio;
+		final int sampleSize = getSampleSize(sampleRatio, nRows, cs.minimumSampleSize);
 
-		CompressedSizeEstimator est;
-		if(cs.samplingRatio >= 1.0 || nRows < minimumSampleSize || sampleSize > nRows)
-			est = new CompressedSizeEstimatorExact(data, cs);
-		else
-			est = new CompressedSizeEstimatorSample(data, cs, sampleSize);
+		final CompressedSizeEstimator est = (shouldUseExactEstimator(cs, nRows, sampleSize,
+			nnzRows)) ? new CompressedSizeEstimatorExact(data,
+				cs) : tryToMakeSampleEstimator(data, cs, sampleRatio, sampleSize, nRows, nnzRows);
 
 		LOG.debug("Estimating using: " + est);
 		return est;
 	}
+
+	private static CompressedSizeEstimator tryToMakeSampleEstimator(MatrixBlock data, CompressionSettings cs,
+		double sampleRatio, int sampleSize, int nRows, int nnzRows) {
+		CompressedSizeEstimatorSample estS = new CompressedSizeEstimatorSample(data, cs, sampleSize);
+		while(estS.getSample() == null) {
+			LOG.warn("Doubling sample size");
+			sampleSize = sampleSize * 2;
+			if(shouldUseExactEstimator(cs, nRows, sampleSize, nnzRows))
+				return new CompressedSizeEstimatorExact(data, cs);
+			else
+				estS.sampleData(sampleSize);
+		}
+		return estS;
+	}
+
+	private static boolean shouldUseExactEstimator(CompressionSettings cs, int nRows, int sampleSize, int nnzRows) {
+		return cs.samplingRatio >= 1.0 || nRows < cs.minimumSampleSize || sampleSize > nnzRows;
+	}
+
+	private static int getSampleSize(double sampleRatio, int nRows, int minimumSampleSize) {
+		return Math.max((int) Math.ceil(nRows * sampleRatio), minimumSampleSize);
+	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorSample.java b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorSample.java
index 3098394..9fe57d7 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorSample.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorSample.java
@@ -21,7 +21,6 @@ package org.apache.sysds.runtime.compress.estim;
 
 import java.util.HashMap;
 
-import org.apache.sysds.runtime.DMLCompressionException;
 import org.apache.sysds.runtime.compress.CompressionSettings;
 import org.apache.sysds.runtime.compress.colgroup.AColGroup.CompressionType;
 import org.apache.sysds.runtime.compress.estim.sample.HassAndStokes;
@@ -38,8 +37,8 @@ import org.apache.sysds.runtime.util.UtilFunctions;
 
 public class CompressedSizeEstimatorSample extends CompressedSizeEstimator {
 
-	private final int[] _sampleRows;
-	private final MatrixBlock _sample;
+	private int[] _sampleRows;
+	private MatrixBlock _sample;
 	private HashMap<Integer, Double> _solveCache = null;
 
 	/**
@@ -51,12 +50,16 @@ public class CompressedSizeEstimatorSample extends CompressedSizeEstimator {
 	 */
 	public CompressedSizeEstimatorSample(MatrixBlock data, CompressionSettings cs, int sampleSize) {
 		super(data, cs);
-		_sampleRows = CompressedSizeEstimatorSample.getSortedUniformSample(_numRows, sampleSize, _cs.seed);
-		_solveCache = new HashMap<>();
-		_sample = sampleData();
+		_sample = sampleData(sampleSize);
 	}
 
-	protected MatrixBlock sampleData() {
+	public MatrixBlock getSample() {
+		return _sample;
+	}
+
+	public MatrixBlock sampleData(int sampleSize) {
+		_sampleRows = CompressedSizeEstimatorSample.getSortedUniformSample(_numRows, sampleSize, _cs.seed);
+		_solveCache = new HashMap<>();
 		MatrixBlock sampledMatrixBlock;
 		if(_data.isInSparseFormat() && !_cs.transposed) {
 			sampledMatrixBlock = new MatrixBlock(_sampleRows.length, _data.getNumColumns(), true);
@@ -77,18 +80,17 @@ public class CompressedSizeEstimatorSample extends CompressedSizeEstimator {
 				select.appendValue(_sampleRows[i], 0, 1);
 
 			sampledMatrixBlock = _data.removeEmptyOperations(new MatrixBlock(), !_cs.transposed, true, select);
-
 		}
 
 		if(sampledMatrixBlock.isEmpty())
-			throw new DMLCompressionException("Empty sample block");
-
-		return sampledMatrixBlock;
+			return null;
+		else
+			return sampledMatrixBlock;
 
 	}
 
 	@Override
-	public CompressedSizeInfoColGroup estimateCompressedColGroupSize(int[] colIndexes) {
+	public CompressedSizeInfoColGroup estimateCompressedColGroupSize(int[] colIndexes, int nrUniqueUpperBound) {
 		final int sampleSize = _sampleRows.length;
 		// final int numCols = colIndexes.length;
 		final double scalingFactor = ((double) _numRows / sampleSize);
@@ -109,7 +111,7 @@ public class CompressedSizeEstimatorSample extends CompressedSizeEstimator {
 		// Estimate number of distinct values (incl fixes for anomalies w/ large sample fraction)
 		int totalCardinality = getNumDistinctValues(ubm, _numRows, sampleSize, _solveCache);
 		// Number of unique is trivially bounded by the sampled number of uniques and the number of rows.
-		totalCardinality = Math.min(Math.max(totalCardinality, fact.numVals), _numRows);
+		totalCardinality = Math.min(Math.min(Math.max(totalCardinality, fact.numVals), _numRows), nrUniqueUpperBound);
 
 		// estimate number of non-zeros (conservatively round up)
 		final double C = Math.max(1 - (double) fact.numSingle / sampleSize, (double) sampleSize / _numRows);
@@ -127,7 +129,6 @@ public class CompressedSizeEstimatorSample extends CompressedSizeEstimator {
 		for(IntArrayList a : ubm.getOffsetList())
 			if(a.size() > largestInstanceCount)
 				largestInstanceCount = a.size();
-		
 
 		final boolean zeroIsMostFrequent = largestInstanceCount == numZerosInSample;
 
@@ -307,12 +308,12 @@ public class CompressedSizeEstimatorSample extends CompressedSizeEstimator {
 	/**
 	 * Returns a sorted array of n integers, drawn uniformly from the range [0,range).
 	 * 
-	 * @param range    the range
-	 * @param smplSize sample size
+	 * @param range      the range
+	 * @param sampleSize sample size
 	 * @return sorted array of integers
 	 */
-	private static int[] getSortedUniformSample(int range, int smplSize, long seed) {
-		return UtilFunctions.getSortedSampleIndexes(range, smplSize, seed);
+	private static int[] getSortedUniformSample(int range, int sampleSize, long seed) {
+		return UtilFunctions.getSortedSampleIndexes(range, sampleSize, seed);
 	}
 
 	@Override
@@ -329,4 +330,5 @@ public class CompressedSizeEstimatorSample extends CompressedSizeEstimator {
 		sb.append(_numRows);
 		return sb.toString();
 	}
+
 }
diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java
index 50aeb7e..8b984ff 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java
@@ -27,6 +27,8 @@ import java.util.Set;
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.sysds.runtime.compress.CompressionSettings;
+import org.apache.sysds.runtime.compress.cocode.PlanningCoCoder.PartitionerType;
 import org.apache.sysds.runtime.compress.colgroup.AColGroup.CompressionType;
 import org.apache.sysds.runtime.compress.colgroup.ColGroupSizes;
 
@@ -59,7 +61,7 @@ public class CompressedSizeInfoColGroup {
 		_cardinalityRatio = (double) numVals / numRows;
 		_sizes = null;
 		_bestCompressionType = null;
-		_minSize = ColGroupSizes.estimateInMemorySizeDDC(columns.length, numVals, numRows, false);
+		_minSize = ColGroupSizes.estimateInMemorySizeDDC(columns.length, numVals, numRows, 1.0, false);
 	}
 
 	/**
@@ -106,6 +108,15 @@ public class CompressedSizeInfoColGroup {
 		return _sizes.get(ct);
 	}
 
+	public CompressionType getBestCompressionType(CompressionSettings cs) {
+		if(cs.columnPartitioner == PartitionerType.COST_MATRIX_MULT) {
+			// if(_sizes.get(CompressionType.SDC) * 0.8 < _sizes.get(_bestCompressionType))
+			if(getMostCommonFraction() > 0.4)
+				return CompressionType.SDC;
+		}
+		return _bestCompressionType;
+	}
+
 	public CompressionType getBestCompressionType() {
 		return _bestCompressionType;
 	}
@@ -171,26 +182,28 @@ public class CompressedSizeInfoColGroup {
 			case DDC:
 				// + 1 if the column contains zero
 				return ColGroupSizes.estimateInMemorySizeDDC(numCols,
-					fact.numVals + (fact.numOffs < fact.numRows ? 1 : 0), fact.numRows, fact.lossy);
+					fact.numVals + (fact.numOffs < fact.numRows ? 1 : 0), fact.numRows, fact.tupleSparsity, fact.lossy);
 			case RLE:
 				return ColGroupSizes.estimateInMemorySizeRLE(numCols, fact.numVals, fact.numRuns, fact.numRows,
-					fact.lossy);
+					fact.tupleSparsity, fact.lossy);
 			case OLE:
 				return ColGroupSizes.estimateInMemorySizeOLE(numCols, fact.numVals, fact.numOffs + fact.numVals,
-					fact.numRows, fact.lossy);
+					fact.numRows, fact.tupleSparsity, fact.lossy);
 			case UNCOMPRESSED:
 				return ColGroupSizes.estimateInMemorySizeUncompressed(fact.numRows, numCols, fact.overAllSparsity);
 			case SDC:
 				if(fact.numOffs <= 1)
 					return ColGroupSizes.estimateInMemorySizeSDCSingle(numCols, fact.numVals, fact.numRows,
-						fact.largestOff, fact.zeroIsMostFrequent, fact.containNoZeroValues, fact.lossy);
+						fact.largestOff, fact.zeroIsMostFrequent, fact.containNoZeroValues, fact.tupleSparsity,
+						fact.lossy);
 				return ColGroupSizes.estimateInMemorySizeSDC(numCols, fact.numVals, fact.numRows, fact.largestOff,
-					fact.zeroIsMostFrequent, fact.containNoZeroValues, fact.lossy);
+					fact.zeroIsMostFrequent, fact.containNoZeroValues, fact.tupleSparsity, fact.lossy);
 			case CONST:
 				if(fact.numOffs == 0)
 					return ColGroupSizes.estimateInMemorySizeEMPTY(numCols);
 				else if(fact.numOffs == fact.numRows && fact.numVals == 1)
-					return ColGroupSizes.estimateInMemorySizeCONST(numCols, fact.numVals, fact.lossy);
+					return ColGroupSizes.estimateInMemorySizeCONST(numCols, fact.numVals, fact.tupleSparsity,
+						fact.lossy);
 				else
 					return -1;
 			default:
diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/BitmapEncoder.java b/src/main/java/org/apache/sysds/runtime/compress/lib/BitmapEncoder.java
index 310b236..64b0bdd 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/lib/BitmapEncoder.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/lib/BitmapEncoder.java
@@ -34,6 +34,7 @@ import org.apache.sysds.runtime.compress.utils.DblArrayIntListHashMap.DArrayILis
 import org.apache.sysds.runtime.compress.utils.DoubleIntListHashMap;
 import org.apache.sysds.runtime.compress.utils.DoubleIntListHashMap.DIListEntry;
 import org.apache.sysds.runtime.compress.utils.IntArrayList;
+import org.apache.sysds.runtime.compress.utils.MultiColBitmap;
 import org.apache.sysds.runtime.data.SparseBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 
@@ -52,29 +53,28 @@ public class BitmapEncoder {
 	 * @param transposed Boolean specifying if the rawblock was transposed.
 	 * @return uncompressed bitmap representation of the columns
 	 */
-	public static Bitmap extractBitmap(int[] colIndices, MatrixBlock rawBlock, boolean transposed) {
-
-		Bitmap res = null;
-		if(colIndices.length == 1) {
-			res = extractBitmap(colIndices[0], rawBlock, transposed);
-		}
-		// multiple column selection (general case)
-		else {
+	public static ABitmap extractBitmap(int[] colIndices, MatrixBlock rawBlock, boolean transposed) {
+		try {
 			final int numRows = transposed ? rawBlock.getNumColumns() : rawBlock.getNumRows();
-			try {
-				res = extractBitmap(colIndices, ReaderColumnSelection.createReader(rawBlock, colIndices, transposed), numRows);
-			}
-			catch(Exception e) {
-				throw new DMLRuntimeException("Failed to extract bitmap", e);
+			if(rawBlock.isEmpty() && colIndices.length == 1)
+				return new Bitmap(null, null, numRows);
+			else if(colIndices.length == 1)
+				return extractBitmap(colIndices[0], rawBlock, transposed);
+			else if(rawBlock.isEmpty())
+				return new MultiColBitmap(colIndices.length, null, null, numRows);
+			else {
+				ReaderColumnSelection reader = ReaderColumnSelection.createReader(rawBlock, colIndices, transposed);
+				return extractBitmap(colIndices, reader, numRows);
 			}
 		}
-		return res;
-
+		catch(Exception e) {
+			throw new DMLRuntimeException("Failed to extract bitmap", e);
+		}
 	}
 
 	public static ABitmap extractBitmap(int[] colIndices, int rows, BitSet rawBlock, CompressionSettings compSettings) {
 		ReaderColumnSelection reader = new ReaderColumnSelectionBitSet(rawBlock, rows, colIndices);
-		Bitmap res = extractBitmap(colIndices, reader, rows);
+		ABitmap res = extractBitmap(colIndices, reader, rows);
 		return res;
 	}
 
@@ -88,7 +88,7 @@ public class BitmapEncoder {
 	 * @param transposed Boolean specifying if the rawBlock is transposed or not.
 	 * @return Bitmap containing the Information of the column.
 	 */
-	private static Bitmap extractBitmap(int colIndex, MatrixBlock rawBlock, boolean transposed) {
+	private static ABitmap extractBitmap(int colIndex, MatrixBlock rawBlock, boolean transposed) {
 		DoubleIntListHashMap hashMap = transposed ? extractHashMapTransposed(colIndex,
 			rawBlock) : extractHashMap(colIndex, rawBlock);
 		return makeBitmap(hashMap, transposed ? rawBlock.getNumColumns() : rawBlock.getNumRows());
@@ -187,7 +187,7 @@ public class BitmapEncoder {
 	 * @param numRows    The number of contained rows
 	 * @return The Bitmap
 	 */
-	protected static Bitmap extractBitmap(int[] colIndices, ReaderColumnSelection rowReader, int numRows) {
+	protected static ABitmap extractBitmap(int[] colIndices, ReaderColumnSelection rowReader, int numRows) {
 		// probe map for distinct items (for value or value groups)
 		DblArrayIntListHashMap distinctVals = new DblArrayIntListHashMap();
 
@@ -216,24 +216,24 @@ public class BitmapEncoder {
 	 * @param numCols      Number of columns
 	 * @return The Bitmap.
 	 */
-	private static Bitmap makeBitmap(DblArrayIntListHashMap distinctVals, int numRows, int numCols) {
+	private static ABitmap makeBitmap(DblArrayIntListHashMap distinctVals, int numRows, int numCols) {
 		// added for one pass bitmap construction
 		// Convert inputs to arrays
 		ArrayList<DArrayIListEntry> mapEntries = distinctVals.extractValues();
 		if(!mapEntries.isEmpty()) {
 			int numVals = distinctVals.size();
-			double[] values = new double[numVals * numCols];
+			double[][] values = new double[numVals][];
 			IntArrayList[] offsetsLists = new IntArrayList[numVals];
 			int bitmapIx = 0;
 			for(DArrayIListEntry val : mapEntries) {
-				System.arraycopy(val.key.getData(), 0, values, bitmapIx * numCols, numCols);
+				values[bitmapIx] = val.key.getData();
 				offsetsLists[bitmapIx++] = val.value;
 			}
 
-			return new Bitmap(numCols, offsetsLists, values, numRows);
+			return new MultiColBitmap(numCols, offsetsLists, values, numRows);
 		}
 		else
-			return new Bitmap(numCols, null, null, numRows);
+			return new MultiColBitmap(numCols, null, null, numRows);
 
 	}
 
@@ -248,7 +248,7 @@ public class BitmapEncoder {
 		// added for one pass bitmap construction
 		// Convert inputs to arrays
 		int numVals = distinctVals.size();
-		if(numVals > 0){
+		if(numVals > 0) {
 
 			double[] values = new double[numVals];
 			IntArrayList[] offsetsLists = new IntArrayList[numVals];
@@ -257,11 +257,11 @@ public class BitmapEncoder {
 				values[bitmapIx] = val.key;
 				offsetsLists[bitmapIx++] = val.value;
 			}
-	
-			return new Bitmap(1, offsetsLists, values, numRows);
+
+			return new Bitmap(offsetsLists, values, numRows);
 		}
-		else{
-			return new Bitmap(1, null, null, numRows);
+		else {
+			return new Bitmap(null, null, numRows);
 		}
 	}
 
diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/BitmapLossyEncoder.java b/src/main/java/org/apache/sysds/runtime/compress/lib/BitmapLossyEncoder.java
index 81789f1..4e74d69 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/lib/BitmapLossyEncoder.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/lib/BitmapLossyEncoder.java
@@ -30,6 +30,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Queue;
 
+import org.apache.commons.lang.NotImplementedException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
@@ -68,20 +69,21 @@ public class BitmapLossyEncoder {
 	 * @param numRows The number of rows contained in the ubm.
 	 * @return A bitmap.
 	 */
-	public static ABitmap makeBitmapLossy(Bitmap ubm, int numRows) {
-		final double[] fp = ubm.getValues();
-		if(fp.length == 0) {
-			return ubm;
-		}
-		Stats stats = new Stats(fp);
-		// TODO make better decisions than just a 8 Bit encoding.
-		if(Double.isInfinite(stats.max) || Double.isInfinite(stats.min)) {
-			LOG.warn("Defaulting to incompressable colGroup");
-			return ubm;
-		}
-		else {
-			return make8BitLossy(ubm, stats, numRows);
-		}
+	public static ABitmap makeBitmapLossy(ABitmap ubm, int numRows) {
+		throw new NotImplementedException();
+		// final double[] fp = ubm.getValues();
+		// if(fp.length == 0) {
+		// 	return ubm;
+		// }
+		// Stats stats = new Stats(fp);
+		// // TODO make better decisions than just a 8 Bit encoding.
+		// if(Double.isInfinite(stats.max) || Double.isInfinite(stats.min)) {
+		// 	LOG.warn("Defaulting to incompressable colGroup");
+		// 	return ubm;
+		// }
+		// else {
+		// 	return make8BitLossy(ubm, stats, numRows);
+		// }
 	}
 
 	/**
diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibLeftMultBy.java b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibLeftMultBy.java
index 9c73f7c..ba22642 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibLeftMultBy.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibLeftMultBy.java
@@ -255,7 +255,7 @@ public class CLALibLeftMultBy {
 				ExecutorService pool = CommonThreadPool.get(k);
 				// compute remaining compressed column groups in parallel
 				ArrayList<Callable<Object>> tasks = new ArrayList<>();
-				int rowBlockSize = 8;
+				int rowBlockSize = 1;
 				if(overlapping) {
 					for(int blo = 0; blo < that.getNumRows(); blo += rowBlockSize) {
 						tasks.add(new LeftMatrixMatrixMultTask(colGroups, that, ret, blo,
diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibRightMultBy.java b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibRightMultBy.java
index 5b0a105..61e275c 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibRightMultBy.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibRightMultBy.java
@@ -141,14 +141,14 @@ public class CLALibRightMultBy {
 			ret.setOverlapping(true);
 
 		if(containsNull) {
-			ColGroupEmpty cge = findEmptyColumnsAndMakeEmptyColGroup(retCg, ret.getNumColumns());
+			ColGroupEmpty cge = findEmptyColumnsAndMakeEmptyColGroup(retCg, ret.getNumColumns(), ret.getNumRows());
 			if(cge != null)
 				retCg.add(cge);
 		}
 		return ret;
 	}
 
-	private static ColGroupEmpty findEmptyColumnsAndMakeEmptyColGroup(List<AColGroup> colGroups, int nCols) {
+	private static ColGroupEmpty findEmptyColumnsAndMakeEmptyColGroup(List<AColGroup> colGroups, int nCols, int nRows) {
 		Set<Integer> emptyColumns = new HashSet<Integer>(nCols);
 		for(int i = 0; i < nCols; i++)
 			emptyColumns.add(i);
@@ -159,7 +159,7 @@ public class CLALibRightMultBy {
 
 		if(emptyColumns.size() != 0) {
 			int[] emptyColumnsFinal = emptyColumns.stream().mapToInt(Integer::intValue).toArray();
-			return new ColGroupEmpty(emptyColumnsFinal, colGroups.get(0).getNumRows());
+			return new ColGroupEmpty(emptyColumnsFinal, nRows);
 		}
 		else
 			return null;
diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibSquash.java b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibSquash.java
index aacea4a..fecd08e 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibSquash.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibSquash.java
@@ -39,7 +39,6 @@ import org.apache.sysds.runtime.compress.colgroup.ColGroupFactory;
 import org.apache.sysds.runtime.compress.colgroup.ColGroupValue;
 import org.apache.sysds.runtime.compress.readers.ReaderColumnSelection;
 import org.apache.sysds.runtime.compress.utils.ABitmap;
-import org.apache.sysds.runtime.compress.utils.Bitmap;
 import org.apache.sysds.runtime.util.CommonThreadPool;
 
 public class CLALibSquash {
@@ -123,18 +122,16 @@ public class CLALibSquash {
 			map = extractBitmap(columnIds, m);
 		}
 		else
-			map = BitmapLossyEncoder.extractMapFromCompressedSingleColumn(m,
-				columnIds[0],
-				minMaxes[columnIds[0] * 2],
+			map = BitmapLossyEncoder.extractMapFromCompressedSingleColumn(m, columnIds[0], minMaxes[columnIds[0] * 2],
 				minMaxes[columnIds[0] * 2 + 1], m.getNumRows());
 
-		AColGroup newGroup = ColGroupFactory.compress(columnIds, m.getNumRows(), map, CompressionType.DDC, cs, m);
+		AColGroup newGroup = ColGroupFactory.compress(columnIds, m.getNumRows(), map, CompressionType.DDC, cs, m, 1);
 		return newGroup;
 	}
 
 	private static ABitmap extractBitmap(int[] colIndices, CompressedMatrixBlock compressedBlock) {
-		Bitmap x = BitmapEncoder.extractBitmap(colIndices,
-			ReaderColumnSelection.createCompressedReader(compressedBlock, colIndices),  compressedBlock.getNumRows());
+		ABitmap x = BitmapEncoder.extractBitmap(colIndices,
+			ReaderColumnSelection.createCompressedReader(compressedBlock, colIndices), compressedBlock.getNumRows());
 		return BitmapLossyEncoder.makeBitmapLossy(x, compressedBlock.getNumRows());
 	}
 
diff --git a/src/main/java/org/apache/sysds/runtime/compress/utils/Bitmap.java b/src/main/java/org/apache/sysds/runtime/compress/utils/Bitmap.java
index 09354f6..db0c2b6 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/utils/Bitmap.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/utils/Bitmap.java
@@ -34,8 +34,8 @@ public final class Bitmap extends ABitmap {
 	 */
 	private double[] _values;
 
-	public Bitmap(int numCols, IntArrayList[] offsetsLists, double[] values, int rows) {
-		super(numCols, offsetsLists, rows);
+	public Bitmap(IntArrayList[] offsetsLists, double[] values, int rows) {
+		super(1, offsetsLists, rows);
 		_values = values;
 	}
 
@@ -48,33 +48,16 @@ public final class Bitmap extends ABitmap {
 		return _values;
 	}
 
-	/**
-	 * Obtain tuple of column values associated with index.
-	 * 
-	 * @param ix index of a particular distinct value
-	 * @return the tuple of column values associated with the specified index
-	 */
-	public double[] getValues(int ix) {
-		return Arrays.copyOfRange(_values, ix * _numCols, (ix + 1) * _numCols);
-	}
-
 	public int getNumNonZerosInOffset(int idx){
-		if(_numCols == 1)
-			return  _values[0] != 0 ? 1 : 0;
-		int nz = 0;
-		for(int i = idx * _numCols; i < (idx+1) * _numCols; i++)
-			nz += _values[i] == 0 ? 0 : 1;
-		
-		return nz;
+		return  _values[idx] != 0 ? 1 : 0;
 	}
 
 	public int getNumValues() {
-		return (_values == null) ? 0: _values.length / _numCols;
+		return (_values == null) ? 0: _values.length;
 	}
 
 	public void sortValuesByFrequency() {
 		int numVals = getNumValues();
-		int numCols = getNumColumns();
 
 		double[] freq = new double[numVals];
 		int[] pos = new int[numVals];
@@ -90,10 +73,10 @@ public final class Bitmap extends ABitmap {
 		ArrayUtils.reverse(pos);
 
 		// create new value and offset list arrays
-		double[] lvalues = new double[numVals * numCols];
+		double[] lvalues = new double[numVals];
 		IntArrayList[] loffsets = new IntArrayList[numVals];
 		for(int i = 0; i < numVals; i++) {
-			System.arraycopy(_values, pos[i] * numCols, lvalues, i * numCols, numCols);
+			lvalues[i]  = _values[pos[i]];
 			loffsets[i] = _offsetsLists[pos[i]];
 		}
 		_values = lvalues;
diff --git a/src/main/java/org/apache/sysds/runtime/compress/utils/Bitmap.java b/src/main/java/org/apache/sysds/runtime/compress/utils/MultiColBitmap.java
similarity index 70%
copy from src/main/java/org/apache/sysds/runtime/compress/utils/Bitmap.java
copy to src/main/java/org/apache/sysds/runtime/compress/utils/MultiColBitmap.java
index 09354f6..80d98df 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/utils/Bitmap.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/utils/MultiColBitmap.java
@@ -27,14 +27,12 @@ import org.apache.sysds.runtime.util.SortUtils;
 /**
  * Uncompressed representation of one or more columns in bitmap format.
  */
-public final class Bitmap extends ABitmap {
+public final class MultiColBitmap extends ABitmap {
 
-	/**
-	 * Distinct values that appear in the column. Linearized as value groups <v11 v12> <v21 v22>.
-	 */
-	private double[] _values;
+	/** Distinct tuples that appear in the columnGroup */
+	private double[][] _values;
 
-	public Bitmap(int numCols, IntArrayList[] offsetsLists, double[] values, int rows) {
+	public MultiColBitmap(int numCols, IntArrayList[] offsetsLists, double[][] values, int rows) {
 		super(numCols, offsetsLists, rows);
 		_values = values;
 	}
@@ -44,7 +42,7 @@ public final class Bitmap extends ABitmap {
 	 * 
 	 * @return dictionary of value tuples
 	 */
-	public double[] getValues() {
+	public double[][] getValues() {
 		return _values;
 	}
 
@@ -55,29 +53,26 @@ public final class Bitmap extends ABitmap {
 	 * @return the tuple of column values associated with the specified index
 	 */
 	public double[] getValues(int ix) {
-		return Arrays.copyOfRange(_values, ix * _numCols, (ix + 1) * _numCols);
+		return _values[ix];
 	}
 
-	public int getNumNonZerosInOffset(int idx){
-		if(_numCols == 1)
-			return  _values[0] != 0 ? 1 : 0;
+	public int getNumNonZerosInOffset(int idx) {
 		int nz = 0;
-		for(int i = idx * _numCols; i < (idx+1) * _numCols; i++)
-			nz += _values[i] == 0 ? 0 : 1;
-		
+		for(double v : getValues(idx))
+			nz += v == 0 ? 0 : 1;
+
 		return nz;
 	}
 
 	public int getNumValues() {
-		return (_values == null) ? 0: _values.length / _numCols;
+		return (_values == null) ? 0 : _values.length;
 	}
 
 	public void sortValuesByFrequency() {
-		int numVals = getNumValues();
-		int numCols = getNumColumns();
-
-		double[] freq = new double[numVals];
-		int[] pos = new int[numVals];
+		final int numVals = getNumValues();
+		
+		final double[] freq = new double[numVals];
+		final int[] pos = new int[numVals];
 
 		// populate the temporary arrays
 		for(int i = 0; i < numVals; i++) {
@@ -90,10 +85,10 @@ public final class Bitmap extends ABitmap {
 		ArrayUtils.reverse(pos);
 
 		// create new value and offset list arrays
-		double[] lvalues = new double[numVals * numCols];
+		double[][] lvalues = new double[numVals][];
 		IntArrayList[] loffsets = new IntArrayList[numVals];
 		for(int i = 0; i < numVals; i++) {
-			System.arraycopy(_values, pos[i] * numCols, lvalues, i * numCols, numCols);
+			lvalues[i]  = _values[pos[i]];
 			loffsets[i] = _offsetsLists[pos[i]];
 		}
 		_values = lvalues;
@@ -104,7 +99,9 @@ public final class Bitmap extends ABitmap {
 	public String toString() {
 		StringBuilder sb = new StringBuilder();
 		sb.append(super.toString());
-		sb.append("\nValues: " + Arrays.toString(_values));
+		sb.append("\nValues:");
+		for(double[] vv : _values)
+			sb.append("\n" + Arrays.toString(vv));
 		return sb.toString();
 	}
 
diff --git a/src/main/java/org/apache/sysds/utils/MemoryEstimates.java b/src/main/java/org/apache/sysds/utils/MemoryEstimates.java
index 8401d1a..8e0bac8 100644
--- a/src/main/java/org/apache/sysds/utils/MemoryEstimates.java
+++ b/src/main/java/org/apache/sysds/utils/MemoryEstimates.java
@@ -29,6 +29,8 @@ package org.apache.sysds.utils;
  */
 public class MemoryEstimates {
 
+	// private static final Log LOG = LogFactory.getLog(MemoryEstimates.class.getName());
+
 	/**
 	 * Get the worst case memory usage of an java.util.BitSet java object.
 	 * 
diff --git a/src/test/java/org/apache/sysds/test/component/compress/AbstractCompressedUnaryTests.java b/src/test/java/org/apache/sysds/test/component/compress/AbstractCompressedUnaryTests.java
index 94ba730..80df1e5 100644
--- a/src/test/java/org/apache/sysds/test/component/compress/AbstractCompressedUnaryTests.java
+++ b/src/test/java/org/apache/sysds/test/component/compress/AbstractCompressedUnaryTests.java
@@ -228,9 +228,6 @@ public abstract class AbstractCompressedUnaryTests extends CompressedTestBase {
 			}
 
 		}
-		catch(NotImplementedException e) {
-			throw e;
-		}
 		catch(Exception e) {
 			e.printStackTrace();
 			throw new RuntimeException(this.toString() + "\n" + e.getMessage(), e);
diff --git a/src/test/java/org/apache/sysds/test/component/compress/CompressedTestBase.java b/src/test/java/org/apache/sysds/test/component/compress/CompressedTestBase.java
index 1167512..7b9d871 100644
--- a/src/test/java/org/apache/sysds/test/component/compress/CompressedTestBase.java
+++ b/src/test/java/org/apache/sysds/test/component/compress/CompressedTestBase.java
@@ -78,7 +78,7 @@ public abstract class CompressedTestBase extends TestBase {
 	protected static SparsityType[] usedSparsityTypes = new SparsityType[] {SparsityType.FULL, SparsityType.SPARSE,};
 
 	protected static ValueType[] usedValueTypes = new ValueType[] {ValueType.RAND_ROUND, ValueType.OLE_COMPRESSIBLE,
-		ValueType.RLE_COMPRESSIBLE,};
+		ValueType.RLE_COMPRESSIBLE};
 
 	protected static ValueRange[] usedValueRanges = new ValueRange[] {ValueRange.SMALL, ValueRange.NEGATIVE,
 		ValueRange.BYTE};
@@ -120,6 +120,9 @@ public abstract class CompressedTestBase extends TestBase {
 		new CompressionSettingsBuilder().setSamplingRatio(0.1).setSeed(compressionSeed).setTransposeInput("true")
 			.setColumnPartitioner(PartitionerType.STATIC).setInvestigateEstimate(true),
 
+		new CompressionSettingsBuilder().setSamplingRatio(0.1).setSeed(compressionSeed).setTransposeInput("true")
+			.setColumnPartitioner(PartitionerType.COST_MATRIX_MULT).setInvestigateEstimate(true),
+
 		// Forced Uncompressed tests
 		new CompressionSettingsBuilder().setValidCompressions(EnumSet.of(CompressionType.UNCOMPRESSED)),
 
@@ -302,6 +305,8 @@ public abstract class CompressedTestBase extends TestBase {
 				for(OverLapping ov : overLapping) {
 					tests.add(new Object[] {SparsityType.EMPTY, ValueType.RAND, ValueRange.BOOLEAN, cs, mt, ov});
 					tests.add(new Object[] {SparsityType.FULL, ValueType.CONST, ValueRange.LARGE, cs, mt, ov});
+					tests.add(
+						new Object[] {SparsityType.FULL, ValueType.ONE_HOT_ENCODED, ValueRange.BOOLEAN, cs, mt, ov});
 				}
 		return tests;
 	}
diff --git a/src/test/java/org/apache/sysds/test/component/compress/CompressibleInputGenerator.java b/src/test/java/org/apache/sysds/test/component/compress/CompressibleInputGenerator.java
index 4666327..a8a2a3e 100644
--- a/src/test/java/org/apache/sysds/test/component/compress/CompressibleInputGenerator.java
+++ b/src/test/java/org/apache/sysds/test/component/compress/CompressibleInputGenerator.java
@@ -82,6 +82,21 @@ public class CompressibleInputGenerator {
 		return output;
 	}
 
+	public static double[][] getInputOneHotMatrix(int rows, int cols, int seed) {
+		double[][] variations = new double[cols][];
+		for(int i = 0; i < cols; i++) {
+			variations[i] = new double[cols];
+			variations[i][i] = 1;
+		}
+
+		double[][] matrix = new double[rows][];
+		Random r = new Random(seed);
+		for(int i = 0; i < rows; i++)
+			matrix[i] = variations[r.nextInt(cols)];
+
+		return matrix;
+	}
+
 	private static double[][] rle(int rows, int cols, int nrUnique, int max, int min, double sparsity, int seed,
 		boolean transpose) {
 
diff --git a/src/test/java/org/apache/sysds/test/component/compress/TestBase.java b/src/test/java/org/apache/sysds/test/component/compress/TestBase.java
index f2ebbba..29a2a87 100644
--- a/src/test/java/org/apache/sysds/test/component/compress/TestBase.java
+++ b/src/test/java/org/apache/sysds/test/component/compress/TestBase.java
@@ -35,6 +35,7 @@ import org.apache.sysds.test.component.compress.TestConstants.ValueRange;
 import org.apache.sysds.test.component.compress.TestConstants.ValueType;
 
 public class TestBase {
+	// private static final Log LOG = LogFactory.getLog(TestBase.class.getName());
 
 	protected ValueType valType;
 	protected ValueRange valRange;
@@ -70,20 +71,23 @@ public class TestBase {
 					this.min = this.max;
 					// Do not Break, utilize the RAND afterwards.
 				case RAND:
-					this.input = TestUtils.generateTestMatrix(rows, cols, min, max, sparsity, 7);
+					this.input = TestUtils.generateTestMatrix(rows, cols, min, max, sparsity, seed);
 					break;
 				case RAND_ROUND:
-					this.input = TestUtils.round(TestUtils.generateTestMatrix(rows, cols, min, max, sparsity, 7));
+					this.input = TestUtils.round(TestUtils.generateTestMatrix(rows, cols, min, max, sparsity, seed));
 					break;
 				case OLE_COMPRESSIBLE:
 					// Note the Compressible Input generator, generates an already Transposed input
 					// normally, therefore last argument is true, to build a non transposed matrix.
 					this.input = CompressibleInputGenerator.getInputDoubleMatrix(rows, cols, CompressionType.OLE,
-						(max - min), max, min, sparsity, 7, true);
+						(max - min), max, min, sparsity, seed, true);
 					break;
 				case RLE_COMPRESSIBLE:
 					this.input = CompressibleInputGenerator.getInputDoubleMatrix(rows, cols, CompressionType.RLE,
-						(max - min), max, min, sparsity, 7, true);
+						(max - min), max, min, sparsity, seed, true);
+					break;
+				case ONE_HOT_ENCODED:
+					this.input = CompressibleInputGenerator.getInputOneHotMatrix(rows, cols, seed);
 					break;
 				default:
 					throw new NotImplementedException("Not Implemented Test Value type input generator");
@@ -94,6 +98,7 @@ public class TestBase {
 			this.compressionSettings = compressionSettings.create();
 
 			mb = DataConverter.convertToMatrixBlock(this.input);
+
 		}
 		catch(Exception e) {
 			e.printStackTrace();
diff --git a/src/test/java/org/apache/sysds/test/component/compress/TestConstants.java b/src/test/java/org/apache/sysds/test/component/compress/TestConstants.java
index c936c1b..649e96c 100644
--- a/src/test/java/org/apache/sysds/test/component/compress/TestConstants.java
+++ b/src/test/java/org/apache/sysds/test/component/compress/TestConstants.java
@@ -38,6 +38,7 @@ public class TestConstants {
 		RAND_ROUND, // Values rounded to nearest whole numbers.
 		OLE_COMPRESSIBLE, // Ideal inputs for OLE Compression.
 		RLE_COMPRESSIBLE, // Ideal inputs for RLE Compression.
+		ONE_HOT_ENCODED,
 	}
 
 	public enum MatrixTypology {
diff --git a/src/test/java/org/apache/sysds/test/component/compress/colgroup/JolEstimateTest.java b/src/test/java/org/apache/sysds/test/component/compress/colgroup/JolEstimateTest.java
index c32ccbe..b370451 100644
--- a/src/test/java/org/apache/sysds/test/component/compress/colgroup/JolEstimateTest.java
+++ b/src/test/java/org/apache/sysds/test/component/compress/colgroup/JolEstimateTest.java
@@ -58,7 +58,7 @@ public abstract class JolEstimateTest {
 	public abstract CompressionType getCT();
 
 	private final long actualSize;
-	// The actual compressed column group
+	private final int actualNumberUnique;
 	private final AColGroup cg;
 
 	public JolEstimateTest(MatrixBlock mbt) {
@@ -72,8 +72,9 @@ public abstract class JolEstimateTest {
 				.setValidCompressions(EnumSet.of(getCT())).create();
 			cs.transposed = true;
 			ABitmap ubm = BitmapEncoder.extractBitmap(colIndexes, mbt, true);
-			cg = ColGroupFactory.compress(colIndexes, mbt.getNumColumns(), ubm, getCT(), cs, mbt);
+			cg = ColGroupFactory.compress(colIndexes, mbt.getNumColumns(), ubm, getCT(), cs, mbt, 1);
 			actualSize = cg.estimateInMemorySize();
+			actualNumberUnique = cg.getNumValues();
 		}
 		catch(Exception e) {
 			e.printStackTrace();
@@ -83,21 +84,7 @@ public abstract class JolEstimateTest {
 
 	@Test
 	public void compressedSizeInfoEstimatorExact() {
-		try {
-			CompressionSettings cs = new CompressionSettingsBuilder().setSamplingRatio(1.0)
-				.setValidCompressions(EnumSet.of(getCT())).setSeed(seed).create();
-			cs.transposed = true;
-
-			final long estimateCSI = CompressedSizeEstimatorFactory.getSizeEstimator(mbt, cs)
-				.estimateCompressedColGroupSize().getCompressionSize(cg.getCompType());
-
-			boolean res = Math.abs(estimateCSI - actualSize) <= 0;
-			assertTrue("CSI estimate " + estimateCSI + " should be exactly " + actualSize + "\n" + cg.toString(), res);
-		}
-		catch(Exception e) {
-			e.printStackTrace();
-			assertTrue("Failed exact test " + getCT(), false);
-		}
+		compressedSizeInfoEstimatorSample(1.0, 1.0);
 	}
 
 	@Test
@@ -107,48 +94,59 @@ public abstract class JolEstimateTest {
 
 	@Test
 	public void compressedSizeInfoEstimatorSample_50() {
-		compressedSizeInfoEstimatorSample(0.5, 0.90);
+		compressedSizeInfoEstimatorSample(0.5, 0.8);
 	}
 
 	@Test
 	public void compressedSizeInfoEstimatorSample_20() {
-		compressedSizeInfoEstimatorSample(0.2, 0.8);
+		compressedSizeInfoEstimatorSample(0.2, 0.7);
 	}
 
-	@Test
-	public void compressedSizeInfoEstimatorSample_10() {
-		compressedSizeInfoEstimatorSample(0.1, 0.75);
-	}
+	// @Test
+	// public void compressedSizeInfoEstimatorSample_10() {
+	// 	compressedSizeInfoEstimatorSample(0.1, 0.6);
+	// }
 
-	@Test
-	public void compressedSizeInfoEstimatorSample_5() {
-		compressedSizeInfoEstimatorSample(0.05, 0.7);
-	}
+	// @Test
+	// public void compressedSizeInfoEstimatorSample_5() {
+	// 	compressedSizeInfoEstimatorSample(0.05, 0.5);
+	// }
 
-	@Test
-	public void compressedSizeInfoEstimatorSample_1() {
-		compressedSizeInfoEstimatorSample(0.01, 0.6);
-	}
+	// @Test
+	// public void compressedSizeInfoEstimatorSample_1() {
+	// 	compressedSizeInfoEstimatorSample(0.01, 0.4);
+	// }
 
 	public void compressedSizeInfoEstimatorSample(double ratio, double tolerance) {
 		try {
-			if(mbt.getNumColumns() < CompressedSizeEstimatorFactory.minimumSampleSize)
-				return; // Skip the tests that anyway wouldn't use the sample based approach.
 
 			CompressionSettings cs = new CompressionSettingsBuilder().setSamplingRatio(ratio)
-				.setValidCompressions(EnumSet.of(getCT())).setSeed(seed).create();
+				.setValidCompressions(EnumSet.of(getCT())).setMinimumSampleSize(100).setSeed(seed).create();
 			cs.transposed = true;
 
 			final CompressedSizeInfoColGroup cgsi = CompressedSizeEstimatorFactory.getSizeEstimator(mbt, cs)
 				.estimateCompressedColGroupSize();
+
+			if(cg.getCompType() != CompressionType.UNCOMPRESSED && actualNumberUnique > 10) {
+
+				final int estimateNUniques = cgsi.getNumVals();
+				final double minToleranceNUniques = actualNumberUnique * tolerance;
+				final double maxToleranceNUniques = actualNumberUnique / tolerance;
+				final String uniqueString = minToleranceNUniques + " < " + estimateNUniques + " < "
+					+ maxToleranceNUniques;
+				final boolean withinToleranceOnNUniques = minToleranceNUniques <= actualNumberUnique &&
+					actualNumberUnique <= maxToleranceNUniques;
+				assertTrue("CSI Sampled estimate of number of unique values not in range " + uniqueString + "\n" + cgsi,
+					withinToleranceOnNUniques);
+			}
+
 			final long estimateCSI = cgsi.getCompressionSize(cg.getCompType());
 			final double minTolerance = actualSize * tolerance;
 			final double maxTolerance = actualSize / tolerance;
 			final String rangeString = minTolerance + " < " + estimateCSI + " < " + maxTolerance;
-			boolean res = minTolerance < estimateCSI && estimateCSI < maxTolerance;
-			assertTrue(
-				"CSI Sampled estimate is not in tolerance range " + rangeString + "\n" + cgsi + "\n" + cg.toString(),
-				res);
+			final boolean withinToleranceOnSize = minTolerance <= estimateCSI && estimateCSI <= maxTolerance;
+			assertTrue("CSI Sampled estimate is not in tolerance range " + rangeString + " Actual number uniques:"
+				+ actualNumberUnique + "\n" + cgsi, withinToleranceOnSize);
 
 		}
 		catch(Exception e) {
diff --git a/src/test/java/org/apache/sysds/test/component/compress/estim/SampleEstimatorTest.java b/src/test/java/org/apache/sysds/test/component/compress/estim/SampleEstimatorTest.java
new file mode 100644
index 0000000..fba1088
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/component/compress/estim/SampleEstimatorTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.component.compress.estim;
+
+import static org.junit.Assert.assertTrue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sysds.runtime.compress.CompressionSettings;
+import org.apache.sysds.runtime.compress.CompressionSettingsBuilder;
+import org.apache.sysds.runtime.compress.estim.CompressedSizeEstimator;
+import org.apache.sysds.runtime.compress.estim.CompressedSizeEstimatorFactory;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.util.DataConverter;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Test;
+
+public class SampleEstimatorTest {
+
+	protected static final Log LOG = LogFactory.getLog(SampleEstimatorTest.class.getName());
+
+	private static final int seed = 1512314;
+
+	final MatrixBlock mbt;
+
+	public SampleEstimatorTest() {
+		// matrix block 2 columns
+		mbt = DataConverter
+			.convertToMatrixBlock(TestUtils.round(TestUtils.generateTestMatrix(2, 500000, 0, 299, 1.0, seed + 1)));
+	}
+
+	@Test
+	public void compressedSizeInfoEstimatorFull() {
+		testSampleEstimateIsAtMaxEstimatedElementsInEachColumnsProduct(1.0, 1.0);
+	}
+
+	@Test
+	public void compressedSizeInfoEstimatorSample_90() {
+		testSampleEstimateIsAtMaxEstimatedElementsInEachColumnsProduct(0.9, 0.9);
+	}
+
+	@Test
+	public void compressedSizeInfoEstimatorSample_50() {
+		testSampleEstimateIsAtMaxEstimatedElementsInEachColumnsProduct(0.5, 0.90);
+	}
+
+	@Test
+	public void compressedSizeInfoEstimatorSample_20() {
+		testSampleEstimateIsAtMaxEstimatedElementsInEachColumnsProduct(0.2, 0.8);
+	}
+
+	@Test
+	public void compressedSizeInfoEstimatorSample_10() {
+		testSampleEstimateIsAtMaxEstimatedElementsInEachColumnsProduct(0.1, 0.75);
+	}
+
+	@Test
+	public void compressedSizeInfoEstimatorSample_5() {
+		testSampleEstimateIsAtMaxEstimatedElementsInEachColumnsProduct(0.05, 0.7);
+	}
+
+	@Test
+	public void compressedSizeInfoEstimatorSample_1() {
+		testSampleEstimateIsAtMaxEstimatedElementsInEachColumnsProduct(0.01, 0.6);
+	}
+
+	@Test
+	public void compressedSizeInfoEstimatorSample_p1() {
+		testSampleEstimateIsAtMaxEstimatedElementsInEachColumnsProduct(0.001, 0.5);
+	}
+
+	/**
+	 * This test verify that the estimated number or unique values in individual columns is adhered to when analyzing
+	 * multi columns.
+	 * 
+	 * The important part here is not if the number of unique elements is estimated correctly, but that the relation
+	 * between observations is preserved.
+	 * 
+	 * @param ratio     Ratio to sample
+	 * @param tolerance A percentage tolerance in number of unique element estimated
+	 */
+	private void testSampleEstimateIsAtMaxEstimatedElementsInEachColumnsProduct(double ratio, double tolerance) {
+
+		final CompressionSettings cs_estimate = new CompressionSettingsBuilder().setMinimumSampleSize(100)
+			.setSamplingRatio(ratio).setSeed(seed).create();
+
+		cs_estimate.transposed = true;
+
+		final CompressedSizeEstimator estimate = CompressedSizeEstimatorFactory.getSizeEstimator(mbt, cs_estimate);
+		final int estimate_1 = estimate.estimateCompressedColGroupSize(new int[] {0}).getNumVals() + 1;
+		final int estimate_2 = estimate.estimateCompressedColGroupSize(new int[] {1}).getNumVals() + 1;
+
+		final int estimate_full = estimate.estimateCompressedColGroupSize(new int[] {0, 1}, estimate_1 * estimate_2)
+			.getNumVals();
+		assertTrue(
+			"Estimate of all columns should be upper bounded by distinct of each column multiplied: " + estimate_full
+				+ " * " + tolerance + " <= " + estimate_1 * estimate_2,
+			estimate_full * tolerance <= estimate_1 * estimate_2);
+
+	}
+
+}