You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by ba...@apache.org on 2022/01/25 21:20:42 UTC

[systemds] 01/02: [SYSTEMDS-3252] CLA Cocode Greedy Parallel

This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git

commit 7f67226999161e6a6a6b035e9abea8757c0c78de
Author: baunsgaard <ba...@tugraz.at>
AuthorDate: Tue Dec 14 13:10:12 2021 +0100

    [SYSTEMDS-3252] CLA Cocode Greedy Parallel
    
    This commit contains a new method for sparsely joining column groups in
    the cocoding algorithm.
    
    Closes #1485
---
 .../runtime/compress/cocode/CoCodeBinPacking.java  | 130 ++------
 .../runtime/compress/cocode/CoCodeGreedy.java      | 154 +++------
 .../sysds/runtime/compress/cocode/ColIndexes.java} |  25 +-
 .../sysds/runtime/compress/cocode/Memorizer.java   |  92 ++++++
 .../runtime/compress/colgroup/AColGroupValue.java  |  12 +-
 .../runtime/compress/colgroup/ColGroupConst.java   |  10 +-
 .../runtime/compress/colgroup/ColGroupFactory.java |  45 ++-
 .../runtime/compress/colgroup/ColGroupSizes.java   |  24 +-
 .../compress/colgroup/dictionary/Dictionary.java   |   6 +-
 .../colgroup/dictionary/DictionaryFactory.java     |  59 +---
 .../colgroup/dictionary/MatrixBlockDictionary.java |  42 ++-
 .../compress/colgroup/mapping/MapToBit.java        |   2 +-
 .../compress/colgroup/mapping/MapToByte.java       |   2 +-
 .../compress/colgroup/mapping/MapToChar.java       |   2 +-
 .../compress/colgroup/mapping/MapToFactory.java    |  45 ++-
 .../compress/colgroup/offset/OffsetByte.java       |  50 +--
 .../compress/colgroup/offset/OffsetChar.java       |  15 +-
 .../compress/colgroup/offset/OffsetFactory.java    |  59 +++-
 .../compress/cost/ComputationCostEstimator.java    |  16 +-
 .../compress/estim/CompressedSizeEstimator.java    |  23 +-
 .../estim/CompressedSizeEstimatorExact.java        |  35 +-
 .../estim/CompressedSizeEstimatorFactory.java      |  31 +-
 .../estim/CompressedSizeEstimatorSample.java       |  47 +--
 .../estim/CompressedSizeEstimatorUltraSparse.java  |   5 +
 .../runtime/compress/estim/CompressedSizeInfo.java |  18 -
 .../compress/estim/CompressedSizeInfoColGroup.java | 109 ++-----
 .../runtime/compress/estim/EstimationFactors.java  | 113 +------
 .../compress/estim/encoding/ConstEncoding.java     |  66 ++++
 .../compress/estim/encoding/DenseEncoding.java     | 171 ++++++++++
 .../compress/estim/encoding/EmptyEncoding.java     |  67 ++++
 .../runtime/compress/estim/encoding/IEncode.java   | 361 +++++++++++++++++++++
 .../compress/estim/encoding/SparseEncoding.java    | 245 ++++++++++++++
 .../runtime/compress/lib/CLALibBinaryCellOp.java   |   2 +-
 .../compress/utils/DblArrayCountHashMap.java       |  25 ++
 .../runtime/compress/utils/DoubleCountHashMap.java |  67 +++-
 .../sysds/runtime/compress/utils/IntArrayList.java |  40 +--
 .../component/compress/CompressedTestBase.java     |  23 +-
 .../compress/CompressibleInputGenerator.java       |  73 ++++-
 .../compress/colgroup/JolEstimateOLETest.java      | 169 +++++-----
 .../compress/colgroup/JolEstimateRLETest.java      | 217 ++++++-------
 .../compress/colgroup/JolEstimateSDCTest.java      | 120 ++++++-
 .../compress/colgroup/JolEstimateTest.java         | 121 ++++---
 .../colgroup/JolEstimateUncompressedTest.java      |  30 +-
 .../encoding/EncodeSampleDenseNonUniform.java      |  83 +++++
 .../estim/encoding/EncodeSampleMultiColTest.java   | 145 +++++++++
 .../estim/encoding/EncodeSampleSingleColTest.java  | 124 +++++++
 .../compress/estim/encoding/EncodeSampleTest.java  | 179 ++++++++++
 .../estim/encoding/EncodeSampleUnbalancedTest.java | 116 +++++++
 .../estim/encoding/EncodeSampleUniformTest.java    | 120 +++++++
 .../compress/mapping/StandAloneTests.java          |  30 ++
 .../component/compress/offset/OffsetTests.java     |  10 +-
 51 files changed, 2747 insertions(+), 1028 deletions(-)

diff --git a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeBinPacking.java b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeBinPacking.java
index 1731ef2..4007ed9 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeBinPacking.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeBinPacking.java
@@ -22,9 +22,7 @@ package org.apache.sysds.runtime.compress.cocode;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.sysds.runtime.compress.CompressionSettings;
 import org.apache.sysds.runtime.compress.colgroup.AColGroup.CompressionType;
@@ -56,7 +54,7 @@ public class CoCodeBinPacking extends AColumnCoCoder {
 	protected CoCodeBinPacking(CompressedSizeEstimator sizeEstimator, ICostEstimate costEstimator,
 		CompressionSettings cs) {
 		super(sizeEstimator, costEstimator, cs);
-		mem = new Memorizer();
+		mem = new Memorizer(sizeEstimator);
 	}
 
 	@Override
@@ -141,7 +139,7 @@ public class CoCodeBinPacking extends AColumnCoCoder {
 			for(int j = 0; j < bins.size(); j++) {
 				double newBinWeight = binWeights[j] - c.getCardinalityRatio();
 				if(newBinWeight >= 0 && bins.get(j).getColumns().length < MAX_COL_PER_GROUP - 1) {
-					bins.set(j, joinWithoutAnalysis(Util.join(bins.get(j).getColumns(), c.getColumns()),bins.get(j), c));
+					bins.set(j, joinWithoutAnalysis(Util.join(bins.get(j).getColumns(), c.getColumns()), bins.get(j), c));
 					binWeights[j] = newBinWeight;
 					assigned = true;
 					break;
@@ -190,20 +188,20 @@ public class CoCodeBinPacking extends AColumnCoCoder {
 
 	private List<CompressedSizeInfoColGroup> coCodeBruteForce(CompressedSizeInfoColGroup bin) {
 
-		List<int[]> workset = new ArrayList<>(bin.getColumns().length);
+		List<ColIndexes> workSet = new ArrayList<>(bin.getColumns().length);
 
-		for(int i = 0; i < bin.getColumns().length; i++)
-			workset.add(new int[] {bin.getColumns()[i]});
+		for(int b : bin.getColumns())
+			workSet.add(new ColIndexes(new int[] {b}));
 
 		// process merging iterations until no more change
-		while(workset.size() > 1) {
+		while(workSet.size() > 1) {
 			long changeInSize = 0;
 			CompressedSizeInfoColGroup tmp = null;
-			int[] selected1 = null, selected2 = null;
-			for(int i = 0; i < workset.size(); i++) {
-				for(int j = i + 1; j < workset.size(); j++) {
-					final int[] c1 = workset.get(i);
-					final int[] c2 = workset.get(j);
+			ColIndexes selected1 = null, selected2 = null;
+			for(int i = 0; i < workSet.size(); i++) {
+				for(int j = i + 1; j < workSet.size(); j++) {
+					final ColIndexes c1 = workSet.get(i);
+					final ColIndexes c2 = workSet.get(j);
 					final long sizeC1 = mem.get(c1).getMinSize();
 					final long sizeC2 = mem.get(c2).getMinSize();
 
@@ -222,8 +220,8 @@ public class CoCodeBinPacking extends AColumnCoCoder {
 					long newSizeChangeIfSelected = sizeC1C2 - sizeC1 - sizeC2;
 					// Select the best join of either the currently selected
 					// or keep the old one.
-					if((tmp == null && newSizeChangeIfSelected < changeInSize) || tmp != null &&
-						(newSizeChangeIfSelected < changeInSize || newSizeChangeIfSelected == changeInSize &&
+					if((tmp == null && newSizeChangeIfSelected < changeInSize) ||
+						tmp != null && (newSizeChangeIfSelected < changeInSize || newSizeChangeIfSelected == changeInSize &&
 							c1c2Inf.getColumns().length < tmp.getColumns().length)) {
 						changeInSize = newSizeChangeIfSelected;
 						tmp = c1c2Inf;
@@ -234,111 +232,23 @@ public class CoCodeBinPacking extends AColumnCoCoder {
 			}
 
 			if(tmp != null) {
-				workset.remove(selected1);
-				workset.remove(selected2);
-				workset.add(tmp.getColumns());
+				workSet.remove(selected1);
+				workSet.remove(selected2);
+				workSet.add(new ColIndexes(tmp.getColumns()));
 			}
 			else
 				break;
 		}
 
-		LOG.debug(mem.stats());
+		if(LOG.isDebugEnabled())
+			LOG.debug("Memorizer stats:" + mem.stats());
 		mem.resetStats();
 
-		List<CompressedSizeInfoColGroup> ret = new ArrayList<>(workset.size());
+		List<CompressedSizeInfoColGroup> ret = new ArrayList<>(workSet.size());
 
-		for(int[] w : workset)
+		for(ColIndexes w : workSet)
 			ret.add(mem.get(w));
 
 		return ret;
 	}
-
-	protected class Memorizer {
-		private final Map<ColIndexes, CompressedSizeInfoColGroup> mem;
-		private int st1 = 0, st2 = 0, st3 = 0, st4 = 0;
-
-		public Memorizer() {
-			mem = new HashMap<>();
-		}
-
-		public void put(CompressedSizeInfoColGroup g) {
-			mem.put(new ColIndexes(g.getColumns()), g);
-		}
-
-		public CompressedSizeInfoColGroup get(CompressedSizeInfoColGroup g) {
-			return mem.get(new ColIndexes(g.getColumns()));
-		}
-
-		public CompressedSizeInfoColGroup get(int[] c) {
-			return mem.get(new ColIndexes(c));
-		}
-
-		public CompressedSizeInfoColGroup getOrCreate(int[] c1, int[] c2) {
-			final int[] c = Util.join(c1, c2);
-			final ColIndexes cI = new ColIndexes(Util.join(c1, c2));
-			CompressedSizeInfoColGroup g = mem.get(cI);
-			st2++;
-			if(g == null) {
-				final CompressedSizeInfoColGroup left = mem.get(new ColIndexes(c1));
-				final CompressedSizeInfoColGroup right = mem.get(new ColIndexes(c2));
-				final boolean leftConst = left.getBestCompressionType(_cs) == CompressionType.CONST &&
-					left.getNumOffs() == 0;
-				final boolean rightConst = right.getBestCompressionType(_cs) == CompressionType.CONST &&
-					right.getNumOffs() == 0;
-				if(leftConst)
-					g = CompressedSizeInfoColGroup.addConstGroup(c, right, _cs.validCompressions);
-				else if(rightConst)
-					g = CompressedSizeInfoColGroup.addConstGroup(c, left, _cs.validCompressions);
-				else {
-					st3++;
-					g = _sest.estimateJoinCompressedSize(c, left, right);
-				}
-
-				if(leftConst || rightConst)
-					st4++;
-
-				mem.put(cI, g);
-			}
-			return g;
-		}
-
-		public void incst1() {
-			st1++;
-		}
-
-		public String stats() {
-			return st1 + " " + st2 + " " + st3 + " " + st4;
-		}
-
-		public void resetStats() {
-			st1 = 0;
-			st2 = 0;
-			st3 = 0;
-			st4 = 0;
-		}
-
-		@Override
-		public String toString() {
-			return mem.toString();
-		}
-	}
-
-	private static class ColIndexes {
-		final int[] _indexes;
-
-		public ColIndexes(int[] indexes) {
-			_indexes = indexes;
-		}
-
-		@Override
-		public int hashCode() {
-			return Arrays.hashCode(_indexes);
-		}
-
-		@Override
-		public boolean equals(Object that) {
-			ColIndexes thatGrp = (ColIndexes) that;
-			return Arrays.equals(_indexes, thatGrp._indexes);
-		}
-	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeGreedy.java b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeGreedy.java
index 1e47ff3..3b59fef 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeGreedy.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeGreedy.java
@@ -20,19 +20,19 @@
 package org.apache.sysds.runtime.compress.cocode;
 
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 
+import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.compress.CompressionSettings;
-import org.apache.sysds.runtime.compress.colgroup.AColGroup.CompressionType;
 import org.apache.sysds.runtime.compress.cost.ComputationCostEstimator;
 import org.apache.sysds.runtime.compress.cost.ICostEstimate;
 import org.apache.sysds.runtime.compress.estim.CompressedSizeEstimator;
 import org.apache.sysds.runtime.compress.estim.CompressedSizeInfo;
 import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup;
-import org.apache.sysds.runtime.compress.utils.Util;
+import org.apache.sysds.runtime.util.CommonThreadPool;
 
 public class CoCodeGreedy extends AColumnCoCoder {
 
@@ -48,31 +48,34 @@ public class CoCodeGreedy extends AColumnCoCoder {
 
 	protected static List<CompressedSizeInfoColGroup> join(List<CompressedSizeInfoColGroup> inputColumns,
 		CompressedSizeEstimator sEst, ICostEstimate cEst, CompressionSettings cs, int k) {
-		Memorizer mem = new Memorizer(cs, sEst);
+		Memorizer mem = new Memorizer(sEst);
 		for(CompressedSizeInfoColGroup g : inputColumns)
 			mem.put(g);
 
-		return coCodeBruteForce(inputColumns, cEst, mem);
+		return coCodeBruteForce(inputColumns, cEst, mem, k);
 	}
 
 	private static List<CompressedSizeInfoColGroup> coCodeBruteForce(List<CompressedSizeInfoColGroup> inputColumns,
-		ICostEstimate cEst, Memorizer mem) {
+		ICostEstimate cEst, Memorizer mem, int k) {
 
-		List<ColIndexes> workset = new ArrayList<>(inputColumns.size());
+		List<ColIndexes> workSet = new ArrayList<>(inputColumns.size());
 
 		final boolean workloadCost = cEst instanceof ComputationCostEstimator;
 
 		for(int i = 0; i < inputColumns.size(); i++)
-			workset.add(new ColIndexes(inputColumns.get(i).getColumns()));
+			workSet.add(new ColIndexes(inputColumns.get(i).getColumns()));
+
+		parallelFirstJoin(workSet, mem, cEst, k);
+
 		// process merging iterations until no more change
-		while(workset.size() > 1) {
+		while(workSet.size() > 1) {
 			double changeInCost = 0;
 			CompressedSizeInfoColGroup tmp = null;
 			ColIndexes selected1 = null, selected2 = null;
-			for(int i = 0; i < workset.size(); i++) {
-				for(int j = i + 1; j < workset.size(); j++) {
-					final ColIndexes c1 = workset.get(i);
-					final ColIndexes c2 = workset.get(j);
+			for(int i = 0; i < workSet.size(); i++) {
+				for(int j = i + 1; j < workSet.size(); j++) {
+					final ColIndexes c1 = workSet.get(i);
+					final ColIndexes c2 = workSet.get(j);
 					final double costC1 = cEst.getCostOfColumnGroup(mem.get(c1));
 					final double costC2 = cEst.getCostOfColumnGroup(mem.get(c2));
 
@@ -95,8 +98,8 @@ public class CoCodeGreedy extends AColumnCoCoder {
 
 					// Select the best join of either the currently selected
 					// or keep the old one.
-					if((tmp == null && newSizeChangeIfSelected < changeInCost) || tmp != null &&
-						(newSizeChangeIfSelected < changeInCost || newSizeChangeIfSelected == changeInCost &&
+					if((tmp == null && newSizeChangeIfSelected < changeInCost) ||
+						tmp != null && (newSizeChangeIfSelected < changeInCost || newSizeChangeIfSelected == changeInCost &&
 							c1c2Inf.getColumns().length < tmp.getColumns().length)) {
 						changeInCost = newSizeChangeIfSelected;
 						tmp = c1c2Inf;
@@ -107,10 +110,10 @@ public class CoCodeGreedy extends AColumnCoCoder {
 			}
 
 			if(tmp != null) {
-				workset.remove(selected1);
-				workset.remove(selected2);
+				workSet.remove(selected1);
+				workSet.remove(selected2);
 				mem.remove(selected1, selected2);
-				workset.add(new ColIndexes(tmp.getColumns()));
+				workSet.add(new ColIndexes(tmp.getColumns()));
 			}
 			else
 				break;
@@ -119,107 +122,56 @@ public class CoCodeGreedy extends AColumnCoCoder {
 			LOG.debug("Memorizer stats:" + mem.stats());
 		mem.resetStats();
 
-		List<CompressedSizeInfoColGroup> ret = new ArrayList<>(workset.size());
+		List<CompressedSizeInfoColGroup> ret = new ArrayList<>(workSet.size());
 
-		for(ColIndexes w : workset)
+		for(ColIndexes w : workSet)
 			ret.add(mem.get(w));
 
 		return ret;
 	}
 
-	protected static class Memorizer {
-		private final CompressionSettings _cs;
-		private final CompressedSizeEstimator _sEst;
-		private final Map<ColIndexes, CompressedSizeInfoColGroup> mem;
-		private int st1 = 0, st2 = 0, st3 = 0, st4 = 0;
-
-		public Memorizer(CompressionSettings cs, CompressedSizeEstimator sEst) {
-			_cs = cs;
-			_sEst = sEst;
-			mem = new HashMap<>();
-		}
+	protected static void parallelFirstJoin(List<ColIndexes> workSet, Memorizer mem, ICostEstimate cEst, int k) {
+		try {
 
-		public void put(CompressedSizeInfoColGroup g) {
-			mem.put(new ColIndexes(g.getColumns()), g);
-		}
+			ExecutorService pool = CommonThreadPool.get(k);
+			List<JoinTask> tasks = new ArrayList<>();
+			for(int i = 0; i < workSet.size(); i++)
+				for(int j = i + 1; j < workSet.size(); j++) {
+					final ColIndexes c1 = workSet.get(i);
+					final ColIndexes c2 = workSet.get(j);
 
-		public CompressedSizeInfoColGroup get(ColIndexes c) {
-			return mem.get(c);
-		}
+					final int csi1 = mem.get(c1).getNumVals();
+					final int csi2 = mem.get(c2).getNumVals();
 
-		public void remove(ColIndexes c1, ColIndexes c2) {
-			mem.remove(c1);
-			mem.remove(c2);
-		}
+					if(csi1 * csi2 > 10000)
+						continue;
 
-		public CompressedSizeInfoColGroup getOrCreate(ColIndexes c1, ColIndexes c2) {
-			final int[] c = Util.join(c1._indexes, c2._indexes);
-			final ColIndexes cI = new ColIndexes(c);
-			CompressedSizeInfoColGroup g = mem.get(cI);
-			st2++;
-			if(g == null) {
-				final CompressedSizeInfoColGroup left = mem.get(c1);
-				final CompressedSizeInfoColGroup right = mem.get(c2);
-				final boolean leftConst = left.getBestCompressionType(_cs) == CompressionType.CONST &&
-					left.getNumOffs() == 0;
-				final boolean rightConst = right.getBestCompressionType(_cs) == CompressionType.CONST &&
-					right.getNumOffs() == 0;
-				if(leftConst)
-					g = CompressedSizeInfoColGroup.addConstGroup(c, right, _cs.validCompressions);
-				else if(rightConst)
-					g = CompressedSizeInfoColGroup.addConstGroup(c, left, _cs.validCompressions);
-				else {
-					st3++;
-					g = _sEst.estimateJoinCompressedSize(c, left, right);
+					tasks.add(new JoinTask(workSet.get(i), workSet.get(j), mem));
 				}
 
-				if(leftConst || rightConst)
-					st4++;
-
-				mem.put(cI, g);
-			}
-			return g;
+			for(Future<Object> t : pool.invokeAll(tasks))
+				t.get();
+			pool.shutdown();
 		}
-
-		public void incst1() {
-			st1++;
-		}
-
-		public String stats() {
-			return st1 + " " + st2 + " " + st3 + " " + st4;
-		}
-
-		public void resetStats() {
-			st1 = 0;
-			st2 = 0;
-			st3 = 0;
-			st4 = 0;
-		}
-
-		@Override
-		public String toString() {
-			return mem.toString();
+		catch(Exception e) {
+			throw new DMLRuntimeException("failed to join column groups", e);
 		}
 	}
 
-	private static class ColIndexes {
-		final int[] _indexes;
-		final int _hash;
+	protected static class JoinTask implements Callable<Object> {
+		private final ColIndexes _c1, _c2;
+		private final Memorizer _m;
 
-		public ColIndexes(int[] indexes) {
-			_indexes = indexes;
-			_hash = Arrays.hashCode(_indexes);
-		}
-
-		@Override
-		public int hashCode() {
-			return _hash;
+		protected JoinTask(ColIndexes c1, ColIndexes c2, Memorizer m) {
+			_c1 = c1;
+			_c2 = c2;
+			_m = m;
 		}
 
 		@Override
-		public boolean equals(Object that) {
-			ColIndexes thatGrp = (ColIndexes) that;
-			return Arrays.equals(_indexes, thatGrp._indexes);
+		public Object call() {
+			_m.getOrCreate(_c1, _c2);
+			return null;
 		}
 	}
 }
diff --git a/src/test/java/org/apache/sysds/test/component/compress/colgroup/JolEstimateSDCTest.java b/src/main/java/org/apache/sysds/runtime/compress/cocode/ColIndexes.java
similarity index 63%
copy from src/test/java/org/apache/sysds/test/component/compress/colgroup/JolEstimateSDCTest.java
copy to src/main/java/org/apache/sysds/runtime/compress/cocode/ColIndexes.java
index b302242..0800a86 100644
--- a/src/test/java/org/apache/sysds/test/component/compress/colgroup/JolEstimateSDCTest.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/cocode/ColIndexes.java
@@ -17,22 +17,27 @@
  * under the License.
  */
 
-package org.apache.sysds.test.component.compress.colgroup;
+package org.apache.sysds.runtime.compress.cocode;
 
-import org.apache.sysds.runtime.compress.colgroup.AColGroup.CompressionType;
-import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import java.util.Arrays;
 
-public class JolEstimateSDCTest extends JolEstimateOLETest {
+public class ColIndexes {
+	final int[] _indexes;
+	final int _hash;
 
-	// Just use the same test cases as OLE.
-	// This is fine because SDC exhibit the same characteristics as OLE.
+	public ColIndexes(int[] indexes) {
+		_indexes = indexes;
+		_hash = Arrays.hashCode(_indexes);
+	}
 
-	public JolEstimateSDCTest(MatrixBlock mb) {
-		super(mb);
+	@Override
+	public int hashCode() {
+		return _hash;
 	}
 
 	@Override
-	public CompressionType getCT() {
-		return sdc;
+	public boolean equals(Object that) {
+		ColIndexes thatGrp = (ColIndexes) that;
+		return Arrays.equals(_indexes, thatGrp._indexes);
 	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/compress/cocode/Memorizer.java b/src/main/java/org/apache/sysds/runtime/compress/cocode/Memorizer.java
new file mode 100644
index 0000000..f70de7c
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/compress/cocode/Memorizer.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.compress.cocode;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.sysds.runtime.compress.estim.CompressedSizeEstimator;
+import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup;
+import org.apache.sysds.runtime.compress.utils.Util;
+
+public class Memorizer {
+	private final CompressedSizeEstimator _sEst;
+	private final Map<ColIndexes, CompressedSizeInfoColGroup> mem;
+	private int st1 = 0, st2 = 0, st3 = 0;
+
+	public Memorizer(CompressedSizeEstimator sEst) {
+		_sEst = sEst;
+		mem = new HashMap<>();
+	}
+
+	public void put(CompressedSizeInfoColGroup g) {
+		mem.put(new ColIndexes(g.getColumns()), g);
+	}
+
+	public CompressedSizeInfoColGroup get(ColIndexes c) {
+		return mem.get(c);
+	}
+
+	public void remove(ColIndexes c1, ColIndexes c2) {
+		mem.remove(c1);
+		mem.remove(c2);
+	}
+
+	public CompressedSizeInfoColGroup getOrCreate(ColIndexes c1, ColIndexes c2) {
+		final int[] c = Util.join(c1._indexes, c2._indexes);
+		final ColIndexes cI = new ColIndexes(c);
+		CompressedSizeInfoColGroup g = mem.get(cI);
+		st2++;
+		if(g == null) {
+			final CompressedSizeInfoColGroup left = mem.get(c1);
+			final CompressedSizeInfoColGroup right = mem.get(c2);
+			if(left != null && right != null) {
+
+				st3++;
+				g = _sEst.estimateJoinCompressedSize(c, left, right);
+
+				synchronized(this) {
+					mem.put(cI, g);
+				}
+			}
+
+		}
+		return g;
+	}
+
+	public void incst1() {
+		st1++;
+	}
+
+	public String stats() {
+		return " possible: " + st1 + " requests: " + st2 + " joined: " + st3;
+	}
+
+	public void resetStats() {
+		st1 = 0;
+		st2 = 0;
+		st3 = 0;
+	}
+
+	@Override
+	public String toString() {
+		return mem.toString();
+	}
+}
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroupValue.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroupValue.java
index 4e44191..2eaeb99 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroupValue.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroupValue.java
@@ -355,22 +355,22 @@ public abstract class AColGroupValue extends AColGroupCompressed implements Clon
 	}
 
 	@Override
-	protected double[] preAggSumRows(){
+	protected double[] preAggSumRows() {
 		return _dict.sumAllRowsToDouble(_colIndexes.length);
 	}
 
 	@Override
-	protected double[] preAggSumSqRows(){
+	protected double[] preAggSumSqRows() {
 		return _dict.sumAllRowsToDoubleSq(_colIndexes.length);
 	}
 
 	@Override
-	protected double[] preAggProductRows(){
+	protected double[] preAggProductRows() {
 		throw new NotImplementedException();
 	}
 
 	@Override
-	protected double[] preAggBuiltinRows(Builtin builtin){
+	protected double[] preAggBuiltinRows(Builtin builtin) {
 		return _dict.aggregateRows(builtin, _colIndexes.length);
 	}
 
@@ -488,13 +488,13 @@ public abstract class AColGroupValue extends AColGroupCompressed implements Clon
 	@Override
 	public long estimateInMemorySize() {
 		long size = super.estimateInMemorySize();
-		size += 8; // Dictionary Reference.
 		size += 8; // Counts reference
 		size += 4; // Int nRows
 		size += 1; // _zeros boolean reference
 		size += 1; // _lossy boolean reference
 		size += 2; // padding
 		size += _dict.getInMemorySize();
+		size += 8; // dict reference
 		return size;
 	}
 
@@ -508,7 +508,7 @@ public abstract class AColGroupValue extends AColGroupCompressed implements Clon
 	public String toString() {
 		StringBuilder sb = new StringBuilder();
 		sb.append(super.toString());
-		sb.append(String.format("\n%15s%s", "Values: " , _dict.getClass().getSimpleName()));
+		sb.append(String.format("\n%15s%s", "Values: ", _dict.getClass().getSimpleName()));
 		sb.append(_dict.getString(_colIndexes.length));
 		return sb.toString();
 	}
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupConst.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupConst.java
index 45e4afc..336ce08 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupConst.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupConst.java
@@ -208,7 +208,7 @@ public class ColGroupConst extends AColGroupCompressed {
 			LibMatrixMult.matrixMult(left, right, ret);
 			if(ret.isEmpty())
 				return null;
-			ADictionary d = new MatrixBlockDictionary(ret);
+			ADictionary d = new MatrixBlockDictionary(ret, cr);
 			return ColGroupFactory.genColGroupConst(cr, d);
 		}
 		else {
@@ -346,4 +346,12 @@ public class ColGroupConst extends AColGroupCompressed {
 	protected double[] preAggBuiltinRows(Builtin builtin) {
 		return _dict.aggregateRows(builtin, _colIndexes.length);
 	}
+
+	@Override
+	public long estimateInMemorySize() {
+		long size = super.estimateInMemorySize();
+		size += _dict.getInMemorySize();
+		size += 8; // dict reference
+		return size;
+	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java
index 7cd2b1a..29a4a21 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java
@@ -255,8 +255,8 @@ public class ColGroupFactory {
 			Timing time = new Timing(true);
 			time.start();
 			Collection<AColGroup> ret = compressColGroupExecute(in, compSettings, tmpMap, cg, k);
-			LOG.debug(String.format("time[ms]: %10.2f %25s %s cols:%s", time.stop(), getColumnTypesString(ret),
-				getEstimateVsActualSize(ret, cg), Arrays.toString(cg.getColumns())));
+			LOG.debug(String.format("time[ms]: %10.2f %50s %s cols:%s wanted:%s", time.stop(), getColumnTypesString(ret),
+				getEstimateVsActualSize(ret, cg), Arrays.toString(cg.getColumns()), cg.getBestCompressionType()));
 			return ret;
 		}
 		return compressColGroupExecute(in, compSettings, tmpMap, cg, k);
@@ -355,6 +355,8 @@ public class ColGroupFactory {
 		if(of.length == 1 && of[0].size() == rlen) // If this always constant
 			return ColGroupConst.create(colIndexes, DictionaryFactory.create(ubm));
 
+		// only consider sparse dictionaries if cocoded more than 4 columns.
+		tupleSparsity = colIndexes.length > 4 ? tupleSparsity : 1.0;
 		switch(compType) {
 			case DDC:
 				return compressDDC(colIndexes, rlen, ubm, cs, tupleSparsity);
@@ -364,8 +366,6 @@ public class ColGroupFactory {
 				return compressOLE(colIndexes, rlen, ubm, cs, tupleSparsity);
 			case SDC:
 				return compressSDC(colIndexes, rlen, ubm, cs, tupleSparsity);
-			// CONST and EMPTY are handled above switch statement.
-			// UNCOMPRESSED is handled before extraction of ubm
 			default:
 				throw new DMLCompressionException("Not implemented compression of " + compType + "in factory.");
 		}
@@ -506,21 +506,32 @@ public class ColGroupFactory {
 			}
 		}
 
-		ADictionary dict = DictionaryFactory.create(ubm, tupleSparsity);
+		// Currently not effecient allocation of the dictionary.
 		if(ubm.getNumValues() == 1) {
 			if(numZeros >= largestOffset) {
+				ADictionary dict = DictionaryFactory.create(ubm, tupleSparsity);
 				final AOffset off = OffsetFactory.createOffset(ubm.getOffsetList()[0].extractValues(true));
 				return new ColGroupSDCSingleZeros(colIndexes, rlen, dict, off, null);
 			}
 			else {
+				LOG.warn("fix three dictionary allocations");
+				ADictionary dict = DictionaryFactory.create(ubm, 1.0);
 				dict = DictionaryFactory.moveFrequentToLastDictionaryEntry(dict, ubm, rlen, largestIndex);
+				if(tupleSparsity < 0.4)
+					dict = dict.getMBDict(colIndexes.length);
 				return setupSingleValueSDCColGroup(colIndexes, rlen, ubm, dict);
 			}
 		}
-		else if(numZeros >= largestOffset)
+		else if(numZeros >= largestOffset) {
+			ADictionary dict = DictionaryFactory.create(ubm, tupleSparsity);
 			return setupMultiValueZeroColGroup(colIndexes, rlen, ubm, dict, cs);
+		}
 		else {
+			LOG.warn("fix three dictionary allocations");
+			ADictionary dict = DictionaryFactory.create(ubm, 1.0);
 			dict = DictionaryFactory.moveFrequentToLastDictionaryEntry(dict, ubm, rlen, largestIndex);
+			if(tupleSparsity < 0.4 && colIndexes.length > 4)
+				dict = dict.getMBDict(colIndexes.length);
 			return setupMultiValueColGroup(colIndexes, numZeros, rlen, ubm, largestIndex, dict, cs);
 		}
 	}
@@ -636,35 +647,35 @@ public class ColGroupFactory {
 		for(int j = apos; j < alen; j++)
 			map.increment(vals[j]);
 
-		List<DCounts> entries = map.extractValues();
-		Collections.sort(entries, Comparator.comparing(x -> -x.count));
+		DCounts[] entries = map.extractValues();
+		Arrays.sort(entries, Comparator.comparing(x -> -x.count));
 
-		if(entries.get(0).count < rlen - sb.size(sbRow)) {
+		if(entries[0].count < rlen - sb.size(sbRow)) {
 			// If the zero is the default value.
-			final int[] counts = new int[entries.size() + 1];
-			final double[] dict = new double[entries.size()];
+			final int[] counts = new int[entries.length + 1];
+			final double[] dict = new double[entries.length];
 			int sum = 0;
-			for(int i = 0; i < entries.size(); i++) {
-				final DCounts x = entries.get(i);
+			for(int i = 0; i < entries.length; i++) {
+				final DCounts x = entries[i];
 				counts[i] = x.count;
 				sum += x.count;
 				dict[i] = x.key;
 				x.count = i;
 			}
 
-			counts[entries.size()] = rlen - sum;
+			counts[entries.length] = rlen - sum;
 			final AOffset offsets = OffsetFactory.createOffset(sb.indexes(sbRow), apos, alen);
-			if(entries.size() <= 1)
+			if(entries.length <= 1)
 				return new ColGroupSDCSingleZeros(cols, rlen, new Dictionary(dict), offsets, counts);
 			else {
-				final AMapToData mapToData = MapToFactory.create((alen - apos), entries.size());
+				final AMapToData mapToData = MapToFactory.create((alen - apos), entries.length);
 				for(int j = apos; j < alen; j++)
 					mapToData.set(j - apos, map.get(vals[j]));
 				return ColGroupSDCZeros.create(cols, rlen, new Dictionary(dict), offsets, mapToData, counts);
 			}
 		}
 		else {
-			final ABitmap ubm = BitmapEncoder.extractBitmap(cols, mb, true, entries.size(), true);
+			final ABitmap ubm = BitmapEncoder.extractBitmap(cols, mb, true, entries.length, true);
 			// zero is not the default value fall back to the standard compression path.
 			return compressSDC(cols, rlen, ubm, cs, 1.0);
 		}
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSizes.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSizes.java
index 49d3197..dc69ae7 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSizes.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSizes.java
@@ -39,13 +39,13 @@ public final class ColGroupSizes {
 
 	public static long estimateInMemorySizeGroupValue(int nrColumns, int nrValues, double tupleSparsity, boolean lossy) {
 		long size = estimateInMemorySizeGroup(nrColumns);
-		size += 8; // Dictionary Reference.
 		size += 8; // Counts reference
+		size += 4; // Int nRows
 		size += 1; // _zeros boolean reference
 		size += 1; // _lossy boolean reference
 		size += 2; // padding
-		size += 4; // num Rows
 		size += DictionaryFactory.getInMemorySize(nrValues, nrColumns, tupleSparsity, lossy);
+		size += 8; // Reference to Dict.
 		return size;
 	}
 
@@ -80,26 +80,18 @@ public final class ColGroupSizes {
 	}
 
 	public static long estimateInMemorySizeSDC(int nrColumns, int nrValues, int nrRows, int largestOff,
-		boolean largestOffIsZero, boolean containNoZeroValues, double tupleSparsity, boolean lossy) {
-		final int nVals = nrValues ;
-		long size = estimateInMemorySizeGroupValue(nrColumns, nVals, tupleSparsity, lossy);
+		double tupleSparsity, boolean largestOffZero, boolean lossy) {
+		long size = estimateInMemorySizeGroupValue(nrColumns, nrValues, tupleSparsity, lossy);
 		size += OffsetFactory.estimateInMemorySize(nrRows - largestOff, nrRows);
-		if(nrValues > 1)
+		if(nrValues > 1 + (largestOffZero ? 0 : 1))
 			size += MapToFactory.estimateInMemorySize(nrRows - largestOff, nrValues);
 		return size;
 	}
 
-	public static long estimateInMemorySizeSDCSingle(int nrColumns, int nrValues, int nrRows, int largestOff,
-		boolean largestOffIsZero, boolean containNoZeroValues, double tupleSparsity, boolean lossy) {
-		final int nVals = nrValues ;
-		long size = estimateInMemorySizeGroupValue(nrColumns, nVals, tupleSparsity, lossy);
-		size += OffsetFactory.estimateInMemorySize(nrRows - largestOff, nrRows);
-		return size;
-	}
-
-	public static long estimateInMemorySizeCONST(int nrColumns, int nrValues, double tupleSparsity, boolean lossy) {
+	public static long estimateInMemorySizeCONST(int nrColumns, double tupleSparsity, boolean lossy) {
 		long size = estimateInMemorySizeGroup(nrColumns);
-		size += DictionaryFactory.getInMemorySize(nrValues, nrColumns, tupleSparsity, lossy);
+		size += DictionaryFactory.getInMemorySize(1, nrColumns, tupleSparsity, lossy);
+		size += 8; // reference to dictionary.
 		return size;
 	}
 
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/Dictionary.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/Dictionary.java
index c3faa6c..0d4eaec 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/Dictionary.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/Dictionary.java
@@ -306,7 +306,7 @@ public class Dictionary extends ADictionary {
 	}
 
 	@Override
-	public double[] sumAllRowsToDouble(double[] reference){
+	public double[] sumAllRowsToDouble(double[] reference) {
 		final int nCol = reference.length;
 		final int numVals = getNumberOfValues(nCol);
 		double[] ret = new double[numVals + 1];
@@ -646,9 +646,9 @@ public class Dictionary extends ADictionary {
 	@Override
 	public ADictionary subtractTuple(double[] tuple) {
 		double[] newValues = new double[_values.length - tuple.length];
-		for(int i = 0; i < _values.length- tuple.length; i++)
+		for(int i = 0; i < _values.length - tuple.length; i++)
 			newValues[i] = _values[i] - tuple[i % tuple.length];
-		
+
 		return new Dictionary(newValues);
 	}
 
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/DictionaryFactory.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/DictionaryFactory.java
index 236c0f4..05784e4 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/DictionaryFactory.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/DictionaryFactory.java
@@ -33,7 +33,6 @@ import org.apache.sysds.runtime.compress.bitmap.MultiColBitmap;
 import org.apache.sysds.runtime.compress.utils.DArrCounts;
 import org.apache.sysds.runtime.compress.utils.DblArrayCountHashMap;
 import org.apache.sysds.runtime.data.SparseBlock;
-import org.apache.sysds.runtime.data.SparseRow;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 
 public class DictionaryFactory {
@@ -104,7 +103,7 @@ public class DictionaryFactory {
 					sb.append(i, col, tuple[col]);
 			}
 			m.recomputeNonZeros();
-			return new MatrixBlockDictionary(m);
+			return new MatrixBlockDictionary(m, nCols);
 		}
 		else if(ubm instanceof MultiColBitmap) {
 			MultiColBitmap mcbm = (MultiColBitmap) ubm;
@@ -143,7 +142,7 @@ public class DictionaryFactory {
 					sb.append(i, col, tuple[col]);
 			}
 			m.recomputeNonZeros();
-			return new MatrixBlockDictionary(m);
+			return new MatrixBlockDictionary(m, nCols);
 		}
 
 		final double[] resValues = new double[nRows * nCols];
@@ -156,28 +155,17 @@ public class DictionaryFactory {
 
 	public static ADictionary moveFrequentToLastDictionaryEntry(ADictionary dict, ABitmap ubm, int nRow,
 		int largestIndex) {
+		LOG.warn("Inefficient moving of tuples.");
 		final int zeros = nRow - (int) ubm.getNumOffsets();
 		final int nCol = ubm.getNumColumns();
 		final int largestIndexSize = ubm.getOffsetsList(largestIndex).size();
 		if(dict instanceof MatrixBlockDictionary) {
 			MatrixBlockDictionary mbd = (MatrixBlockDictionary) dict;
 			MatrixBlock mb = mbd.getMatrixBlock();
-			if(mb.isEmpty()) {
-				if(zeros == 0)
-					return dict;
-				else
-					return new MatrixBlockDictionary(new MatrixBlock(mb.getNumRows() + 1, mb.getNumColumns(), true));
-			}
+			if(mb.isEmpty()) 
+				throw new DMLCompressionException("Should not construct or use a empty dictionary ever.");
 			else if(mb.isInSparseFormat()) {
-				MatrixBlockDictionary mbdn = moveToLastDictionaryEntrySparse(mb.getSparseBlock(), largestIndex, zeros, nCol,
-					largestIndexSize);
-				if(mbdn == null)
-					return null;
-				MatrixBlock mbn = mbdn.getMatrixBlock();
-				mbn.setNonZeros(mb.getNonZeros());
-				if(mbn.getNonZeros() == 0)
-					mbn.recomputeNonZeros();
-				return mbdn;
+				throw new NotImplementedException(); // and should not be
 			}
 			else
 				return moveToLastDictionaryEntryDense(mb.getDenseBlockValues(), largestIndex, zeros, nCol,
@@ -188,41 +176,6 @@ public class DictionaryFactory {
 
 	}
 
-	private static MatrixBlockDictionary moveToLastDictionaryEntrySparse(SparseBlock sb, int indexToMove, int zeros,
-		int nCol, int largestIndexSize) {
-
-		if(zeros == 0) {
-			MatrixBlock ret = new MatrixBlock(sb.numRows(), nCol, true);
-			ret.setSparseBlock(sb);
-			final SparseRow swap = sb.get(indexToMove);
-			for(int i = indexToMove + 1; i < sb.numRows(); i++)
-				sb.set(i - 1, sb.get(i), false);
-			sb.set(sb.numRows() - 1, swap, false);
-			if(ret.isEmpty())
-				return null;
-			return new MatrixBlockDictionary(ret);
-		}
-
-		MatrixBlock ret = new MatrixBlock(sb.numRows() + 1, nCol, true);
-		ret.allocateSparseRowsBlock();
-		final SparseBlock retB = ret.getSparseBlock();
-		if(zeros > largestIndexSize) {
-			for(int i = 0; i < sb.numRows(); i++)
-				retB.set(i, sb.get(i), false);
-		}
-		else {
-			for(int i = 0; i < indexToMove; i++)
-				retB.set(i, sb.get(i), false);
-
-			retB.set(sb.numRows(), sb.get(indexToMove), false);
-			for(int i = indexToMove + 1; i < sb.numRows(); i++)
-				retB.set(i - 1, sb.get(i), false);
-		}
-		if(ret.isEmpty())
-			return null;
-		return new MatrixBlockDictionary(ret);
-	}
-
 	private static ADictionary moveToLastDictionaryEntryDense(double[] values, int indexToMove, int zeros, int nCol,
 		int largestIndexSize) {
 		final int offsetToLargest = indexToMove * nCol;
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/MatrixBlockDictionary.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/MatrixBlockDictionary.java
index b69bcf2..6c56ca6 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/MatrixBlockDictionary.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/MatrixBlockDictionary.java
@@ -49,10 +49,14 @@ public class MatrixBlockDictionary extends ADictionary {
 			throw new DMLCompressionException("Invalid construction of empty dictionary");
 	}
 
-	public MatrixBlockDictionary(MatrixBlock data) {
+	public MatrixBlockDictionary(MatrixBlock data, int nCol) {
+
 		_data = data;
 		if(_data.isEmpty())
 			throw new DMLCompressionException("Invalid construction of empty dictionary");
+
+		if(_data.getNumColumns() != nCol)
+			throw new DMLCompressionException("Invalid construction expected nCol: "+ nCol + " but matrix block contains: " + _data.getNumColumns());
 	}
 
 	public MatrixBlock getMatrixBlock() {
@@ -79,10 +83,12 @@ public class MatrixBlockDictionary extends ADictionary {
 
 	@Override
 	public long getInMemorySize() {
+		// object reference to a matrix block + matrix block size.
 		return 8 + _data.estimateSizeInMemory();
 	}
 
 	public static long getInMemorySize(int numberValues, int numberColumns, double sparsity) {
+		// object reference to a matrix block + matrix block size.
 		return 8 + MatrixBlock.estimateSizeInMemory(numberValues, numberColumns, sparsity);
 	}
 
@@ -303,7 +309,7 @@ public class MatrixBlockDictionary extends ADictionary {
 		if(res.isEmpty())
 			return null;
 		else
-			return new MatrixBlockDictionary(res);
+			return new MatrixBlockDictionary(res, _data.getNumColumns());
 	}
 
 	@Override
@@ -350,7 +356,7 @@ public class MatrixBlockDictionary extends ADictionary {
 		if(ret.isEmpty())
 			return null;
 		else
-			return new MatrixBlockDictionary(ret);
+			return new MatrixBlockDictionary(ret, nCol);
 
 	}
 
@@ -373,7 +379,7 @@ public class MatrixBlockDictionary extends ADictionary {
 		if(res2.isEmpty())
 			return null;
 		else
-			return new MatrixBlockDictionary(res2);
+			return new MatrixBlockDictionary(res2, _data.getNumColumns());
 	}
 
 	@Override
@@ -394,13 +400,13 @@ public class MatrixBlockDictionary extends ADictionary {
 	public ADictionary applyBinaryRowOpLeftAppendNewEntry(BinaryOperator op, double[] v, int[] colIndexes) {
 		MatrixBlock rowVector = Util.extractValues(v, colIndexes);
 		MatrixBlock tmp = _data.append(new MatrixBlock(1, _data.getNumColumns(), 0), null, false);
-		return new MatrixBlockDictionary(rowVector.binaryOperations(op, tmp, null));
+		return new MatrixBlockDictionary(rowVector.binaryOperations(op, tmp, null), _data.getNumColumns());
 	}
 
 	@Override
 	public ADictionary binOpRight(BinaryOperator op, double[] v, int[] colIndexes) {
 		MatrixBlock rowVector = Util.extractValues(v, colIndexes);
-		return new MatrixBlockDictionary(_data.binaryOperations(op, rowVector, null));
+		return new MatrixBlockDictionary(_data.binaryOperations(op, rowVector, null), _data.getNumColumns());
 	}
 
 	@Override
@@ -413,14 +419,14 @@ public class MatrixBlockDictionary extends ADictionary {
 	public ADictionary applyBinaryRowOpRightAppendNewEntry(BinaryOperator op, double[] v, int[] colIndexes) {
 		MatrixBlock rowVector = Util.extractValues(v, colIndexes);
 		MatrixBlock tmp = _data.append(new MatrixBlock(1, _data.getNumColumns(), 0), null, false);
-		return new MatrixBlockDictionary(tmp.binaryOperations(op, rowVector, null));
+		return new MatrixBlockDictionary(tmp.binaryOperations(op, rowVector, null), _data.getNumColumns());
 	}
 
 	@Override
 	public ADictionary clone() {
 		MatrixBlock ret = new MatrixBlock();
 		ret.copy(_data);
-		return new MatrixBlockDictionary(ret);
+		return new MatrixBlockDictionary(ret, _data.getNumColumns());
 	}
 
 	@Override
@@ -875,7 +881,7 @@ public class MatrixBlockDictionary extends ADictionary {
 	@Override
 	public ADictionary sliceOutColumnRange(int idxStart, int idxEnd, int previousNumberOfColumns) {
 		MatrixBlock retBlock = _data.slice(0, _data.getNumRows() - 1, idxStart, idxEnd - 1);
-		return new MatrixBlockDictionary(retBlock);
+		return new MatrixBlockDictionary(retBlock, idxEnd  - idxStart );
 	}
 
 	@Override
@@ -1088,7 +1094,7 @@ public class MatrixBlockDictionary extends ADictionary {
 			final int nRow = _data.getNumRows() - 1;
 			final int nCol = _data.getNumColumns();
 			double[] values = _data.getDenseBlockValues();
-			MatrixBlock res = new MatrixBlock(nCol, nRow, false);
+			MatrixBlock res = new MatrixBlock(nRow, nCol, false);
 			res.allocateBlock();
 			double[] resVals = res.getDenseBlockValues();
 			for(int i = 0, off = 0; i < nRow; i++)
@@ -1098,7 +1104,7 @@ public class MatrixBlockDictionary extends ADictionary {
 			res.examSparsity();
 			if(res.isEmpty())
 				return null;
-			return new MatrixBlockDictionary(res);
+			return new MatrixBlockDictionary(res, nCol);
 		}
 	}
 
@@ -1147,7 +1153,7 @@ public class MatrixBlockDictionary extends ADictionary {
 				}
 			}
 			retBlock.setNonZeros(_data.getNonZeros());
-			return new MatrixBlockDictionary(retBlock);
+			return new MatrixBlockDictionary(retBlock, _data.getNumColumns());
 		}
 		else {
 			final double[] _values = _data.getDenseBlockValues();
@@ -1163,7 +1169,7 @@ public class MatrixBlockDictionary extends ADictionary {
 			DenseBlockFP64 db = new DenseBlockFP64(new int[] {_data.getNumRows(), _data.getNumColumns()}, scaledValues);
 			MatrixBlock retBlock = new MatrixBlock(_data.getNumRows(), _data.getNumColumns(), db);
 			retBlock.setNonZeros(_data.getNonZeros());
-			return new MatrixBlockDictionary(retBlock);
+			return new MatrixBlockDictionary(retBlock, _data.getNumColumns());
 		}
 	}
 
@@ -1176,7 +1182,7 @@ public class MatrixBlockDictionary extends ADictionary {
 	public static MatrixBlockDictionary read(DataInput in) throws IOException {
 		MatrixBlock ret = new MatrixBlock();
 		ret.readFields(in);
-		return new MatrixBlockDictionary(ret);
+		return new MatrixBlockDictionary(ret, ret.getNumColumns());
 	}
 
 	@Override
@@ -1228,7 +1234,7 @@ public class MatrixBlockDictionary extends ADictionary {
 		MatrixBlock dictM = new MatrixBlock(numVals, aggregateColumns.length, dictV);
 		dictM.recomputeNonZeros();
 		dictM.examSparsity();
-		return new MatrixBlockDictionary(dictM);
+		return new MatrixBlockDictionary(dictM, aggregateColumns.length);
 
 	}
 
@@ -1237,7 +1243,7 @@ public class MatrixBlockDictionary extends ADictionary {
 		final MatrixBlock ret = _data.replaceOperations(new MatrixBlock(), pattern, replace);
 		if(ret.isEmpty())
 			return null;
-		return new MatrixBlockDictionary(ret);
+		return new MatrixBlockDictionary(ret, _data.getNumColumns());
 	}
 
 	@Override
@@ -1284,7 +1290,7 @@ public class MatrixBlockDictionary extends ADictionary {
 		if(ret.isEmpty())
 			return null;
 		else
-			return new MatrixBlockDictionary(ret);
+			return new MatrixBlockDictionary(ret, _data.getNumColumns());
 
 	}
 
@@ -1329,7 +1335,7 @@ public class MatrixBlockDictionary extends ADictionary {
 			for(int h = nRows * nCols; h < nonZerosOut; h++)
 				retValues[h] = replace;
 		}
-		return new MatrixBlockDictionary(ret);
+		return new MatrixBlockDictionary(ret, _data.getNumColumns());
 	}
 
 	@Override
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToBit.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToBit.java
index baaf378..be742f5 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToBit.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToBit.java
@@ -66,7 +66,7 @@ public class MapToBit extends AMapToData {
 		return getInMemorySize(_data.size());
 	}
 
-	protected static long getInMemorySize(int dataLength) {
+	public static long getInMemorySize(int dataLength) {
 		long size = 16 + 8 + 4; // object header + object reference + int size
 		size += MemoryEstimates.bitSetCost(dataLength);
 		return size;
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToByte.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToByte.java
index 5564cca..993cd32 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToByte.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToByte.java
@@ -63,7 +63,7 @@ public class MapToByte extends AMapToData {
 		return getInMemorySize(_data.length);
 	}
 
-	protected static long getInMemorySize(int dataLength) {
+	public static long getInMemorySize(int dataLength) {
 		long size = 16 + 8; // object header + object reference
 		size += MemoryEstimates.byteArrayCost(dataLength);
 		return size;
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToChar.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToChar.java
index 64130bd..1c8c8e5 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToChar.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToChar.java
@@ -63,7 +63,7 @@ public class MapToChar extends AMapToData {
 		return getInMemorySize(_data.length);
 	}
 
-	protected static long getInMemorySize(int dataLength) {
+	public static long getInMemorySize(int dataLength) {
 		long size = 16 + 8; // object header + object reference
 		size += MemoryEstimates.charArrayCost(dataLength);
 		return size;
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToFactory.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToFactory.java
index 1915cb8..60c1100 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToFactory.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToFactory.java
@@ -22,12 +22,14 @@ package org.apache.sysds.runtime.compress.colgroup.mapping;
 import java.io.DataInput;
 import java.io.IOException;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.sysds.runtime.compress.DMLCompressionException;
 import org.apache.sysds.runtime.compress.bitmap.ABitmap;
 import org.apache.sysds.runtime.compress.utils.IntArrayList;
 
 public class MapToFactory {
-	// protected static final Log LOG = LogFactory.getLog(MapToFactory.class.getName());
+	protected static final Log LOG = LogFactory.getLog(MapToFactory.class.getName());
 
 	public enum MAP_TYPE {
 		BIT, BYTE, CHAR, INT;
@@ -54,6 +56,13 @@ public class MapToFactory {
 		return _data;
 	}
 
+	public static AMapToData create(int size, int[] values, int nUnique) {
+		AMapToData _data = MapToFactory.create(size, nUnique);
+		for(int i = 0; i < size; i++)
+			_data.set(i, values[i]);
+		return _data;
+	}
+
 	/**
 	 * Create and allocate a map with the given size and support for upto the num tuples argument of values
 	 * 
@@ -62,7 +71,7 @@ public class MapToFactory {
 	 * @return A new map
 	 */
 	public static AMapToData create(int size, int numTuples) {
-		if(numTuples <= 2)
+		if(numTuples <= 2 && size > 32)
 			return new MapToBit(numTuples, size);
 		else if(numTuples <= 256)
 			return new MapToByte(numTuples, size);
@@ -87,7 +96,7 @@ public class MapToFactory {
 		AMapToData ret;
 		if(d instanceof MapToBit)
 			return d;
-		else if(numTuples <= 2)
+		else if(numTuples <= 2 && size > 32)
 			ret = new MapToBit(numTuples, size);
 		else if(d instanceof MapToByte)
 			return d;
@@ -136,7 +145,7 @@ public class MapToFactory {
 	}
 
 	public static long estimateInMemorySize(int size, int numTuples) {
-		if(numTuples <= 2)
+		if(numTuples <= 2 && size > 32)
 			return MapToBit.getInMemorySize(size);
 		else if(numTuples <= 256)
 			return MapToByte.getInMemorySize(size);
@@ -161,6 +170,20 @@ public class MapToFactory {
 		}
 	}
 
+	public static int getUpperBoundValue(MAP_TYPE t) {
+		switch(t) {
+			case BIT:
+				return 1;
+			case BYTE:
+				return 255;
+			case CHAR:
+				return Character.MAX_VALUE;
+			case INT:
+			default:
+				return Integer.MAX_VALUE;
+		}
+	}
+
 	public static AMapToData join(AMapToData left, AMapToData right) {
 		if(left == null)
 			return right;
@@ -202,18 +225,4 @@ public class MapToFactory {
 		tmp.setUnique(newUID - 1);
 		return tmp;
 	}
-
-	public static int getUpperBoundValue(MAP_TYPE t) {
-		switch(t) {
-			case BIT:
-				return 1;
-			case BYTE:
-				return 255;
-			case CHAR:
-				return Character.MAX_VALUE;
-			case INT:
-			default:
-				return Integer.MAX_VALUE;
-		}
-	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/offset/OffsetByte.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/offset/OffsetByte.java
index 7fb7b2b..074239b 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/offset/OffsetByte.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/offset/OffsetByte.java
@@ -126,13 +126,6 @@ public class OffsetByte extends AOffset {
 	}
 
 	@Override
-	public long getInMemorySize() {
-		long size = 16 + 4 + 4 + 8; // object header plus ints plus reference
-		size += MemoryEstimates.byteArrayCost(offsets.length);
-		return size;
-	}
-
-	@Override
 	public long getExactSizeOnDisk() {
 		return 1 + 4 + 4 + offsets.length;
 	}
@@ -162,9 +155,14 @@ public class OffsetByte extends AOffset {
 		return offsets.length;
 	}
 
-	public static long estimateInMemorySize(int nOffs, int nRows) {
+	@Override
+	public long getInMemorySize() {
+		return estimateInMemorySize(offsets.length);
+	}
+
+	public static long estimateInMemorySize(int nOffs) {
 		long size = 16 + 4 + 4 + 8; // object header plus int plus reference
-		size += MemoryEstimates.byteArrayCost(Math.max(nOffs, nRows / maxV));
+		size += MemoryEstimates.byteArrayCost(nOffs);
 		return size;
 	}
 
@@ -540,23 +538,27 @@ public class OffsetByte extends AOffset {
 	private void preAggregateDenseMapRowsByteEnd(DenseBlock db, double[] preAV, int rl, int ru, int cl, int cu, int nVal,
 		byte[] data, IterateByteOffset it) {
 		final int maxId = data.length - 1;
-		final int offsetStart = it.offset;
-		final int indexStart = it.getOffsetsIndex();
-		final int dataIndexStart = it.getDataIndex();
-		// all the way to the end of offsets.
-		for(int r = rl; r < ru; r++) {
-			final int offOut = (r - rl) * nVal;
-			final int off = db.pos(r);
-			final double[] vals = db.values(r);
-			it.offset = offsetStart + off;
-			it.index = indexStart;
-			it.dataIndex = dataIndexStart;
-			preAV[offOut + data[it.getDataIndex()] & 0xFF] += vals[it.offset];
-			while(it.getDataIndex() < maxId) {
-				it.next();
-				preAV[offOut + data[it.getDataIndex()] & 0xFF] += vals[it.offset];
+		final int nCol = db.getCumODims(0);
+
+		int dataOffset = data[it.getDataIndex()] & 0xFF;
+		int start = it.offset + nCol * rl;
+		int end = it.offset + nCol * ru;
+		double[] vals = db.values(rl);
+		for(int offOut = dataOffset, off = start; off < end; offOut += nVal, off += nCol) {
+			preAV[offOut] += vals[off];
+		}
+
+		while(it.getDataIndex() < maxId) {
+			it.next();
+			dataOffset = data[it.getDataIndex()] & 0xFF;
+			start = it.offset + nCol * rl;
+			end = it.offset + nCol * ru;
+			vals = db.values(rl);
+			for(int offOut = dataOffset, off = start; off < end; offOut += nVal, off += nCol) {
+				preAV[offOut] += vals[off];
 			}
 		}
+
 	}
 
 	@Override
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/offset/OffsetChar.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/offset/OffsetChar.java
index f633c53..5ba9aed 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/offset/OffsetChar.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/offset/OffsetChar.java
@@ -93,11 +93,15 @@ public class OffsetChar extends AOffset {
 
 	@Override
 	public long getInMemorySize() {
-		long size = 16 + 4 + 8; // object header plus int plus reference
-		size += MemoryEstimates.charArrayCost(offsets.length);
+		return estimateInMemorySize(offsets.length);
+	}
+	
+	public static long estimateInMemorySize(int nOffs) {
+		long size = 16 + 4 + 4 + 8; // object header plus int plus reference
+		size += MemoryEstimates.charArrayCost(nOffs);
 		return size;
 	}
-
+	
 	@Override
 	public long getExactSizeOnDisk() {
 		return 1 + 4 + 4 + offsets.length * 2;
@@ -140,11 +144,6 @@ public class OffsetChar extends AOffset {
 		return new OffsetChar(offsets, offsetToFirst, offsetToLast);
 	}
 
-	public static long estimateInMemorySize(int nOffs, int nRows) {
-		long size = 16 + 4 + 8; // object header plus int plus reference
-		size += MemoryEstimates.charArrayCost(Math.max(nOffs, nRows / maxV));
-		return size;
-	}
 
 	@Override
 	protected final void preAggregateDenseMapRowByte(double[] mV, int off, double[] preAV, int cu, int nVal, byte[] data,
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/offset/OffsetFactory.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/offset/OffsetFactory.java
index 9904be3..28d7bb0 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/offset/OffsetFactory.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/offset/OffsetFactory.java
@@ -22,9 +22,13 @@ package org.apache.sysds.runtime.compress.colgroup.offset;
 import java.io.DataInput;
 import java.io.IOException;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sysds.runtime.compress.utils.IntArrayList;
+
 public interface OffsetFactory {
 
-	// static final Log LOG = LogFactory.getLog(OffsetFactory.class.getName());
+	static final Log LOG = LogFactory.getLog(OffsetFactory.class.getName());
 
 	/** The specific underlying types of offsets. */
 	public enum OFF_TYPE {
@@ -34,8 +38,8 @@ public interface OffsetFactory {
 	/**
 	 * Main factory pattern creator for Offsets.
 	 * 
-	 * Note this creator is unsafe in the sense it is assumed that the input index list only contain a sequential non
-	 * duplicate incrementing values.
+	 * Note this creator is unsafe it is assumed that the input index list only contain sequential non duplicate
+	 * incrementing values.
 	 * 
 	 * @param indexes List of indexes, that is assumed to be sorted and have no duplicates
 	 * @return AOffset object containing offsets to the next value.
@@ -45,13 +49,26 @@ public interface OffsetFactory {
 	}
 
 	/**
+	 * Create the offsets based on our primitive IntArrayList.
+	 * 
+	 * Note this creator is unsafe it is assumed that the input index list only contain sequential non duplicate
+	 * incrementing values.
+	 * 
+	 * @param indexes The List of indexes, that is assumed to be sorted and have no duplicates
+	 * @return AOffset object containing offsets to the next value.
+	 */
+	public static AOffset createOffset(IntArrayList indexes) {
+		return createOffset(indexes.extractValues(), 0, indexes.size());
+	}
+
+	/**
 	 * Create a Offset based on a subset of the indexes given.
 	 * 
 	 * This is useful if the input is created from a CSR matrix, since it allows us to not reallocate the indexes[] but
 	 * use the shared indexes from the entire CSR representation.
 	 * 
-	 * Note this creator is unsafe in the sense it is assumed that the input indexes in the range from apos to alen only
-	 * contain a sequential non duplicate incrementing values.
+	 * Note this creator is unsafe it is assumed that the input index list only contain sequential non duplicate
+	 * incrementing values.
 	 * 
 	 * @param indexes The indexes from which to take the offsets.
 	 * @param apos    The position to start looking from in the indexes.
@@ -62,9 +79,15 @@ public interface OffsetFactory {
 		final int minValue = indexes[apos];
 		final int maxValue = indexes[alen - 1];
 		final int range = maxValue - minValue;
-		final int endLength = alen - apos;
-		final long byteSize = OffsetByte.estimateInMemorySize(endLength, range);
-		final long charSize = OffsetChar.estimateInMemorySize(endLength, range);
+		final int endLength = alen - apos - 1;
+		// -1 because one index is skipped using a first idex allocated as a int.
+
+		final int correctionByte = correctionByte(range, endLength);
+		final int correctionChar = correctionChar(range, endLength);
+	
+		final long byteSize = OffsetByte.estimateInMemorySize(endLength + correctionByte);
+		final long charSize = OffsetChar.estimateInMemorySize(endLength + correctionChar);
+
 		if(byteSize < charSize)
 			return new OffsetByte(indexes, apos, alen);
 		else
@@ -106,10 +129,22 @@ public interface OffsetFactory {
 			return 8; // If this is the case, then the compression results in constant col groups
 		else {
 			final int avgDiff = nRows / size;
-			if(avgDiff < 256)
-				return OffsetByte.estimateInMemorySize(size - 1, nRows);
-			else
-				return OffsetChar.estimateInMemorySize(size - 1, nRows);
+			if(avgDiff < 256) {
+				final int correctionByte = correctionByte(nRows, size);
+				return OffsetByte.estimateInMemorySize(size - 1 + correctionByte);
+			}
+			else {
+				final int correctionChar = correctionChar(nRows, size);
+				return OffsetChar.estimateInMemorySize(size - 1 + correctionChar);
+			}
 		}
 	}
+
+	public static int correctionByte(int nRows, int size) {
+		return Math.max((nRows - size * 256), 0) / 256;
+	}
+
+	public static int correctionChar(int nRows, int size) {
+		return Math.max((nRows - size * Character.MAX_VALUE), 0) / Character.MAX_VALUE;
+	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/compress/cost/ComputationCostEstimator.java b/src/main/java/org/apache/sysds/runtime/compress/cost/ComputationCostEstimator.java
index 4252b70..f0676b4 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/cost/ComputationCostEstimator.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/cost/ComputationCostEstimator.java
@@ -30,6 +30,8 @@ public class ComputationCostEstimator implements ICostEstimate {
 	private static final double commonValueImpact = 0.75;
 	/** The threshold before the commonValueImpact is tarting. */
 	private static final double cvThreshold = 0.2;
+	/** The uncertainty per column in group */
+	private static final double uncertaintyPerCol = 0.001;
 
 	/** A factor for when the number of distinct tuples start scaling the cost. */
 	private static final int scalingStart = 1000;
@@ -91,19 +93,19 @@ public class ComputationCostEstimator implements ICostEstimate {
 	public double getCostOfColumnGroup(CompressedSizeInfoColGroup g) {
 		if(g == null)
 			return Double.POSITIVE_INFINITY;
+		final double scalingFactor = getScalingFactor(g.getNumVals());
+
 		double cost = 0;
+		final int rowsCols = 16;
 		cost += _scans * scanCost(g);
 		cost += _decompressions * decompressionCost(g);
 		cost += _overlappingDecompressions * overlappingDecompressionCost(g);
-
-		final int rowsCols = 16;
-
-		final double scalingFactor = getScalingFactor(g.getNumVals());
 		cost += _leftMultiplications * leftMultCost(g) * rowsCols;
 		cost += _rightMultiplications * rightMultCost(g) * rowsCols;
 		cost += _dictionaryOps * dictionaryOpsCost(g);
 		cost += _compressedMultiplication * _compressedMultCost(g) * rowsCols;
-
+		for(int i = 0; i < g.getColumns().length; i++)
+			cost += cost * uncertaintyPerCol;
 		return cost * scalingFactor;
 	}
 
@@ -117,7 +119,9 @@ public class ComputationCostEstimator implements ICostEstimate {
 	}
 
 	private double scanCost(CompressedSizeInfoColGroup g) {
-		return _nRows;
+		final int nColsInGroup = g.getColumns().length;
+		final double numberTuples = g.getNumVals();
+		return _nRows + nColsInGroup * numberTuples * 10;
 	}
 
 	private double leftMultCost(CompressedSizeInfoColGroup g) {
diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimator.java b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimator.java
index 7d46e20..fd69fb6 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimator.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimator.java
@@ -31,8 +31,6 @@ import java.util.concurrent.Future;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sysds.runtime.compress.CompressionSettings;
-import org.apache.sysds.runtime.compress.bitmap.ABitmap;
-import org.apache.sysds.runtime.compress.colgroup.AColGroup.CompressionType;
 import org.apache.sysds.runtime.compress.utils.Util;
 import org.apache.sysds.runtime.controlprogram.parfor.stat.Timing;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
@@ -223,28 +221,11 @@ public abstract class CompressedSizeEstimator {
 
 	protected abstract int worstCaseUpperBound(int[] columns);
 
+	public abstract int getSampleSize();
+
 	protected abstract CompressedSizeInfoColGroup estimateJoinCompressedSize(int[] joinedcols,
 		CompressedSizeInfoColGroup g1, CompressedSizeInfoColGroup g2, int joinedMaxDistinct);
 
-	/**
-	 * Method used to extract the CompressedSizeEstimationFactors from an constructed UncompressedBitmap. Note this
-	 * method works both for the sample based estimator and the exact estimator, since the bitmap, can be extracted from
-	 * a sample or from the entire dataset.
-	 * 
-	 * @param ubm        The UncompressedBitmap, either extracted from a sample or from the entire dataset
-	 * @param colIndexes The columns that is compressed together.
-	 * @return The size factors estimated from the Bit Map.
-	 */
-	public EstimationFactors estimateCompressedColGroupSize(ABitmap ubm, int[] colIndexes) {
-		return estimateCompressedColGroupSize(ubm, colIndexes, getNumRows(), _cs);
-	}
-
-	public static EstimationFactors estimateCompressedColGroupSize(ABitmap ubm, int[] colIndexes, int nrRows,
-		CompressionSettings cs) {
-		return EstimationFactors.computeSizeEstimationFactors(ubm, nrRows,
-			cs.validCompressions.contains(CompressionType.RLE), colIndexes);
-	}
-
 	protected CompressedSizeInfoColGroup[] CompressedSizeInfoColGroup(int clen) {
 		CompressedSizeInfoColGroup[] ret = new CompressedSizeInfoColGroup[clen];
 		for(int col = 0; col < clen; col++)
diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorExact.java b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorExact.java
index 80305fc..c3c6595 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorExact.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorExact.java
@@ -20,11 +20,7 @@
 package org.apache.sysds.runtime.compress.estim;
 
 import org.apache.sysds.runtime.compress.CompressionSettings;
-import org.apache.sysds.runtime.compress.bitmap.ABitmap;
-import org.apache.sysds.runtime.compress.bitmap.BitmapEncoder;
-import org.apache.sysds.runtime.compress.colgroup.AColGroup.CompressionType;
-import org.apache.sysds.runtime.compress.colgroup.mapping.AMapToData;
-import org.apache.sysds.runtime.compress.colgroup.mapping.MapToFactory;
+import org.apache.sysds.runtime.compress.estim.encoding.IEncode;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 
 /**
@@ -32,36 +28,30 @@ import org.apache.sysds.runtime.matrix.data.MatrixBlock;
  */
 public class CompressedSizeEstimatorExact extends CompressedSizeEstimator {
 
-	protected CompressedSizeEstimatorExact(MatrixBlock data, CompressionSettings compSettings) {
+	public CompressedSizeEstimatorExact(MatrixBlock data, CompressionSettings compSettings) {
 		super(data, compSettings);
 	}
 
 	@Override
 	public CompressedSizeInfoColGroup estimateCompressedColGroupSize(int[] colIndexes, int estimate,
 		int nrUniqueUpperBound) {
-		// exact estimator can ignore upper bound since it returns the accurate values.
-		final ABitmap entireBitMap = BitmapEncoder.extractBitmap(colIndexes, _data, _cs.transposed, estimate, false);
-		EstimationFactors em = null;
-		if(entireBitMap != null)
-			em = estimateCompressedColGroupSize(entireBitMap, colIndexes);
-		if(em == null)
-			em = EstimationFactors.emptyFactors(colIndexes.length, getNumRows());
-
-		return new CompressedSizeInfoColGroup(colIndexes, em, _cs.validCompressions, entireBitMap, getNumRows());
+		final int _numRows = getNumRows();
+		final IEncode map = IEncode.createFromMatrixBlock(_data, _cs.transposed, colIndexes);
+		final EstimationFactors em = map.computeSizeEstimation(colIndexes, _numRows, _data.getSparsity(),
+			_data.getSparsity());
+		return new CompressedSizeInfoColGroup(colIndexes, em, _cs.validCompressions, map);
 	}
 
 	@Override
 	protected CompressedSizeInfoColGroup estimateJoinCompressedSize(int[] joined, CompressedSizeInfoColGroup g1,
 		CompressedSizeInfoColGroup g2, int joinedMaxDistinct) {
 		final int _numRows = getNumRows();
-		AMapToData map = MapToFactory.join(g1.getMap(), g2.getMap());
+		final IEncode map = g1.getMap().join(g2.getMap());
 		EstimationFactors em = null;
 		if(map != null)
-			em = EstimationFactors.computeSizeEstimation(joined, map, _cs.validCompressions.contains(CompressionType.RLE),
-				_numRows, false);
-
+			em = map.computeSizeEstimation(joined, _numRows, _data.getSparsity(), _data.getSparsity());
 		if(em == null)
-			em = EstimationFactors.emptyFactors(joined.length, getNumRows());
+			em = EstimationFactors.emptyFactors(joined.length, _numRows);
 
 		return new CompressedSizeInfoColGroup(joined, em, _cs.validCompressions, map);
 	}
@@ -70,4 +60,9 @@ public class CompressedSizeEstimatorExact extends CompressedSizeEstimator {
 	protected int worstCaseUpperBound(int[] columns) {
 		return getNumRows();
 	}
+
+	@Override
+	public final int getSampleSize() {
+		return getNumRows();
+	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorFactory.java b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorFactory.java
index b5d2502..91bc163 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorFactory.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorFactory.java
@@ -32,13 +32,21 @@ public class CompressedSizeEstimatorFactory {
 
 		final int nRows = cs.transposed ? data.getNumColumns() : data.getNumRows();
 		final int nCols = cs.transposed ? data.getNumRows() : data.getNumColumns();
-		final int nnzRows = Math.min(nRows, (int) Math.ceil(data.getNonZeros() / nCols));
+		final double sparsity = data.getSparsity();
 		final double sampleRatio = cs.samplingRatio;
 		final int minSample = cs.minimumSampleSize;
 		final int maxSample = Math.min(cs.maxSampleSize, nRows);
-		final int sampleSize = getSampleSize(sampleRatio, nRows, nCols, nnzRows, minSample, maxSample);
+		final int sampleSize = getSampleSize(sampleRatio, nRows, nCols, sparsity, minSample, maxSample);
 
-		if(sampleRatio >= 1.0 || sampleSize >= nRows) {
+		return getSizeEstimator(data, cs, sampleSize, k);
+	}
+
+	public static CompressedSizeEstimator getSizeEstimator(MatrixBlock data, CompressionSettings cs, int sampleSize,
+		int k) {
+		final int nRows = cs.transposed ? data.getNumColumns() : data.getNumRows();
+		final int nCols = cs.transposed ? data.getNumRows() : data.getNumColumns();
+
+		if(sampleSize >= (double) nRows * 0.8) {
 			if(nRows > 10000 && nCols > 10 && data.isInSparseFormat() && !cs.transposed) {
 				if(!cs.isInSparkInstruction)
 					LOG.debug("Transposing for exact estimator");
@@ -70,22 +78,27 @@ public class CompressedSizeEstimatorFactory {
 	 * @param sampleRatio       The sample ratio
 	 * @param nRows             The number of rows
 	 * @param nCols             The number of columns
-	 * @param nnzRows           The number of nonzero rows
+	 * @param sparsity          The sparsity of the input
 	 * @param minimumSampleSize The minimum sample size
 	 * @param maxSampleSize     The maximum sample size
 	 * @return The sample size to use.
 	 */
-	private static int getSampleSize(double sampleRatio, int nRows, int nCols, int nnzRows, int minSampleSize,
+	private static int getSampleSize(double sampleRatio, int nRows, int nCols, double sparsity, int minSampleSize,
 		int maxSampleSize) {
+
 		// Start sample size at the min sample size.
 		int sampleSize = minSampleSize;
+
 		// Scale the sample size disproportionally with the number of rows in the input.
 		// Since the the number of rows needed to classify the contained values in a population doesn't scale linearly.
 		sampleSize += (int) Math.ceil(Math.pow(nRows, 0.65));
-		// Scale the sample size with the number of nonzero rows.
-		// This tries to make the sample bigger when there is less nonzero values in the matrix.
-		// This is done to increase the likelihood that the sample is big enough to contain some of the values.
-		sampleSize = (int) Math.min((double) sampleSize * ((double) nRows / nnzRows), maxSampleSize);
+
+		// Scale sample size based on overall sparsity so that if the input is very sparse, increase the sample size.
+		sampleSize = (int) (sampleSize * (1.0 / Math.min(sparsity + 0.2, 1.0)));
+
+		// adhere to maximum sample size.
+		sampleSize = (int) Math.max(minSampleSize, Math.min(sampleSize, maxSampleSize));
+
 		return sampleSize;
 	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorSample.java b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorSample.java
index 0911d63..9c1363c 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorSample.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorSample.java
@@ -26,10 +26,8 @@ import java.util.Random;
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.sysds.runtime.compress.CompressionSettings;
 import org.apache.sysds.runtime.compress.bitmap.ABitmap;
-import org.apache.sysds.runtime.compress.bitmap.BitmapEncoder;
 import org.apache.sysds.runtime.compress.colgroup.AColGroup.CompressionType;
-import org.apache.sysds.runtime.compress.colgroup.mapping.AMapToData;
-import org.apache.sysds.runtime.compress.colgroup.mapping.MapToFactory;
+import org.apache.sysds.runtime.compress.estim.encoding.IEncode;
 import org.apache.sysds.runtime.compress.estim.sample.SampleEstimatorFactory;
 import org.apache.sysds.runtime.controlprogram.parfor.stat.Timing;
 import org.apache.sysds.runtime.data.DenseBlock;
@@ -56,7 +54,7 @@ public class CompressedSizeEstimatorSample extends CompressedSizeEstimator {
 	 * @param sampleSize The size to sample from the data.
 	 * @param k          The parallelization degree allowed.
 	 */
-	protected CompressedSizeEstimatorSample(MatrixBlock data, CompressionSettings cs, int sampleSize, int k) {
+	public CompressedSizeEstimatorSample(MatrixBlock data, CompressionSettings cs, int sampleSize, int k) {
 		super(data, cs);
 		_k = k;
 		_sampleSize = sampleSize;
@@ -76,6 +74,7 @@ public class CompressedSizeEstimatorSample extends CompressedSizeEstimator {
 		return _sample;
 	}
 
+	@Override
 	public final int getSampleSize() {
 		return _sampleSize;
 	}
@@ -84,18 +83,13 @@ public class CompressedSizeEstimatorSample extends CompressedSizeEstimator {
 	public CompressedSizeInfoColGroup estimateCompressedColGroupSize(int[] colIndexes, int estimate,
 		int nrUniqueUpperBound) {
 
-		// extract statistics from sample
-		final ABitmap ubm = BitmapEncoder.extractBitmap(colIndexes, _sample, _transposed, estimate, false);
-		final EstimationFactors sampleFacts = EstimationFactors.computeSizeEstimationFactors(ubm, _sampleSize, false,
-			colIndexes);
-		final AMapToData map = MapToFactory.create(_sampleSize, ubm);
-
-		// result scaled
+		final IEncode map = IEncode.createFromMatrixBlock(_sample, _transposed, colIndexes);
+		final EstimationFactors sampleFacts = map.computeSizeEstimation(colIndexes, _sampleSize, _data.getSparsity(), _data.getSparsity());
+		// EstimationFactors.computeSizeEstimation(colIndexes, map, false, _sampleSize,
+			// false);
 		final EstimationFactors em = estimateCompressionFactors(sampleFacts, map, colIndexes, nrUniqueUpperBound);
-
-		// LOG.error("Sample vs Scaled:\n" + sampleFacts + "\n" + em + "\n");
-
 		return new CompressedSizeInfoColGroup(colIndexes, em, _cs.validCompressions, map);
+
 	}
 
 	@Override
@@ -111,19 +105,17 @@ public class CompressedSizeEstimatorSample extends CompressedSizeEstimator {
 		if((long) g1.getNumVals() * g2.getNumVals() > (long) Integer.MAX_VALUE)
 			return null;
 
-		final AMapToData map = MapToFactory.join(g1.getMap(), g2.getMap());
-		final EstimationFactors sampleFacts = EstimationFactors.computeSizeEstimation(joined, map,
-			_cs.validCompressions.contains(CompressionType.RLE), map.size(), false);
+		final IEncode map = g1.getMap().join(g2.getMap());
+		final EstimationFactors sampleFacts = map.computeSizeEstimation(joined, _sampleSize,_data.getSparsity(), _data.getSparsity());
+		//  EstimationFactors.computeSizeEstimation(joined, map,
+			// _cs.validCompressions.contains(CompressionType.RLE), map.size(), false);
 
 		// result facts
 		final EstimationFactors em = estimateCompressionFactors(sampleFacts, map, joined, joinedMaxDistinct);
-
-		// LOG.error("Sample vs Scaled Join:\n" + sampleFacts + "\n" + em + "\n");
-
 		return new CompressedSizeInfoColGroup(joined, em, _cs.validCompressions, map);
 	}
 
-	private EstimationFactors estimateCompressionFactors(EstimationFactors sampleFacts, AMapToData map, int[] colIndexes,
+	private EstimationFactors estimateCompressionFactors(EstimationFactors sampleFacts, IEncode map, int[] colIndexes,
 		int nrUniqueUpperBound) {
 		final int numRows = getNumRows();
 		if(map == null || sampleFacts == null) {
@@ -142,7 +134,7 @@ public class CompressedSizeEstimatorSample extends CompressedSizeEstimator {
 
 			final int numOffs = calculateOffs(sampleFacts, _sampleSize, numRows, scalingFactor, numZerosInSample);
 
-			final int totalCardinality = getEstimatedDistinctCount(sampleFacts.frequencies, nrUniqueUpperBound, numOffs,
+			final int totalCardinality = getEstimatedDistinctCount(sampleFacts.frequencies, nrUniqueUpperBound, numRows,
 				sampleFacts.numOffs);
 
 			final int totalNumRuns = getNumRuns(map, sampleFacts.numVals, _sampleSize, numRows);
@@ -190,7 +182,7 @@ public class CompressedSizeEstimatorSample extends CompressedSizeEstimator {
 
 			// add one to make sure that Uncompressed columns are considered as containing at least one value.
 			if(nnzCount == 0)
-				nnzCount += 1;
+				nnzCount += colIndexes.length;
 			return nnzCount / ((double) getNumRows() * colIndexes.length);
 		}
 		else
@@ -204,7 +196,7 @@ public class CompressedSizeEstimatorSample extends CompressedSizeEstimator {
 		return Math.min(est, upperBound);
 	}
 
-	private int getNumRuns(AMapToData map, int numVals, int sampleSize, int totalNumRows) {
+	private int getNumRuns(IEncode map, int numVals, int sampleSize, int totalNumRows) {
 		// estimate number of segments and number of runs incl correction for
 		// empty segments and empty runs (via expected mean of offset value)
 		// int numUnseenSeg = (int) (unseenVals * Math.ceil((double) _numRows / BitmapEncoder.BITMAP_BLOCK_SZ / 2));
@@ -372,7 +364,7 @@ public class CompressedSizeEstimatorSample extends CompressedSizeEstimator {
 		return (int) Math.min(Math.round(numRuns), Integer.MAX_VALUE);
 	}
 
-	private static int getNumRuns(AMapToData map, int sampleSize, int totalNumRows) {
+	private static int getNumRuns(IEncode map, int sampleSize, int totalNumRows) {
 		throw new NotImplementedException("Not Supported ever since the ubm was replaced by the map");
 	}
 
@@ -426,10 +418,7 @@ public class CompressedSizeEstimatorSample extends CompressedSizeEstimator {
 		else
 			sampledMatrixBlock = defaultSlowSamplingPath(sampleRows);
 
-		if(sampledMatrixBlock.isEmpty())
-			return null;
-		else
-			return sampledMatrixBlock;
+		return sampledMatrixBlock;
 	}
 
 	private MatrixBlock sparseNotTransposedSamplePath(int[] sampleRows) {
diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorUltraSparse.java b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorUltraSparse.java
index 96a937f..5915fcd 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorUltraSparse.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorUltraSparse.java
@@ -104,4 +104,9 @@ public class CompressedSizeEstimatorUltraSparse extends CompressedSizeEstimator
 	protected int worstCaseUpperBound(int[] columns) {
 		return getNumRows();
 	}
+
+	@Override
+	public final int getSampleSize() {
+		return getNumRows();
+	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfo.java b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfo.java
index 6c85b26..10f1fad 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfo.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfo.java
@@ -56,24 +56,6 @@ public class CompressedSizeInfo {
 		compressionInfo = info;
 	}
 
-	public void joinEmpty(int nRows) {
-		List<CompressedSizeInfoColGroup> ng = new ArrayList<>();
-		List<Integer> empty = new ArrayList<>();
-		for(CompressedSizeInfoColGroup g : compressionInfo) {
-			if(g.isEmpty())
-				empty.add(g.getColumns()[0]);
-			else
-				ng.add(g);
-		}
-		int[] emptyColumns = new int[empty.size()];
-		for(int i = 0; i < empty.size(); i++)
-			emptyColumns[i] = empty.get(i);
-		if(empty.size() > 0) {
-			ng.add(new CompressedSizeInfoColGroup(emptyColumns, nRows));
-			compressionInfo = ng;
-		}
-	}
-
 	/**
 	 * Method for returning the calculated memory usage from this specific compression plan.
 	 * 
diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java
index 78eeb31..04a2248 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java
@@ -28,11 +28,10 @@ import org.apache.commons.lang.NotImplementedException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sysds.runtime.compress.CompressionSettings;
-import org.apache.sysds.runtime.compress.bitmap.ABitmap;
+import org.apache.sysds.runtime.compress.DMLCompressionException;
 import org.apache.sysds.runtime.compress.colgroup.AColGroup.CompressionType;
 import org.apache.sysds.runtime.compress.colgroup.ColGroupSizes;
-import org.apache.sysds.runtime.compress.colgroup.mapping.AMapToData;
-import org.apache.sysds.runtime.compress.colgroup.mapping.MapToFactory;
+import org.apache.sysds.runtime.compress.estim.encoding.IEncode;
 
 /**
  * Information collected about a specific ColGroup's compression size.
@@ -51,7 +50,7 @@ public class CompressedSizeInfoColGroup {
 	/**
 	 * Map containing a mapping to unique values, but not necessarily the actual values contained in this column group
 	 */
-	private AMapToData _map;
+	private IEncode _map;
 
 	/**
 	 * Join columns without analyzing the content. This only specify the compression ratio if encoded in DDC since this
@@ -81,51 +80,12 @@ public class CompressedSizeInfoColGroup {
 		_sizes = null;
 		_bestCompressionType = CompressionType.SDC;
 		_minSize = ColGroupSizes.estimateInMemorySizeSDC(columns.length, facts.numVals, facts.numRows, facts.largestOff,
-			true, false, sparsity, false);
-		_map = null;
-	}
-
-	protected CompressedSizeInfoColGroup(int[] columns, int numRows) {
-		_facts = new EstimationFactors(columns.length, 1, numRows);
-		_cardinalityRatio = (double) 1 / numRows;
-		_sizes = null;
-		_cols = columns;
-		_bestCompressionType = null;
-		_minSize = ColGroupSizes.estimateInMemorySizeCONST(columns.length, numRows, 1.0, false);
-		_map = null;
-	}
-
-	public CompressedSizeInfoColGroup(int[] columns, EstimationFactors facts, Set<CompressionType> validCompressionTypes,
-		ABitmap ubm, int sampleSize) {
-		_facts = facts;
-		_cols = columns;
-		_cardinalityRatio = (double) facts.numVals / facts.numRows;
-		_sizes = calculateCompressionSizes(_cols.length, facts, validCompressionTypes);
-		Map.Entry<CompressionType, Long> bestEntry = null;
-		for(Map.Entry<CompressionType, Long> ent : _sizes.entrySet()) {
-			if(bestEntry == null || ent.getValue() < bestEntry.getValue())
-				bestEntry = ent;
-		}
-
-		_bestCompressionType = bestEntry.getKey();
-		_minSize = bestEntry.getValue();
-		_map = MapToFactory.create(sampleSize, ubm);
-		if(LOG.isTraceEnabled())
-			LOG.trace(this);
-	}
-
-	public CompressedSizeInfoColGroup(int[] columns, EstimationFactors facts, CompressionType bestType) {
-		_facts = facts;
-		_cols = columns;
-		_cardinalityRatio = 1.0;
-		_sizes = null;
-		_bestCompressionType = bestType;
-		_minSize = 0;
+			sparsity, facts.zeroIsMostFrequent, false);
 		_map = null;
 	}
 
 	protected CompressedSizeInfoColGroup(int[] columns, EstimationFactors facts,
-		Set<CompressionType> validCompressionTypes, AMapToData map) {
+		Set<CompressionType> validCompressionTypes, IEncode map) {
 		_cols = columns;
 		_facts = facts;
 		_cardinalityRatio = (double) facts.numVals / facts.numRows;
@@ -143,27 +103,11 @@ public class CompressedSizeInfoColGroup {
 			LOG.trace(this);
 	}
 
-	/**
-	 * This method adds a column group without having to analyze. This is because the columns added are constant groups.
-	 * 
-	 * NOTE THIS IS ONLY VALID IF THE COLUMN ADDED IS EMPTY OR CONSTANT!
-	 * 
-	 * @param columns               The columns of the colgroups together
-	 * @param oneSide               One of the sides, this may contain something, but the other side (not part of the
-	 *                              argument) should not.
-	 * @param validCompressionTypes The List of valid compression techniques to use
-	 * @return A Combined estimate of the column group.
-	 */
-	public static CompressedSizeInfoColGroup addConstGroup(int[] columns, CompressedSizeInfoColGroup oneSide,
-		Set<CompressionType> validCompressionTypes) {
-		EstimationFactors fact = new EstimationFactors(columns.length, oneSide._facts);
-		CompressedSizeInfoColGroup ret = new CompressedSizeInfoColGroup(columns, fact, validCompressionTypes,
-			oneSide._map);
-		return ret;
-	}
-
 	public long getCompressionSize(CompressionType ct) {
-		return _sizes.get(ct);
+		if(_sizes != null)
+			return _sizes.get(ct);
+		else
+			throw new DMLCompressionException("There was no encodings analyzed");
 	}
 
 	public CompressionType getBestCompressionType(CompressionSettings cs) {
@@ -220,7 +164,7 @@ public class CompressedSizeInfoColGroup {
 		return _facts.tupleSparsity;
 	}
 
-	public AMapToData getMap() {
+	public IEncode getMap() {
 		return _map;
 	}
 
@@ -233,9 +177,8 @@ public class CompressedSizeInfoColGroup {
 		Map<CompressionType, Long> res = new HashMap<>();
 		for(CompressionType ct : validCompressionTypes) {
 			long compSize = getCompressionSize(numCols, ct, fact);
-			if(compSize > 0) {
+			if(compSize > 0)
 				res.put(ct, compSize);
-			}
 		}
 		return res;
 	}
@@ -245,31 +188,31 @@ public class CompressedSizeInfoColGroup {
 	}
 
 	private static long getCompressionSize(int numCols, CompressionType ct, EstimationFactors fact) {
-
+		int nv;
 		switch(ct) {
 			case DDC:
+				nv = fact.numVals + (fact.zeroIsMostFrequent ? 1 : 0);
 				// + 1 if the column contains zero
-				return ColGroupSizes.estimateInMemorySizeDDC(numCols, fact.numVals + (fact.numOffs < fact.numRows ? 1 : 0),
-					fact.numRows, fact.tupleSparsity, fact.lossy);
+				return ColGroupSizes.estimateInMemorySizeDDC(numCols, nv, fact.numRows, fact.tupleSparsity, fact.lossy);
 			case RLE:
-				return ColGroupSizes.estimateInMemorySizeRLE(numCols, fact.numVals, fact.numRuns, fact.numRows,
-					fact.tupleSparsity, fact.lossy);
+				nv = fact.numVals + (fact.zeroIsMostFrequent ? 1 : 0);
+				return ColGroupSizes.estimateInMemorySizeRLE(numCols, nv, fact.numRuns, fact.numRows, fact.tupleSparsity,
+					fact.lossy);
 			case OLE:
-				return ColGroupSizes.estimateInMemorySizeOLE(numCols, fact.numVals, fact.numOffs + fact.numVals,
-					fact.numRows, fact.tupleSparsity, fact.lossy);
+				nv = fact.numVals + (fact.zeroIsMostFrequent ? 1 : 0);
+				return ColGroupSizes.estimateInMemorySizeOLE(numCols, nv, fact.numOffs + fact.numVals, fact.numRows,
+					fact.tupleSparsity, fact.lossy);
 			case UNCOMPRESSED:
-				return ColGroupSizes.estimateInMemorySizeUncompressed(fact.numRows, numCols, fact.overAllSparsity);
+				return ColGroupSizes.estimateInMemorySizeUncompressed(fact.numRows, numCols,
+					Math.min(Math.max(fact.overAllSparsity, 0.1) + numCols / 10, 1));
 			case SDC:
-				if(fact.numOffs <= 1)
-					return ColGroupSizes.estimateInMemorySizeSDCSingle(numCols, fact.numVals, fact.numRows, fact.largestOff,
-						fact.zeroIsMostFrequent, fact.containNoZeroValues, fact.tupleSparsity, fact.lossy);
 				return ColGroupSizes.estimateInMemorySizeSDC(numCols, fact.numVals, fact.numRows, fact.largestOff,
-					fact.zeroIsMostFrequent, fact.containNoZeroValues, fact.tupleSparsity, fact.lossy);
+					fact.tupleSparsity, fact.zeroIsMostFrequent, fact.lossy);
 			case CONST:
 				if(fact.numOffs == 0)
 					return ColGroupSizes.estimateInMemorySizeEMPTY(numCols);
 				else if(fact.numOffs == fact.numRows && fact.numVals == 1)
-					return ColGroupSizes.estimateInMemorySizeCONST(numCols, fact.numVals, fact.tupleSparsity, fact.lossy);
+					return ColGroupSizes.estimateInMemorySizeCONST(numCols, fact.tupleSparsity, fact.lossy);
 				else
 					return -1;
 			default:
@@ -284,7 +227,8 @@ public class CompressedSizeInfoColGroup {
 	@Override
 	public String toString() {
 		StringBuilder sb = new StringBuilder();
-		sb.append("\ncols: " + Arrays.toString(_cols));
+		sb.append(this.getClass().getSimpleName());
+		sb.append("cols: " + Arrays.toString(_cols));
 		sb.append(" Best Type: " + _bestCompressionType);
 		sb.append(" Cardinality: ");
 		sb.append(_cardinalityRatio);
@@ -293,6 +237,7 @@ public class CompressedSizeInfoColGroup {
 		sb.append(" Sizes: ");
 		sb.append(_sizes);
 		sb.append(" facts: " + _facts);
+		sb.append("\n" + _map);
 		return sb.toString();
 	}
 
diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/EstimationFactors.java b/src/main/java/org/apache/sysds/runtime/compress/estim/EstimationFactors.java
index 0920577..7cfbf23 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/estim/EstimationFactors.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/estim/EstimationFactors.java
@@ -19,13 +19,9 @@
 
 package org.apache.sysds.runtime.compress.estim;
 
-import org.apache.commons.lang.NotImplementedException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.sysds.runtime.compress.CompressionSettings;
 import org.apache.sysds.runtime.compress.DMLCompressionException;
-import org.apache.sysds.runtime.compress.bitmap.ABitmap;
-import org.apache.sysds.runtime.compress.colgroup.mapping.AMapToData;
 
 /**
  * Compressed Size Estimation factors. Contains meta information used to estimate the compression sizes of given columns
@@ -100,25 +96,7 @@ public class EstimationFactors {
 		this.tupleSparsity = 1;
 	}
 
-	protected EstimationFactors(int nCols, EstimationFactors old) {
-		if(nCols >= 5 && old.numVals == 0)
-			this.numVals = 1;
-		else
-			this.numVals = old.numVals;
-		this.numRows = old.numRows;
-		this.numOffs = old.numOffs;
-		this.largestOff = old.largestOff;
-		this.frequencies = old.frequencies;
-		this.numRuns = old.numRuns;
-		this.numSingle = old.numSingle;
-		this.lossy = old.lossy;
-		this.zeroIsMostFrequent = old.zeroIsMostFrequent;
-		this.containNoZeroValues = old.containNoZeroValues;
-		this.overAllSparsity = old.overAllSparsity;
-		this.tupleSparsity = old.tupleSparsity;
-	}
-
-	protected EstimationFactors(int nCols, int numVals, int numOffs, int largestOff, int[] frequencies, int numRuns,
+	public EstimationFactors(int nCols, int numVals, int numOffs, int largestOff, int[] frequencies, int numRuns,
 		int numSingle, int numRows, boolean lossy, boolean zeroIsMostFrequent, double overAllSparsity,
 		double tupleSparsity) {
 		// Safety in numbers, if the estimation factor is saying that there is no values in 5 columns,
@@ -139,7 +117,7 @@ public class EstimationFactors {
 		this.containNoZeroValues = numOffs == numRows && overAllSparsity < 1;
 		this.overAllSparsity = overAllSparsity;
 		this.tupleSparsity = tupleSparsity;
-			
+
 		if(overAllSparsity > 1 || overAllSparsity < 0)
 			throw new DMLCompressionException("Invalid OverAllSparsity of: " + overAllSparsity);
 		if(tupleSparsity > 1 || tupleSparsity < 0)
@@ -150,95 +128,14 @@ public class EstimationFactors {
 					+ " > numRows: " + numRows);
 	}
 
-	protected static EstimationFactors emptyFactors(int nCols, int nRows){
-		return new EstimationFactors(nCols, 0, 0, 0 , null, 0, 0, nRows, false, true, 0, 0);
-	}
-
-	protected static EstimationFactors computeSizeEstimationFactors(ABitmap ubm, int rlen, boolean inclRLE, int[] cols) {
-		if(ubm == null || ubm.getOffsetList() == null)
-			return null;
-
-		int numVals = ubm.getNumValues();
-		int numRuns = 0;
-		int numOffs = 0;
-		int numSingle = 0;
-		int largestOffs = 0;
-		long tupleNonZeroCount = 0;
-		long overallNonZeroCount = 0;
-		// compute size estimation factors
-		for(int i = 0; i < numVals; i++) {
-			final int listSize = ubm.getNumOffsets(i);
-			final int numZerosInTuple = ubm.getNumNonZerosInOffset(i);
-			tupleNonZeroCount += numZerosInTuple;
-			overallNonZeroCount += numZerosInTuple * listSize;
-			numOffs += listSize;
-			if(listSize > largestOffs)
-				largestOffs = listSize;
-
-			numSingle += (listSize == 1) ? 1 : 0;
-			if(inclRLE) {
-				int[] list = ubm.getOffsetsList(i).extractValues();
-				int lastOff = -2;
-				numRuns += list[listSize - 1] / (CompressionSettings.BITMAP_BLOCK_SZ);
-				for(int j = 0; j < listSize; j++) {
-					if(list[j] != lastOff + 1)
-						numRuns++;
-					lastOff = list[j];
-				}
-			}
-		}
-
-		final int zerosOffs = rlen - numOffs;
-		int[] frequencies = new int[numVals];
-		for(int i = 0; i < numVals; i++)
-			frequencies[i] = ubm.getNumOffsets(i);
-		final boolean zerosLargestOffset = zerosOffs > largestOffs;
-		if(zerosLargestOffset)
-			largestOffs = zerosOffs;
-
-		double overAllSparsity = (double) overallNonZeroCount / (rlen * cols.length);
-		double tupleSparsity = (double) tupleNonZeroCount / (numVals * cols.length);
-
-		return new EstimationFactors(cols.length, numVals, numOffs, largestOffs, frequencies, numRuns, numSingle, rlen,
-			false, zerosLargestOffset, overAllSparsity, tupleSparsity);
-
-	}
-
-	protected static EstimationFactors computeSizeEstimation(final int[] cols, final AMapToData map,
-		final boolean inclRLE, final int numRows, final boolean lastEntryAllZero) {
-		final boolean lossy = false;
-		if(map == null)
-			return null;
-
-		final int nUnique = map.getUnique();
-		if(lastEntryAllZero || inclRLE)
-			throw new NotImplementedException();
-
-		final boolean zerosLargestOffset = false;
-		final double overAllSparsity = 1.0;
-		final double tupleSparsity = 1.0;
-		final int numOffs = map.size();
-		final int numSingle = 0; // unknown
-
-		final int numRuns = 0;
-		int[] counts = new int[nUnique];
-		for(int i = 0; i < map.size(); i++)
-			counts[map.getIndex(i)]++;
-
-		int largestOffs = 0;
-		for(int i = 0; i < nUnique; i++)
-			if(counts[i] > largestOffs)
-				largestOffs = counts[i];
-
-		return new EstimationFactors(cols.length, nUnique, numOffs, largestOffs, counts, numRuns, numSingle, numRows,
-			lossy, zerosLargestOffset, overAllSparsity, tupleSparsity);
-
+	protected static EstimationFactors emptyFactors(int nCols, int nRows) {
+		return new EstimationFactors(nCols, 0, 0, 0, null, 0, 0, nRows, false, true, 0, 0);
 	}
 
 	@Override
 	public String toString() {
 		StringBuilder sb = new StringBuilder();
-		sb.append(" rows:" + numRows);
+		sb.append("rows:" + numRows);
 		sb.append(" num Offsets:" + numOffs);
 		sb.append(" LargestOffset:" + largestOff);
 		sb.append(" num Singles:" + numSingle);
diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/encoding/ConstEncoding.java b/src/main/java/org/apache/sysds/runtime/compress/estim/encoding/ConstEncoding.java
new file mode 100644
index 0000000..3f18350
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/compress/estim/encoding/ConstEncoding.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.compress.estim.encoding;
+
+import org.apache.sysds.runtime.compress.estim.EstimationFactors;
+
+/** Const encoding for cases where the entire group of columns is the same value */
+public class ConstEncoding implements IEncode {
+
+	private final int[] counts;
+
+	protected ConstEncoding(int nRows) {
+		this.counts = new int[] {nRows};
+	}
+
+	@Override
+	public IEncode join(IEncode e) {
+		return e;
+	}
+
+	@Override
+	public int getUnique() {
+		return 1;
+	}
+
+	@Override
+	public int size() {
+		return 1;
+	}
+
+	@Override
+	public int[] getCounts() {
+		return counts;
+	}
+
+	@Override
+	public String toString() {
+		StringBuilder sb = new StringBuilder();
+		sb.append(this.getClass().getSimpleName());
+		return sb.toString();
+	}
+
+	@Override
+	public EstimationFactors computeSizeEstimation(int[] cols, int nRows, double tupleSparsity, double matrixSparsity) {
+		return new EstimationFactors(cols.length, 1, nRows, nRows, counts, 0, 0, nRows, false, false, matrixSparsity,
+			tupleSparsity);
+	}
+
+}
diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/encoding/DenseEncoding.java b/src/main/java/org/apache/sysds/runtime/compress/estim/encoding/DenseEncoding.java
new file mode 100644
index 0000000..45d2879
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/compress/estim/encoding/DenseEncoding.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.compress.estim.encoding;
+
+import java.util.Arrays;
+
+import org.apache.sysds.runtime.compress.DMLCompressionException;
+import org.apache.sysds.runtime.compress.colgroup.mapping.AMapToData;
+import org.apache.sysds.runtime.compress.colgroup.mapping.MapToFactory;
+import org.apache.sysds.runtime.compress.colgroup.offset.AIterator;
+import org.apache.sysds.runtime.compress.estim.EstimationFactors;
+
+public class DenseEncoding implements IEncode {
+
+	private final AMapToData map;
+	private final int[] counts;
+
+	protected DenseEncoding(AMapToData map, int[] counts) {
+		this.map = map;
+		this.counts = counts;
+		if(map.getUnique() == 0)
+			throw new DMLCompressionException("Invalid Dense Encoding");
+	}
+
+	/**
+	 * Protected constructor that also counts the frequencies of the values.
+	 * 
+	 * @param map The Map.
+	 */
+	protected DenseEncoding(AMapToData map) {
+		this.map = map;
+		final int nUnique = map.getUnique();
+		if(nUnique == 0)
+			throw new DMLCompressionException("Invalid Dense Encoding");
+		this.counts = new int[nUnique];
+		for(int i = 0; i < map.size(); i++)
+			counts[map.getIndex(i)]++;
+	}
+
+	@Override
+	public DenseEncoding join(IEncode e) {
+		if(e instanceof EmptyEncoding || e instanceof ConstEncoding)
+			return this;
+		else if(e instanceof SparseEncoding)
+			return joinSparse((SparseEncoding) e);
+		else
+			return joinDense((DenseEncoding) e);
+	}
+
+	protected DenseEncoding joinSparse(SparseEncoding e) {
+
+		final long maxUnique = (long) e.getUnique() * getUnique();
+
+		if(maxUnique > (long) Integer.MAX_VALUE)
+			throw new DMLCompressionException(
+				"Joining impossible using linearized join, since each side has a large number of unique values");
+
+		final int nRows = size();
+		final int nVl = getUnique();
+		final int defR = (e.getUnique() - 1) * nVl;
+		final int[] m = new int[(int) maxUnique];
+		final AMapToData d = MapToFactory.create(nRows, (int) maxUnique);
+
+		final AIterator itr = e.off.getIterator();
+		final int fr = e.off.getOffsetToLast();
+
+		int newUID = 1;
+		int r = 0;
+		for(; r < fr; r++) {
+			final int ir = itr.value();
+			if(ir == r) {
+				
+				final int nv = map.getIndex(r) + e.map.getIndex(itr.getDataIndex()) * nVl;
+				itr.next();
+				newUID = addVal(nv, r, m, newUID, d);
+			}
+			else {
+				final int nv = map.getIndex(r) + defR;
+				newUID = addVal(nv, r, m, newUID, d);
+			}
+		}
+		// add last offset
+		newUID = addVal(map.getIndex(r) + e.map.getIndex(itr.getDataIndex()) * nVl, r++, m, newUID, d);
+
+		// add remaining rows.
+		for(; r < nRows; r++) {
+			final int nv = map.getIndex(r) + defR;
+			newUID = addVal(nv, r, m, newUID, d);
+		}
+
+		// set unique.
+		d.setUnique(newUID - 1);
+		return new DenseEncoding(d);
+	}
+
+	protected static int addVal(int nv, int r, int[] m, int newUID, AMapToData d) {
+		final int mapV = m[nv];
+		if(mapV == 0) {
+			d.set(r, newUID - 1);
+			m[nv] = newUID++;
+		}
+		else
+			d.set(r, mapV - 1);
+		return newUID;
+	}
+
+	protected DenseEncoding joinDense(DenseEncoding e) {
+		if(map == e.map)
+			return this; // unlikely to happen but cheap to compute
+		return new DenseEncoding(MapToFactory.join(map, e.map));
+	}
+
+	@Override
+	public int getUnique() {
+		return counts.length;
+	}
+
+	@Override
+	public int size() {
+		return map.size();
+	}
+
+	@Override
+	public int[] getCounts() {
+		return counts;
+	}
+
+	@Override
+	public EstimationFactors computeSizeEstimation(int[] cols, int nRows, double tupleSparsity, double matrixSparsity) {
+		int largestOffs = 0;
+
+		for(int i = 0; i < counts.length; i++)
+			if(counts[i] > largestOffs)
+				largestOffs = counts[i];
+
+		return new EstimationFactors(cols.length, counts.length, nRows, largestOffs, counts, 0, 0, nRows, false, false,
+			matrixSparsity, tupleSparsity);
+
+	}
+
+	@Override
+	public String toString() {
+		StringBuilder sb = new StringBuilder();
+		sb.append(this.getClass().getSimpleName());
+		sb.append("\n");
+		sb.append("mapping: ");
+		sb.append(map);
+		sb.append("\n");
+		sb.append("counts:  ");
+		sb.append(Arrays.toString(counts));
+		return sb.toString();
+	}
+
+}
diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/encoding/EmptyEncoding.java b/src/main/java/org/apache/sysds/runtime/compress/estim/encoding/EmptyEncoding.java
new file mode 100644
index 0000000..6fa7863
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/compress/estim/encoding/EmptyEncoding.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.compress.estim.encoding;
+
+import org.apache.sysds.runtime.compress.estim.EstimationFactors;
+
+/** Empty encoding for cases where the entire group of columns is zero */
+public class EmptyEncoding implements IEncode {
+
+	/** always a empty int array */
+	private static final int[] counts = new int[] {};
+
+	// empty constructor
+	protected EmptyEncoding() {
+
+	}
+
+	@Override
+	public IEncode join(IEncode e) {
+		return e;
+	}
+
+	@Override
+	public int getUnique() {
+		return 1;
+	}
+
+	@Override
+	public int size() {
+		return 0;
+	}
+
+	@Override
+	public int[] getCounts() {
+		return counts;
+	}
+
+	@Override
+	public String toString() {
+		StringBuilder sb = new StringBuilder();
+		sb.append(this.getClass().getSimpleName());
+		return sb.toString();
+	}
+
+	@Override
+	public EstimationFactors computeSizeEstimation(int[] cols, int nRows, double tupleSparsity, double matrixSparsity) {
+		return new EstimationFactors(cols.length, 0, 0, nRows, counts, 0, 0, nRows, false, true, 0, 0);
+	}
+
+}
diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/encoding/IEncode.java b/src/main/java/org/apache/sysds/runtime/compress/estim/encoding/IEncode.java
new file mode 100644
index 0000000..fc991ff
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/compress/estim/encoding/IEncode.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.compress.estim.encoding;
+
+import java.util.Arrays;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sysds.runtime.compress.colgroup.mapping.AMapToData;
+import org.apache.sysds.runtime.compress.colgroup.mapping.MapToFactory;
+import org.apache.sysds.runtime.compress.colgroup.offset.AOffset;
+import org.apache.sysds.runtime.compress.colgroup.offset.OffsetFactory;
+import org.apache.sysds.runtime.compress.estim.EstimationFactors;
+import org.apache.sysds.runtime.compress.readers.ReaderColumnSelection;
+import org.apache.sysds.runtime.compress.utils.DblArray;
+import org.apache.sysds.runtime.compress.utils.DblArrayCountHashMap;
+import org.apache.sysds.runtime.compress.utils.DoubleCountHashMap;
+import org.apache.sysds.runtime.compress.utils.IntArrayList;
+import org.apache.sysds.runtime.data.DenseBlock;
+import org.apache.sysds.runtime.data.SparseBlock;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+
+/**
+ * This interface covers an intermediate encoding for the samples to improve the efficiency of the joining of sample
+ * column groups.
+ */
+public interface IEncode {
+	static final Log LOG = LogFactory.getLog(IEncode.class.getName());
+
+	public static IEncode createFromMatrixBlock(MatrixBlock m, boolean transposed, int[] rowCols) {
+		if(m.isEmpty())
+			return new EmptyEncoding();
+		else if(rowCols.length == 1)
+			return createFromMatrixBlock(m, transposed, rowCols[0]);
+		else
+			return createWithReader(m, rowCols, transposed);
+	}
+
+	public static IEncode createFromMatrixBlock(MatrixBlock m, boolean transposed, int rowCol) {
+		if(m.isEmpty())
+			return new EmptyEncoding();
+		else if(transposed) {
+			if(m.isInSparseFormat())
+				return createFromSparseTransposed(m, rowCol);
+			else
+				return createFromDenseTransposed(m, rowCol);
+		}
+		else if(m.isInSparseFormat())
+			return createFromSparse(m, rowCol);
+		else
+			return createFromDense(m, rowCol);
+	}
+
+	public static IEncode createFromDenseTransposed(MatrixBlock m, int row) {
+		final DoubleCountHashMap map = new DoubleCountHashMap(16);
+		final DenseBlock db = m.getDenseBlock();
+		final int off = db.pos(row);
+		final int nCol = m.getNumColumns();
+		final int end = off + nCol;
+		final double[] vals = db.values(row);
+
+		// Iteration 1, make Count HashMap.
+		for(int i = off; i < end; i++) // sequential access
+			map.increment(vals[i]);
+
+		final int nUnique = map.size();
+
+		if(nUnique == 1)
+			return new ConstEncoding(m.getNumColumns());
+
+		if(map.getOrDefault(0, -1) * 10 > nCol * 4) { // 40 %
+			final int[] counts = map.getUnorderedCountsAndReplaceWithUIDsWithout0(); // map.getUnorderedCountsAndReplaceWithUIDs();
+			final int zeroCount = map.get(0);
+			final int nV = nCol - zeroCount;
+			final IntArrayList offsets = new IntArrayList(nV);
+
+			final AMapToData d = MapToFactory.create(nV, nUnique - 1);
+
+			// for(int i = off, r = 0, di = 0; i < end; i += nCol, r++){
+			for(int i = off, r = 0, di = 0; i < end; i++, r++) {
+				if(vals[i] != 0) {
+					offsets.appendValue(r);
+					d.set(di++, map.get(vals[i]) );
+				}
+			}
+
+			final AOffset o = OffsetFactory.createOffset(offsets);
+			return new SparseEncoding(d, o, zeroCount, counts, nCol);
+		}
+		else {
+			// Allocate counts, and iterate once to replace counts with u ids
+			final int[] counts = map.getUnorderedCountsAndReplaceWithUIDs();
+
+			// Create output map
+			final AMapToData d = MapToFactory.create(nCol, nUnique);
+
+			// Iteration 2, make final map
+			for(int i = off, r = 0; i < end; i++, r++)
+				d.set(r, map.get(vals[i]));
+
+			return new DenseEncoding(d, counts);
+		}
+	}
+
+	public static IEncode createFromSparseTransposed(MatrixBlock m, int row) {
+		final DoubleCountHashMap map = new DoubleCountHashMap(16);
+		final SparseBlock sb = m.getSparseBlock();
+		if(sb.isEmpty(row))
+			return new EmptyEncoding();
+		final int apos = sb.pos(row);
+		final int alen = sb.size(row) + apos;
+		final double[] avals = sb.values(row);
+
+		// Iteration 1 of non zero values, make Count HashMap.
+		for(int i = apos; i < alen; i++) // sequential of non zero cells.
+			map.increment(avals[i]);
+
+		final int nUnique = map.size();
+
+		// Allocate counts
+		final int[] counts = map.getUnorderedCountsAndReplaceWithUIDs();
+
+		// Create output map
+		final AMapToData d = MapToFactory.create(alen - apos, nUnique);
+
+		// Iteration 2 of non zero values, make either a IEncode Dense or sparse map.
+		for(int i = apos, j = 0; i < alen; i++, j++)
+			d.set(j, map.get(avals[i]));
+
+		// Iteration 3 of non zero indexes, make a Offset Encoding to know what cells are zero and not.
+		// not done yet
+		AOffset o = OffsetFactory.createOffset(sb.indexes(row), apos, alen);
+
+		final int zero = m.getNumColumns() - o.getSize();
+
+		return new SparseEncoding(d, o, zero, counts, m.getNumColumns());
+	}
+
+	public static IEncode createFromDense(MatrixBlock m, int col) {
+		final DenseBlock db = m.getDenseBlock();
+		if(!db.isContiguous())
+			throw new NotImplementedException("Not Implemented non contiguous dense matrix encoding for sample");
+		final DoubleCountHashMap map = new DoubleCountHashMap(16);
+		final int off = col;
+		final int nCol = m.getNumColumns();
+		final int nRow = m.getNumRows();
+		final int end = off + nRow * nCol;
+		final double[] vals = m.getDenseBlockValues();
+
+		// Iteration 1, make Count HashMap.
+		for(int i = off; i < end; i += nCol) // jump down through rows.
+			map.increment(vals[i]);
+
+		final int nUnique = map.size();
+		if(nUnique == 1)
+			return new ConstEncoding(m.getNumColumns());
+
+		if(map.getOrDefault(0, -1) * 10 > nRow * 4) { // 40 %
+			final int[] counts = map.getUnorderedCountsAndReplaceWithUIDsWithout0();
+			final int zeroCount = map.get(0);
+			final int nV = m.getNumRows() - zeroCount;
+			final IntArrayList offsets = new IntArrayList(nV);
+
+			final AMapToData d = MapToFactory.create(nV, nUnique);
+
+			for(int i = off, r = 0, di = 0; i < end; i += nCol, r++) {
+				if(vals[i] != 0) {
+					offsets.appendValue(r);
+					d.set(di++, map.get(vals[i]));
+				}
+			}
+
+			final AOffset o = OffsetFactory.createOffset(offsets);
+
+			return new SparseEncoding(d, o, zeroCount, counts, nRow);
+		}
+		else {
+			// Allocate counts, and iterate once to replace counts with u ids
+			final int[] counts = map.getUnorderedCountsAndReplaceWithUIDs();
+
+			final AMapToData d = MapToFactory.create(nRow, nUnique);
+			// Iteration 2, make final map
+			for(int i = off, r = 0; i < end; i += nCol, r++)
+				d.set(r, map.get(vals[i]));
+
+			return new DenseEncoding(d, counts);
+		}
+	}
+
+	public static IEncode createFromSparse(MatrixBlock m, int col) {
+
+		final DoubleCountHashMap map = new DoubleCountHashMap(16);
+		final SparseBlock sb = m.getSparseBlock();
+
+		final double guessedNumberOfNonZero = Math.min(4, Math.ceil((double)m.getNumRows() * m.getSparsity()));
+		final IntArrayList offsets = new IntArrayList((int)guessedNumberOfNonZero);
+
+		// Iteration 1 of non zero values, make Count HashMap.
+		for(int r = 0; r < m.getNumRows(); r++) { // Horrible performance but ... it works.
+			if(sb.isEmpty(r))
+				continue;
+			final int apos = sb.pos(r);
+			final int alen = sb.size(r) + apos;
+			final int[] aix = sb.indexes(r);
+			final int index = Arrays.binarySearch(aix, apos, alen, col);
+			if(index >= 0) {
+				offsets.appendValue(r);
+				map.increment(sb.values(r)[index]);
+			}
+		}
+		if(offsets.size() == 0)
+			return new EmptyEncoding();
+
+		final int nUnique = map.size();
+		final int[] counts = map.getUnorderedCountsAndReplaceWithUIDs();
+
+		int sumCounts = 0;
+		for(int c : counts)
+			sumCounts += c;
+
+		final AMapToData d = MapToFactory.create(sumCounts, nUnique);
+
+		// Iteration 2 of non zero values, make either a IEncode Dense or sparse map.
+		for(int off = 0, r = 0; off < sumCounts; r++) {
+			if(sb.isEmpty(r))
+				continue;
+			final int apos = sb.pos(r);
+			final int alen = sb.size(r) + apos;
+			final int[] aix = sb.indexes(r);
+			// Performance hit because of binary search for each row.
+			final int index = Arrays.binarySearch(aix, apos, alen, col);
+			if(index >= 0)
+				d.set(off++, map.get(sb.values(r)[index]));
+		}
+
+		// Iteration 3 of non zero indexes, make a Offset Encoding to know what cells are zero and not.
+		AOffset o = OffsetFactory.createOffset(offsets);
+
+		final int zero = m.getNumRows() - sumCounts;
+
+		return new SparseEncoding(d, o, zero, counts, m.getNumRows());
+	}
+
+	public static IEncode createWithReader(MatrixBlock m, int[] rowCols, boolean transposed) {
+		final ReaderColumnSelection reader1 = ReaderColumnSelection.createReader(m, rowCols, transposed);
+		final int nRows = transposed ? m.getNumColumns() : m.getNumRows();
+		final DblArrayCountHashMap map = new DblArrayCountHashMap(16);
+		final IntArrayList offsets = new IntArrayList();
+		DblArray cellVals = reader1.nextRow();
+
+		// Iteration 1, make Count HashMap, and offsets.
+		while(cellVals != null) {
+			map.increment(cellVals);
+			offsets.appendValue(reader1.getCurrentRowIndex());
+			cellVals = reader1.nextRow();
+		}
+
+		if(offsets.size() == 0)
+			return new EmptyEncoding();
+
+		if(map.size() == 1 && offsets.size() == nRows)
+			return new ConstEncoding(nRows);
+
+		if(offsets.size() < nRows) {
+			// there was fewer offsets than rows.
+			if(offsets.size() < nRows / 2) {
+				// Output encoded Sparse since there is more than half empty.
+				final int[] counts = map.getUnorderedCountsAndReplaceWithUIDs();
+				final int zeros = nRows - offsets.size();
+				return createWithReaderSparse(m, map, zeros, counts, rowCols, offsets, nRows, transposed);
+			}
+			else {
+				// Output Encoded dense since there is not enough common values.
+				// TODO add Common group, that allows to now allocate this extra cell
+				final int[] counts = map.getUnorderedCountsAndReplaceWithUIDsWithExtraCell();
+				counts[counts.length - 1] = nRows - offsets.size();
+				return createWithReaderDense(m, map, counts, rowCols, nRows, transposed);
+			}
+		}
+		else {
+			// TODO add Common group, that allows to allocate with one of the map entries as the common value.
+			// the input was fully dense.
+			final int[] counts = map.getUnorderedCountsAndReplaceWithUIDs();
+			return createWithReaderDense(m, map, counts, rowCols, nRows, transposed);
+		}
+	}
+
+	public static IEncode createWithReaderDense(MatrixBlock m, DblArrayCountHashMap map, int[] counts, int[] rowCols,
+		int nRows, boolean transposed) {
+		// Iteration 2,
+		final ReaderColumnSelection reader2 = ReaderColumnSelection.createReader(m, rowCols, transposed);
+		final AMapToData d = MapToFactory.create(nRows, counts.length);
+		final int def = counts.length - 1;
+
+		DblArray cellVals = reader2.nextRow();
+		int r = 0;
+		while(r < nRows && cellVals != null) {
+			final int row = reader2.getCurrentRowIndex();
+			if(row == r) {
+				d.set(row, map.get(cellVals));
+				cellVals = reader2.nextRow();
+			}
+			else
+				d.set(r, def);
+			r++;
+		}
+
+		while(r < nRows)
+			d.set(r++, def);
+		return new DenseEncoding(d, counts);
+	}
+
+	public static IEncode createWithReaderSparse(MatrixBlock m, DblArrayCountHashMap map, int zeros, int[] counts,
+		int[] rowCols, IntArrayList offsets, int nRows, boolean transposed) {
+		final ReaderColumnSelection reader2 = ReaderColumnSelection.createReader(m, rowCols, transposed);
+		DblArray cellVals = reader2.nextRow();
+
+		final AMapToData d = MapToFactory.create(offsets.size(), map.size());
+
+		int i = 0;
+		// Iterator 2 of non zero tuples.
+		while(cellVals != null) {
+			d.set(i++, map.get(cellVals));
+			cellVals = reader2.nextRow();
+		}
+
+		// iteration 3 of non zero indexes,
+		final AOffset o = OffsetFactory.createOffset(offsets);
+
+		return new SparseEncoding(d, o, zeros, counts, nRows);
+	}
+
+	public IEncode join(IEncode e);
+
+	public int getUnique();
+
+	public int size();
+
+	public int[] getCounts();
+
+	public EstimationFactors computeSizeEstimation(int[] cols, int nRows, double tupleSparsity, double matrixSparsity);
+
+}
diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/encoding/SparseEncoding.java b/src/main/java/org/apache/sysds/runtime/compress/estim/encoding/SparseEncoding.java
new file mode 100644
index 0000000..51520b3
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/compress/estim/encoding/SparseEncoding.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.compress.estim.encoding;
+
+import java.util.Arrays;
+
+import org.apache.sysds.runtime.compress.DMLCompressionException;
+import org.apache.sysds.runtime.compress.colgroup.mapping.AMapToData;
+import org.apache.sysds.runtime.compress.colgroup.mapping.MapToFactory;
+import org.apache.sysds.runtime.compress.colgroup.offset.AIterator;
+import org.apache.sysds.runtime.compress.colgroup.offset.AOffset;
+import org.apache.sysds.runtime.compress.colgroup.offset.OffsetFactory;
+import org.apache.sysds.runtime.compress.estim.EstimationFactors;
+import org.apache.sysds.runtime.compress.utils.IntArrayList;
+
+/** Most common is zero */
+public class SparseEncoding implements IEncode {
+
+	protected final AMapToData map;
+	protected final AOffset off;
+	protected final int nRows;
+
+	/** Count of Zero tuples in this encoding */
+	protected final int zeroCount;
+
+	/** Count of non zero tuples in this encoding */
+	protected final int[] counts;
+
+	protected SparseEncoding(AMapToData map, AOffset off, int zeroCount, int[] counts, int nRows) {
+		this.map = map;
+		this.off = off;
+		this.zeroCount = zeroCount;
+		this.counts = counts;
+		this.nRows = nRows;
+
+		// final int u = getUnique();
+		// for(int i = 0; i < map.size();i ++){
+		// 	if(map.getIndex(i) > u){
+		// 		throw new DMLCompressionException("Invalid allocation");
+		// 	}
+		// }
+	}
+
+	@Override
+	public IEncode join(IEncode e) {
+		if(e instanceof EmptyEncoding || e instanceof ConstEncoding)
+			return this;
+		else if(e instanceof SparseEncoding)
+			return joinSparse((SparseEncoding) e);
+		else
+			return ((DenseEncoding) e).joinSparse(this);
+	}
+
+	protected IEncode joinSparse(SparseEncoding e) {
+		if(e.map == map && e.off == off)
+			return this; // unlikely to happen but cheap to compute therefore this skip is included.
+
+		final long maxUnique = (long) e.getUnique() * getUnique();
+		if(maxUnique > (long) Integer.MAX_VALUE)
+			throw new DMLCompressionException(
+				"Joining impossible using linearized join, since each side has a large number of unique values");
+
+		final int[] d = new int[(int) maxUnique - 1];
+
+		// We need at least this size of offsets, but i don't know if i need more.
+		final IntArrayList retOff = new IntArrayList(Math.max(e.size(), this.size()));
+		final IntArrayList tmpVals = new IntArrayList(Math.max(e.size(), this.size()));
+
+		final int fl = off.getOffsetToLast();
+		final int fr = e.off.getOffsetToLast();
+		final AIterator itl = off.getIterator();
+		final AIterator itr = e.off.getIterator();
+
+		final int nVl = getUnique();
+		final int nVr = e.getUnique();
+		final int defR = (nVr - 1) * nVl;
+		final int defL = nVl - 1;
+
+
+		boolean doneL = false;
+		boolean doneR = false;
+		int newUID = 1;
+		while(true) {
+			final int il = itl.value();
+			final int ir = itr.value();
+			if(il == ir) {
+				// Both sides have a value.
+				final int nv = map.getIndex(itl.getDataIndex()) + e.map.getIndex(itr.getDataIndex()) * nVl;
+				newUID = addVal(nv, il, d, newUID, tmpVals, retOff);
+				if(il >= fl || ir >= fr) {
+					if(il < fl)
+						itl.next();
+					else
+						doneL = true;
+					if(ir < fr)
+						itr.next();
+					else
+						doneR = true;
+					break;
+				}
+				// both sides.
+				itl.next();
+				itr.next();
+			}
+			else if(il < ir) {
+				// left side have a value before right
+				final int nv = map.getIndex(itl.getDataIndex()) + defR;
+				newUID = addVal(nv, il, d, newUID, tmpVals, retOff);
+				if(il >= fl) {
+					doneL = true;
+					break;
+				}
+				itl.next();
+			}
+			else {
+				// right side have a value before left
+				final int nv = e.map.getIndex(itr.getDataIndex()) * nVl + defL;
+				newUID = addVal(nv, ir, d, newUID, tmpVals, retOff);
+				if(ir >= fr) {
+					doneR = true;
+					break;
+				}
+				itr.next();
+			}
+		}
+
+		// process stragglers
+		if(!doneL) { // If there is stragglers in the left side
+			while(true) {
+				final int il = itl.value();
+				final int ir = itr.value();
+				int nv;
+				if(ir == il)
+					nv = map.getIndex(itl.getDataIndex()) + e.map.getIndex(itr.getDataIndex()) * nVl;
+				else
+					nv = map.getIndex(itl.getDataIndex()) + defR;
+				newUID = addVal(nv, il, d, newUID, tmpVals, retOff);
+				if(il >= fl)
+					break;
+				itl.next();
+			}
+		}
+		else if(!doneR) {// If there is stragglers in the right side
+			while(true) {
+				final int il = itl.value();
+				final int ir = itr.value();
+				int nv;
+				if(ir == il)
+					nv = map.getIndex(itl.getDataIndex()) + e.map.getIndex(itr.getDataIndex()) * nVl;
+				else
+					nv = e.map.getIndex(itr.getDataIndex()) * nVl + defL;
+
+				newUID = addVal(nv, ir, d, newUID, tmpVals, retOff);
+				if(ir >= fr)
+					break;
+				itr.next();
+			}
+		}
+
+		if(retOff.size() < nRows * 0.4) {
+			final AOffset o = OffsetFactory.createOffset(retOff);
+			final AMapToData retMap = MapToFactory.create(tmpVals.size(), tmpVals.extractValues(), newUID);
+			return new SparseEncoding(retMap, o, nRows - retOff.size(),
+				retMap.getCounts(new int[newUID - 1], retOff.size()), nRows);
+		}
+		else {
+			final AMapToData retMap = MapToFactory.create(nRows, newUID);
+			retMap.fill(newUID - 1);
+			for(int i = 0; i < retOff.size(); i++)
+				retMap.set(retOff.get(i), tmpVals.get(i));
+
+			// add values.
+			IEncode ret = new DenseEncoding(retMap);
+			return ret;
+		}
+	}
+
+	private static int addVal(int nv, int offset, int[] d, int newUID, IntArrayList tmpVals, IntArrayList offsets) {
+		final int mapV = d[nv];
+		if(mapV == 0) {
+			tmpVals.appendValue(newUID - 1);
+			d[nv] = newUID++;
+		}
+		else
+			tmpVals.appendValue(mapV - 1);
+		offsets.appendValue(offset);
+		return newUID;
+	}
+
+	@Override
+	public int getUnique() {
+		return counts.length + 1;
+	}
+
+	@Override
+	public int size() {
+		return map.size();
+	}
+
+	@Override
+	public int[] getCounts() {
+		return counts;
+	}
+
+	@Override
+	public EstimationFactors computeSizeEstimation(int[] cols, int nRows, double tupleSparsity, double matrixSparsity) {
+		final int largestOffs = nRows - map.size(); // known largest off is zero tuples
+		tupleSparsity = Math.min((double) map.size() / (double) nRows, tupleSparsity);
+		return new EstimationFactors(cols.length, counts.length, map.size(), largestOffs, counts, 0, 0, nRows, false,
+			true, matrixSparsity, tupleSparsity);
+	}
+
+	@Override
+	public String toString() {
+		StringBuilder sb = new StringBuilder();
+		sb.append(this.getClass().getSimpleName());
+		sb.append("\n");
+		sb.append("mapping: ");
+		sb.append(map);
+		sb.append("\n");
+		sb.append("offsets: ");
+		sb.append(off);
+		sb.append("\n");
+		sb.append("counts:  ");
+		sb.append(Arrays.toString(counts));
+		return sb.toString();
+	}
+}
diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibBinaryCellOp.java b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibBinaryCellOp.java
index d8f582d..7fb0118 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibBinaryCellOp.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibBinaryCellOp.java
@@ -299,7 +299,7 @@ public class CLALibBinaryCellOp {
 		// apply overlap
 		if(smallestSize == Integer.MAX_VALUE) {
 			// if there was no smallest colgroup
-			ADictionary newDict = new MatrixBlockDictionary(m2);
+			ADictionary newDict = new MatrixBlockDictionary(m2, nCol);
 			newColGroups.add(ColGroupFactory.genColGroupConst(nCol, newDict));
 		}
 		else {
diff --git a/src/main/java/org/apache/sysds/runtime/compress/utils/DblArrayCountHashMap.java b/src/main/java/org/apache/sysds/runtime/compress/utils/DblArrayCountHashMap.java
index 7ccbf95..bf76873 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/utils/DblArrayCountHashMap.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/utils/DblArrayCountHashMap.java
@@ -125,6 +125,31 @@ public class DblArrayCountHashMap {
 		return ret;
 	}
 
+	public int[] getUnorderedCountsAndReplaceWithUIDs() {
+		final int[] counts = new int[_size];
+		int i = 0;
+		for(Bucket e : _data)
+			while(e != null) {
+				counts[i] = e.v.count;
+				e.v.count = i++;
+				e = e.n;
+			}
+
+		return counts;
+	}
+
+	public int[] getUnorderedCountsAndReplaceWithUIDsWithExtraCell() {
+		final int[] counts = new int[_size + 1];
+		int i = 0;
+		for(Bucket e : _data)
+			while(e != null) {
+				counts[i] = e.v.count;
+				e.v.count = i++;
+				e = e.n;
+			}
+		return counts;
+	}
+
 	private void resize() {
 		// check for integer overflow on resize
 		if(_data.length > Integer.MAX_VALUE / RESIZE_FACTOR)
diff --git a/src/main/java/org/apache/sysds/runtime/compress/utils/DoubleCountHashMap.java b/src/main/java/org/apache/sysds/runtime/compress/utils/DoubleCountHashMap.java
index 4c7f21f..4a83b42 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/utils/DoubleCountHashMap.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/utils/DoubleCountHashMap.java
@@ -19,7 +19,6 @@
 
 package org.apache.sysds.runtime.compress.utils;
 
-import java.util.ArrayList;
 import java.util.Arrays;
 
 import org.apache.commons.logging.Log;
@@ -66,7 +65,7 @@ public class DoubleCountHashMap {
 		int hash = hash(key);
 		int ix = indexFor(hash, _data.length);
 		Bucket l = _data[ix];
-		while(l != null && !(l.v.key == key)){
+		while(l != null && !(l.v.key == key)) {
 			hashMissCount++;
 			l = l.n;
 		}
@@ -100,11 +99,23 @@ public class DoubleCountHashMap {
 		return l.v.count;
 	}
 
-	public ArrayList<DCounts> extractValues() {
-		ArrayList<DCounts> ret = new ArrayList<>(_size);
-		for(Bucket e : _data){
+	public int getOrDefault(double key, int def){
+		int hash = hash(key);
+		int ix = indexFor(hash, _data.length);
+		Bucket l = _data[ix];
+		while(l != null && !(l.v.key == key))
+			l = l.n;
+		if (l == null)
+			return def;
+		return l.v.count;
+	}
+
+	public DCounts[] extractValues() {
+		DCounts[] ret = new DCounts[_size];
+		int i = 0;
+		for(Bucket e : _data) {
 			while(e != null) {
-				ret.add(e.v);
+				ret[i++] = e.v;
 				e = e.n;
 			}
 		}
@@ -112,6 +123,48 @@ public class DoubleCountHashMap {
 		return ret;
 	}
 
+	public int[] getUnorderedCountsAndReplaceWithUIDs() {
+		final int[] counts = new int[_size];
+		int i = 0;
+		for(Bucket e : _data)
+			while(e != null) {
+				counts[i] = e.v.count;
+				e.v.count = i++;
+				e = e.n;
+			}
+
+		return counts;
+	}
+
+	public int[] getUnorderedCountsAndReplaceWithUIDsWithout0(){
+		final int[] counts = new int[_size - 1];
+		int i = 0;
+		for(Bucket e : _data){
+			while(e != null) {
+				if(e.v.key != 0){
+					counts[i] = e.v.count;
+					e.v.count = i++;
+				}
+				e = e.n;
+			}
+		}
+
+		return counts;
+	}
+
+	// public int[] getUnorderedCountsAndReplaceWithUIDsWithExtraCell() {
+	// 	final int[] counts = new int[_size + 1];
+	// 	int i = 0;
+	// 	for(Bucket e : _data)
+	// 		while(e != null) {
+	// 			counts[i] = e.v.count;
+	// 			e.v.count = i++;
+	// 			e = e.n;
+	// 		}
+
+	// 	return counts;
+	// }
+
 	private void resize() {
 		// check for integer overflow on resize
 		if(_data.length > Integer.MAX_VALUE / RESIZE_FACTOR)
@@ -177,7 +230,7 @@ public class DoubleCountHashMap {
 		return sb.toString();
 	}
 
-	public void reset(int size){
+	public void reset(int size) {
 		int p2 = Util.getPow2(size);
 		if(_data.length > 2 * p2)
 			_data = new Bucket[p2];
diff --git a/src/main/java/org/apache/sysds/runtime/compress/utils/IntArrayList.java b/src/main/java/org/apache/sysds/runtime/compress/utils/IntArrayList.java
index f5c4e7d..a78e73d 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/utils/IntArrayList.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/utils/IntArrayList.java
@@ -21,25 +21,21 @@ package org.apache.sysds.runtime.compress.utils;
 
 import java.util.Arrays;
 
-/**
- * This class provides a memory-efficient replacement for {@code ArrayList<Integer>} for restricted use cases.
- */
 public class IntArrayList {
 	private static final int INIT_CAPACITY = 4;
 	private static final int RESIZE_FACTOR = 2;
 
 	private int[] _data = null;
 	private int _size;
-	private int _val0;
 
 	public IntArrayList() {
 		_data = null;
 		_size = 0;
 	}
 
-	public IntArrayList(int value) {
-		this();
-		appendValue(value);
+	public IntArrayList(int initialSize) {
+		_data = new int[initialSize];
+		_size = 0;
 	}
 
 	public IntArrayList(int[] values) {
@@ -52,17 +48,9 @@ public class IntArrayList {
 	}
 
 	public void appendValue(int value) {
-		// embedded value (no array allocation)
-		if(_size == 0) {
-			_val0 = value;
-			_size = 1;
-			return;
-		}
-
 		// allocate or resize array if necessary
 		if(_data == null) {
 			_data = new int[INIT_CAPACITY];
-			_data[0] = _val0;
 		}
 		else if(_size + 1 >= _data.length)
 			resize();
@@ -79,17 +67,12 @@ public class IntArrayList {
 	 * @return integer array of offsets, the physical array length may be larger than the length of the offset list
 	 */
 	public int[] extractValues() {
-		if(_size == 1)
-			return new int[] {_val0};
-		else
-			return _data;
+		return _data;
 	}
 
 	public int get(int index) {
 		if(_data != null)
 			return _data[index];
-		else if(index == 0)
-			return _val0;
 		else
 			throw new RuntimeException("invalid index to get");
 	}
@@ -112,16 +95,13 @@ public class IntArrayList {
 	public String toString() {
 		StringBuilder sb = new StringBuilder();
 
-		if(_size == 1)
-			sb.append(_val0);
-		else {
-			sb.append("[");
-			int i = 0;
-			for(; i < _size - 1; i++)
-				sb.append(_data[i] + ",");
+		sb.append("[");
+		int i = 0;
+		for(; i < _size - 1; i++)
+			sb.append(_data[i] + ",");
+
+		sb.append(_data[i] + "]");
 
-			sb.append(_data[i] + "]");
-		}
 		return sb.toString();
 	}
 }
diff --git a/src/test/java/org/apache/sysds/test/component/compress/CompressedTestBase.java b/src/test/java/org/apache/sysds/test/component/compress/CompressedTestBase.java
index 5d201c0..aef68e6 100644
--- a/src/test/java/org/apache/sysds/test/component/compress/CompressedTestBase.java
+++ b/src/test/java/org/apache/sysds/test/component/compress/CompressedTestBase.java
@@ -43,18 +43,15 @@ import org.apache.sysds.runtime.compress.CompressedMatrixBlockFactory;
 import org.apache.sysds.runtime.compress.CompressionSettings;
 import org.apache.sysds.runtime.compress.CompressionSettingsBuilder;
 import org.apache.sysds.runtime.compress.CompressionStatistics;
-import org.apache.sysds.runtime.compress.bitmap.ABitmap;
-import org.apache.sysds.runtime.compress.bitmap.BitmapEncoder;
 import org.apache.sysds.runtime.compress.cocode.CoCoderFactory.PartitionerType;
 import org.apache.sysds.runtime.compress.colgroup.AColGroup;
 import org.apache.sysds.runtime.compress.colgroup.AColGroup.CompressionType;
 import org.apache.sysds.runtime.compress.colgroup.ColGroupFactory;
 import org.apache.sysds.runtime.compress.colgroup.ColGroupUncompressed;
 import org.apache.sysds.runtime.compress.cost.CostEstimatorFactory.CostType;
-import org.apache.sysds.runtime.compress.estim.CompressedSizeEstimator;
+import org.apache.sysds.runtime.compress.estim.CompressedSizeEstimatorFactory;
 import org.apache.sysds.runtime.compress.estim.CompressedSizeInfo;
 import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup;
-import org.apache.sysds.runtime.compress.estim.EstimationFactors;
 import org.apache.sysds.runtime.compress.lib.CLALibAppend;
 import org.apache.sysds.runtime.functionobjects.Builtin;
 import org.apache.sysds.runtime.functionobjects.Builtin.BuiltinCode;
@@ -170,10 +167,8 @@ public abstract class CompressedTestBase extends TestBase {
 						colIndexes[x] = y;
 					}
 
-					ABitmap ubm = BitmapEncoder.extractBitmap(colIndexes, mb, false, 8, true);
-					EstimationFactors ef = CompressedSizeEstimator.estimateCompressedColGroupSize(ubm, colIndexes,
-						mb.getNumRows(), cs);
-					CompressedSizeInfoColGroup cgi = new CompressedSizeInfoColGroup(colIndexes, ef, c);
+					CompressedSizeInfoColGroup cgi = CompressedSizeEstimatorFactory.getSizeEstimator(mb, cs, _k)
+						.estimateCompressedColGroupSize(colIndexes);
 					CompressedSizeInfo csi = new CompressedSizeInfo(cgi);
 					for(AColGroup cg : ColGroupFactory.compressColGroups(mb, csi, cs, 1))
 						colGroups.add(cg);
@@ -455,6 +450,13 @@ public abstract class CompressedTestBase extends TestBase {
 		testLeftMatrixMatrix(matrix);
 	}
 
+
+	@Test
+	public void testLeftMatrixMatrixMultConst() {
+		MatrixBlock matrix = TestUtils.generateTestMatrixBlock(3, rows, 1.0, 1.0, 1.0, 3);
+		testLeftMatrixMatrix(matrix);
+	}
+
 	@Test
 	public void testLeftMatrixMatrixMultSparse() {
 		MatrixBlock matrix = TestUtils.generateTestMatrixBlock(2, rows, 0.9, 1.5, .1, 3);
@@ -881,14 +883,14 @@ public abstract class CompressedTestBase extends TestBase {
 			compareResultMatrices(ucRet, ret2, 2);
 		}
 		catch(NotImplementedException e) {
-			if(! printedErrorForNotImplementedTestBinaryVMPlus.get()){
+			if(!printedErrorForNotImplementedTestBinaryVMPlus.get()) {
 				LOG.error("Failed Binary VM Plus: " + e.getMessage());
 				printedErrorForNotImplementedTestBinaryVMPlus.set(true);
 			}
 		}
 		catch(Exception e) {
 			if(e.getCause() instanceof ExecutionException && e.getCause().getCause() instanceof NotImplementedException) {
-				if(! printedErrorForNotImplementedTestBinaryVMPlus.get()){
+				if(!printedErrorForNotImplementedTestBinaryVMPlus.get()) {
 					LOG.error("Failed Binary VM Plus: " + e.getMessage());
 					printedErrorForNotImplementedTestBinaryVMPlus.set(true);
 				}
@@ -900,7 +902,6 @@ public abstract class CompressedTestBase extends TestBase {
 		}
 	}
 
-
 	public void testBinaryMV(ValueFunction vf, MatrixBlock matrix) {
 		testBinaryMV(vf, matrix, true);
 	}
diff --git a/src/test/java/org/apache/sysds/test/component/compress/CompressibleInputGenerator.java b/src/test/java/org/apache/sysds/test/component/compress/CompressibleInputGenerator.java
index 963b956..4a4aab9 100644
--- a/src/test/java/org/apache/sysds/test/component/compress/CompressibleInputGenerator.java
+++ b/src/test/java/org/apache/sysds/test/component/compress/CompressibleInputGenerator.java
@@ -28,6 +28,7 @@ import org.apache.commons.lang3.NotImplementedException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sysds.runtime.compress.colgroup.AColGroup.CompressionType;
+import org.apache.sysds.runtime.data.SparseBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 
 /**
@@ -42,6 +43,11 @@ public class CompressibleInputGenerator {
 		return getInputDoubleMatrix(rows, cols, ct, nrUnique, 1000000, -1000000, sparsity, seed, false);
 	}
 
+	public static MatrixBlock getInput(int rows, int cols, CompressionType ct, int nrUnique, double sparsity, int seed,
+		boolean transposed) {
+		return getInputDoubleMatrix(rows, cols, ct, nrUnique, 1000000, -1000000, sparsity, seed, transposed);
+	}
+
 	public static MatrixBlock getInput(int rows, int cols, CompressionType ct, int nrUnique, int max, int min,
 		double sparsity, int seed) {
 		return getInputDoubleMatrix(rows, cols, ct, nrUnique, max, min, sparsity, seed, false);
@@ -149,23 +155,37 @@ public class CompressibleInputGenerator {
 			}
 		}
 	}
-
+	
 	private static void ole(MatrixBlock output, int nrUnique, int max, int min, double sparsity, int seed,
-		boolean transpose) {
-
+	boolean transpose) {
+		
 		// chose some random values
-		final Random r = new Random(seed);
-		final List<Double> values = getNRandomValues(nrUnique, r, max, min);
+			final Random r = new Random(seed);
+			final List<Double> values = getNRandomValues(nrUnique, r, max, min);
+		if(transpose && output.isInSparseFormat() && output.getNumRows() == 1){
+			int nV = (int)Math.round((double)output.getNumColumns() * sparsity);
+
+			for(int i = 0 ; i < nV; i ++){
+				double d =  values.get(r.nextInt(nrUnique));
+				output.appendValue(0, r.nextInt(output.getNumColumns()), d);
+			}
+			output.getSparseBlock().sort();
+			return;
+		}
+
 
 		final int cols = transpose ? output.getNumRows() : output.getNumColumns();
 		final int rows = transpose ? output.getNumColumns() : output.getNumRows();
 
 		// Generate the first column.
 		for(int x = 0; x < rows; x++) {
-			if(transpose)
-				output.setValue(0, x, values.get(r.nextInt(nrUnique)));
+			double d =  values.get(r.nextInt(nrUnique));
+			if(transpose && output.isInSparseFormat())
+				output.appendValue(0, x, d);
+			else if(transpose)
+				output.setValue(0, x, d);
 			else
-				output.setValue(x, 0, values.get(r.nextInt(nrUnique)));
+				output.setValue(x, 0, d);
 		}
 
 		int diff = max - min;
@@ -174,24 +194,43 @@ public class CompressibleInputGenerator {
 		for(int y = 1; y < cols; y++) {
 			for(int x = 0; x < rows; x++) {
 				if(r.nextDouble() < sparsity) {
-					if(transpose) {
+					if(transpose && output.isInSparseFormat()){
 						int v = (int) (output.getValue(0, x) * (double) y);
-						output.setValue(y, x, Math.abs(v % ((int) (diff))) + min);
+						double d = Math.abs(v % ((int) (diff))) + min;
+						output.appendValue(y, x, d);
+					}
+					else if(transpose) {
+						int v = (int) (output.getValue(0, x) * (double) y);
+						double d = Math.abs(v % ((int) (diff))) + min;
+						output.setValue(y, x, d);
 					}
 					else {
 						int v = (int) (output.getValue(x, 0) * (double) y);
-						output.setValue(x, y, Math.abs(v % ((int) (diff))) + min);
+						double d = Math.abs(v % ((int) (diff))) + min;
+						output.setValue(x, y, d);
 					}
 				}
 			}
 		}
 
-		for(int x = 0; x < rows; x++) {
-			if(r.nextDouble() > sparsity) {
-				if(transpose)
-					output.setValue(0, x, 0);
-				else
-					output.setValue(x, 0, 0);
+		if(transpose && output.isInSparseFormat()){
+			SparseBlock sb = output.getSparseBlock();
+			double[] r0 = sb.values(0);
+			for(int i = 0; i < r0.length; i++){
+				if(r.nextDouble() > sparsity) {
+					r0[i] = 0;
+				}
+			}
+			sb.get(0).compact();
+		}
+		else{
+			for(int x = 0; x < rows; x++) {
+				if(r.nextDouble() > sparsity) {
+					if(transpose)
+						output.setValue(0, x, 0);
+					else
+						output.setValue(x, 0, 0);
+				}
 			}
 		}
 
diff --git a/src/test/java/org/apache/sysds/test/component/compress/colgroup/JolEstimateOLETest.java b/src/test/java/org/apache/sysds/test/component/compress/colgroup/JolEstimateOLETest.java
index 22385cd..cc64469 100644
--- a/src/test/java/org/apache/sysds/test/component/compress/colgroup/JolEstimateOLETest.java
+++ b/src/test/java/org/apache/sysds/test/component/compress/colgroup/JolEstimateOLETest.java
@@ -24,9 +24,6 @@ import java.util.Collection;
 
 import org.apache.sysds.runtime.compress.colgroup.AColGroup.CompressionType;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
-import org.apache.sysds.runtime.util.DataConverter;
-import org.apache.sysds.test.TestUtils;
-import org.apache.sysds.test.component.compress.CompressibleInputGenerator;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
@@ -38,93 +35,93 @@ public class JolEstimateOLETest extends JolEstimateTest {
 	public static Collection<Object[]> data() {
 		ArrayList<Object[]> tests = new ArrayList<>();
 
-		MatrixBlock mb;
+		// MatrixBlock mb;
 		// base tests
-		mb = DataConverter.convertToMatrixBlock(new double[][] {{1}});
-		tests.add(new Object[] {mb});
-		mb = DataConverter.convertToMatrixBlock(new double[][] {{0}});
-		tests.add(new Object[] {mb});
-		mb = DataConverter.convertToMatrixBlock(new double[][] {{0, 0, 0, 0, 0}});
-		tests.add(new Object[] {mb});
-
-		// The size of the compression increase at repeated values.
-		mb = DataConverter.convertToMatrixBlock(new double[][] {{0, 0, 0, 0, 5, 0}});
-		tests.add(new Object[] {mb});
-		mb = DataConverter.convertToMatrixBlock(new double[][] {{0, 0, 0, 0, 5, 5, 0}});
-		tests.add(new Object[] {mb});
-		mb = DataConverter.convertToMatrixBlock(new double[][] {{0, 0, 0, 0, 5, 5, 5, 0}});
-		tests.add(new Object[] {mb});
-		mb = DataConverter.convertToMatrixBlock(new double[][] {{0, 0, 0, 0, 5, 5, 5, 5, 5, 5}});
-		tests.add(new Object[] {mb});
-
-		// all values grow by 1 if new value is introduced
-		mb = DataConverter.convertToMatrixBlock(new double[][] {{0, 0, 0, 0, 5, 7, 0}});
-		tests.add(new Object[] {mb});
-		mb = DataConverter.convertToMatrixBlock(new double[][] {{0, 0, 0, 0, 5, 2, 1, 0}});
-		tests.add(new Object[] {mb});
-		mb = DataConverter.convertToMatrixBlock(new double[][] {{0, 0, 0, 0, 5, 2, 1, 3, 6, 7}});
-		tests.add(new Object[] {mb});
-
-		// Dense random... Horrible compression at full precision
-		mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1, 100, 0, 100, 1.0, 7));
-		tests.add(new Object[] {mb});
-		mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1, 1000, 0, 100, 1.0, 7));
-		tests.add(new Object[] {mb});
-		mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1, 10000, 0, 100, 1.0, 7));
-		tests.add(new Object[] {mb});
+		// mb = DataConverter.convertToMatrixBlock(new double[][] {{1}});
+		// tests.add(new Object[] {mb});
+		// mb = DataConverter.convertToMatrixBlock(new double[][] {{0}});
+		// tests.add(new Object[] {mb});
+		// mb = DataConverter.convertToMatrixBlock(new double[][] {{0, 0, 0, 0, 0}});
+		// tests.add(new Object[] {mb});
+
+		// // The size of the compression increase at repeated values.
+		// mb = DataConverter.convertToMatrixBlock(new double[][] {{0, 0, 0, 0, 5, 0}});
+		// tests.add(new Object[] {mb});
+		// mb = DataConverter.convertToMatrixBlock(new double[][] {{0, 0, 0, 0, 5, 5, 0}});
+		// tests.add(new Object[] {mb});
+		// mb = DataConverter.convertToMatrixBlock(new double[][] {{0, 0, 0, 0, 5, 5, 5, 0}});
+		// tests.add(new Object[] {mb});
+		// mb = DataConverter.convertToMatrixBlock(new double[][] {{0, 0, 0, 0, 5, 5, 5, 5, 5, 5}});
+		// tests.add(new Object[] {mb});
+
+		// // all values grow by 1 if new value is introduced
+		// mb = DataConverter.convertToMatrixBlock(new double[][] {{0, 0, 0, 0, 5, 7, 0}});
+		// tests.add(new Object[] {mb});
+		// mb = DataConverter.convertToMatrixBlock(new double[][] {{0, 0, 0, 0, 5, 2, 1, 0}});
+		// tests.add(new Object[] {mb});
+		// mb = DataConverter.convertToMatrixBlock(new double[][] {{0, 0, 0, 0, 5, 2, 1, 3, 6, 7}});
+		// tests.add(new Object[] {mb});
+
+		// // Dense random... Horrible compression at full precision
+		// mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1, 100, 0, 100, 1.0, 7));
+		// tests.add(new Object[] {mb});
+		// mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1, 1000, 0, 100, 1.0, 7));
+		// tests.add(new Object[] {mb});
+		// mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1, 10000, 0, 100, 1.0, 7));
+		// tests.add(new Object[] {mb});
 
 		// Random rounded numbers dense
-		mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1, 1523, 0, 99, 1.0, 7));
-		tests.add(new Object[] {mb});
-		mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1, 4000, 0, 100, 1.0, 7));
-		tests.add(new Object[] {mb});
-
-		// Sparse rounded numbers
-		mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1, 1523, 0, 99, 0.1, 7));
-		tests.add(new Object[] {mb});
-		mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1, 1621, 0, 99, 0.1, 142));
-		tests.add(new Object[] {mb});
-		mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1, 2321, 0, 99, 0.1, 512));
-		tests.add(new Object[] {mb});
-		mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1, 4000, 0, 100, 0.1, 7));
-		tests.add(new Object[] {mb});
-
-		mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1, 1523, 0, 99, 0.5, 7));
-		tests.add(new Object[] {mb});
-		mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1, 1621, 0, 99, 0.5, 142));
-		tests.add(new Object[] {mb});
-		mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1, 2321, 0, 99, 0.5, 512));
-		tests.add(new Object[] {mb});
-		mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1, 4000, 0, 100, 0.5, 7));
-		tests.add(new Object[] {mb});
+		// mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1, 1523, 0, 99, 1.0, 7));
+		// tests.add(new Object[] {mb});
+		// mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1, 4000, 0, 100, 1.0, 7));
+		// tests.add(new Object[] {mb});
+
+		// // Sparse rounded numbers
+		// mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1, 1523, 0, 99, 0.1, 7));
+		// tests.add(new Object[] {mb});
+		// mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1, 1621, 0, 99, 0.1, 142));
+		// tests.add(new Object[] {mb});
+		// mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1, 2321, 0, 99, 0.1, 512));
+		// tests.add(new Object[] {mb});
+		// mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1, 4000, 0, 100, 0.1, 7));
+		// tests.add(new Object[] {mb});
+
+		// mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1, 1523, 0, 99, 0.5, 7));
+		// tests.add(new Object[] {mb});
+		// mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1, 1621, 0, 99, 0.5, 142));
+		// tests.add(new Object[] {mb});
+		// mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1, 2321, 0, 99, 0.5, 512));
+		// tests.add(new Object[] {mb});
+		// mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1, 4000, 0, 100, 0.6, 7));
+		// tests.add(new Object[] {mb});
 
 		// Paper
-		mb = DataConverter
-			.convertToMatrixBlock(new double[][] {{7, 3, 7, 7, 3, 7, 3, 3, 7, 3}, {6, 4, 6, 5, 4, 5, 4, 4, 6, 4}});
-		tests.add(new Object[] {mb});
-
-		// Dream Inputs
-		int[] cols = new int[] {2, 6, 111};
-		int[] rows = new int[] {10, 121, 513};
-		int[] unique = new int[] {3, 5};
-		for(int y : cols) {
-			for(int x : rows) {
-				for(int u : unique) {
-					mb = CompressibleInputGenerator.getInput(x, y, CompressionType.OLE, u, 1.0, 5);
-					tests.add(new Object[] {mb});
-				}
-			}
-		}
-
-		// Sparse test.
-		mb = CompressibleInputGenerator.getInput(571, 1, CompressionType.OLE, 40, 0.6, 5);
-		tests.add(new Object[] {mb});
-
-		// Multi Columns
-		mb = CompressibleInputGenerator.getInput(412,5,CompressionType.OLE, 20, 0.4, 5);
-		tests.add(new Object[] {mb});
-		mb = CompressibleInputGenerator.getInput(1000,5,CompressionType.OLE, 20, 0.4, 5);
-		tests.add(new Object[] {mb});
+		// mb = DataConverter
+		// .convertToMatrixBlock(new double[][] {{7, 3, 7, 7, 3, 7, 3, 3, 7, 3}, {6, 4, 6, 5, 4, 5, 4, 4, 6, 4}});
+		// tests.add(new Object[] {mb});
+
+		// // Dream Inputs
+		// int[] cols = new int[] {2, 6, 111};
+		// int[] rows = new int[] {10, 121, 513};
+		// int[] unique = new int[] {3, 5};
+		// for(int y : cols) {
+		// for(int x : rows) {
+		// for(int u : unique) {
+		// mb = CompressibleInputGenerator.getInput(x, y, CompressionType.OLE, u, 1.0, 5);
+		// tests.add(new Object[] {mb});
+		// }
+		// }
+		// }
+
+		// // Sparse test.
+		// mb = CompressibleInputGenerator.getInput(571, 1, CompressionType.OLE, 40, 0.6, 5);
+		// tests.add(new Object[] {mb});
+
+		// // Multi Columns
+		// mb = CompressibleInputGenerator.getInput(412,5,CompressionType.OLE, 20, 0.4, 5);
+		// tests.add(new Object[] {mb});
+		// mb = CompressibleInputGenerator.getInput(1000,5,CompressionType.OLE, 20, 0.4, 5);
+		// tests.add(new Object[] {mb});
 
 		return tests;
 	}
@@ -137,4 +134,4 @@ public class JolEstimateOLETest extends JolEstimateTest {
 	public CompressionType getCT() {
 		return ole;
 	}
-}
\ No newline at end of file
+}
diff --git a/src/test/java/org/apache/sysds/test/component/compress/colgroup/JolEstimateRLETest.java b/src/test/java/org/apache/sysds/test/component/compress/colgroup/JolEstimateRLETest.java
index f6b4c10..d803a3d 100644
--- a/src/test/java/org/apache/sysds/test/component/compress/colgroup/JolEstimateRLETest.java
+++ b/src/test/java/org/apache/sysds/test/component/compress/colgroup/JolEstimateRLETest.java
@@ -25,9 +25,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.sysds.runtime.compress.colgroup.AColGroup.CompressionType;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
-import org.apache.sysds.runtime.util.DataConverter;
-import org.apache.sysds.test.TestUtils;
-import org.apache.sysds.test.component.compress.CompressibleInputGenerator;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
@@ -39,113 +36,113 @@ public class JolEstimateRLETest extends JolEstimateTest {
 	public static Collection<Object[]> data() {
 		ArrayList<Object[]> tests = new ArrayList<>();
 
-		MatrixBlock mb;
-		mb = DataConverter.convertToMatrixBlock(new double[][] {{1}});
-		tests.add(new Object[] {mb});
-
-		// The size of the compression is the same even at different numbers of repeated values.
-		mb = DataConverter.convertToMatrixBlock(new double[][] {{0, 0, 0, 0, 5, 0}});
-		tests.add(new Object[] {mb});
-		mb = DataConverter.convertToMatrixBlock(new double[][] {{0, 0, 0, 0, 5, 5, 0}});
-		tests.add(new Object[] {mb});
-		mb = DataConverter.convertToMatrixBlock(new double[][] {{0, 0, 0, 0, 5, 5, 5, 0}});
-		tests.add(new Object[] {mb});
-		mb = DataConverter.convertToMatrixBlock(new double[][] {{0, 0, 0, 0, 5, 5, 5, 5, 5, 5}});
-		tests.add(new Object[] {mb});
-
-		// Worst case all random numbers dense.
-		mb = TestUtils.generateTestMatrixBlock(1, 100, 0, 100, 1.0, 7);
-		tests.add(new Object[] {mb});
-		mb = TestUtils.generateTestMatrixBlock(1, 1000, 0, 100, 1.0, 7);
-		tests.add(new Object[] {mb});
-		mb = TestUtils.generateTestMatrixBlock(1, 10000, 0, 100, 1.0, 7);
-		tests.add(new Object[] {mb});
-
-		// Random rounded numbers dense
-		mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1, 1523, 0, 99, 1.0, 7));
-		tests.add(new Object[] {mb});
-		mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1, 4000, 0, 255, 1.0, 7));
-		tests.add(new Object[] {mb});
-
-		// Sparse rounded numbers
-		// Scale directly with sparsity
-		mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1, 1523, 0, 99, 0.1, 7));
-		tests.add(new Object[] {mb});
-		mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1, 1621, 0, 99, 0.1, 142));
-		tests.add(new Object[] {mb});
-		mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1, 2321, 0, 99, 0.1, 512));
-		tests.add(new Object[] {mb});
-		mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1, 4000, 0, 255, 0.1, 7));
-		tests.add(new Object[] {mb});
-
-		// Medium sparsity
-		mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1, 1523, 0, 99, 0.5, 7));
-		tests.add(new Object[] {mb});
-		mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1, 1621, 0, 99, 0.5, 142));
-		tests.add(new Object[] {mb});
-		mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1, 2321, 0, 99, 0.5, 512));
-		tests.add(new Object[] {mb});
-		mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1, 4000, 0, 255, 0.5, 7));
-		tests.add(new Object[] {mb});
-
-		// Dream inputs.
-		// 1 unique value
-		mb = CompressibleInputGenerator.getInput(10000, 1, CompressionType.RLE, 1, 1.0, 132);
-		tests.add(new Object[] {mb});
-
-		// when the rows length is larger than overflowing the character value,
-		// the run gets split into two
-		// char overflows into the next position increasing size by 1 char.
-		int charMax = Character.MAX_VALUE;
-		mb = CompressibleInputGenerator.getInput(charMax, 1, CompressionType.RLE, 1, 1.0, 132);
-		tests.add(new Object[] {mb});
-		mb = CompressibleInputGenerator.getInput(charMax + 1, 1, CompressionType.RLE, 1, 1.0, 132);
-		tests.add(new Object[] {mb});
-		mb = CompressibleInputGenerator.getInput(charMax * 2 + 1, 1, CompressionType.RLE, 1, 1.0, 132);
-		tests.add(new Object[] {mb});
-
-		// 10 unique values ordered such that all 10 instances is in the same run.
-		// Results in same size no matter the number of original rows.
-		mb = CompressibleInputGenerator.getInput(100, 1, CompressionType.RLE, 10, 1.0, 1);
-		tests.add(new Object[] {mb});
-		mb = CompressibleInputGenerator.getInput(1000, 1, CompressionType.RLE, 10, 1.0, 1312);
-		tests.add(new Object[] {mb});
-		mb = CompressibleInputGenerator.getInput(10000, 1, CompressionType.RLE, 10, 1.0, 14512);
-		tests.add(new Object[] {mb});
-		mb = CompressibleInputGenerator.getInput(100000, 1, CompressionType.RLE, 10, 1.0, 132);
-		tests.add(new Object[] {mb});
-
-		// Sparse Dream inputs.
-		mb = CompressibleInputGenerator.getInput(100, 1, CompressionType.RLE, 10, 0.1, 1);
-		tests.add(new Object[] {mb});
-		mb = CompressibleInputGenerator.getInput(1000, 1, CompressionType.RLE, 10, 0.1, 1312);
-		tests.add(new Object[] {mb});
-		mb = CompressibleInputGenerator.getInput(10000, 1, CompressionType.RLE, 10, 0.1, 14512);
-		tests.add(new Object[] {mb});
-		mb = CompressibleInputGenerator.getInput(100000, 1, CompressionType.RLE, 10, 0.1, 132);
-		tests.add(new Object[] {mb});
-		mb = CompressibleInputGenerator.getInput(1000000, 1, CompressionType.RLE, 10, 0.1, 132);
-		tests.add(new Object[] {mb});
-
-		mb = CompressibleInputGenerator.getInput(1000000, 1, CompressionType.RLE, 1, 1.0, 132);
-		tests.add(new Object[] {mb});
-
-		// Multi Column
-		// two identical columns
-		mb = CompressibleInputGenerator.getInput(10, 2, CompressionType.RLE, 2, 1.0, 132);
-		tests.add(new Object[] {mb});
-
-		mb = CompressibleInputGenerator.getInput(10, 6, CompressionType.RLE, 2, 1.0, 132);
-		tests.add(new Object[] {mb});
-
-		mb = CompressibleInputGenerator.getInput(10, 100, CompressionType.RLE, 2, 1.0, 132);
-		tests.add(new Object[] {mb});
-
-		mb = CompressibleInputGenerator.getInput(101, 17, CompressionType.RLE, 2, 1.0, 132);
-		tests.add(new Object[] {mb});
-
-		mb = CompressibleInputGenerator.getInput(101, 17, CompressionType.RLE, 3, 1.0, 132);
-		tests.add(new Object[] {mb});
+		// MatrixBlock mb;
+		// mb = DataConverter.convertToMatrixBlock(new double[][] {{1}});
+		// tests.add(new Object[] {mb});
+
+		// // The size of the compression is the same even at different numbers of repeated values.
+		// mb = DataConverter.convertToMatrixBlock(new double[][] {{0, 0, 0, 0, 5, 0}});
+		// tests.add(new Object[] {mb});
+		// mb = DataConverter.convertToMatrixBlock(new double[][] {{0, 0, 0, 0, 5, 5, 0}});
+		// tests.add(new Object[] {mb});
+		// mb = DataConverter.convertToMatrixBlock(new double[][] {{0, 0, 0, 0, 5, 5, 5, 0}});
+		// tests.add(new Object[] {mb});
+		// mb = DataConverter.convertToMatrixBlock(new double[][] {{0, 0, 0, 0, 5, 5, 5, 5, 5, 5}});
+		// tests.add(new Object[] {mb});
+
+		// // Worst case all random numbers dense.
+		// mb = TestUtils.generateTestMatrixBlock(1, 100, 0, 100, 1.0, 7);
+		// tests.add(new Object[] {mb});
+		// mb = TestUtils.generateTestMatrixBlock(1, 1000, 0, 100, 1.0, 7);
+		// tests.add(new Object[] {mb});
+		// mb = TestUtils.generateTestMatrixBlock(1, 10000, 0, 100, 1.0, 7);
+		// tests.add(new Object[] {mb});
+
+		// // Random rounded numbers dense
+		// mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1, 1523, 0, 99, 1.0, 7));
+		// tests.add(new Object[] {mb});
+		// mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1, 4000, 0, 255, 1.0, 7));
+		// tests.add(new Object[] {mb});
+
+		// // Sparse rounded numbers
+		// // Scale directly with sparsity
+		// mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1, 1523, 0, 99, 0.1, 7));
+		// tests.add(new Object[] {mb});
+		// mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1, 1621, 0, 99, 0.1, 142));
+		// tests.add(new Object[] {mb});
+		// mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1, 2321, 0, 99, 0.1, 512));
+		// tests.add(new Object[] {mb});
+		// mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1, 4000, 0, 255, 0.1, 7));
+		// tests.add(new Object[] {mb});
+
+		// // Medium sparsity
+		// mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1, 1523, 0, 99, 0.5, 7));
+		// tests.add(new Object[] {mb});
+		// mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1, 1621, 0, 99, 0.5, 142));
+		// tests.add(new Object[] {mb});
+		// mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1, 2321, 0, 99, 0.5, 512));
+		// tests.add(new Object[] {mb});
+		// mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1, 4000, 0, 255, 0.5, 7));
+		// tests.add(new Object[] {mb});
+
+		// // Dream inputs.
+		// // 1 unique value
+		// mb = CompressibleInputGenerator.getInput(10000, 1, CompressionType.RLE, 1, 1.0, 132);
+		// tests.add(new Object[] {mb});
+
+		// // when the rows length is larger than overflowing the character value,
+		// // the run gets split into two
+		// // char overflows into the next position increasing size by 1 char.
+		// int charMax = Character.MAX_VALUE;
+		// mb = CompressibleInputGenerator.getInput(charMax, 1, CompressionType.RLE, 1, 1.0, 132);
+		// tests.add(new Object[] {mb});
+		// mb = CompressibleInputGenerator.getInput(charMax + 1, 1, CompressionType.RLE, 1, 1.0, 132);
+		// tests.add(new Object[] {mb});
+		// mb = CompressibleInputGenerator.getInput(charMax * 2 + 1, 1, CompressionType.RLE, 1, 1.0, 132);
+		// tests.add(new Object[] {mb});
+
+		// // 10 unique values ordered such that all 10 instances is in the same run.
+		// // Results in same size no matter the number of original rows.
+		// mb = CompressibleInputGenerator.getInput(100, 1, CompressionType.RLE, 10, 1.0, 1);
+		// tests.add(new Object[] {mb});
+		// mb = CompressibleInputGenerator.getInput(1000, 1, CompressionType.RLE, 10, 1.0, 1312);
+		// tests.add(new Object[] {mb});
+		// mb = CompressibleInputGenerator.getInput(10000, 1, CompressionType.RLE, 10, 1.0, 14512);
+		// tests.add(new Object[] {mb});
+		// mb = CompressibleInputGenerator.getInput(100000, 1, CompressionType.RLE, 10, 1.0, 132);
+		// tests.add(new Object[] {mb});
+
+		// // Sparse Dream inputs.
+		// mb = CompressibleInputGenerator.getInput(100, 1, CompressionType.RLE, 10, 0.1, 1);
+		// tests.add(new Object[] {mb});
+		// mb = CompressibleInputGenerator.getInput(1000, 1, CompressionType.RLE, 10, 0.1, 1312);
+		// tests.add(new Object[] {mb});
+		// mb = CompressibleInputGenerator.getInput(10000, 1, CompressionType.RLE, 10, 0.1, 14512);
+		// tests.add(new Object[] {mb});
+		// mb = CompressibleInputGenerator.getInput(100000, 1, CompressionType.RLE, 10, 0.1, 132);
+		// tests.add(new Object[] {mb});
+		// mb = CompressibleInputGenerator.getInput(1000000, 1, CompressionType.RLE, 10, 0.1, 132);
+		// tests.add(new Object[] {mb});
+
+		// mb = CompressibleInputGenerator.getInput(1000000, 1, CompressionType.RLE, 1, 1.0, 132);
+		// tests.add(new Object[] {mb});
+
+		// // Multi Column
+		// // two identical columns
+		// mb = CompressibleInputGenerator.getInput(10, 2, CompressionType.RLE, 2, 1.0, 132);
+		// tests.add(new Object[] {mb});
+
+		// mb = CompressibleInputGenerator.getInput(10, 6, CompressionType.RLE, 2, 1.0, 132);
+		// tests.add(new Object[] {mb});
+
+		// mb = CompressibleInputGenerator.getInput(10, 100, CompressionType.RLE, 2, 1.0, 132);
+		// tests.add(new Object[] {mb});
+
+		// mb = CompressibleInputGenerator.getInput(101, 17, CompressionType.RLE, 2, 1.0, 132);
+		// tests.add(new Object[] {mb});
+
+		// mb = CompressibleInputGenerator.getInput(101, 17, CompressionType.RLE, 3, 1.0, 132);
+		// tests.add(new Object[] {mb});
 
 		return tests;
 	}
diff --git a/src/test/java/org/apache/sysds/test/component/compress/colgroup/JolEstimateSDCTest.java b/src/test/java/org/apache/sysds/test/component/compress/colgroup/JolEstimateSDCTest.java
index b302242..ea80e74 100644
--- a/src/test/java/org/apache/sysds/test/component/compress/colgroup/JolEstimateSDCTest.java
+++ b/src/test/java/org/apache/sysds/test/component/compress/colgroup/JolEstimateSDCTest.java
@@ -19,10 +19,128 @@
 
 package org.apache.sysds.test.component.compress.colgroup;
 
+import java.util.ArrayList;
+import java.util.Collection;
+
 import org.apache.sysds.runtime.compress.colgroup.AColGroup.CompressionType;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.util.DataConverter;
+import org.apache.sysds.test.TestUtils;
+import org.apache.sysds.test.component.compress.CompressibleInputGenerator;
+import org.junit.runners.Parameterized.Parameters;
+
+public class JolEstimateSDCTest extends JolEstimateTest {
+
+	@Parameters
+	public static Collection<Object[]> data() {
+		ArrayList<Object[]> tests = new ArrayList<>();
+
+		MatrixBlock mb;
+		// base tests
+		mb = DataConverter.convertToMatrixBlock(new double[][] {{1}});
+		tests.add(new Object[] {mb});
+		mb = DataConverter.convertToMatrixBlock(new double[][] {{0}});
+		tests.add(new Object[] {mb});
+		mb = DataConverter.convertToMatrixBlock(new double[][] {{0, 0, 0, 0, 0}});
+		tests.add(new Object[] {mb});
+
+		// The size of the compression increase at repeated values.
+		mb = DataConverter.convertToMatrixBlock(new double[][] {{0, 0, 0, 0, 5, 0}});
+		tests.add(new Object[] {mb});
+		mb = DataConverter.convertToMatrixBlock(new double[][] {{0, 0, 0, 0, 5, 5, 0}});
+		tests.add(new Object[] {mb});
+		mb = DataConverter.convertToMatrixBlock(new double[][] {{0, 0, 0, 0, 5, 5, 5, 0}});
+		tests.add(new Object[] {mb});
+		mb = DataConverter.convertToMatrixBlock(new double[][] {{0, 0, 0, 0, 5, 5, 5, 5, 5, 5}});
+		tests.add(new Object[] {mb});
+
+		// all values grow by 1 if new value is introduced
+		mb = DataConverter.convertToMatrixBlock(new double[][] {{0, 0, 0, 0, 5, 7, 0}});
+		tests.add(new Object[] {mb});
+		mb = DataConverter.convertToMatrixBlock(new double[][] {{0, 0, 0, 0, 5, 2, 1, 0}});
+		tests.add(new Object[] {mb});
+		mb = DataConverter.convertToMatrixBlock(new double[][] {{0, 0, 0, 0, 5, 2, 1, 3, 6, 7}});
+		tests.add(new Object[] {mb});
+
+		// Dense random... Horrible compression at full precision
+		mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1, 100, 0, 100, 1.0, 7));
+		tests.add(new Object[] {mb});
+		mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1, 1000, 0, 100, 1.0, 7));
+		tests.add(new Object[] {mb});
+		mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1, 10000, 0, 100, 1.0, 7));
+		tests.add(new Object[] {mb});
+
+		// Random rounded numbers dense
+		mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1, 1523, 0, 99, 1.0, 7));
+		tests.add(new Object[] {mb});
+		mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1, 4000, 0, 100, 1.0, 7));
+		tests.add(new Object[] {mb});
+
+		// Sparse rounded numbers
+		mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1, 1523, 0, 99, 0.1, 7));
+		tests.add(new Object[] {mb});
+		mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1, 1621, 0, 99, 0.1, 142));
+		tests.add(new Object[] {mb});
+		mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1, 2321, 0, 99, 0.1, 512));
+		tests.add(new Object[] {mb});
+		mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1, 4000, 0, 100, 0.1, 7));
+		tests.add(new Object[] {mb});
+
+		mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1, 1523, 0, 99, 0.5, 7));
+		tests.add(new Object[] {mb});
+		mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1, 1621, 0, 99, 0.5, 142));
+		tests.add(new Object[] {mb});
+		mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1, 2321, 0, 99, 0.5, 512));
+		tests.add(new Object[] {mb});
+		mb = TestUtils.round(TestUtils.generateTestMatrixBlock(1, 4000, 0, 100, 0.6, 7));
+		tests.add(new Object[] {mb});
 
-public class JolEstimateSDCTest extends JolEstimateOLETest {
+		// Paper
+		mb = DataConverter
+			.convertToMatrixBlock(new double[][] {
+				{7, 3, 7, 7, 3, 7, 3, 3, 7, 3},
+				{6, 4, 6, 5, 4, 5, 4, 4, 6, 4}});
+		tests.add(new Object[] {mb});
+
+		// Dream Inputs
+		int[] cols = new int[] {3, 6, 111};
+		int[] rows = new int[] {10, 121, 513};
+		int[] unique = new int[] {3, 5};
+		for(int y : cols) {
+			for(int x : rows) {
+				for(int u : unique) {
+					mb = CompressibleInputGenerator.getInput(x, y, CompressionType.OLE, u, 1.0, 5);
+					tests.add(new Object[] {mb});
+				}
+			}
+		}
+
+		// Sparse test.
+		mb = CompressibleInputGenerator.getInput(1421, 1, CompressionType.OLE, 40, 0.6, 5, true);
+		tests.add(new Object[] {mb});
+
+		mb = CompressibleInputGenerator.getInput(10000, 1, CompressionType.OLE, 12, 0.06, 5, true);
+		tests.add(new Object[] {mb});
+
+		// Hard input to get 100% right on exact estimation, since the offsets are irregular
+		// and therefore not easy to know
+		// mb = CompressibleInputGenerator.getInput(100000, 1, CompressionType.OLE, 4, 0.006, 5, true);
+		// tests.add(new Object[] {mb});
+
+		mb = CompressibleInputGenerator.getInput(1000000, 1, CompressionType.OLE, 4, 0.0001, 5, true);
+		tests.add(new Object[] {mb});
+
+		mb = CompressibleInputGenerator.getInput(1000000, 1, CompressionType.OLE, 4, 0.00001, 5, true);
+		tests.add(new Object[] {mb});
+
+		// Multi Columns
+		// mb = CompressibleInputGenerator.getInput(412, 5, CompressionType.OLE, 20, 0.4, 5);
+		// tests.add(new Object[] {mb});
+		// mb = CompressibleInputGenerator.getInput(1000, 5, CompressionType.OLE, 20, 0.4, 5);
+		// tests.add(new Object[] {mb});
+
+		return tests;
+	}
 
 	// Just use the same test cases as OLE.
 	// This is fine because SDC exhibit the same characteristics as OLE.
diff --git a/src/test/java/org/apache/sysds/test/component/compress/colgroup/JolEstimateTest.java b/src/test/java/org/apache/sysds/test/component/compress/colgroup/JolEstimateTest.java
index 2e801c7..dcecacb 100644
--- a/src/test/java/org/apache/sysds/test/component/compress/colgroup/JolEstimateTest.java
+++ b/src/test/java/org/apache/sysds/test/component/compress/colgroup/JolEstimateTest.java
@@ -23,26 +23,22 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.util.EnumSet;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.compress.CompressionSettings;
 import org.apache.sysds.runtime.compress.CompressionSettingsBuilder;
-import org.apache.sysds.runtime.compress.bitmap.ABitmap;
-import org.apache.sysds.runtime.compress.bitmap.BitmapEncoder;
 import org.apache.sysds.runtime.compress.colgroup.AColGroup;
 import org.apache.sysds.runtime.compress.colgroup.AColGroup.CompressionType;
 import org.apache.sysds.runtime.compress.colgroup.ColGroupFactory;
 import org.apache.sysds.runtime.compress.estim.CompressedSizeEstimator;
 import org.apache.sysds.runtime.compress.estim.CompressedSizeEstimatorExact;
 import org.apache.sysds.runtime.compress.estim.CompressedSizeEstimatorFactory;
-import org.apache.sysds.runtime.compress.estim.CompressedSizeEstimatorSample;
 import org.apache.sysds.runtime.compress.estim.CompressedSizeInfo;
 import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup;
-import org.apache.sysds.runtime.compress.estim.EstimationFactors;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -77,20 +73,30 @@ public abstract class JolEstimateTest {
 		for(int x = 0; x < mbt.getNumRows(); x++)
 			colIndexes[x] = x;
 
+		mbt.recomputeNonZeros();
+		mbt.examSparsity();
 		try {
 			CompressionSettings cs = new CompressionSettingsBuilder().setSamplingRatio(1.0)
 				.setValidCompressions(EnumSet.of(getCT())).create();
 			cs.transposed = true;
-			ABitmap ubm = BitmapEncoder.extractBitmap(colIndexes, mbt, true, 8, false);
-
-			EstimationFactors ef = CompressedSizeEstimator.estimateCompressedColGroupSize(ubm, colIndexes,
-				mbt.getNumColumns(), cs);
-			CompressedSizeInfoColGroup cgi = new CompressedSizeInfoColGroup(colIndexes, ef, getCT());
-			CompressedSizeInfo csi = new CompressedSizeInfo(cgi);
-			cg = ColGroupFactory.compressColGroups(mbt, csi, cs, 1).get(0);
-			// cg = ColGroupFactory.compress(colIndexes, mbt.getNumColumns(), ubm, getCT(), cs, mbt, 1);
-			actualSize = cg.estimateInMemorySize();
-			actualNumberUnique = cg.getNumValues();
+
+			final CompressedSizeInfoColGroup cgi = new CompressedSizeEstimatorExact(mbt, cs)
+				.estimateCompressedColGroupSize(colIndexes);
+
+			final CompressedSizeInfo csi = new CompressedSizeInfo(cgi);
+			final List<AColGroup> groups = ColGroupFactory.compressColGroups(mbt, csi, cs, 1);
+
+			if(groups.size() == 1) {
+				cg = groups.get(0);
+				actualSize = cg.estimateInMemorySize();
+				actualNumberUnique = cg.getNumValues();
+			}
+			else {
+				cg = null;
+				actualSize = groups.stream().mapToLong(x -> x.estimateInMemorySize()).sum();
+				actualNumberUnique = groups.stream().mapToInt(x -> x.getNumValues()).sum();
+			}
+
 		}
 		catch(Exception e) {
 			e.printStackTrace();
@@ -104,86 +110,75 @@ public abstract class JolEstimateTest {
 	}
 
 	@Test
-	@Ignore
 	public void compressedSizeInfoEstimatorSample_90() {
 		compressedSizeInfoEstimatorSample(0.9, 0.9);
 	}
 
 	@Test
-	@Ignore
 	public void compressedSizeInfoEstimatorSample_50() {
 		compressedSizeInfoEstimatorSample(0.5, 0.8);
 	}
 
 	@Test
-	@Ignore
 	public void compressedSizeInfoEstimatorSample_20() {
-		compressedSizeInfoEstimatorSample(0.2, 0.7);
+		compressedSizeInfoEstimatorSample(0.2, 0.6);
 	}
 
 	@Test
 	public void compressedSizeInfoEstimatorSample_10() {
-		compressedSizeInfoEstimatorSample(0.1, 0.6);
+		compressedSizeInfoEstimatorSample(0.1, 0.5);
 	}
 
-	@Test
-	@Ignore
-	public void compressedSizeInfoEstimatorSample_5() {
-		compressedSizeInfoEstimatorSample(0.05, 0.5);
-	}
+	// @Test
+	// public void compressedSizeInfoEstimatorSample_5() {
+	// compressedSizeInfoEstimatorSample(0.05, 0.5);
+	// }
 
-	@Test
-	@Ignore
-	public void compressedSizeInfoEstimatorSample_1() {
-		compressedSizeInfoEstimatorSample(0.01, 0.4);
-	}
+	// @Test
+	// public void compressedSizeInfoEstimatorSample_1() {
+	// compressedSizeInfoEstimatorSample(0.01, 0.4);
+	// }
 
-	@Test 
-	public void testToString(){
+	@Test
+	public void testToString() {
 		// just to add a tests to verify that the to String method does not crash
-		cg.toString();
+		if(cg != null)
+			cg.toString();
 	}
 
 	public void compressedSizeInfoEstimatorSample(double ratio, double tolerance) {
+		if(cg == null)
+			return;
 		try {
-
-			CompressionSettings cs = csb.setSamplingRatio(ratio).setValidCompressions(EnumSet.of(getCT())).create();
+			if( mbt.getNumColumns() > 10000 )
+				tolerance = tolerance * 0.95;
+			final CompressionSettings cs = csb.setSamplingRatio(ratio).setMinimumSampleSize(10)
+				.setValidCompressions(EnumSet.of(getCT())).create();
 			cs.transposed = true;
 
-			CompressedSizeEstimator est = CompressedSizeEstimatorFactory.getSizeEstimator(mbt, cs, 1);
-			final int sampleSize = (est instanceof CompressedSizeEstimatorSample) ? ((CompressedSizeEstimatorSample) est)
-				.getSampleSize() : est.getNumRows();
-
-			if(est instanceof CompressedSizeEstimatorExact)
-				return;
-			final CompressedSizeInfoColGroup cgsi = est.estimateCompressedColGroupSize();
-
-			if(cg.getCompType() != CompressionType.UNCOMPRESSED && actualNumberUnique > 10) {
-
-				final int estimateNUniques = cgsi.getNumVals();
-				final double minToleranceNUniques = actualNumberUnique * tolerance;
-				final double maxToleranceNUniques = actualNumberUnique / tolerance;
-				final boolean withinToleranceOnNUniques = minToleranceNUniques <= estimateNUniques &&
-					estimateNUniques <= maxToleranceNUniques;
-
-				if(!withinToleranceOnNUniques) {
-					final String uniqueString = String.format("%.0f <= %d <= %.0f, Actual %d", minToleranceNUniques,
-						estimateNUniques, maxToleranceNUniques, actualNumberUnique);
-					fail("CSI Sampled estimate of number of unique values not in range\n" + uniqueString);
-				}
-			}
-
-			final long estimateCSI = cgsi.getCompressionSize(cg.getCompType());
-			final double minTolerance = actualSize * tolerance;
-			final double maxTolerance = actualSize / tolerance;
+			final CompressedSizeEstimator est = CompressedSizeEstimatorFactory.getSizeEstimator(mbt, cs,
+				Math.max(10, (int) (mbt.getNumColumns() * ratio)), 1);
+
+			final int sampleSize = est.getSampleSize();
+			final CompressedSizeInfoColGroup cInfo = est.estimateCompressedColGroupSize();
+			// LOG.error(cg);
+			final int estimateNUniques = cInfo.getNumVals();
+			final long estimateCSI = cInfo.getCompressionSize(cg.getCompType());
+			final double minTolerance = actualSize * tolerance *
+				(ratio < 1 && mbt.getSparsity() < 0.8 ? mbt.getSparsity() + 0.2 : 1);
+			final double maxTolerance = actualSize / tolerance +
+				(cg.getCompType() == CompressionType.SDC ? +8 * mbt.getNumRows() : 0);
 			final boolean withinToleranceOnSize = minTolerance <= estimateCSI && estimateCSI <= maxTolerance;
+			// LOG.error(cg);
 			if(!withinToleranceOnSize) {
 				final String rangeString = String.format("%.0f <= %d <= %.0f , Actual Size %d", minTolerance, estimateCSI,
 					maxTolerance, actualSize);
 
 				fail("CSI Sampled estimate size is not in tolerance range \n" + rangeString + "\nActual number uniques:"
-					+ actualNumberUnique + "\nSampleSize of total rows:: " + sampleSize + " " + mbt.getNumColumns() + "\n"
-					+ cg);
+					+ actualNumberUnique + " estimated Uniques: " + estimateNUniques + "\nSampleSize of total rows:: "
+					+ sampleSize + " " + mbt.getNumColumns() + "\n" + cInfo
+				// + "\n" + mbt + "\n" + cg
+				);
 			}
 
 		}
diff --git a/src/test/java/org/apache/sysds/test/component/compress/colgroup/JolEstimateUncompressedTest.java b/src/test/java/org/apache/sysds/test/component/compress/colgroup/JolEstimateUncompressedTest.java
index 5975667..f45c3c1 100644
--- a/src/test/java/org/apache/sysds/test/component/compress/colgroup/JolEstimateUncompressedTest.java
+++ b/src/test/java/org/apache/sysds/test/component/compress/colgroup/JolEstimateUncompressedTest.java
@@ -40,18 +40,38 @@ public class JolEstimateUncompressedTest extends JolEstimateTest {
 	public static Collection<Object[]> data() {
 
 		ArrayList<Object[]> tests = new ArrayList<>();
-
-		// mb.add(DataConverter.convertToMatrixBlock(new double[][] {{0}}));
+		
+		// single cell
+		tests.add(new Object[] {DataConverter.convertToMatrixBlock(new double[][] {{0}})});
 		tests.add(new Object[] {DataConverter.convertToMatrixBlock(new double[][] {{1}})});
+		tests.add(new Object[] {DataConverter.convertToMatrixBlock(new double[][] {{42151}})});
+		
+		// Const
+		tests.add(new Object[] {DataConverter.convertToMatrixBlock(new double[][] {{1,1,1}})});
+		
+		// Empty
+		tests.add(new Object[] {TestUtils.generateTestMatrixBlock(1, 1, 0, 0, 0.0, 7)});
+		tests.add(new Object[] {TestUtils.generateTestMatrixBlock(1, 10, 0, 0, 0.0, 7)});
+		tests.add(new Object[] {TestUtils.generateTestMatrixBlock(1, 100, 0, 0, 0.0, 7)});
+		
+		// Small
 		tests.add(new Object[] {TestUtils.generateTestMatrixBlock(1, 100, 0, 100, 1.0, 7)});
 		tests.add(new Object[] {TestUtils.generateTestMatrixBlock(1, 1000, 0, 100, 0.2, 7)});
-		// tests.add(new Object[] {TestUtils.generateTestMatrixBlock(1, 100000, 0, 100, 0.01, 7)});
-
+		
 		// Multi column
 		tests.add(new Object[] {TestUtils.generateTestMatrixBlock(2, 10, 0, 100, 1.0, 7)});
 		tests.add(new Object[] {TestUtils.generateTestMatrixBlock(13, 100, 0, 100, 1.0, 7)});
 
-		// sparse
+		// Const multi column
+		tests.add(new Object[] {DataConverter.convertToMatrixBlock(new double[][] {{1,1,1},{1,1,1},{1,1,1}})});
+		tests.add(new Object[] {TestUtils.generateTestMatrixBlock(13, 100, 1, 1, 1.0, 7)});
+		tests.add(new Object[] {TestUtils.generateTestMatrixBlock(30, 100, 1, 1, 1.0, 7)});
+		
+		// empty multi column
+		tests.add(new Object[] {TestUtils.generateTestMatrixBlock(10, 100, 0, 0, 0.0, 7)});
+		tests.add(new Object[] {TestUtils.generateTestMatrixBlock(100, 100, 0, 0, 0.0, 7)});
+
+		// sparse multi column
 		tests.add(new Object[] {TestUtils.generateTestMatrixBlock(13, 100, 0, 100, 0.3, 7)});
 		tests.add(new Object[] {TestUtils.generateTestMatrixBlock(100, 100, 0, 100, 0.01, 7)});
 
diff --git a/src/test/java/org/apache/sysds/test/component/compress/estim/encoding/EncodeSampleDenseNonUniform.java b/src/test/java/org/apache/sysds/test/component/compress/estim/encoding/EncodeSampleDenseNonUniform.java
new file mode 100644
index 0000000..1621d7e
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/component/compress/estim/encoding/EncodeSampleDenseNonUniform.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.component.compress.estim.encoding;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Random;
+
+import org.apache.sysds.runtime.compress.estim.encoding.IEncode;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(value = Parameterized.class)
+public class EncodeSampleDenseNonUniform extends EncodeSampleMultiColTest {
+
+	public EncodeSampleDenseNonUniform(MatrixBlock m, boolean t, int u, IEncode e, IEncode fh, IEncode sh) {
+		super(m, t, u, e, fh, sh);
+	}
+
+	@Parameters
+	public static Collection<Object[]> data() {
+		ArrayList<Object[]> tests = new ArrayList<>();
+		// allocate two matrices of either row or col size always dense.
+		tests.add(create(10, 10, 0.1, 231, false));
+		tests.add(create(100, 10, 0.1, 231, false));
+		tests.add(create(10, 100, 0.1, 231, false));
+
+		tests.add(create(10, 100, 0.1, 231, true));
+		tests.add(create(100, 10, 0.1, 231, true));
+
+		tests.add(create(10,10, 1.0, 12341, true));
+		tests.add(create(10,10, 0.1, 12341, true));
+		tests.add(create(10, 10, 0.7, 12341, true));
+
+		tests.add(create(10, 10, 0.9, 3, true));
+		tests.add(create(4, 10, 0.9, 32, true));
+		tests.add(create(4, 10, 0.9, 4215, true));
+
+		return tests;
+	}
+
+	private static Object[] create(int nRow, int nCol, double likelihoodEntireRowIsEmpty, int seed, boolean t) {
+
+		MatrixBlock m = new MatrixBlock(nRow, nCol, false);
+		m.allocateBlock();
+
+		Random r = new Random(seed);
+		double[] mV = m.getDenseBlockValues();
+		int nnz = 0;
+
+		for(int i = 0; i < nRow; i++) {
+			if(r.nextDouble() > likelihoodEntireRowIsEmpty) {
+				for(int j = i * nCol; j < i * nCol + nCol; j++)
+					mV[j] = r.nextInt(3) + 1;
+
+				nnz += nRow;
+			}
+		}
+
+		m.setNonZeros(nnz * 2);
+		return EncodeSampleUniformTest.create(m, t);
+	}
+
+}
diff --git a/src/test/java/org/apache/sysds/test/component/compress/estim/encoding/EncodeSampleMultiColTest.java b/src/test/java/org/apache/sysds/test/component/compress/estim/encoding/EncodeSampleMultiColTest.java
new file mode 100644
index 0000000..ef4fdfe
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/component/compress/estim/encoding/EncodeSampleMultiColTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.component.compress.estim.encoding;
+
+import static org.junit.Assert.fail;
+
+import org.apache.sysds.runtime.compress.estim.encoding.IEncode;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.junit.Test;
+
+public abstract class EncodeSampleMultiColTest extends EncodeSampleTest {
+
+	public IEncode fh;
+	public IEncode sh;
+
+	public EncodeSampleMultiColTest(MatrixBlock m, boolean t, int u, IEncode e, IEncode fh, IEncode sh) {
+		super(m, t, u, e);
+		this.fh = fh;
+		this.sh = sh;
+	}
+
+	public static int[] genRowCol(int n) {
+		int[] ret = new int[n];
+		for(int i = 0; i < n; i++)
+			ret[i] = i;
+		return ret;
+	}
+
+	public static int[] genRowCol(int s, int n) {
+		int[] ret = new int[n - s];
+		for(int i = s; i < n; i++)
+			ret[i - s] = i;
+		return ret;
+	}
+
+	@Test
+	public void testPartJoinEqualToFullRead() {
+		try {
+
+			partJoinVerification(fh.join(sh));
+		}
+		catch(Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testPartJoinEqualToFullReadLeft() {
+		try {
+
+			partJoinVerification(sh.join(fh));
+		}
+		catch(Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testJoinWithFirstSubpart() {
+		try {
+
+			// again a test that does not make sense since joining with subpart results in equivalent but it is a valid
+			// test
+			partJoinVerification(e.join(fh));
+		}
+		catch(Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testJoinWithSecondSubpart() {
+		try {
+
+			// joining with subpart results in equivalent but it is a valid test
+			partJoinVerification(e.join(sh));
+		}
+		catch(Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testJoinWithFirstSubpartLeft() {
+		try {
+
+			// joining with subpart results in equivalent but it is a valid test
+			partJoinVerification(fh.join(e));
+		}
+		catch(Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testJoinWithSecondSubpartLeft() {
+		try {
+			// joining with subpart results in equivalent but it is a valid test
+			partJoinVerification(sh.join(e));
+		}
+		catch(Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	private void partJoinVerification(IEncode er) {
+		if(e.getUnique() != er.getUnique() || e.size() != er.size()) {
+			StringBuilder sb = new StringBuilder();
+			sb.append("\nFailed joining sub parts to recreate whole.\nRead:");
+			sb.append(e);
+			sb.append("\nJoined:");
+			sb.append(er);
+			sb.append("\n");
+			sb.append(m);
+			sb.append("\nsubParts:\n");
+			sb.append(sh);
+			sb.append("\n");
+			sb.append(fh);
+			fail(sb.toString());
+		}
+	}
+}
diff --git a/src/test/java/org/apache/sysds/test/component/compress/estim/encoding/EncodeSampleSingleColTest.java b/src/test/java/org/apache/sysds/test/component/compress/estim/encoding/EncodeSampleSingleColTest.java
new file mode 100644
index 0000000..c36baad
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/component/compress/estim/encoding/EncodeSampleSingleColTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.component.compress.estim.encoding;
+
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sysds.runtime.compress.estim.encoding.IEncode;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.test.TestUtils;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(value = Parameterized.class)
+public class EncodeSampleSingleColTest extends EncodeSampleTest {
+
+	protected static final Log LOG = LogFactory.getLog(EncodeSampleTest.class.getName());
+
+	public EncodeSampleSingleColTest(MatrixBlock m, boolean t, int u, IEncode e) {
+		super(m, t, u, e);
+	}
+
+	@Parameters
+	public static Collection<Object[]> data() {
+		ArrayList<Object[]> tests = new ArrayList<>();
+		
+		tests.add(create(1, 10, 1.0, true, 2, 2));
+		tests.add(create(1, 10, 0.5, true, 2, 131241));
+		tests.add(create(1, 100, 1.0, true, 2, 1312));
+		tests.add(create(1, 500, 0.5, true, 2, 132));
+		tests.add(create(1, 500, 0.5, true, 10, 12));
+		tests.add(create(10, 1, 1.0, false, 2, 32141));
+		tests.add(create(10, 1, 0.5, false, 2, 132));
+		tests.add(create(100, 1, 0.5, false, 2, 21));
+		tests.add(create(100, 2, 0.5, false, 2, 3131));
+		tests.add(create(100, 4, 0.5, false, 2, 32141));
+		tests.add(create(100, 4, 0.5, false, 10, 11));
+		
+		tests.add(create(1, 100, 0.2, true, 2, 13));
+		tests.add(create(1, 1000, 0.2, true, 10, 2));
+		tests.add(create(1, 10000, 0.02, true, 10, 3145));
+		tests.add(create(1, 100000, 0.002, true, 10, 3214));
+		tests.add(create(1, 1000000, 0.0002, true, 10, 3232));
+		
+		tests.add(create(100, 100, 0.02, false, 2, 32));
+		tests.add(create(1000, 100, 0.06, false, 2, 33412));
+		
+		// const
+		tests.add(create(1, 10, 1.0, true, 1, 1341));
+		tests.add(create(10, 1, 1.0, true, 1, 13));
+		// tests.add(create(1, 10, 1.0, true, 1, 2));
+		
+		// empty
+		tests.add(create(1, 10, 0.0, true, 1, 2));
+		tests.add(create(10, 1, 0.0, false, 1, 2));
+
+		tests.add(createEmptyAllocatedSparse(1, 10 ,true));
+		tests.add(createEmptyAllocatedSparse(10, 1 ,false));
+		
+		return tests;
+	}
+
+	public static Object[] create(int nRow, int nCol, double sparsity, boolean transposed, int nUnique, int seed) {
+		try {
+			int u = nUnique;
+			// Make sure that nUnique always is correct if we have a large enough matrix.
+			nUnique -= sparsity < 1.0 ? 1 : 0;
+			MatrixBlock m = TestUtils
+				.round(TestUtils.generateTestMatrixBlock(nRow, nCol, sparsity < 1.0 ? 0 : 1, nUnique, sparsity, seed));
+
+			boolean t = transposed;
+
+			IEncode e = IEncode.createFromMatrixBlock(m, t, 0);
+			return new Object[] {m, t, u, e};
+		}
+		catch(Exception e) {
+			e.printStackTrace();
+			fail("failed to initialize the Encoding test");
+			return null; // this is never executed but java require it.
+		}
+	}
+
+
+	public static Object[] createEmptyAllocatedSparse(int nRow, int nCol, boolean transposed) {
+		try {
+			int u = 1;
+			MatrixBlock m = new MatrixBlock(nRow, nCol, true);
+			m.allocateBlock();
+			m.setNonZeros(1);
+
+			boolean t = transposed;
+
+			IEncode e = IEncode.createFromMatrixBlock(m, t, 0);
+			return new Object[] {m, t, u, e};
+		}
+		catch(Exception e) {
+			e.printStackTrace();
+			fail("failed to initialize the Encoding test");
+			return null; // this is never executed but java require it.
+		}
+	}
+}
diff --git a/src/test/java/org/apache/sysds/test/component/compress/estim/encoding/EncodeSampleTest.java b/src/test/java/org/apache/sysds/test/component/compress/estim/encoding/EncodeSampleTest.java
new file mode 100644
index 0000000..5728d6a
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/component/compress/estim/encoding/EncodeSampleTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.component.compress.estim.encoding;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sysds.runtime.compress.estim.encoding.IEncode;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.junit.Test;
+
+public abstract class EncodeSampleTest {
+
+	protected static final Log LOG = LogFactory.getLog(EncodeSampleTest.class.getName());
+
+	public MatrixBlock m;
+	public boolean t;
+	public int u;
+	public IEncode e;
+
+	public EncodeSampleTest(MatrixBlock m, boolean t, int u, IEncode e) {
+		this.m = m;
+		this.t = t;
+		this.u = u;
+		this.e = e;
+	}
+
+	@Test
+	public void getUnique() {
+		if(u != e.getUnique()) {
+			StringBuilder sb = new StringBuilder();
+			sb.append("invalid number of unique expected:");
+			sb.append(u);
+			sb.append(" got: ");
+			sb.append(e.getUnique());
+			sb.append("\n");
+			sb.append(e);
+			fail(sb.toString());
+		}
+	}
+
+	@Test
+	public void testToString() {
+		e.toString();
+	}
+
+	@Test
+	public void testJoinSelfEqualsSameNumberUnique() {
+		try {
+			// not that you should or would ever do this.
+			// but it is a nice and simple test.
+			IEncode j = e.join(e);
+			if(u != j.getUnique()) {
+				StringBuilder sb = new StringBuilder();
+				sb.append("invalid number of unique expected:");
+				sb.append(u);
+				sb.append(" got: ");
+				sb.append(j.getUnique());
+				sb.append("\nexpected encoding:\n");
+				sb.append(e);
+				sb.append("\ngot\n:");
+				sb.append(j);
+				fail(sb.toString());
+			}
+		}
+		catch(Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testJoinEmptyLeft() {
+		try {
+			final MatrixBlock empty = new MatrixBlock(m.getNumRows(), m.getNumColumns(), true);
+			final IEncode emptyEncoding = IEncode.createFromMatrixBlock(empty, t, 0);
+			assertEquals(u, emptyEncoding.join(e).getUnique());
+		}
+		catch(Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testJoinEmptyRight() {
+		try {
+			final MatrixBlock empty = new MatrixBlock(m.getNumRows(), m.getNumColumns(), true);
+			final IEncode emptyEncoding = IEncode.createFromMatrixBlock(empty, t, 0);
+			assertEquals(u, e.join(emptyEncoding).getUnique());
+		}
+		catch(Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testJoinConstLeft() {
+		try {
+			final MatrixBlock c = new MatrixBlock(m.getNumRows(), m.getNumColumns(), 1.0);
+			final IEncode emptyEncoding = IEncode.createFromMatrixBlock(c, t, 0);
+			assertEquals(u, emptyEncoding.join(e).getUnique());
+		}
+		catch(Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testJoinConstRight() {
+		try {
+			final MatrixBlock c = new MatrixBlock(m.getNumRows(), m.getNumColumns(), 1.0);
+
+			final IEncode emptyEncoding = IEncode.createFromMatrixBlock(c, t, 0);
+			assertEquals(u, e.join(emptyEncoding).getUnique());
+		}
+		catch(Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testGetSize() {
+		try {
+			assertTrue(e.size() <= (t ? m.getNumColumns() : m.getNumRows()));
+		}
+		catch(Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testGetSizeAfterJoinSelf() {
+		try {
+			assertTrue(e.join(e).size() <= (t ? m.getNumColumns() : m.getNumRows()));
+		}
+		catch(Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void toEstimationFactors() {
+		try {
+			int[] cols = new int[t ? m.getNumRows() : m.getNumColumns()];
+			int rows = t ? m.getNumColumns() : m.getNumRows();
+			e.computeSizeEstimation(cols, rows, 1.0, 1.0);
+		}
+		catch(Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}
diff --git a/src/test/java/org/apache/sysds/test/component/compress/estim/encoding/EncodeSampleUnbalancedTest.java b/src/test/java/org/apache/sysds/test/component/compress/estim/encoding/EncodeSampleUnbalancedTest.java
new file mode 100644
index 0000000..09a6188
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/component/compress/estim/encoding/EncodeSampleUnbalancedTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.component.compress.estim.encoding;
+
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.apache.sysds.runtime.compress.estim.encoding.IEncode;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.test.TestUtils;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(value = Parameterized.class)
+public class EncodeSampleUnbalancedTest extends EncodeSampleMultiColTest {
+
+	public EncodeSampleUnbalancedTest(MatrixBlock m, boolean t, int u, IEncode e, IEncode fh, IEncode sh) {
+		super(m, t, u, e, fh, sh);
+	}
+
+	@Parameters
+	public static Collection<Object[]> data() {
+		ArrayList<Object[]> tests = new ArrayList<>();
+		// one sparse one dense ... small size
+		tests.add(createT(1, 1.0, 2, 1, 0.1, 2, 10, 326314));
+		tests.add(createT(1, .1, 2, 1, 1.0, 2, 10, 512));
+
+		// bigger making it sparse in one and dense in another
+		tests.add(createT(1, .1, 2, 1, 1.0, 2, 100, 32141));
+		tests.add(createT(1, 1.0, 2, 1, 0.1, 2, 100, 777));
+
+		// big sparse
+		tests.add(createT(1, 0.0001, 10, 1, 0.0000001, 2, 10000000, 1231));
+		// more rows
+		tests.add(createT(3, 0.0001, 10, 10, 0.0000001, 2, 10000000, 444));
+
+		// Both Sparse and end dense joined
+		tests.add(createT(1, 0.2, 10, 10, 0.1, 2, 1000, 1231521));
+
+		return tests;
+	}
+
+	private static Object[] createT(int nRow1, double sp1, int nU1, int nRow2, double sp2, int nU2, int nCol, int seed) {
+		return create(nRow1, nCol, sp1, nU1, nRow2, nCol, sp2, nU2, seed, true);
+	}
+
+	private static Object[] create(int nRow1, int nCol1, double sp1, int nU1, int nRow2, int nCol2, double sp2, int nU2,
+		int seed, boolean t) {
+		try {
+			// Make sure that nUnique always is correct if we have a large enough matrix.
+			nU1 -= sp1 < 1.0 ? 1 : 0;
+			final int min1 = sp1 < 1.0 ? 0 : 1;
+			MatrixBlock m1 = TestUtils.round(TestUtils.generateTestMatrixBlock(nRow1, nCol1, min1, nU1, sp1, seed));
+			nU2 -= sp2 < 1.0 ? 1 : 0;
+			final int min2 = sp2 < 1.0 ? 0 : 1;
+			MatrixBlock m2 = TestUtils
+				.round(TestUtils.generateTestMatrixBlock(nRow2, nCol2, min2, nU2, sp2, seed * 21351));
+			return create(m1, m2, t);
+		}
+		catch(Exception e) {
+			e.printStackTrace();
+			fail("failed to initialize the Encoding test");
+			return null; // this is never executed but java require it.
+		}
+	}
+
+	protected static Object[] create(MatrixBlock m1, MatrixBlock m2, boolean t) {
+
+		MatrixBlock m = m1.append(m2, null, !t);
+		return create(m,m1,m2,t);
+	}
+
+
+	protected static Object[] create(MatrixBlock m, MatrixBlock m1, MatrixBlock m2, boolean t) {
+		try {
+
+			final IEncode e = IEncode.createFromMatrixBlock(m, t, genRowCol(t ? m.getNumRows() : m.getNumColumns()));
+
+			// sub part.
+			final IEncode fh = IEncode.createFromMatrixBlock(m1, t, genRowCol(t ? m1.getNumRows() : m1.getNumColumns()));
+			final IEncode sh = IEncode.createFromMatrixBlock(m2, t, genRowCol(t ? m2.getNumRows() : m2.getNumColumns()));
+
+			// join subparts and use its unique count for tests
+			final IEncode er = fh.join(sh);
+			int u = er.getUnique();
+
+			return new Object[] {m, t, u, e, fh, sh};
+		}
+		catch(Exception e) {
+			e.printStackTrace();
+			fail("failed to initialize the Encoding test");
+			return null; // this is never executed but java require it.
+		}
+	}
+
+}
diff --git a/src/test/java/org/apache/sysds/test/component/compress/estim/encoding/EncodeSampleUniformTest.java b/src/test/java/org/apache/sysds/test/component/compress/estim/encoding/EncodeSampleUniformTest.java
new file mode 100644
index 0000000..266ae6f
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/component/compress/estim/encoding/EncodeSampleUniformTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.component.compress.estim.encoding;
+
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.apache.sysds.runtime.compress.estim.encoding.IEncode;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.test.TestUtils;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(value = Parameterized.class)
+public class EncodeSampleUniformTest extends EncodeSampleMultiColTest {
+
+	public EncodeSampleUniformTest(MatrixBlock m, boolean t, int u, IEncode e, IEncode fh, IEncode sh) {
+		super(m, t, u, e, fh, sh);
+	}
+
+	@Parameters
+	public static Collection<Object[]> data() {
+		ArrayList<Object[]> tests = new ArrayList<>();
+
+		// row reading dense
+		tests.add(create(2, 30, 1.0, true, 2, 1251));
+		tests.add(create(3, 30, 1.0, true, 2, 13142));
+		tests.add(create(10, 30, 1.0, true, 2, 182828));
+
+		// col reading dense
+		tests.add(create(30, 2, 1.0, false, 2, 8865));
+		tests.add(create(30, 3, 1.0, false, 2, 9876));
+		tests.add(create(30, 10, 1.0, false, 2, 7654));
+
+		// row sparse
+		tests.add(create(2, 300, 0.1, true, 2, 1251));
+		tests.add(create(2, 300, 0.1, true, 2, 11));
+		tests.add(create(2, 300, 0.2, true, 2, 65));
+		tests.add(create(2, 300, 0.24, true, 2, 245));
+		tests.add(create(2, 300, 0.24, true, 4, 16));
+		tests.add(create(2, 300, 0.23, true, 4, 15));
+
+		// ultra sparse
+		tests.add(create(2, 10000, 0.001, true, 3, 215));
+		tests.add(create(2, 100000, 0.0001, true, 3, 42152));
+
+		// const
+		tests.add(create(3, 30, 1.0, true, 1, 2));
+		tests.add(create(50, 5, 1.0, false, 1, 2));
+
+		// empty
+		tests.add(create(10, 10, 0.0, false, 1, 2));
+		tests.add(create(100, 100, 0.0, false, 1, 2));
+
+		return tests;
+	}
+
+	private static Object[] create(int nRow, int nCol, double sparsity, boolean t, int nUnique, int seed) {
+		try {
+			// Make sure that nUnique always is correct if we have a large enough matrix.
+			nUnique -= sparsity < 1.0 ? 1 : 0;
+			final int min = sparsity < 1.0 ? 0 : 1;
+
+			MatrixBlock m = sparsity == 0.0 ? new MatrixBlock(nRow, nCol, true) : TestUtils
+				.round(TestUtils.generateTestMatrixBlock(nRow, nCol, min, nUnique, sparsity, seed));
+
+			return create(m,t);
+		}
+		catch(Exception e) {
+			e.printStackTrace();
+			fail("failed to initialize the Encoding test");
+			return null; // this is never executed but java require it.
+		}
+	}
+
+	public static Object[] create(MatrixBlock m, boolean t){
+		try {
+			// Make sure that nUnique always is correct if we have a large enough matrix.
+
+			final int d = t ? m.getNumRows() : m.getNumColumns();
+			final IEncode e = IEncode.createFromMatrixBlock(m, t, genRowCol(d));
+
+			// split and read subparts individually
+			final int dfh = d / 2;
+			final IEncode fh = IEncode.createFromMatrixBlock(m, t, genRowCol(dfh));
+			final IEncode sh = IEncode.createFromMatrixBlock(m, t, genRowCol(dfh, d));
+
+			// join subparts and use its unique count for tests
+			final IEncode er = fh.join(sh);
+			int u = er.getUnique();
+
+			return new Object[] {m, t, u, e, fh, sh};
+		}
+		catch(Exception e) {
+			e.printStackTrace();
+			fail("failed to initialize the Encoding test");
+			return null; // this is never executed but java require it.
+		}
+	}
+}
diff --git a/src/test/java/org/apache/sysds/test/component/compress/mapping/StandAloneTests.java b/src/test/java/org/apache/sysds/test/component/compress/mapping/StandAloneTests.java
index 9b43d2b..26caa47 100644
--- a/src/test/java/org/apache/sysds/test/component/compress/mapping/StandAloneTests.java
+++ b/src/test/java/org/apache/sysds/test/component/compress/mapping/StandAloneTests.java
@@ -27,6 +27,9 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sysds.runtime.compress.DMLCompressionException;
 import org.apache.sysds.runtime.compress.colgroup.mapping.AMapToData;
+import org.apache.sysds.runtime.compress.colgroup.mapping.MapToBit;
+import org.apache.sysds.runtime.compress.colgroup.mapping.MapToByte;
+import org.apache.sysds.runtime.compress.colgroup.mapping.MapToChar;
 import org.apache.sysds.runtime.compress.colgroup.mapping.MapToFactory;
 import org.apache.sysds.runtime.compress.utils.IntArrayList;
 import org.junit.Test;
@@ -153,4 +156,31 @@ public class StandAloneTests {
 	private static IntArrayList gen(int[] in) {
 		return new IntArrayList(in);
 	}
+
+	@Test
+	public void sameMemoryUsageBit01() {
+		assertEquals(MapToBit.getInMemorySize(10), MapToBit.getInMemorySize(40));
+	}
+
+	@Test
+	public void sameMemoryUsageBit02() {
+		assertEquals(MapToBit.getInMemorySize(1), MapToBit.getInMemorySize(63));
+	}
+
+	@Test
+	public void sameMemoryUsageBit03() {
+		assertEquals(MapToBit.getInMemorySize(1), MapToBit.getInMemorySize(64));
+	}
+
+
+
+	@Test
+	public void sameMemoryUsageChar() {
+		assertEquals(MapToChar.getInMemorySize(9), MapToChar.getInMemorySize(10));
+	}
+
+	@Test
+	public void sameMemoryUsageByte() {
+		assertEquals(MapToByte.getInMemorySize(9), MapToByte.getInMemorySize(12));
+	}
 }
diff --git a/src/test/java/org/apache/sysds/test/component/compress/offset/OffsetTests.java b/src/test/java/org/apache/sysds/test/component/compress/offset/OffsetTests.java
index 7a476aa..2d0300c 100644
--- a/src/test/java/org/apache/sysds/test/component/compress/offset/OffsetTests.java
+++ b/src/test/java/org/apache/sysds/test/component/compress/offset/OffsetTests.java
@@ -208,20 +208,22 @@ public class OffsetTests {
 		try {
 			final long inMemorySize = o.getInMemorySize();
 			long estimatedSize;
+
 			switch(type) {
 				case BYTE:
-					estimatedSize = OffsetByte.estimateInMemorySize(data.length, data[data.length - 1] - data[0]);
+					final int correctionByte = OffsetFactory.correctionByte(data[data.length - 1] - data[0], data.length);
+					estimatedSize = OffsetByte.estimateInMemorySize(data.length + correctionByte);
 					break;
 				case CHAR:
-					estimatedSize = OffsetChar.estimateInMemorySize(data.length, data[data.length - 1] - data[0]);
+					final int correctionChar = OffsetFactory.correctionChar(data[data.length - 1] - data[0], data.length);
+					estimatedSize = OffsetChar.estimateInMemorySize(data.length + correctionChar);
 					break;
 				default:
 					throw new DMLCompressionException("Unknown input");
 			}
 			if(!(inMemorySize <= estimatedSize + sizeTolerance)) {
-
 				fail("in memory size: " + inMemorySize + " is not smaller than estimate: " + estimatedSize
-					+ " with tolerance " + sizeTolerance);
+					+ " with tolerance " + sizeTolerance + "\nEncoded:" + o + "\nData:" + Arrays.toString(data));
 			}
 		}
 		catch(Exception e) {