You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by ba...@apache.org on 2021/09/20 11:27:37 UTC

[systemds] 01/03: [SYSTEMDS-2610] CLA updates

This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git

commit 67bb8034611a766ca93c0314ee735a3252e79290
Author: baunsgaard <ba...@tugraz.at>
AuthorDate: Mon Aug 30 16:28:31 2021 +0200

    [SYSTEMDS-2610] CLA updates
    
    - Compressed matrix factory improvements
    - Add decompression if the data is serialized and larger in compressed format
    - Decompress on write to HDFS
    - Abort compression after cocode if the compression sizes are bad
    - Make c bind decompressing in workload tree (worst case)
    - Add a minimum compression ratio argument to the CompressionSettings
    - Reduce the sampling size in c bind compression and set high minimum compression ratio
    - Fix order of operations in compressed append
    - Add compressed output size to unary hops
    - More utilization of the cached decompressed matrix if it fits in
    memory by looking for soft reference of uncompressed in certain cases
---
 src/main/java/org/apache/sysds/hops/UnaryOp.java   |  12 +-
 .../ipa/IPAPassCompressionWorkloadAnalysis.java    |   1 +
 .../hops/rewrite/RewriteCompressedReblock.java     |   2 +-
 .../runtime/compress/CompressedMatrixBlock.java    |  29 ++-
 .../compress/CompressedMatrixBlockFactory.java     |  98 +++++++-
 .../runtime/compress/CompressionSettings.java      |   8 +-
 .../compress/CompressionSettingsBuilder.java       |   8 +-
 .../runtime/compress/cocode/AColumnCoCoder.java    |  22 +-
 .../runtime/compress/cocode/CoCodeBinPacking.java  |   4 +-
 .../runtime/compress/cocode/CoCodeGreedy.java      |   2 +-
 .../runtime/compress/cocode/CoCodePriorityQue.java |   4 +-
 .../compress/colgroup/ColGroupUncompressed.java    |  19 +-
 .../compress/colgroup/mapping/MapToFactory.java    |  12 +-
 .../compress/cost/CostEstimatorBuilder.java        |   5 +
 .../compress/cost/CostEstimatorFactory.java        |   2 +-
 .../compress/estim/CompressedSizeEstimator.java    |  43 +++-
 .../estim/CompressedSizeEstimatorExact.java        |   5 +-
 .../estim/CompressedSizeEstimatorFactory.java      |  30 ++-
 .../estim/CompressedSizeEstimatorSample.java       |  11 +-
 .../compress/estim/CompressedSizeInfoColGroup.java |   2 +-
 .../sysds/runtime/compress/lib/CLALibAppend.java   |  32 +--
 .../runtime/compress/lib/CLALibBinaryCellOp.java   |  42 ++--
 .../sysds/runtime/compress/lib/CLALibCompAgg.java  |  51 ++--
 .../runtime/compress/lib/CLALibRelationalOp.java   | 267 ---------------------
 .../sysds/runtime/compress/lib/CLALibScalar.java   |   7 -
 .../compress/workload/WorkloadAnalyzer.java        |   5 +-
 .../context/SparkExecutionContext.java             |   8 +-
 .../spark/BinUaggChainSPInstruction.java           |   6 +
 .../component/compress/CompressedMatrixTest.java   |  33 +++
 .../component/compress/CompressedTestBase.java     |  27 ---
 .../component/compress/workload/WorkloadTest.java  |  18 +-
 .../compress/workload/WorkloadAlgorithmTest.java   |   2 +-
 32 files changed, 379 insertions(+), 438 deletions(-)

diff --git a/src/main/java/org/apache/sysds/hops/UnaryOp.java b/src/main/java/org/apache/sysds/hops/UnaryOp.java
index b7df277..38199b2 100644
--- a/src/main/java/org/apache/sysds/hops/UnaryOp.java
+++ b/src/main/java/org/apache/sysds/hops/UnaryOp.java
@@ -545,7 +545,7 @@ public class UnaryOp extends MultiThreadedHop
 			setDim2(1);
 		}
 		else if(_op == OpOp1.TYPEOF || _op == OpOp1.DETECTSCHEMA || _op == OpOp1.COLNAMES) {
-			//TODO theses three builtins should rather be moved to unary aggregates
+			//TODO these three builtins should rather be moved to unary aggregates
 			setDim1(1);
 			setDim2(input.getDim2());
 		}
@@ -564,6 +564,16 @@ public class UnaryOp extends MultiThreadedHop
 			{
 				setNnz( input.getNnz() );
 			}
+
+			// if the input is compressed then set the output to be compressed as well.
+			if(input._compressedOutput && ! (_op==OpOp1.DECOMPRESS)){
+				setCompressedOutput(true);
+				// Setting the compressed output to be 2 x larger.
+				// Just in case we change the compressed structure slightly.
+				// this value is overwritten with correct size once the hop is executed
+				// TODO handle overlapping state, since some operations would not lead to compressed output.
+				setCompressedSize(input.compressedSize() * 2);
+			}
 		}
 	}
 	
diff --git a/src/main/java/org/apache/sysds/hops/ipa/IPAPassCompressionWorkloadAnalysis.java b/src/main/java/org/apache/sysds/hops/ipa/IPAPassCompressionWorkloadAnalysis.java
index 324b272..b23397f 100644
--- a/src/main/java/org/apache/sysds/hops/ipa/IPAPassCompressionWorkloadAnalysis.java
+++ b/src/main/java/org/apache/sysds/hops/ipa/IPAPassCompressionWorkloadAnalysis.java
@@ -58,6 +58,7 @@ public class IPAPassCompressionWorkloadAnalysis extends IPAPass {
 			WTreeRoot tree = e.getValue();
 			CostEstimatorBuilder b = new CostEstimatorBuilder(tree);
 			// filter out compression plans that is known bad
+			
 			if(b.shouldTryToCompress()){
 				tree.getRoot().setRequiresCompression(tree);
 				for(Hop h : tree.getDecompressList())
diff --git a/src/main/java/org/apache/sysds/hops/rewrite/RewriteCompressedReblock.java b/src/main/java/org/apache/sysds/hops/rewrite/RewriteCompressedReblock.java
index 6b51a51..2f0f1f6c 100644
--- a/src/main/java/org/apache/sysds/hops/rewrite/RewriteCompressedReblock.java
+++ b/src/main/java/org/apache/sysds/hops/rewrite/RewriteCompressedReblock.java
@@ -128,7 +128,7 @@ public class RewriteCompressedReblock extends StatementBlockRewriteRule {
 
 	public static boolean satisfiesSizeConstraintsForCompression(Hop hop) {
 		if(hop.getDim2() >= 1) {
-			return (hop.getDim1() >= 1000 && hop.getDim2() < 100) || hop.getDim1() / hop.getDim2() >= 75;
+			return (hop.getDim1() >= 1000 && hop.getDim2() < 100) || hop.getDim1() / hop.getDim2() >= 75 || (hop.getSparsity() < 0.0001 && hop.getDim1() > 1000);
 		}
 		return false;
 	}
diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
index d374d3c..5dcc406 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
@@ -302,7 +302,10 @@ public class CompressedMatrixBlock extends MatrixBlock {
 		return ret;
 	}
 
-	private MatrixBlock getCachedDecompressed() {
+	/**
+	 * Get the cached decompressed matrix (if it exists otherwise null)
+	 */
+	public MatrixBlock getCachedDecompressed() {
 		if(decompressedVersion != null) {
 			final MatrixBlock mb = decompressedVersion.get();
 			if(mb != null) {
@@ -453,6 +456,16 @@ public class CompressedMatrixBlock extends MatrixBlock {
 
 	@Override
 	public void write(DataOutput out) throws IOException {
+		if(getExactSizeOnDisk() > MatrixBlock.estimateSizeOnDisk(rlen, clen, nonZeros)) {
+			// decompress and make a uncompressed column group. this is then used for the serialization, since it is
+			// smaller.
+			// throw new NotImplementedException("Decompressing serialization is not implemented");
+
+			MatrixBlock uncompressed = getUncompressed("Decompressing serialization for smaller serialization");
+			ColGroupUncompressed cg = new ColGroupUncompressed(uncompressed);
+			allocateColGroup(cg);
+			nonZeros = cg.getNumberNonZeros();
+		}
 		// serialize compressed matrix block
 		out.writeInt(rlen);
 		out.writeInt(clen);
@@ -492,11 +505,15 @@ public class CompressedMatrixBlock extends MatrixBlock {
 
 	@Override
 	public MatrixBlock binaryOperations(BinaryOperator op, MatrixValue thatValue, MatrixValue result) {
-		return CLALibBinaryCellOp.binaryOperations(op, this, thatValue, result);
+		MatrixBlock that = thatValue == null ? null : (MatrixBlock) thatValue;
+		MatrixBlock ret = result == null ? null : (MatrixBlock) result;
+		return CLALibBinaryCellOp.binaryOperations(op, this, that, ret);
 	}
 
 	public MatrixBlock binaryOperationsLeft(BinaryOperator op, MatrixValue thatValue, MatrixValue result) {
-		return CLALibBinaryCellOp.binaryOperationsLeft(op, this, thatValue, result);
+		MatrixBlock that = thatValue == null ? null : (MatrixBlock) thatValue;
+		MatrixBlock ret = result == null ? null : (MatrixBlock) result;
+		return CLALibBinaryCellOp.binaryOperationsLeft(op, this, that, ret);
 	}
 
 	@Override
@@ -686,8 +703,8 @@ public class CompressedMatrixBlock extends MatrixBlock {
 				.aggregateUnaryOperations(op, result, blen, indexesIn, inCP);
 
 		}
-
-		return CLALibCompAgg.aggregateUnary(this, result, op, blen, indexesIn, inCP);
+		MatrixBlock ret = (result == null) ? null : (MatrixBlock) result;
+		return CLALibCompAgg.aggregateUnary(this, ret, op, blen, indexesIn, inCP);
 	}
 
 	@Override
@@ -1080,7 +1097,7 @@ public class CompressedMatrixBlock extends MatrixBlock {
 		boolean m2C = m2 instanceof CompressedMatrixBlock;
 		boolean m3C = m3 instanceof CompressedMatrixBlock;
 		printDecompressWarning("aggregateTernaryOperations " + op.aggOp.getClass().getSimpleName() + " "
-			+ op.indexFn.getClass().getSimpleName() + "  " + op.aggOp.increOp.fn.getClass().getSimpleName() + " "
+			+ op.indexFn.getClass().getSimpleName() + " " + op.aggOp.increOp.fn.getClass().getSimpleName() + " "
 			+ op.binaryFn.getClass().getSimpleName() + " m1,m2,m3 " + m1C + " " + m2C + " " + m3C);
 		MatrixBlock left = getUncompressed();
 		MatrixBlock right1 = getUncompressed(m2);
diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java
index 20beaf5..5f45d56 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java
@@ -32,18 +32,21 @@ import org.apache.sysds.runtime.compress.colgroup.AColGroup;
 import org.apache.sysds.runtime.compress.colgroup.ColGroupConst;
 import org.apache.sysds.runtime.compress.colgroup.ColGroupEmpty;
 import org.apache.sysds.runtime.compress.colgroup.ColGroupFactory;
+import org.apache.sysds.runtime.compress.colgroup.ColGroupUncompressed;
 import org.apache.sysds.runtime.compress.colgroup.ColGroupValue;
 import org.apache.sysds.runtime.compress.colgroup.dictionary.Dictionary;
+import org.apache.sysds.runtime.compress.cost.ComputationCostEstimator;
 import org.apache.sysds.runtime.compress.cost.CostEstimatorFactory;
 import org.apache.sysds.runtime.compress.cost.ICostEstimate;
-import org.apache.sysds.runtime.compress.cost.MemoryCostEstimator;
 import org.apache.sysds.runtime.compress.estim.CompressedSizeEstimator;
 import org.apache.sysds.runtime.compress.estim.CompressedSizeEstimatorFactory;
 import org.apache.sysds.runtime.compress.estim.CompressedSizeInfo;
+import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup;
 import org.apache.sysds.runtime.compress.utils.DblArrayIntListHashMap;
 import org.apache.sysds.runtime.compress.utils.DoubleCountHashMap;
 import org.apache.sysds.runtime.compress.workload.WTreeRoot;
 import org.apache.sysds.runtime.controlprogram.parfor.stat.Timing;
+import org.apache.sysds.runtime.data.SparseBlock;
 import org.apache.sysds.runtime.matrix.data.LibMatrixReorg;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.utils.DMLCompressionStatistics;
@@ -65,7 +68,7 @@ public class CompressedMatrixBlockFactory {
 	private final CompressionSettings compSettings;
 	/** The main cost estimator used for the compression */
 	private final ICostEstimate costEstimator;
-	
+
 	/** Time stamp of last phase */
 	private double lastPhase;
 	/** Pointer to the original matrix Block that is about to be compressed. */
@@ -153,6 +156,20 @@ public class CompressedMatrixBlockFactory {
 	}
 
 	/**
+	 * Generate a CompressedMatrixBlock Object that contains a single uncompressed matrix block column group.
+	 * 
+	 * @param mb The matrix block to be contained in the uncompressed matrix block column,
+	 * @return a CompressedMatrixBlock
+	 */
+	public static CompressedMatrixBlock genUncompressedCompressedMatrixBlock(MatrixBlock mb) {
+		CompressedMatrixBlock ret = new CompressedMatrixBlock(mb.getNumRows(), mb.getNumColumns());
+		AColGroup cg = new ColGroupUncompressed(mb);
+		ret.allocateColGroup(cg);
+		ret.setNonZeros(mb.getNonZeros());
+		return ret;
+	}
+
+	/**
 	 * Method for constructing a compressed matrix out of an constant input.
 	 * 
 	 * Since the input is a constant value it is trivially compressable, therefore we skip the entire compression
@@ -191,9 +208,13 @@ public class CompressedMatrixBlockFactory {
 
 		res = new CompressedMatrixBlock(mb); // copy metadata and allocate soft reference
 
-		classifyPhase();
-		if(coCodeColGroups == null)
-			return abortCompression();
+		looksLikeOneHot();
+
+		if(coCodeColGroups == null) {
+			classifyPhase();
+			if(coCodeColGroups == null)
+				return abortCompression();
+		}
 
 		transposePhase();
 		compressPhase();
@@ -217,7 +238,13 @@ public class CompressedMatrixBlockFactory {
 		_stats.estimatedSizeCols = sizeInfos.memoryEstimate();
 		logPhase();
 
-		if(!(costEstimator instanceof MemoryCostEstimator) || _stats.estimatedSizeCols < _stats.originalSize)
+		final boolean isValidForComputeBasedCompression = isComputeBasedCompression() &&
+			(compSettings.minimumCompressionRatio != 1.0) ? _stats.estimatedSizeCols *
+				compSettings.minimumCompressionRatio < _stats.originalSize : true;
+		final boolean isValidForMemoryBasedCompression = _stats.estimatedSizeCols *
+			compSettings.minimumCompressionRatio < _stats.originalSize;
+
+		if(isValidForComputeBasedCompression || isValidForMemoryBasedCompression)
 			coCodePhase(sizeEstimator, sizeInfos, costEstimator);
 		else {
 			LOG.info("Estimated Size of singleColGroups: " + _stats.estimatedSizeCols);
@@ -225,13 +252,72 @@ public class CompressedMatrixBlockFactory {
 		}
 	}
 
+	private boolean isComputeBasedCompression() {
+		return costEstimator instanceof ComputationCostEstimator;
+	}
+
 	private void coCodePhase(CompressedSizeEstimator sizeEstimator, CompressedSizeInfo sizeInfos,
 		ICostEstimate costEstimator) {
 		coCodeColGroups = CoCoderFactory.findCoCodesByPartitioning(sizeEstimator, sizeInfos, k, costEstimator,
 			compSettings);
 
 		_stats.estimatedSizeCoCoded = coCodeColGroups.memoryEstimate();
+
 		logPhase();
+
+		// if cocode is estimated larger than uncompressed abort compression.
+		if(isComputeBasedCompression() &&
+			_stats.estimatedSizeCoCoded * compSettings.minimumCompressionRatio > _stats.originalSize) {
+
+			coCodeColGroups = null;
+			LOG.info("Aborting compression because the cocoded size : " + _stats.estimatedSizeCoCoded);
+			LOG.info("Vs original size                              : " + _stats.originalSize);
+		}
+
+	}
+
+	private void looksLikeOneHot() {
+		final int numColumns = mb.getNumColumns();
+		final int numRows = mb.getNumRows();
+		final long nnz = mb.getNonZeros();
+		final int colGroupSize = 100;
+		if(nnz == numRows) {
+			boolean onlyOneValues = true;
+			LOG.debug("Looks like one hot encoded.");
+			if(mb.isInSparseFormat()) {
+				final SparseBlock sb = mb.getSparseBlock();
+				for(double v : sb.get(0).values()) {
+					onlyOneValues = v == 1.0;
+					if(!onlyOneValues) {
+						break;
+					}
+				}
+			}
+			else {
+				final double[] vals = mb.getDenseBlock().values(0);
+				for(int i = 0; i < Math.min(vals.length, 1000); i++) {
+					double v = vals[i];
+					onlyOneValues = v == 1.0 || v == 0.0;
+					if(!onlyOneValues) {
+						break;
+					}
+				}
+			}
+			if(onlyOneValues) {
+				List<CompressedSizeInfoColGroup> ng = new ArrayList<>(numColumns / colGroupSize + 1);
+				for(int i = 0; i < numColumns; i += colGroupSize) {
+					int[] columnIds = new int[Math.min(colGroupSize, numColumns - i)];
+					for(int j = 0; j < columnIds.length; j++)
+						columnIds[j] = i + j;
+					ng.add(new CompressedSizeInfoColGroup(columnIds, Math.min(numColumns, colGroupSize), numRows));
+				}
+				coCodeColGroups = new CompressedSizeInfo(ng);
+
+				LOG.debug("Concluded that it probably is one hot encoded skipping analysis");
+				// skipping two phases
+				phase += 2;
+			}
+		}
 	}
 
 	private void transposePhase() {
diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java b/src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java
index 6dfcdae..ea04e8e 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java
@@ -103,10 +103,15 @@ public class CompressionSettings {
 	 */
 	public boolean transposed = false;
 
+	/**
+	 * The minimum compression ratio to achieve.
+	 */
+	public final double minimumCompressionRatio;
+
 	protected CompressionSettings(double samplingRatio, boolean allowSharedDictionary, String transposeInput,
 		 int seed, boolean lossy, EnumSet<CompressionType> validCompressions,
 		boolean sortValuesByLength, PartitionerType columnPartitioner, int maxColGroupCoCode, double coCodePercentage,
-		int minimumSampleSize, EstimationType estimationType, CostType costComputationType) {
+		int minimumSampleSize, EstimationType estimationType, CostType costComputationType, double minimumCompressionRatio) {
 		this.samplingRatio = samplingRatio;
 		this.allowSharedDictionary = allowSharedDictionary;
 		this.transposeInput = transposeInput;
@@ -120,6 +125,7 @@ public class CompressionSettings {
 		this.minimumSampleSize = minimumSampleSize;
 		this.estimationType = estimationType;
 		this.costComputationType = costComputationType;
+		this.minimumCompressionRatio = minimumCompressionRatio;
 		if(LOG.isDebugEnabled())
 			LOG.debug(this);
 	}
diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressionSettingsBuilder.java b/src/main/java/org/apache/sysds/runtime/compress/CompressionSettingsBuilder.java
index a56285b..2864118 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/CompressionSettingsBuilder.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/CompressionSettingsBuilder.java
@@ -45,6 +45,7 @@ public class CompressionSettingsBuilder {
 	private EstimationType estimationType = EstimationType.HassAndStokes;
 	private PartitionerType columnPartitioner;
 	private CostType costType;
+	private double minimumCompressionRatio = 1.0;
 
 	public CompressionSettingsBuilder() {
 
@@ -267,6 +268,11 @@ public class CompressionSettingsBuilder {
 		return this;
 	}
 
+	public CompressionSettingsBuilder setMinimumCompressionRatio(double ratio){
+		this.minimumCompressionRatio = ratio;
+		return this;
+	}
+
 	/**
 	 * Create the CompressionSettings object to use in the compression.
 	 * 
@@ -275,6 +281,6 @@ public class CompressionSettingsBuilder {
 	public CompressionSettings create() {
 		return new CompressionSettings(samplingRatio, allowSharedDictionary, transposeInput, seed, lossy,
 			validCompressions, sortValuesByLength, columnPartitioner, maxColGroupCoCode, coCodePercentage,
-			minimumSampleSize, estimationType, costType);
+			minimumSampleSize, estimationType, costType,minimumCompressionRatio);
 	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/compress/cocode/AColumnCoCoder.java b/src/main/java/org/apache/sysds/runtime/compress/cocode/AColumnCoCoder.java
index 41d986c..3bdcf5d 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/cocode/AColumnCoCoder.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/cocode/AColumnCoCoder.java
@@ -26,7 +26,6 @@ import org.apache.sysds.runtime.compress.cost.ICostEstimate;
 import org.apache.sysds.runtime.compress.estim.CompressedSizeEstimator;
 import org.apache.sysds.runtime.compress.estim.CompressedSizeInfo;
 import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup;
-import org.apache.sysds.runtime.compress.utils.Util;
 
 public abstract class AColumnCoCoder {
 
@@ -52,26 +51,17 @@ public abstract class AColumnCoCoder {
 	 */
 	protected abstract CompressedSizeInfo coCodeColumns(CompressedSizeInfo colInfos, int k);
 
-	protected CompressedSizeInfoColGroup join(CompressedSizeInfoColGroup lhs, CompressedSizeInfoColGroup rhs,
-		boolean analyze) {
-		return analyze ? joinWithAnalysis(lhs, rhs) : joinWithoutAnalysis(lhs, rhs);
+	protected CompressedSizeInfoColGroup join(int[] joined, CompressedSizeInfoColGroup lhs,
+		CompressedSizeInfoColGroup rhs, boolean analyze) {
+		return analyze ? _sest.estimateJoinCompressedSize(joined, lhs, rhs) : joinWithoutAnalysis(joined, lhs, rhs);
 	}
 
-	protected CompressedSizeInfoColGroup joinWithAnalysis(CompressedSizeInfoColGroup lhs,
+	protected CompressedSizeInfoColGroup joinWithoutAnalysis(int[] joined, CompressedSizeInfoColGroup lhs,
 		CompressedSizeInfoColGroup rhs) {
-		return _sest.estimateJoinCompressedSize(lhs, rhs);
-	}
-
-	protected CompressedSizeInfoColGroup joinWithoutAnalysis(CompressedSizeInfoColGroup lhs,
-		CompressedSizeInfoColGroup rhs) {
-		int[] joined = Util.join(lhs.getColumns(), rhs.getColumns());
 		final int lhsV = lhs.getNumVals();
 		final int rhsV = rhs.getNumVals();
-		final int numVals = lhsV * rhsV;
-		if(numVals < 0 || numVals > _sest.getNumRows())
-			return null;
-		else
-			return new CompressedSizeInfoColGroup(joined, numVals, _sest.getNumRows());
+		final int joinedMaxDistinct = (int) Math.min((long) lhsV * (long) rhsV, (long) _sest.getNumRows());
+		return new CompressedSizeInfoColGroup(joined, joinedMaxDistinct, _sest.getNumRows());
 	}
 
 	protected CompressedSizeInfoColGroup analyze(CompressedSizeInfoColGroup g) {
diff --git a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeBinPacking.java b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeBinPacking.java
index 1887bdc..1731ef2 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeBinPacking.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeBinPacking.java
@@ -141,7 +141,7 @@ public class CoCodeBinPacking extends AColumnCoCoder {
 			for(int j = 0; j < bins.size(); j++) {
 				double newBinWeight = binWeights[j] - c.getCardinalityRatio();
 				if(newBinWeight >= 0 && bins.get(j).getColumns().length < MAX_COL_PER_GROUP - 1) {
-					bins.set(j, joinWithoutAnalysis(bins.get(j), c));
+					bins.set(j, joinWithoutAnalysis(Util.join(bins.get(j).getColumns(), c.getColumns()),bins.get(j), c));
 					binWeights[j] = newBinWeight;
 					assigned = true;
 					break;
@@ -291,7 +291,7 @@ public class CoCodeBinPacking extends AColumnCoCoder {
 					g = CompressedSizeInfoColGroup.addConstGroup(c, left, _cs.validCompressions);
 				else {
 					st3++;
-					g = _sest.estimateJoinCompressedSize(left, right);
+					g = _sest.estimateJoinCompressedSize(c, left, right);
 				}
 
 				if(leftConst || rightConst)
diff --git a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeGreedy.java b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeGreedy.java
index 51bfa46..3091ba0 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeGreedy.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeGreedy.java
@@ -163,7 +163,7 @@ public class CoCodeGreedy extends AColumnCoCoder {
 					g = CompressedSizeInfoColGroup.addConstGroup(c, left, _cs.validCompressions);
 				else {
 					st3++;
-					g = _sest.estimateJoinCompressedSize(left, right);
+					g = _sest.estimateJoinCompressedSize(c, left, right);
 				}
 
 				if(leftConst || rightConst)
diff --git a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodePriorityQue.java b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodePriorityQue.java
index 2e0e7fb..27b678c 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodePriorityQue.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodePriorityQue.java
@@ -68,7 +68,7 @@ public class CoCodePriorityQue extends AColumnCoCoder {
 			while(que.peek() != null) {
 
 				CompressedSizeInfoColGroup r = que.poll();
-				final CompressedSizeInfoColGroup g = joinWithAnalysis(l, r);
+				final CompressedSizeInfoColGroup g = _sest.estimateJoinCompressedSize(l, r);
 				if(g != null) {
 					final double costOfJoin = _cest.getCostOfCollectionOfGroups(que, g);
 					if(costOfJoin < costBeforeJoin) {
@@ -93,7 +93,7 @@ public class CoCodePriorityQue extends AColumnCoCoder {
 			while(que.peek() != null) {
 				CompressedSizeInfoColGroup r = que.peek();
 				if(_cest.shouldTryJoin(l, r)) {
-					CompressedSizeInfoColGroup g = joinWithAnalysis(l, r);
+					CompressedSizeInfoColGroup g = _sest.estimateJoinCompressedSize(l, r);
 					if(g != null) {
 						double costOfJoin = _cest.getCostOfColumnGroup(g);
 						double costIndividual = _cest.getCostOfColumnGroup(l) + _cest.getCostOfColumnGroup(r);
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupUncompressed.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupUncompressed.java
index ab94646..b1b0540 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupUncompressed.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupUncompressed.java
@@ -132,6 +132,23 @@ public class ColGroupUncompressed extends AColGroup {
 		_data = data;
 	}
 
+	/**
+	 * Constructor for allocating a single uncompressed column group.
+	 * 
+	 * @param data matrix block
+	 */
+	public ColGroupUncompressed(MatrixBlock data) {
+		super(generateColumnList(data.getNumColumns()));
+		_data = data;
+	}
+
+	private static int[] generateColumnList(int nCol){
+		int[] cols = new int[nCol];
+		for(int i = 0; i< nCol; i++)
+			cols[i] = i;
+		return cols;
+	}
+
 	@Override
 	public CompressionType getCompType() {
 		return CompressionType.UNCOMPRESSED;
@@ -632,7 +649,7 @@ public class ColGroupUncompressed extends AColGroup {
 			double[] dv = colSum.getDenseBlockValues();
 			for(int i = 0; i < _colIndexes.length; i++)
 				c[_colIndexes[i]] += dv[i];
-			
+
 		}
 	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToFactory.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToFactory.java
index 5a72ff7..6c0b9fa 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToFactory.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToFactory.java
@@ -108,16 +108,13 @@ public final class MapToFactory {
 		final int nVL = left.getUnique();
 		final int nVR = right.getUnique();
 		final int size = left.size();
-		final int maxUnique = nVL * nVR;
+		final long maxUnique = nVL * nVR;
+		if(maxUnique > (long)Integer.MAX_VALUE)
+			throw new DMLCompressionException("Joining impossible using linearized join, since each side has a large number of unique values");
 		if(size != right.size())
 			throw new DMLCompressionException("Invalid input maps to join, must contain same number of rows");
 
-		try {
-			return computeJoin(left, right, size, nVL, maxUnique);
-		}
-		catch(Exception e) {
-			throw new DMLCompressionException("Joining failed max unique expected:" + maxUnique, e);
-		}
+		return computeJoin(left, right, size, nVL, (int)maxUnique);
 	}
 
 	private static AMapToData computeJoin(AMapToData left, AMapToData right, int size, int nVL, int maxUnique) {
@@ -141,7 +138,6 @@ public final class MapToFactory {
 		}
 
 		tmp.setUnique(newUID-1);
-		// LOG.error(Arrays.toString(map));
 		return tmp;
 	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/compress/cost/CostEstimatorBuilder.java b/src/main/java/org/apache/sysds/runtime/compress/cost/CostEstimatorBuilder.java
index d7f305c..3d58bff 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/cost/CostEstimatorBuilder.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/cost/CostEstimatorBuilder.java
@@ -125,6 +125,11 @@ public final class CostEstimatorBuilder implements Serializable {
 		numberOps += counter.scans + counter.leftMultiplications * 2 + counter.rightMultiplications +
 			counter.compressedMultiplications * 4 + counter.dictionaryOps;
 		numberOps -= counter.decompressions + counter.overlappingDecompressions;
+
+		if(counter.decompressions > 1 &&
+			counter.leftMultiplications + counter.rightMultiplications + counter.compressedMultiplications < 1)
+			// This condition is added for l2svm and mLogReg y dataset, that is compressing while it should not.
+			return false;
 		return numberOps > 4;
 
 	}
diff --git a/src/main/java/org/apache/sysds/runtime/compress/cost/CostEstimatorFactory.java b/src/main/java/org/apache/sysds/runtime/compress/cost/CostEstimatorFactory.java
index e8bff06..83b794e 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/cost/CostEstimatorFactory.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/cost/CostEstimatorFactory.java
@@ -45,7 +45,7 @@ public final class CostEstimatorFactory {
 					return b.create(nRows, nCols);
 				}
 				else
-					return new DistinctCostEstimator(nRows, cs);
+					return new MemoryCostEstimator();
 			case MEMORY:
 			default:
 				return new MemoryCostEstimator();
diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimator.java b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimator.java
index 3dd092c..ef7d9dd 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimator.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimator.java
@@ -82,7 +82,7 @@ public abstract class CompressedSizeEstimator {
 		return _numCols;
 	}
 
-	public MatrixBlock getData(){
+	public MatrixBlock getData() {
 		return _data;
 	}
 
@@ -163,7 +163,7 @@ public abstract class CompressedSizeEstimator {
 	 * @param colIndexes The columns to group together inside a ColGroup
 	 * @return The CompressedSizeInformation associated with the selected ColGroups.
 	 */
-	public CompressedSizeInfoColGroup estimateCompressedColGroupSize(int[] colIndexes){
+	public CompressedSizeInfoColGroup estimateCompressedColGroupSize(int[] colIndexes) {
 		return estimateCompressedColGroupSize(colIndexes, 8, getNumRows());
 	}
 
@@ -173,7 +173,7 @@ public abstract class CompressedSizeEstimator {
 	 * the number estimated in sub groups of the given colIndexes.
 	 * 
 	 * @param colIndexes         The columns to extract compression information from
-	 * @param estimate 			 An estimate of number of unique elements in these columns
+	 * @param estimate           An estimate of number of unique elements in these columns
 	 * @param nrUniqueUpperBound The upper bound of unique elements allowed in the estimate, can be calculated from the
 	 *                           number of unique elements estimated in sub columns multiplied together. This is
 	 *                           flexible in the sense that if the sample is small then this unique can be manually
@@ -181,13 +181,16 @@ public abstract class CompressedSizeEstimator {
 	 * 
 	 * @return The CompressedSizeInfoColGroup fro the given column indexes.
 	 */
-	public abstract CompressedSizeInfoColGroup estimateCompressedColGroupSize(int[] colIndexes, int estimate, int nrUniqueUpperBound);
+	public abstract CompressedSizeInfoColGroup estimateCompressedColGroupSize(int[] colIndexes, int estimate,
+		int nrUniqueUpperBound);
 
 	/**
 	 * Join two analyzed column groups together. without materializing the dictionaries of either side.
 	 * 
-	 * If either side was constructed without analysis then fall back to default materialization of double arrays.
+	 * if the number of distinct elements in both sides multiplied is larger than Integer, return null.
 	 * 
+	 * If either side was constructed without analysis then fall back to default materialization of double arrays.
+	 * O
 	 * @param g1 First group
 	 * @param g2 Second group
 	 * @return A joined compressed size estimation for the group.
@@ -195,19 +198,39 @@ public abstract class CompressedSizeEstimator {
 	public CompressedSizeInfoColGroup estimateJoinCompressedSize(CompressedSizeInfoColGroup g1,
 		CompressedSizeInfoColGroup g2) {
 		final int[] joined = Util.join(g1.getColumns(), g2.getColumns());
+		return estimateJoinCompressedSize(joined, g1, g2);
+	}
+
+	/**
+	 * Join two analyzed column groups together. without materializing the dictionaries of either side.
+	 * 
+	 * if the number of distinct elements in both sides multiplied is larger than Integer, return null.
+	 * 
+	 * If either side was constructed without analysis then fall back to default materialization of double arrays.
+	 * 
+	 * @param joined The joined column indexes.
+	 * @param g1     First group
+	 * @param g2     Second group
+	 * @return A joined compressed size estimation for the group.
+	 */
+	public CompressedSizeInfoColGroup estimateJoinCompressedSize(int[] joined, CompressedSizeInfoColGroup g1,
+		CompressedSizeInfoColGroup g2) {
 		final int g1V = g1.getNumVals();
 		final int g2V = g2.getNumVals();
-		if(g1V * g2V < 0 || g1V * g2V > getNumRows())
+		if((long) g1V * g2V > (long) Integer.MAX_VALUE)
 			return null;
-		else if((g1.getMap() == null && g2V != 0) || (g2.getMap() == null && g2V != 0))
-			return estimateCompressedColGroupSize(joined, Math.max(g1V + 1, g2V+ 1), Math.min((g1V + 1) * (g2V + 1), getNumRows()));
+
+		final int joinedMaxDistinct = (int) Math.min((long) g1V * (long) g2V, getNumRows());
+		if((g1.getMap() == null && g2V != 0) || (g2.getMap() == null && g2V != 0))
+			return estimateCompressedColGroupSize(joined, Math.max(g1V + 1, g2V + 1),
+				Math.min((g1V + 1) * (g2V + 1), getNumRows()));
 		else
-			return estimateJoinCompressedSize(joined, g1, g2);
+			return estimateJoinCompressedSize(joined, g1, g2, joinedMaxDistinct);
 
 	}
 
 	protected abstract CompressedSizeInfoColGroup estimateJoinCompressedSize(int[] joinedcols,
-		CompressedSizeInfoColGroup g1, CompressedSizeInfoColGroup g2);
+		CompressedSizeInfoColGroup g1, CompressedSizeInfoColGroup g2, int joinedMaxDistinct);
 
 	/**
 	 * Method used to extract the CompressedSizeEstimationFactors from an constructed UncompressedBitmap. Note this
diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorExact.java b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorExact.java
index 824034a..1531781 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorExact.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorExact.java
@@ -45,8 +45,9 @@ public class CompressedSizeEstimatorExact extends CompressedSizeEstimator {
 	}
 
 	@Override
-	public CompressedSizeInfoColGroup estimateJoinCompressedSize(int[] joined, CompressedSizeInfoColGroup g1,
-		CompressedSizeInfoColGroup g2) {
+	protected CompressedSizeInfoColGroup estimateJoinCompressedSize(int[] joined, CompressedSizeInfoColGroup g1,
+		CompressedSizeInfoColGroup g2, int joinedMaxDistinct) {
+
 		AMapToData map = MapToFactory.join(g1.getMap(), g2.getMap());
 		EstimationFactors em = EstimationFactors.computeSizeEstimation(joined, map,
 			_cs.validCompressions.contains(CompressionType.RLE), _numRows, false);
diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorFactory.java b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorFactory.java
index 8b42258..6e9208a 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorFactory.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorFactory.java
@@ -31,22 +31,31 @@ public class CompressedSizeEstimatorFactory {
 
 	public static CompressedSizeEstimator getSizeEstimator(MatrixBlock data, CompressionSettings cs, int k) {
 
-		final int nRows = cs.transposed ? data.getNumColumns() :  data.getNumRows();
+		final int nRows = cs.transposed ? data.getNumColumns() : data.getNumRows();
 		final int nCols = cs.transposed ? data.getNumRows() : data.getNumColumns();
 		final int nnzRows = (int) Math.ceil(data.getNonZeros() / nCols);
 
 		final double sampleRatio = cs.samplingRatio;
 		final int sampleSize = Math.min(getSampleSize(sampleRatio, nRows, cs.minimumSampleSize), maxSampleSize);
-		if(shouldUseExactEstimator(cs, nRows, sampleSize, nnzRows)) {
-			if(sampleSize > nnzRows && nRows > 10000 && nCols > 10 && !cs.transposed) {
-				data = LibMatrixReorg.transpose(data,
-					new MatrixBlock(data.getNumColumns(), data.getNumRows(), data.isInSparseFormat()), k);
-				cs.transposed = true;
-			}
-			return new CompressedSizeEstimatorExact(data, cs);
+
+		if(nCols > 1000) {
+			return tryToMakeSampleEstimator(data, cs, sampleRatio, sampleSize / 10, nRows, nnzRows, k);
 		}
 		else {
-			return tryToMakeSampleEstimator(data, cs, sampleRatio, sampleSize, nRows, nnzRows, k);
+			if(shouldUseExactEstimator(cs, nRows, sampleSize, nnzRows)) {
+				if(sampleSize > nnzRows && nRows > 10000 && nCols > 10 && !cs.transposed) {
+					LOG.info("Transposing for exact estimator");
+					data = LibMatrixReorg.transpose(data,
+						new MatrixBlock(data.getNumColumns(), data.getNumRows(), data.isInSparseFormat()), k);
+					cs.transposed = true;
+				}
+				LOG.info("Using Exact estimator");
+				return new CompressedSizeEstimatorExact(data, cs);
+			}
+			else {
+				LOG.info("Trying sample size: " + sampleSize);
+				return tryToMakeSampleEstimator(data, cs, sampleRatio, sampleSize, nRows, nnzRows, k);
+			}
 		}
 
 	}
@@ -54,8 +63,9 @@ public class CompressedSizeEstimatorFactory {
 	private static CompressedSizeEstimator tryToMakeSampleEstimator(MatrixBlock data, CompressionSettings cs,
 		double sampleRatio, int sampleSize, int nRows, int nnzRows, int k) {
 		CompressedSizeEstimatorSample estS = new CompressedSizeEstimatorSample(data, cs, sampleSize, k);
+		int double_number = 1;
 		while(estS.getSample() == null) {
-			LOG.warn("Doubling sample size");
+			LOG.error("Warining doubling sample size " + double_number++);
 			sampleSize = sampleSize * 2;
 			if(shouldUseExactEstimator(cs, nRows, sampleSize, nnzRows))
 				return new CompressedSizeEstimatorExact(data, cs);
diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorSample.java b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorSample.java
index 24ee358..218e12b 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorSample.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorSample.java
@@ -107,18 +107,17 @@ public class CompressedSizeEstimatorSample extends CompressedSizeEstimator {
 	}
 
 	@Override
-	public CompressedSizeInfoColGroup estimateJoinCompressedSize(int[] joined, CompressedSizeInfoColGroup g1,
-		CompressedSizeInfoColGroup g2) {
-		final int g1V = g1.getMap().getUnique();
-		final int g2V = g2.getMap().getUnique();
-		final int nrUniqueUpperBound = g1V * g2V;
+	protected CompressedSizeInfoColGroup estimateJoinCompressedSize(int[] joined, CompressedSizeInfoColGroup g1,
+		CompressedSizeInfoColGroup g2, int joinedMaxDistinct) {
+		if((long)g1.getNumVals() * g2.getNumVals() >(long)Integer.MAX_VALUE )
+			return null;
 
 		final AMapToData map = MapToFactory.join(g1.getMap(), g2.getMap());
 		EstimationFactors sampleFacts = EstimationFactors.computeSizeEstimation(joined, map,
 			_cs.validCompressions.contains(CompressionType.RLE), map.size(), false);
 
 		// result facts
-		EstimationFactors em = estimateCompressionFactors(sampleFacts, map, joined, nrUniqueUpperBound);
+		EstimationFactors em = estimateCompressionFactors(sampleFacts, map, joined, joinedMaxDistinct);
 		return new CompressedSizeInfoColGroup(em, _cs.validCompressions, map);
 	}
 
diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java
index e2b7491..1026aa2 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java
@@ -66,7 +66,7 @@ public class CompressedSizeInfoColGroup {
 		_facts = new EstimationFactors(columns, numVals, numRows);
 		_cardinalityRatio = (double) numVals / numRows;
 		_sizes = null;
-		_bestCompressionType = null;
+		_bestCompressionType = CompressionType.DDC;
 		_minSize = ColGroupSizes.estimateInMemorySizeDDC(columns.length, numVals, numRows, 1.0, false);
 		_map = null;
 	}
diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibAppend.java b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibAppend.java
index fc086e6..ab60d9f 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibAppend.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibAppend.java
@@ -22,12 +22,10 @@ package org.apache.sysds.runtime.compress.lib;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
 import org.apache.sysds.runtime.compress.CompressedMatrixBlockFactory;
-import org.apache.sysds.runtime.compress.CompressionStatistics;
 import org.apache.sysds.runtime.compress.colgroup.AColGroup;
 import org.apache.sysds.runtime.compress.colgroup.ColGroupEmpty;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
@@ -41,24 +39,18 @@ public class CLALibAppend {
 		if(left.isEmpty() && right instanceof CompressedMatrixBlock)
 			return appendLeftEmpty(left, (CompressedMatrixBlock) right);
 		else if(right.isEmpty() && left instanceof CompressedMatrixBlock)
-			return appendRightEmpty((CompressedMatrixBlock)left, right);
+			return appendRightEmpty((CompressedMatrixBlock) left, right);
 
 		final int m = left.getNumRows();
 		final int n = left.getNumColumns() + right.getNumColumns();
 
-		// try to compress both sides (if not already compressed).
 		if(!(left instanceof CompressedMatrixBlock) && m > 1000) {
-			LOG.warn("Compressing left for append operation");
-			Pair<MatrixBlock, CompressionStatistics> x = CompressedMatrixBlockFactory.compress(left);
-			if(x.getRight().getRatio() > 3.0)
-				left = x.getLeft();
-
+			LOG.info("Appending uncompressed column group left");
+			left = CompressedMatrixBlockFactory.genUncompressedCompressedMatrixBlock(left);
 		}
 		if(!(right instanceof CompressedMatrixBlock) && m > 1000) {
-			LOG.warn("Compressing right for append operation");
-			Pair<MatrixBlock, CompressionStatistics> x = CompressedMatrixBlockFactory.compress(right);
-			if(x.getRight().getRatio() > 3.0)
-				right = x.getLeft();
+			LOG.warn("Appending uncompressed column group right");
+			left = CompressedMatrixBlockFactory.genUncompressedCompressedMatrixBlock(right);
 		}
 
 		// if compression failed then use default append method.
@@ -72,14 +64,22 @@ public class CLALibAppend {
 		CompressedMatrixBlock ret = new CompressedMatrixBlock(m, n);
 
 		ret = appendColGroups(ret, leftC.getColGroups(), rightC.getColGroups(), leftC.getNumColumns());
-		return ret;
+
+		double compressedSize = ret.getInMemorySize();
+		double uncompressedSize = MatrixBlock.estimateSizeInMemory(m,n, ret.getSparsity());
+
+		
+		if(compressedSize * 10 < uncompressedSize)
+			return ret;
+		else
+			return ret.getUncompressed("Decompressing c bind matrix");
 	}
 
 	private static MatrixBlock appendRightEmpty(CompressedMatrixBlock left, MatrixBlock right) {
 
 		final int m = left.getNumRows();
 		final int n = left.getNumColumns() + right.getNumColumns();
-		CompressedMatrixBlock ret = new CompressedMatrixBlock(m,n);
+		CompressedMatrixBlock ret = new CompressedMatrixBlock(m, n);
 
 		List<AColGroup> newGroup = new ArrayList<>(1);
 		newGroup.add(ColGroupEmpty.generate(right.getNumColumns(), right.getNumRows()));
@@ -91,7 +91,7 @@ public class CLALibAppend {
 	private static MatrixBlock appendLeftEmpty(MatrixBlock left, CompressedMatrixBlock right) {
 		final int m = left.getNumRows();
 		final int n = left.getNumColumns() + right.getNumColumns();
-		CompressedMatrixBlock ret = new CompressedMatrixBlock(m,n);
+		CompressedMatrixBlock ret = new CompressedMatrixBlock(m, n);
 
 		List<AColGroup> newGroup = new ArrayList<>(1);
 		newGroup.add(ColGroupEmpty.generate(left.getNumColumns(), left.getNumRows()));
diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibBinaryCellOp.java b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibBinaryCellOp.java
index 79cf1c3..99d9c92 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibBinaryCellOp.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibBinaryCellOp.java
@@ -19,7 +19,6 @@
 
 package org.apache.sysds.runtime.compress.lib;
 
-import java.lang.ref.SoftReference;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Callable;
@@ -59,33 +58,41 @@ public class CLALibBinaryCellOp {
 
 	private static final Log LOG = LogFactory.getLog(CLALibBinaryCellOp.class.getName());
 
-	public static MatrixBlock binaryOperations(BinaryOperator op, CompressedMatrixBlock m1, MatrixValue thatValue,
-		MatrixValue result) {
+	public static MatrixBlock binaryOperations(BinaryOperator op, CompressedMatrixBlock m1, MatrixBlock thatValue,
+		MatrixBlock result) {
 		MatrixBlock that = CompressedMatrixBlock.getUncompressed(thatValue, "Decompressing right side in BinaryOps");
 		if(m1.getNumRows() <= 0)
 			LOG.error(m1);
-		if(thatValue.getNumRows() <= 0)
-			LOG.error(thatValue);
+		if(that.getNumRows() <= 0)
+			LOG.error(that);
 		LibMatrixBincell.isValidDimensionsBinary(m1, that);
-		thatValue = that;
 		BinaryAccessType atype = LibMatrixBincell.getBinaryAccessType(m1, that);
-		return selectProcessingBasedOnAccessType(op, m1, thatValue, result, atype, false);
+		return selectProcessingBasedOnAccessType(op, m1, that, result, atype, false);
 	}
 
-	public static MatrixBlock binaryOperationsLeft(BinaryOperator op, CompressedMatrixBlock m1, MatrixValue thatValue,
-		MatrixValue result) {
+	public static MatrixBlock binaryOperationsLeft(BinaryOperator op, CompressedMatrixBlock m1, MatrixBlock thatValue,
+		MatrixBlock result) {
 		MatrixBlock that = CompressedMatrixBlock.getUncompressed(thatValue, "Decompressing left side in BinaryOps");
 		LibMatrixBincell.isValidDimensionsBinary(that, m1);
 		thatValue = that;
 		BinaryAccessType atype = LibMatrixBincell.getBinaryAccessType(that, m1);
-		return selectProcessingBasedOnAccessType(op, m1, thatValue, result, atype, true);
+		return selectProcessingBasedOnAccessType(op, m1, that, result, atype, true);
 	}
 
 	private static MatrixBlock selectProcessingBasedOnAccessType(BinaryOperator op, CompressedMatrixBlock m1,
-		MatrixValue thatValue, MatrixValue result, BinaryAccessType atype, boolean left) {
-		MatrixBlock that = (MatrixBlock) thatValue;
-		if(atype == BinaryAccessType.MATRIX_COL_VECTOR)
-			return binaryMVCol(m1, that, op, left);
+		MatrixBlock that, MatrixBlock result, BinaryAccessType atype, boolean left) {
+		if(atype == BinaryAccessType.MATRIX_COL_VECTOR) {
+			MatrixBlock d_compressed = m1.getCachedDecompressed();
+			if(d_compressed != null) {
+				if(left)
+					return that.binaryOperations(op, d_compressed, result);
+				else
+					return d_compressed.binaryOperations(op, that, result);
+			}
+			else
+				return binaryMVCol(m1, that, op, left);
+
+		}
 		else if(atype == BinaryAccessType.MATRIX_MATRIX) {
 			if(that.isEmpty()) {
 				ScalarOperator sop = left ? new LeftScalarOperator(op.fn, 0, -1) : new RightScalarOperator(op.fn, 0,
@@ -93,8 +100,7 @@ public class CLALibBinaryCellOp {
 				return CLALibScalar.scalarOperations(sop, m1, result);
 			}
 			else {
-				SoftReference<MatrixBlock> msf = m1.getSoftReferenceToDecompressed();
-				MatrixBlock d_compressed = msf != null ? msf.get() : null;
+				MatrixBlock d_compressed = m1.getCachedDecompressed();
 				if(d_compressed != null) {
 					// copy the decompressed matrix if there is a decompressed matrix already.
 					MatrixBlock tmp = d_compressed;
@@ -117,7 +123,7 @@ public class CLALibBinaryCellOp {
 			return bincellOp(m1, that, setupCompressedReturnMatrixBlock(m1, result), op, left);
 		else {
 			LOG.warn("Decompressing since Binary Ops" + op.fn + " is not supported compressed");
-			return CompressedMatrixBlock.getUncompressed(m1).binaryOperations(op, thatValue, result);
+			return CompressedMatrixBlock.getUncompressed(m1).binaryOperations(op, that, result);
 		}
 	}
 
@@ -295,7 +301,7 @@ public class CLALibBinaryCellOp {
 		final int blkz = CompressionSettings.BITMAP_BLOCK_SZ;
 		final int k = op.getNumThreads();
 		long nnz = 0;
-		;
+
 		if(k <= 1) {
 			for(int i = 0; i * blkz < m1.getNumRows(); i++) {
 				if(left)
diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibCompAgg.java b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibCompAgg.java
index 892ec95..5c9f7b2 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibCompAgg.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibCompAgg.java
@@ -34,6 +34,7 @@ import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
 import org.apache.sysds.runtime.compress.CompressionSettings;
 import org.apache.sysds.runtime.compress.DMLCompressionException;
 import org.apache.sysds.runtime.compress.colgroup.AColGroup;
+import org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
 import org.apache.sysds.runtime.functionobjects.Builtin;
 import org.apache.sysds.runtime.functionobjects.Builtin.BuiltinCode;
 import org.apache.sysds.runtime.functionobjects.IndexFunction;
@@ -50,7 +51,6 @@ import org.apache.sysds.runtime.matrix.data.LibMatrixAgg;
 import org.apache.sysds.runtime.matrix.data.LibMatrixBincell;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
-import org.apache.sysds.runtime.matrix.data.MatrixValue;
 import org.apache.sysds.runtime.matrix.data.MatrixValue.CellIndex;
 import org.apache.sysds.runtime.matrix.operators.AggregateOperator;
 import org.apache.sysds.runtime.matrix.operators.AggregateUnaryOperator;
@@ -71,43 +71,64 @@ public class CLALibCompAgg {
 		}
 	};
 
-	public static MatrixBlock aggregateUnary(CompressedMatrixBlock inputMatrix, MatrixValue result,
+	public static MatrixBlock aggregateUnary(CompressedMatrixBlock inputMatrix, MatrixBlock result,
 		AggregateUnaryOperator op, int blen, MatrixIndexes indexesIn, boolean inCP) {
 
 		// prepare output dimensions
 		CellIndex tempCellIndex = new CellIndex(-1, -1);
 		op.indexFn.computeDimension(inputMatrix.getNumRows(), inputMatrix.getNumColumns(), tempCellIndex);
 
+		if(requireDecompression(inputMatrix, op)) {
+			// Decide if we should use the cached decompressed Version, or we should decompress.
+			final double denseSize = MatrixBlock.estimateSizeDenseInMemory(inputMatrix.getNumRows(),
+				inputMatrix.getNumColumns());
+			final double currentSize = inputMatrix.getInMemorySize();
+			final double localMaxMemory = InfrastructureAnalyzer.getLocalMaxMemory();
+
+			if(denseSize < 5 * currentSize && inputMatrix.getColGroups().size() > 5 &&
+				denseSize <= localMaxMemory / 2) {
+				LOG.info("Decompressing for unaryAggregate because of overlapping state");
+				inputMatrix.decompress(op.getNumThreads());
+			}
+			MatrixBlock decomp = inputMatrix.getCachedDecompressed();
+			if(decomp != null)
+				return decomp.aggregateUnaryOperations(op, result, blen, indexesIn, inCP);
+		}
+		
 		// initialize and allocate the result
 		if(result == null)
 			result = new MatrixBlock(tempCellIndex.row, tempCellIndex.column, false);
 		else
 			result.reset(tempCellIndex.row, tempCellIndex.column, false);
-		MatrixBlock ret = (MatrixBlock) result;
 
-		ret.allocateDenseBlock();
+		result.allocateDenseBlock();
 
 		AggregateUnaryOperator opm = replaceKahnOperations(op);
 
 		if(inputMatrix.getColGroups() != null) {
-			fillStart(ret, opm);
+			fillStart(result, opm);
 
-			if(inputMatrix.isOverlapping() &&
-				(opm.aggOp.increOp.fn instanceof KahanPlusSq || (opm.aggOp.increOp.fn instanceof Builtin &&
-					(((Builtin) opm.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MIN ||
-						((Builtin) opm.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MAX))))
-				aggregateUnaryOverlapping(inputMatrix, ret, opm, indexesIn, inCP);
+			if(requireDecompression(inputMatrix, opm))
+				aggregateUnaryOverlapping(inputMatrix, result, opm, indexesIn, inCP);
 			else
-				aggregateUnaryNormalCompressedMatrixBlock(inputMatrix, ret, opm, blen, indexesIn, inCP);
+				aggregateUnaryNormalCompressedMatrixBlock(inputMatrix, result, opm, blen, indexesIn, inCP);
 		}
-		ret.recomputeNonZeros();
+		
+		result.recomputeNonZeros();
 		if(op.aggOp.existsCorrection() && !inCP) {
-			ret = addCorrection(ret, op);
+			result = addCorrection(result, op);
 			if(op.aggOp.increOp.fn instanceof Mean)
-				ret = addCellCount(ret, op, inputMatrix.getNumRows(), inputMatrix.getNumColumns());
+				result = addCellCount(result, op, inputMatrix.getNumRows(), inputMatrix.getNumColumns());
 		}
-		return ret;
+		return result;
+
+	}
 
+	private static boolean requireDecompression(CompressedMatrixBlock inputMatrix, AggregateUnaryOperator op) {
+		return inputMatrix.isOverlapping() &&
+			(op.aggOp.increOp.fn instanceof KahanPlusSq || (op.aggOp.increOp.fn instanceof Builtin &&
+				(((Builtin) op.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MIN ||
+					((Builtin) op.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MAX)));
 	}
 
 	private static MatrixBlock addCorrection(MatrixBlock ret, AggregateUnaryOperator op) {
diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibRelationalOp.java b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibRelationalOp.java
deleted file mode 100644
index d4fbb7e..0000000
--- a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibRelationalOp.java
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-// package org.apache.sysds.runtime.compress.lib;
-
-// import java.util.ArrayList;
-// import java.util.Arrays;
-// import java.util.List;
-// import java.util.concurrent.Callable;
-// import java.util.concurrent.ExecutionException;
-// import java.util.concurrent.ExecutorService;
-// import java.util.concurrent.Future;
-
-// import org.apache.sysds.hops.OptimizerUtils;
-// import org.apache.sysds.runtime.DMLRuntimeException;
-// import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
-// import org.apache.sysds.runtime.compress.CompressionSettings;
-// import org.apache.sysds.runtime.compress.colgroup.AColGroup;
-// import org.apache.sysds.runtime.compress.colgroup.ColGroupConst;
-// import org.apache.sysds.runtime.compress.colgroup.dictionary.Dictionary;
-// import org.apache.sysds.runtime.functionobjects.Equals;
-// import org.apache.sysds.runtime.functionobjects.GreaterThan;
-// import org.apache.sysds.runtime.functionobjects.GreaterThanEquals;
-// import org.apache.sysds.runtime.functionobjects.LessThan;
-// import org.apache.sysds.runtime.functionobjects.LessThanEquals;
-// import org.apache.sysds.runtime.functionobjects.NotEquals;
-// import org.apache.sysds.runtime.matrix.data.MatrixBlock;
-// import org.apache.sysds.runtime.matrix.operators.LeftScalarOperator;
-// import org.apache.sysds.runtime.matrix.operators.RightScalarOperator;
-// import org.apache.sysds.runtime.matrix.operators.ScalarOperator;
-// import org.apache.sysds.runtime.util.CommonThreadPool;
-
-// /**
-//  * This class is used for relational operators that return binary values depending on individual cells values in the
-//  * compression. This indicate that the resulting vectors/matrices are amenable to compression since they only contain
-//  * two distinct values, true or false.
-//  * 
-//  */
-// public class CLALibRelationalOp {
-// 	// private static final Log LOG = LogFactory.getLog(LibRelationalOp.class.getName());
-
-// 	/** Thread pool matrix Block for materializing decompressed groups. */
-// 	private static ThreadLocal<MatrixBlock> memPool = new ThreadLocal<MatrixBlock>() {
-// 		@Override
-// 		protected MatrixBlock initialValue() {
-// 			return null;
-// 		}
-// 	};
-
-// 	protected static boolean isValidForRelationalOperation(ScalarOperator sop, CompressedMatrixBlock m1) {
-// 		return m1.isOverlapping() &&
-// 			(sop.fn instanceof LessThan || sop.fn instanceof LessThanEquals || sop.fn instanceof GreaterThan ||
-// 				sop.fn instanceof GreaterThanEquals || sop.fn instanceof Equals || sop.fn instanceof NotEquals);
-// 	}
-
-// 	// public static MatrixBlock overlappingRelativeRelationalOperation(ScalarOperator sop, CompressedMatrixBlock m1) {
-
-// 	// 	List<AColGroup> colGroups = m1.getColGroups();
-// 	// 	boolean less = ((sop.fn instanceof LessThan || sop.fn instanceof LessThanEquals) &&
-// 	// 		sop instanceof LeftScalarOperator) ||
-// 	// 		(sop instanceof RightScalarOperator &&
-// 	// 			(sop.fn instanceof GreaterThan || sop.fn instanceof GreaterThanEquals));
-// 	// 	double v = sop.getConstant();
-// 	// 	double min = m1.min();
-// 	// 	double max = m1.max();
-
-// 	// 	// Shortcut:
-// 	// 	// If we know worst case min and worst case max and the values to compare to in all cases is
-// 	// 	// less then or greater than worst then we can return a full matrix with either 1 or 0.
-
-// 	// 	if(v < min || v > max) {
-// 	// 		if(sop.fn instanceof Equals) {
-// 	// 			return makeConstZero(m1.getNumRows(), m1.getNumColumns());
-// 	// 		}
-// 	// 		else if(sop.fn instanceof NotEquals) {
-// 	// 			return makeConstOne(m1.getNumRows(), m1.getNumColumns());
-// 	// 		}
-// 	// 		else if(less) {
-// 	// 			if(v < min || ((sop.fn instanceof LessThanEquals || sop.fn instanceof GreaterThan) && v <= min))
-// 	// 				return makeConstOne(m1.getNumRows(), m1.getNumColumns());
-// 	// 			else
-// 	// 				return makeConstZero(m1.getNumRows(), m1.getNumColumns());
-// 	// 		}
-// 	// 		else {
-// 	// 			if(v > max || ((sop.fn instanceof LessThanEquals || sop.fn instanceof GreaterThan) && v >= max))
-// 	// 				return makeConstOne(m1.getNumRows(), m1.getNumColumns());
-// 	// 			else
-// 	// 				return makeConstZero(m1.getNumRows(), m1.getNumColumns());
-// 	// 		}
-// 	// 	}
-// 	// 	else {
-// 	// 		return processNonConstant(sop, minMax, min, max, m1.getNumRows(), m1.getNumColumns(), less);
-// 	// 	}
-
-// 	// }
-
-// 	private static MatrixBlock makeConstOne(int rows, int cols) {
-// 	// 	List<AColGroup> newColGroups = new ArrayList<>();
-// 	// 	int[] colIndexes = new int[cols];
-// 	// 	for(int i = 0; i < colIndexes.length; i++) {
-// 	// 		colIndexes[i] = i;
-// 	// 	}
-// 	// 	double[] values = new double[cols];
-// 	// 	Arrays.fill(values, 1);
-
-// 	// 	newColGroups.add(new ColGroupConst(colIndexes, rows, new Dictionary(values)));
-// 	// 	CompressedMatrixBlock ret = new CompressedMatrixBlock(rows, cols);
-// 	// 	ret.allocateColGroupList(newColGroups);
-// 	// 	ret.setNonZeros(cols * rows);
-// 	// 	ret.setOverlapping(false);
-// 	// 	return ret;
-// 	// }
-
-// 	// private static MatrixBlock makeConstZero(int rows, int cols) {
-// 	// 	MatrixBlock sb = new MatrixBlock(rows, cols, true, 0);
-// 	// 	return sb;
-// 	// }
-
-// 	// private static MatrixBlock processNonConstant(ScalarOperator sop, MinMaxGroup[] minMax, double minS, double maxS,
-// 	// 	final int rows, final int cols, boolean less) {
-
-// 	// 	// BitSet res = new BitSet(ret.getNumColumns() * ret.getNumRows());
-// 	// 	MatrixBlock res = new MatrixBlock(rows, cols, true, 0).allocateBlock();
-// 	// 	int k = OptimizerUtils.getConstrainedNumThreads(-1);
-// 	// 	int outRows = rows;
-// 	// 	long nnz = 0;
-// 	// 	if(k == 1) {
-// 	// 		final int b = CompressionSettings.BITMAP_BLOCK_SZ / cols;
-// 	// 		final int blkz = (outRows < b) ? outRows : b;
-
-// 	// 		MatrixBlock tmp = new MatrixBlock(blkz, cols, false, -1).allocateBlock();
-// 	// 		for(int i = 0; i * blkz < outRows; i++) {
-// 	// 			for(MinMaxGroup mmg : minMax) 
-// 	// 				mmg.g.decompressToBlockUnSafe(tmp, i * blkz, Math.min((i + 1) * blkz, rows), 0);
-				
-// 	// 			for(int row = 0; row < blkz && row < rows - i * blkz; row++) {
-// 	// 				int off = (row + i * blkz);
-// 	// 				for(int col = 0; col < cols; col++) {
-// 	// 					res.quickSetValue(off, col, sop.executeScalar(tmp.quickGetValue(row, col)));
-// 	// 					if(res.quickGetValue(off, col) != 0) {
-// 	// 						nnz++;
-// 	// 					}
-// 	// 				}
-// 	// 			}
-// 	// 		}
-// 	// 		tmp.reset();
-// 	// 		res.setNonZeros(nnz);
-// 	// 	}
-// 	// 	else {
-// 	// 		final int blkz = CompressionSettings.BITMAP_BLOCK_SZ / 2;
-// 	// 		ExecutorService pool = CommonThreadPool.get(k);
-// 	// 		ArrayList<RelationalTask> tasks = new ArrayList<>();
-
-// 	// 		try {
-// 	// 			for(int i = 0; i * blkz < outRows; i++) {
-// 	// 				RelationalTask rt = new RelationalTask(minMax, i, blkz, res, rows, cols, sop);
-// 	// 				tasks.add(rt);
-// 	// 			}
-// 	// 			List<Future<Object>> futures = pool.invokeAll(tasks);
-// 	// 			pool.shutdown();
-// 	// 			for(Future<Object> f : futures)
-// 	// 				f.get();
-// 	// 		}
-// 	// 		catch(InterruptedException | ExecutionException e) {
-// 	// 			e.printStackTrace();
-// 	// 			throw new DMLRuntimeException(e);
-// 	// 		}
-
-// 	// 	}
-// 	// 	memPool.remove();
-
-// 	// 	return res;
-// 	// }
-
-// 	// protected static class MinMaxGroup implements Comparable<MinMaxGroup> {
-// 	// 	double min;
-// 	// 	double max;
-// 	// 	AColGroup g;
-// 	// 	double[] values;
-
-// 	// 	public MinMaxGroup(double min, double max, AColGroup g) {
-// 	// 		this.min = min;
-// 	// 		this.max = max;
-// 	// 		this.g = g;
-
-// 	// 		this.values = g.getValues();
-// 	// 	}
-
-// 	// 	@Override
-// 	// 	public int compareTo(MinMaxGroup o) {
-// 	// 		double t = max - min;
-// 	// 		double ot = o.max - o.min;
-// 	// 		return Double.compare(t, ot);
-// 	// 	}
-
-// 	// 	@Override
-// 	// 	public String toString() {
-// 	// 		StringBuilder sb = new StringBuilder();
-// 	// 		sb.append("MMG: ");
-// 	// 		sb.append("[" + min + "," + max + "]");
-// 	// 		sb.append(" " + g.getClass().getSimpleName());
-// 	// 		return sb.toString();
-// 	// 	}
-// 	// }
-
-// 	// private static class RelationalTask implements Callable<Object> {
-// 	// 	private final MinMaxGroup[] _minMax;
-// 	// 	private final int _i;
-// 	// 	private final int _blkz;
-// 	// 	private final MatrixBlock _res;
-// 	// 	private final int _rows;
-// 	// 	private final int _cols;
-// 	// 	private final ScalarOperator _sop;
-
-// 	// 	protected RelationalTask(MinMaxGroup[] minMax, int i, int blkz, MatrixBlock res, int rows, int cols,
-// 	// 		ScalarOperator sop) {
-// 	// 		_minMax = minMax;
-// 	// 		_i = i;
-// 	// 		_blkz = blkz;
-// 	// 		_res = res;
-// 	// 		_rows = rows;
-// 	// 		_cols = cols;
-// 	// 		_sop = sop;
-// 	// 	}
-
-// 	// 	@Override
-// 	// 	public Object call() {
-// 	// 		MatrixBlock tmp = memPool.get();
-// 	// 		if(tmp == null) {
-// 	// 			memPool.set(new MatrixBlock(_blkz, _cols, false, -1).allocateBlock());
-// 	// 			tmp = memPool.get();
-// 	// 		}
-// 	// 		else {
-// 	// 			tmp = memPool.get();
-// 	// 			tmp.reset(_blkz, _cols, false, -1);
-// 	// 		}
-
-// 	// 		for(MinMaxGroup mmg : _minMax) {
-// 	// 			if(mmg.g.getNumberNonZeros() != 0)
-// 	// 				mmg.g.decompressToBlockUnSafe(tmp, _i * _blkz, Math.min((_i + 1) * _blkz, mmg.g.getNumRows()), 0);
-// 	// 		}
-
-// 	// 		for(int row = 0, off = _i * _blkz; row < _blkz && row < _rows - _i * _blkz; row++, off++) {
-// 	// 			for(int col = 0; col < _cols; col++) {
-// 	// 				_res.appendValue(off, col, _sop.executeScalar(tmp.quickGetValue(row, col)));
-// 	// 			}
-// 	// 		}
-// 	// 		return null;
-// 	// 	}
-// 	// }
-// }
diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibScalar.java b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibScalar.java
index aea5488..1822685 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibScalar.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibScalar.java
@@ -56,13 +56,6 @@ public class CLALibScalar {
 	private static final int MINIMUM_PARALLEL_SIZE = 8096;
 
 	public static MatrixBlock scalarOperations(ScalarOperator sop, CompressedMatrixBlock m1, MatrixValue result) {
-		// Special case handling of overlapping relational operations
-		// if(CLALibRelationalOp.isValidForRelationalOperation(sop, m1)) {
-		// 	MatrixBlock ret =  CLALibRelationalOp.overlappingRelativeRelationalOperation(sop, m1);
-		// 	ret.recomputeNonZeros();
-		// 	return ret;
-		// }
-
 		if(isInvalidForCompressedOutput(m1, sop)) {
 			LOG.warn("scalar overlapping not supported for op: " + sop.fn);
 			MatrixBlock m1d = m1.decompress(sop.getNumThreads());
diff --git a/src/main/java/org/apache/sysds/runtime/compress/workload/WorkloadAnalyzer.java b/src/main/java/org/apache/sysds/runtime/compress/workload/WorkloadAnalyzer.java
index b37acc1..31b3714 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/workload/WorkloadAnalyzer.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/workload/WorkloadAnalyzer.java
@@ -404,7 +404,10 @@ public class WorkloadAnalyzer {
 				ArrayList<Hop> in = hop.getInput();
 				if(isOverlapping(in.get(0)) || isOverlapping(in.get(1)))
 					overlapping.add(hop.getHopID());
-				return new OpNormal(hop, true);
+				// CBind is in worst case decompressing, but can be compressing the other side if it is trivially compressable.
+				// to make the optimizer correct we need to mark this operation as decompressing, since it is the worst possible outcome.
+				// Currently we dont optimize for operations that are located past a cbind.
+				return new OpDecompressing(hop);
 			}
 			else if(HopRewriteUtils.isBinary(hop, OpOp2.RBIND)) {
 				ArrayList<Hop> in = hop.getInput();
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
index 880f31f..faef2c7 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
@@ -66,6 +66,7 @@ import org.apache.sysds.runtime.data.SparseBlock;
 import org.apache.sysds.runtime.data.TensorBlock;
 import org.apache.sysds.runtime.data.TensorIndexes;
 import org.apache.sysds.runtime.instructions.cp.Data;
+import org.apache.sysds.runtime.instructions.spark.DeCompressionSPInstruction;
 import org.apache.sysds.runtime.instructions.spark.data.BroadcastObject;
 import org.apache.sysds.runtime.instructions.spark.data.LineageObject;
 import org.apache.sysds.runtime.instructions.spark.data.PartitionedBlock;
@@ -1280,16 +1281,21 @@ public class SparkExecutionContext extends ExecutionContext
 		return out;
 	}
 
-	@SuppressWarnings("unchecked")
+	// @SuppressWarnings("unchecked")
 	public static long writeMatrixRDDtoHDFS( RDDObject rdd, String path, FileFormat fmt )
 	{
 		JavaPairRDD<MatrixIndexes,MatrixBlock> lrdd = (JavaPairRDD<MatrixIndexes, MatrixBlock>) rdd.getRDD();
 		InputOutputInfo oinfo = InputOutputInfo.get(DataType.MATRIX, fmt);
 		
+		// if compression is enabled decompress all blocks before writing to disk TEMPORARY MODIFICATION UNTILL MATRIXBLOCK IS MERGED WITH COMPRESSEDMATRIXBLOCK
+		if(ConfigurationManager.isCompressionEnabled())
+			lrdd = lrdd.mapValues(new DeCompressionSPInstruction.DeCompressionFunction());
+
 		//piggyback nnz maintenance on write
 		LongAccumulator aNnz = getSparkContextStatic().sc().longAccumulator("nnz");
 		lrdd = lrdd.mapValues(new ComputeBinaryBlockNnzFunction(aNnz));
 
+
 		//save file is an action which also triggers nnz maintenance
 		lrdd.saveAsHadoopFile(path,
 			oinfo.keyClass,
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/BinUaggChainSPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/BinUaggChainSPInstruction.java
index 4ea0284..39b2408 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/spark/BinUaggChainSPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/BinUaggChainSPInstruction.java
@@ -30,6 +30,12 @@ import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
 import org.apache.sysds.runtime.matrix.operators.AggregateUnaryOperator;
 import org.apache.sysds.runtime.matrix.operators.BinaryOperator;
 
+/**
+ * Instruction that performs
+ * 
+ * res = X / rowsum(x)
+ * 
+ */
 public class BinUaggChainSPInstruction extends UnarySPInstruction {
 	// operators
 	private BinaryOperator _bOp = null;
diff --git a/src/test/java/org/apache/sysds/test/component/compress/CompressedMatrixTest.java b/src/test/java/org/apache/sysds/test/component/compress/CompressedMatrixTest.java
index 6115654..a998a74 100644
--- a/src/test/java/org/apache/sysds/test/component/compress/CompressedMatrixTest.java
+++ b/src/test/java/org/apache/sysds/test/component/compress/CompressedMatrixTest.java
@@ -29,6 +29,7 @@ import java.io.DataOutputStream;
 import java.util.Collection;
 
 import org.apache.commons.math3.random.Well1024a;
+import org.apache.sysds.common.Types.CorrectionLocationType;
 import org.apache.sysds.conf.ConfigurationManager;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
@@ -36,10 +37,15 @@ import org.apache.sysds.runtime.compress.CompressionSettingsBuilder;
 import org.apache.sysds.runtime.compress.CompressionStatistics;
 import org.apache.sysds.runtime.compress.colgroup.AColGroup;
 import org.apache.sysds.runtime.compress.colgroup.AColGroup.CompressionType;
+import org.apache.sysds.runtime.functionobjects.KahanPlus;
+import org.apache.sysds.runtime.functionobjects.Multiply;
+import org.apache.sysds.runtime.functionobjects.ReduceAll;
 import org.apache.sysds.runtime.matrix.data.LibMatrixCountDistinct;
 import org.apache.sysds.runtime.matrix.data.LibMatrixDatagen;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.matrix.data.RandomMatrixGenerator;
+import org.apache.sysds.runtime.matrix.operators.AggregateOperator;
+import org.apache.sysds.runtime.matrix.operators.AggregateTernaryOperator;
 import org.apache.sysds.runtime.matrix.operators.AggregateUnaryOperator;
 import org.apache.sysds.runtime.matrix.operators.CountDistinctOperator;
 import org.apache.sysds.runtime.matrix.operators.CountDistinctOperator.CountDistinctTypes;
@@ -570,6 +576,33 @@ public class CompressedMatrixTest extends AbstractCompressedUnaryTests {
 		cmb.copy(cmb);
 	}
 
+	@Test
+	public void testAggregateTernaryOperation() {
+		try {
+			if(!(cmb instanceof CompressedMatrixBlock) || rows * cols > 10000)
+				return;
+			CorrectionLocationType corr = CorrectionLocationType.LASTCOLUMN;
+			AggregateOperator agg = new AggregateOperator(0, KahanPlus.getKahanPlusFnObject(), corr);
+			AggregateTernaryOperator op = new AggregateTernaryOperator(Multiply.getMultiplyFnObject(), agg,
+				ReduceAll.getReduceAllFnObject());
+
+			int nrow = mb.getNumRows();
+			int ncol = mb.getNumColumns();
+
+			MatrixBlock m2 = new MatrixBlock(nrow, ncol, 13.0);
+			MatrixBlock m3 = new MatrixBlock(nrow, ncol, 14.0);
+
+			MatrixBlock ret1 = cmb.aggregateTernaryOperations(cmb, m2, m3, null, op, true);
+			MatrixBlock ret2 = mb.aggregateTernaryOperations(mb, m2, m3, null, op, true);
+
+			compareResultMatrices(ret2, ret1, 1);
+		}
+		catch(Exception e) {
+			e.printStackTrace();
+			throw new DMLRuntimeException(e);
+		}
+	}
+
 	private static long getJolSize(CompressedMatrixBlock cmb, CompressionStatistics cStat) {
 		Layouter l = new HotSpotLayouter(new X86_64_DataModel());
 		long jolEstimate = 0;
diff --git a/src/test/java/org/apache/sysds/test/component/compress/CompressedTestBase.java b/src/test/java/org/apache/sysds/test/component/compress/CompressedTestBase.java
index eb5ceca..c291235 100644
--- a/src/test/java/org/apache/sysds/test/component/compress/CompressedTestBase.java
+++ b/src/test/java/org/apache/sysds/test/component/compress/CompressedTestBase.java
@@ -31,7 +31,6 @@ import java.util.List;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.sysds.common.Types.CorrectionLocationType;
 import org.apache.sysds.lops.MMTSJ.MMTSJType;
 import org.apache.sysds.lops.MapMultChain.ChainType;
 import org.apache.sysds.runtime.DMLRuntimeException;
@@ -55,22 +54,18 @@ import org.apache.sysds.runtime.functionobjects.Divide;
 import org.apache.sysds.runtime.functionobjects.Equals;
 import org.apache.sysds.runtime.functionobjects.GreaterThan;
 import org.apache.sysds.runtime.functionobjects.GreaterThanEquals;
-import org.apache.sysds.runtime.functionobjects.KahanPlus;
 import org.apache.sysds.runtime.functionobjects.LessThan;
 import org.apache.sysds.runtime.functionobjects.LessThanEquals;
 import org.apache.sysds.runtime.functionobjects.Minus;
 import org.apache.sysds.runtime.functionobjects.Multiply;
 import org.apache.sysds.runtime.functionobjects.Plus;
 import org.apache.sysds.runtime.functionobjects.Power2;
-import org.apache.sysds.runtime.functionobjects.ReduceAll;
 import org.apache.sysds.runtime.functionobjects.SwapIndex;
 import org.apache.sysds.runtime.functionobjects.ValueFunction;
 import org.apache.sysds.runtime.functionobjects.Xor;
 import org.apache.sysds.runtime.instructions.InstructionUtils;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 import org.apache.sysds.runtime.matrix.operators.AggregateBinaryOperator;
-import org.apache.sysds.runtime.matrix.operators.AggregateOperator;
-import org.apache.sysds.runtime.matrix.operators.AggregateTernaryOperator;
 import org.apache.sysds.runtime.matrix.operators.BinaryOperator;
 import org.apache.sysds.runtime.matrix.operators.LeftScalarOperator;
 import org.apache.sysds.runtime.matrix.operators.ReorgOperator;
@@ -1253,28 +1248,6 @@ public abstract class CompressedTestBase extends TestBase {
 	}
 
 	@Test
-	public void aggregateTernaryOperations() {
-		if(!(cmb instanceof CompressedMatrixBlock) || rows * cols > 10000)
-			return;
-
-		MatrixBlock m1 = new MatrixBlock();
-		MatrixBlock m2 = new MatrixBlock();
-		MatrixBlock m3 = null;
-		CorrectionLocationType corr = CorrectionLocationType.LASTCOLUMN;
-		AggregateOperator agg = new AggregateOperator(0, KahanPlus.getKahanPlusFnObject(), corr);
-		AggregateTernaryOperator op = new AggregateTernaryOperator(Multiply.getMultiplyFnObject(), agg,
-			ReduceAll.getReduceAllFnObject(), _k);
-
-		boolean inCP = true;
-
-		MatrixBlock ret1 = mb.aggregateTernaryOperations(m1, m2, m3, null, op, inCP);
-		MatrixBlock ret2 = cmb.aggregateTernaryOperations(m1, m2, m3, null, op, inCP);
-
-		compareResultMatrices(ret1, ret2, 1);
-
-	}
-
-	@Test
 	public void unaryOperations() {
 		if(!(cmb instanceof CompressedMatrixBlock) || cmb.getNumColumns() < 2)
 			return;
diff --git a/src/test/java/org/apache/sysds/test/component/compress/workload/WorkloadTest.java b/src/test/java/org/apache/sysds/test/component/compress/workload/WorkloadTest.java
index aacb716..9422e58 100644
--- a/src/test/java/org/apache/sysds/test/component/compress/workload/WorkloadTest.java
+++ b/src/test/java/org/apache/sysds/test/component/compress/workload/WorkloadTest.java
@@ -119,7 +119,7 @@ public class WorkloadTest {
 		args.put("$2", "FALSE");
 		args.put("$3", "0");
 
-		tests.add(new Object[] {0, 0, 0, 1, 1, 1, 6, 0, true, false, "functions/lmDS.dml", args});
+		tests.add(new Object[] {0, 1, 0, 1, 1, 1, 6, 0, true, false, "functions/lmDS.dml", args});
 		tests.add(new Object[] {0, 0, 0, 1, 0, 1, 0, 0, true, true, "functions/lmDS.dml", args});
 		tests.add(new Object[] {0, 0, 0, 1, 10, 10, 1, 0, true, true, "functions/lmCG.dml", args});
 
@@ -134,15 +134,15 @@ public class WorkloadTest {
 		args.put("$1", testFile);
 		args.put("$2", "TRUE");
 		args.put("$3", "1");
-		tests.add(new Object[] {0, 0, 1, 1, 1, 1, 1, 0, true, true, "functions/lmDS.dml", args});
-		tests.add(new Object[] {0, 0, 1, 1, 11, 10, 2, 0, true, true, "functions/lmCG.dml", args});
+		tests.add(new Object[] {0, 1, 0, 0, 0, 0, 1, 0, false, true, "functions/lmDS.dml", args});
+		tests.add(new Object[] {0, 1, 1, 1, 11, 10, 2, 0, true, true, "functions/lmCG.dml", args});
 
 		args = new HashMap<>();
 		args.put("$1", testFile);
 		args.put("$2", "TRUE");
 		args.put("$3", "2");
-		tests.add(new Object[] {0, 0, 1, 1, 1, 1, 3, 0, true, true, "functions/lmDS.dml", args});
-		tests.add(new Object[] {0, 0, 1, 1, 11, 10, 4, 0, true, true, "functions/lmCG.dml", args});
+		tests.add(new Object[] {0, 1, 0, 0, 0, 0, 1, 0, false, true, "functions/lmDS.dml", args});
+		tests.add(new Object[] {0, 1, 1, 1, 11, 10, 2, 0, true, true, "functions/lmCG.dml", args});
 
 		args = new HashMap<>();
 		args.put("$1", testFile);
@@ -176,7 +176,7 @@ public class WorkloadTest {
 			CostEstimatorBuilder ceb = new CostEstimatorBuilder(wtr);
 			InstructionTypeCounter itc = ceb.getCounter();
 
-			verify(wtr, itc, ceb);
+			verify(wtr, itc, ceb, scriptName, args);
 		}
 		catch(Exception e) {
 			e.printStackTrace();
@@ -184,9 +184,9 @@ public class WorkloadTest {
 		}
 	}
 
-	private void verify(WTreeRoot wtr, InstructionTypeCounter itc, CostEstimatorBuilder ceb) {
+	private void verify(WTreeRoot wtr, InstructionTypeCounter itc, CostEstimatorBuilder ceb, String name, Map<String, String> args) {
 
-		String errorString = wtr + "\n" + itc + " \n ";
+		String errorString = wtr + "\n" + itc + " \n " + name + "  -- " + args +  "\n";
 		Assert.assertEquals(errorString + "scans:", scans, itc.getScans());
 		Assert.assertEquals(errorString + "decompressions", decompressions, itc.getDecompressions());
 		Assert.assertEquals(errorString + "overlappingDecompressions", overlappingDecompressions,
@@ -197,7 +197,7 @@ public class WorkloadTest {
 			itc.getCompressedMultiplications());
 		Assert.assertEquals(errorString + "dictionaryOps", dictionaryOps, itc.getDictionaryOps());
 		Assert.assertEquals(errorString + "lookup", indexing, itc.getIndexing());
-		Assert.assertEquals(shouldCompress, ceb.shouldTryToCompress());
+		Assert.assertEquals(errorString + "Should Compresss", shouldCompress, ceb.shouldTryToCompress());
 	}
 
 	private static WTreeRoot getWorkloadTree(DMLProgram prog) {
diff --git a/src/test/java/org/apache/sysds/test/functions/compress/workload/WorkloadAlgorithmTest.java b/src/test/java/org/apache/sysds/test/functions/compress/workload/WorkloadAlgorithmTest.java
index 9ffeb02..5de8880 100644
--- a/src/test/java/org/apache/sysds/test/functions/compress/workload/WorkloadAlgorithmTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/compress/workload/WorkloadAlgorithmTest.java
@@ -73,7 +73,7 @@ public class WorkloadAlgorithmTest extends AutomatedTestBase {
 
 	@Test
 	public void testMLogRegCP() {
-		runWorkloadAnalysisTest(TEST_NAME1, ExecMode.HYBRID, 2, false);
+		runWorkloadAnalysisTest(TEST_NAME1, ExecMode.HYBRID, 1, false);
 	}
 
 	@Test