You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by ba...@apache.org on 2020/10/20 15:35:57 UTC

[systemds] branch master updated: [SYSTEMDS-2689] Decompress Lop Operation

This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new dfa5285  [SYSTEMDS-2689] Decompress Lop Operation
dfa5285 is described below

commit dfa528532254aff2fb709f42564d57271407b0f2
Author: baunsgaard <ba...@tugraz.at>
AuthorDate: Tue Oct 20 17:31:25 2020 +0200

    [SYSTEMDS-2689] Decompress Lop Operation
    
    Add decompress instruction to lops, to enable future cost based rewrite,
    that will try to compress based on cost estimations of the execution plan.
    
    The new rewrite plan is included and is named "COST", furthermore test
    for this rewrite plan are added as functional tests.
---
 .github/workflows/functionsTests.yml               |   1 +
 src/main/java/org/apache/sysds/conf/DMLConfig.java |   2 +-
 src/main/java/org/apache/sysds/hops/Hop.java       |  20 ++
 .../hops/rewrite/RewriteCompressedReblock.java     | 319 +++++++++++----------
 .../java/org/apache/sysds/lops/Compression.java    |  30 +-
 .../lops/{Compression.java => DeCompression.java}  |   8 +-
 .../compress/CompressedMatrixBlockFactory.java     |  21 +-
 .../runtime/compress/CompressionStatistics.java    |  11 +-
 .../runtime/instructions/CPInstructionParser.java  |   5 +
 .../runtime/instructions/SPInstructionParser.java  |   2 +
 .../runtime/instructions/cp/CPInstruction.java     |   2 +-
 .../cp/DeCompressionCPInstruction.java             |  58 ++++
 .../spark/DeCompressionSPInstruction.java          |  74 +++++
 .../runtime/instructions/spark/SPInstruction.java  |   2 +-
 .../sysds/utils/DMLCompressionStatistics.java      |  35 ++-
 .../org/apache/sysds/test/AutomatedTestBase.java   |  18 +-
 src/test/java/org/apache/sysds/test/TestUtils.java |  29 ++
 .../test/functions/compress/CompressBase.java      | 107 +++++++
 .../test/functions/compress/CompressCost.java      |  69 +++++
 .../test/functions/compress/CompressForce.java     |  69 +++++
 .../test/functions/compress/CompressLossy.java     |  52 ++++
 .../test/functions/compress/CompressLossyCost.java |  52 ++++
 .../functions/compress/compress_row_min.dml        |  24 ++
 .../scripts/functions/compress/compress_sum.dml    |  24 ++
 .../functions/compress/compress_transpose.dml      |  24 ++
 .../cost/SystemDS-config-compress-cost.xml         |  24 ++
 .../lossy/SystemDS-config-compress-lossy-cost.xml  |  25 ++
 .../compress/force/SystemDS-config-compress.xml    |  24 ++
 .../force/lossy/SystemDS-config-compress-lossy.xml |  25 ++
 29 files changed, 967 insertions(+), 189 deletions(-)

diff --git a/.github/workflows/functionsTests.yml b/.github/workflows/functionsTests.yml
index a49e6fc..8da90ee 100644
--- a/.github/workflows/functionsTests.yml
+++ b/.github/workflows/functionsTests.yml
@@ -50,6 +50,7 @@ jobs:
           codegen,
           codegenalg.partone,
           codegenalg.parttwo,
+          compress,
           countDistinct,
           data.misc,
           data.rand,
diff --git a/src/main/java/org/apache/sysds/conf/DMLConfig.java b/src/main/java/org/apache/sysds/conf/DMLConfig.java
index 184e50f..7bba416 100644
--- a/src/main/java/org/apache/sysds/conf/DMLConfig.java
+++ b/src/main/java/org/apache/sysds/conf/DMLConfig.java
@@ -66,7 +66,7 @@ public class DMLConfig
 	public static final String DEFAULT_BLOCK_SIZE   = "sysds.defaultblocksize";
 	public static final String CP_PARALLEL_OPS      = "sysds.cp.parallel.ops";
 	public static final String CP_PARALLEL_IO       = "sysds.cp.parallel.io";
-	public static final String COMPRESSED_LINALG    = "sysds.compressed.linalg"; //auto, true, false
+	public static final String COMPRESSED_LINALG    = "sysds.compressed.linalg"; //auto, cost, true, false
 	public static final String COMPRESSED_LOSSY     = "sysds.compressed.lossy";
 	public static final String COMPRESSED_VALID_COMPRESSIONS = "sysds.compressed.valid.compressions";
 	public static final String NATIVE_BLAS          = "sysds.native.blas";
diff --git a/src/main/java/org/apache/sysds/hops/Hop.java b/src/main/java/org/apache/sysds/hops/Hop.java
index 030bfa8..60a4dc5 100644
--- a/src/main/java/org/apache/sysds/hops/Hop.java
+++ b/src/main/java/org/apache/sysds/hops/Hop.java
@@ -36,6 +36,7 @@ import org.apache.sysds.lops.CSVReBlock;
 import org.apache.sysds.lops.Checkpoint;
 import org.apache.sysds.lops.Compression;
 import org.apache.sysds.lops.Data;
+import org.apache.sysds.lops.DeCompression;
 import org.apache.sysds.lops.Lop;
 import org.apache.sysds.lops.LopProperties.ExecType;
 import org.apache.sysds.lops.LopsException;
@@ -101,6 +102,9 @@ public abstract class Hop implements ParseInfo {
 	// indicates if the output of this hop needs to be compressed
 	// (this happens on persistent reads after reblock but before checkpoint)
 	protected boolean _requiresCompression = false;
+
+	/** Boolean specifying if decompression is required.*/
+	protected boolean _requiresDeCompression = false;
 	
 	// indicates if the output of this hop needs to be checkpointed (cached)
 	// (the default storage level for caching is not yet exposed here)
@@ -259,6 +263,10 @@ public abstract class Hop implements ParseInfo {
 	public void setRequiresCompression(boolean flag) {
 		_requiresCompression = flag;
 	}
+
+	public void setRequiresDeCompression(boolean flag){
+		_requiresDeCompression = flag;
+	}
 	
 	public boolean requiresCompression() {
 		return _requiresCompression;
@@ -404,6 +412,18 @@ public abstract class Hop implements ParseInfo {
 				throw new HopsException(ex);
 			}
 		}
+
+		if( _requiresDeCompression ){
+			try{
+				Lop decompress = new DeCompression(getLops(), getDataType(), getValueType(), et);
+				setOutputDimensions(decompress);
+				setLineNumbers(decompress);
+				setLops(decompress);
+			}
+			catch(LopsException ex){
+				throw new HopsException(ex);
+			}
+		}
 	}
 
 	public static Lop createOffsetLop( Hop hop, boolean repCols ) {
diff --git a/src/main/java/org/apache/sysds/hops/rewrite/RewriteCompressedReblock.java b/src/main/java/org/apache/sysds/hops/rewrite/RewriteCompressedReblock.java
index b9bd5e4..c861663 100644
--- a/src/main/java/org/apache/sysds/hops/rewrite/RewriteCompressedReblock.java
+++ b/src/main/java/org/apache/sysds/hops/rewrite/RewriteCompressedReblock.java
@@ -24,16 +24,18 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sysds.common.Types.AggOp;
+import org.apache.sysds.common.Types.OpOp1;
+import org.apache.sysds.common.Types.OpOp2;
+import org.apache.sysds.common.Types.OpOpData;
 import org.apache.sysds.conf.ConfigurationManager;
 import org.apache.sysds.conf.DMLConfig;
 import org.apache.sysds.hops.AggBinaryOp;
 import org.apache.sysds.hops.FunctionOp;
 import org.apache.sysds.hops.Hop;
 import org.apache.sysds.hops.OptimizerUtils;
-import org.apache.sysds.common.Types.AggOp;
-import org.apache.sysds.common.Types.OpOp1;
-import org.apache.sysds.common.Types.OpOp2;
-import org.apache.sysds.common.Types.OpOpData;
 import org.apache.sysds.lops.Compression.CompressConfig;
 import org.apache.sysds.lops.MMTSJ.MMTSJType;
 import org.apache.sysds.parser.DMLProgram;
@@ -50,39 +52,35 @@ import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
 import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 
 /**
- * Rule: CompressedReblock: If config compressed.linalg is enabled, we
- * inject compression directions after pread of matrices w/ both dims &gt; 1
- * (i.e., multi-column matrices). In case of 'auto' compression, we apply
- * compression if the datasize is known to exceed aggregate cluster memory,
- * the matrix is used in loops, and all operations are supported over 
- * compressed matrices.
+ * Rule: Compressed Re block if config compressed.linalg is enabled, we inject compression directions after read of
+ * matrices if number of rows is above 1000 and cols at least 1. In case of 'auto' compression, we apply compression if
+ * the data size is known to exceed aggregate cluster memory, the matrix is used in loops, and all operations are
+ * supported over compressed matrices.
  */
-public class RewriteCompressedReblock extends StatementBlockRewriteRule
-{
+public class RewriteCompressedReblock extends StatementBlockRewriteRule {
+	private static final Log LOG = LogFactory.getLog(RewriteCompressedReblock.class.getName());
+
 	private static final String TMP_PREFIX = "__cmtx";
-	
+
 	@Override
 	public boolean createsSplitDag() {
 		return false;
 	}
-	
+
 	@Override
-	public List<StatementBlock> rewriteStatementBlock(StatementBlock sb, ProgramRewriteStatus sate)
-	{
-		//check for inapplicable statement blocks
-		if( !HopRewriteUtils.isLastLevelStatementBlock(sb)
-			|| sb.getHops() == null )
+	public List<StatementBlock> rewriteStatementBlock(StatementBlock sb, ProgramRewriteStatus sate) {
+		// check for inapplicable statement blocks
+		if(!HopRewriteUtils.isLastLevelStatementBlock(sb) || sb.getHops() == null)
 			return Arrays.asList(sb);
-		
-		//parse compression config
+
+		// parse compression config
 		DMLConfig conf = ConfigurationManager.getDMLConfig();
-		CompressConfig compress = CompressConfig.valueOf(
-			conf.getTextValue(DMLConfig.COMPRESSED_LINALG).toUpperCase());
-		
-		//perform compressed reblock rewrite
-		if( compress.isEnabled() ) {
+		CompressConfig compress = CompressConfig.valueOf(conf.getTextValue(DMLConfig.COMPRESSED_LINALG).toUpperCase());
+
+		// perform compressed reblock rewrite
+		if(compress.isEnabled()) {
 			Hop.resetVisitStatus(sb.getHops());
-			for( Hop h : sb.getHops() ) 
+			for(Hop h : sb.getHops())
 				injectCompressionDirective(h, compress, sb.getDMLProg());
 			Hop.resetVisitStatus(sb.getHops());
 		}
@@ -93,212 +91,243 @@ public class RewriteCompressedReblock extends StatementBlockRewriteRule
 	public List<StatementBlock> rewriteStatementBlocks(List<StatementBlock> sbs, ProgramRewriteStatus sate) {
 		return sbs;
 	}
-	
+
 	private static void injectCompressionDirective(Hop hop, CompressConfig compress, DMLProgram prog) {
-		if( hop.isVisited() || hop.requiresCompression() )
+		if(hop.isVisited() || hop.requiresCompression())
 			return;
-		
+
 		// recursively process children
-		for( Hop hi : hop.getInput() )
+		for(Hop hi : hop.getInput())
 			injectCompressionDirective(hi, compress, prog);
 		// check for compression conditions
-		if( compress == CompressConfig.TRUE && satisfiesCompressionCondition(hop) 
-			|| compress == CompressConfig.AUTO && satisfiesAutoCompressionCondition(hop, prog) )
-		{
-			hop.setRequiresCompression(true);
+		switch(compress) {
+			case TRUE:
+				if(satisfiesCompressionCondition(hop))
+					hop.setRequiresCompression(true);
+				break;
+			case AUTO:
+				if(satisfiesAutoCompressionCondition(hop, prog))
+					hop.setRequiresCompression(true);
+			case COST:
+				if(satisfiesCostCompressionCondition(hop, prog))
+					hop.setRequiresCompression(true);
+			default:
+				break;
 		}
-		
+
+		if(satisfiesDeCompressionCondition(hop)) {
+			hop.setRequiresDeCompression(true);
+		}
+
 		hop.setVisited();
 	}
-	
+
 	private static boolean satisfiesCompressionCondition(Hop hop) {
-		return HopRewriteUtils.isData(hop, OpOpData.PERSISTENTREAD)
-			&& hop.getDim1() > 1 && hop.getDim2() > 1; //multi-column matrix
+		return HopRewriteUtils.isData(hop, OpOpData.PERSISTENTREAD) && (hop.getDim1() >= 1000 && hop.getDim2() >= 1);
 	}
-	
+
+	private static boolean satisfiesDeCompressionCondition(Hop hop) {
+		// TODO decompression Condition
+		return false;
+	}
+
 	private static boolean satisfiesAutoCompressionCondition(Hop hop, DMLProgram prog) {
-		//check for basic compression condition
-		if( !(satisfiesCompressionCondition(hop) 
-			&& hop.getMemEstimate() >= OptimizerUtils.getLocalMemBudget()
-			&& OptimizerUtils.isSparkExecutionMode()) )
+		// check for basic compression condition
+		if(!(satisfiesCompressionCondition(hop) && hop.getMemEstimate() >= OptimizerUtils.getLocalMemBudget() &&
+			OptimizerUtils.isSparkExecutionMode()))
 			return false;
-		
-		//determine if data size exceeds aggregate cluster storage memory
-		double matrixPSize = OptimizerUtils.estimatePartitionedSizeExactSparsity(
-			hop.getDim1(), hop.getDim2(), hop.getBlocksize(), hop.getNnz());
+
+		// determine if data size exceeds aggregate cluster storage memory
+		double matrixPSize = OptimizerUtils
+			.estimatePartitionedSizeExactSparsity(hop.getDim1(), hop.getDim2(), hop.getBlocksize(), hop.getNnz());
 		double cacheSize = SparkExecutionContext.getDataMemoryBudget(true, true);
 		boolean outOfCore = matrixPSize > cacheSize;
-		
-		//determine if matrix is ultra sparse (and hence serialized)
+
+		// determine if matrix is ultra sparse (and hence serialized)
 		double sparsity = OptimizerUtils.getSparsity(hop.getDim1(), hop.getDim2(), hop.getNnz());
 		boolean ultraSparse = sparsity < MatrixBlock.ULTRA_SPARSITY_TURN_POINT;
-		
-		//determine if all operations are supported over compressed matrices,
-		//but conditionally only if all other conditions are met
-		if( hop.dimsKnown(true) && outOfCore && !ultraSparse ) {
-			//analyze program recursively, including called functions
+
+		// determine if all operations are supported over compressed matrices,
+		// but conditionally only if all other conditions are met
+		if(hop.dimsKnown(true) && outOfCore && !ultraSparse) {
+			// analyze program recursively, including called functions
 			ProbeStatus status = new ProbeStatus(hop.getHopID(), prog);
-			for( StatementBlock sb : prog.getStatementBlocks() )
+			for(StatementBlock sb : prog.getStatementBlocks())
 				rAnalyzeProgram(sb, status);
-			
-			//applicable if used in loop (amortized compressed costs), 
+
+			// applicable if used in loop (amortized compressed costs),
 			// no conditional updates in if-else branches
 			// and all operations are applicable (no decompression costs)
-			boolean ret = status.foundStart && status.usedInLoop 
-				&& !status.condUpdate && !status.nonApplicable;
-			if( LOG.isDebugEnabled() ) {
-				LOG.debug("Auto compression: "+ret+" (dimsKnown="+hop.dimsKnown(true)
-					+ ", outOfCore="+outOfCore+", !ultraSparse="+!ultraSparse
-					+", foundStart="+status.foundStart+", usedInLoop="+status.foundStart
-					+", !condUpdate="+!status.condUpdate+", !nonApplicable="+!status.nonApplicable+")");
+			boolean ret = status.foundStart && status.usedInLoop && !status.condUpdate && !status.nonApplicable;
+			if(LOG.isDebugEnabled()) {
+				LOG.debug("Auto compression: " + ret + " (dimsKnown=" + hop.dimsKnown(true) + ", outOfCore=" + outOfCore
+					+ ", !ultraSparse=" + !ultraSparse + ", foundStart=" + status.foundStart + ", usedInLoop="
+					+ status.foundStart + ", !condUpdate=" + !status.condUpdate + ", !nonApplicable="
+					+ !status.nonApplicable + ")");
 			}
 			return ret;
 		}
-		else if( LOG.isDebugEnabled() ) {
-			LOG.debug("Auto compression: false (dimsKnown="+hop.dimsKnown(true)
-				+ ", outOfCore="+outOfCore+", !ultraSparse="+!ultraSparse+")");
+		else if(LOG.isDebugEnabled()) {
+			LOG.debug("Auto compression: false (dimsKnown=" + hop.dimsKnown(true) + ", outOfCore=" + outOfCore
+				+ ", !ultraSparse=" + !ultraSparse + ")");
 		}
 		return false;
 	}
-	
-	private static void rAnalyzeProgram(StatementBlock sb, ProbeStatus status) 
-	{
+
+	private static boolean satisfiesCostCompressionCondition(Hop hop, DMLProgram prog) {
+		// TODO Cost compression Condition
+		return false;
+	}
+
+	private static void rAnalyzeProgram(StatementBlock sb, ProbeStatus status) {
 		if(sb instanceof FunctionStatementBlock) {
 			FunctionStatementBlock fsb = (FunctionStatementBlock) sb;
-			FunctionStatement fstmt = (FunctionStatement)fsb.getStatement(0);
-			for (StatementBlock csb : fstmt.getBody())
+			FunctionStatement fstmt = (FunctionStatement) fsb.getStatement(0);
+			for(StatementBlock csb : fstmt.getBody())
 				rAnalyzeProgram(csb, status);
 		}
 		else if(sb instanceof WhileStatementBlock) {
 			WhileStatementBlock wsb = (WhileStatementBlock) sb;
-			WhileStatement wstmt = (WhileStatement)wsb.getStatement(0);
-			for (StatementBlock csb : wstmt.getBody())
+			WhileStatement wstmt = (WhileStatement) wsb.getStatement(0);
+			for(StatementBlock csb : wstmt.getBody())
 				rAnalyzeProgram(csb, status);
-			if( wsb.variablesRead().containsAnyName(status.compMtx) )
+			if(wsb.variablesRead().containsAnyName(status.compMtx))
 				status.usedInLoop = true;
-		}	
+		}
 		else if(sb instanceof IfStatementBlock) {
 			IfStatementBlock isb = (IfStatementBlock) sb;
-			IfStatement istmt = (IfStatement)isb.getStatement(0);
-			for (StatementBlock csb : istmt.getIfBody())
+			IfStatement istmt = (IfStatement) isb.getStatement(0);
+			for(StatementBlock csb : istmt.getIfBody())
 				rAnalyzeProgram(csb, status);
-			for (StatementBlock csb : istmt.getElseBody())
+			for(StatementBlock csb : istmt.getElseBody())
 				rAnalyzeProgram(csb, status);
-			if( isb.variablesUpdated().containsAnyName(status.compMtx) )
+			if(isb.variablesUpdated().containsAnyName(status.compMtx))
 				status.condUpdate = true;
 		}
-		else if(sb instanceof ForStatementBlock) { //incl parfor
+		else if(sb instanceof ForStatementBlock) { // incl parfor
 			ForStatementBlock fsb = (ForStatementBlock) sb;
-			ForStatement fstmt = (ForStatement)fsb.getStatement(0);
-			for (StatementBlock csb : fstmt.getBody())
+			ForStatement fstmt = (ForStatement) fsb.getStatement(0);
+			for(StatementBlock csb : fstmt.getBody())
 				rAnalyzeProgram(csb, status);
-			if( fsb.variablesRead().containsAnyName(status.compMtx) )
+			if(fsb.variablesRead().containsAnyName(status.compMtx))
 				status.usedInLoop = true;
 		}
-		else if( sb.getHops() != null ) { //generic (last-level)
+		else if(sb.getHops() != null) { // generic (last-level)
 			ArrayList<Hop> roots = sb.getHops();
 			Hop.resetVisitStatus(roots);
-			//process entire HOP DAG starting from the roots
-			for( Hop root : roots )
+			// process entire HOP DAG starting from the roots
+			for(Hop root : roots)
 				rAnalyzeHopDag(root, status);
-			//remove temporary variables
+			// remove temporary variables
 			status.compMtx.removeIf(n -> n.startsWith(TMP_PREFIX));
 			Hop.resetVisitStatus(roots);
 		}
 	}
-	
-	private static void rAnalyzeHopDag(Hop current, ProbeStatus status) 
-	{
-		if( current.isVisited() )
+
+	private static void rAnalyzeHopDag(Hop current, ProbeStatus status) {
+		if(current.isVisited())
 			return;
-		
-		//process children recursively
-		for( Hop input : current.getInput() )
+
+		// process children recursively
+		for(Hop input : current.getInput())
 			rAnalyzeHopDag(input, status);
-		
-		//handle source persistent read
-		if( current.getHopID() == status.startHopID ) {
+
+		// handle source persistent read
+		if(current.getHopID() == status.startHopID) {
 			status.compMtx.add(getTmpName(current));
 			status.foundStart = true;
 		}
+
+		// handle individual hops
 		
-		//handle individual hops
-		//a) handle function calls
-		if( current instanceof FunctionOp 
-			&& hasCompressedInput(current, status) )
-		{
-			//TODO handle of functions in a more fine-grained manner
-			//to cover special cases multiple calls where compressed
-			//inputs might occur for different input parameters
-			
+		// a) handle function calls
+		if(current instanceof FunctionOp && hasCompressedInput(current, status)) {
+			// TODO handle of functions in a more fine-grained manner
+			// to cover special cases multiple calls where compressed
+			// inputs might occur for different input parameters
+
 			FunctionOp fop = (FunctionOp) current;
 			String fkey = fop.getFunctionKey();
-			if( !status.procFn.contains(fkey) ) {
-				//memoization to avoid redundant analysis and recursive calls
+			if(!status.procFn.contains(fkey)) {
+				// memoization to avoid redundant analysis and recursive calls
 				status.procFn.add(fkey);
-				//map inputs to function inputs
+				// map inputs to function inputs
 				FunctionStatementBlock fsb = status.prog.getFunctionStatementBlock(fkey);
 				FunctionStatement fstmt = (FunctionStatement) fsb.getStatement(0);
 				ProbeStatus status2 = new ProbeStatus(status);
-				for(int i=0; i<fop.getInput().size(); i++)
-					if( status.compMtx.contains(getTmpName(fop.getInput().get(i))) )
+				for(int i = 0; i < fop.getInput().size(); i++)
+					if(status.compMtx.contains(getTmpName(fop.getInput().get(i))))
 						status2.compMtx.add(fstmt.getInputParams().get(i).getName());
-				//analyze function and merge meta info
+				// analyze function and merge meta info
 				rAnalyzeProgram(fsb, status2);
 				status.foundStart |= status2.foundStart;
 				status.usedInLoop |= status2.usedInLoop;
 				status.condUpdate |= status2.condUpdate;
 				status.nonApplicable |= status2.nonApplicable;
-				//map function outputs to outputs
+				// map function outputs to outputs
 				String[] outputs = fop.getOutputVariableNames();
-				for( int i=0; i<outputs.length; i++ )
-					if( status2.compMtx.contains(fstmt.getOutputParams().get(i).getName()) )
+				for(int i = 0; i < outputs.length; i++)
+					if(status2.compMtx.contains(fstmt.getOutputParams().get(i).getName()))
 						status.compMtx.add(outputs[i]);
 			}
 		}
-		//b) handle transient reads and writes (name mapping)
-		else if( HopRewriteUtils.isData(current, OpOpData.TRANSIENTWRITE)
-			&& status.compMtx.contains(getTmpName(current.getInput().get(0))))
+
+		// b) handle transient reads and writes (name mapping)
+		else if(HopRewriteUtils.isData(current, OpOpData.TRANSIENTWRITE) &&
+			status.compMtx.contains(getTmpName(current.getInput().get(0))))
 			status.compMtx.add(current.getName());
-		else if( HopRewriteUtils.isData(current, OpOpData.TRANSIENTREAD)
-		&& status.compMtx.contains(current.getName()) )
+		else if(HopRewriteUtils.isData(current, OpOpData.TRANSIENTREAD) && status.compMtx.contains(current.getName()))
 			status.compMtx.add(getTmpName(current));
-		//c) handle applicable operations
-		else if( hasCompressedInput(current, status) ) {
-			boolean compUCOut = //valid with uncompressed outputs
-				(current instanceof AggBinaryOp && current.getDim2()<= current.getBlocksize() //tsmm
-					&& ((AggBinaryOp)current).checkTransposeSelf()==MMTSJType.LEFT)
-				|| (current instanceof AggBinaryOp && (current.getDim1()==1 || current.getDim2()==1)) //mvmm
-				|| (HopRewriteUtils.isTransposeOperation(current) && current.getParent().size()==1
-					&& current.getParent().get(0) instanceof AggBinaryOp 
-					&& (current.getParent().get(0).getDim1()==1 || current.getParent().get(0).getDim2()==1))
-				|| HopRewriteUtils.isAggUnaryOp(current, AggOp.SUM, AggOp.SUM_SQ, AggOp.MIN, AggOp.MAX);
-				// AggOp.SUM, Direction.Col
-			boolean compCOut = //valid with compressed outputs
-				HopRewriteUtils.isBinaryMatrixScalarOperation(current)
-				|| HopRewriteUtils.isBinary(current, OpOp2.CBIND);
+
+		// c) handle applicable operations
+		else if(hasCompressedInput(current, status)) {
+			// Valid with uncompressed outputs
+			// tsmm
+			boolean compUCOut = (current instanceof AggBinaryOp && current.getDim2() <= current.getBlocksize() &&
+				((AggBinaryOp) current).checkTransposeSelf() == MMTSJType.LEFT);
+
+			// mvmm
+			compUCOut |= (current instanceof AggBinaryOp && (current.getDim1() == 1 || current.getDim2() == 1));
+			compUCOut |= (HopRewriteUtils.isTransposeOperation(current) && current.getParent().size() == 1 &&
+				current.getParent().get(0) instanceof AggBinaryOp &&
+				(current.getParent().get(0).getDim1() == 1 || current.getParent().get(0).getDim2() == 1));
+			compUCOut |= HopRewriteUtils.isAggUnaryOp(current, AggOp.SUM, AggOp.SUM_SQ, AggOp.MIN, AggOp.MAX);
+
+			// Valid compressed
+			// Compressed Output if the operation is Binary scalar
+			boolean compCOut = HopRewriteUtils.isBinaryMatrixScalarOperation(current);
+			// Compressed Output if the operation is right Matrix Multiply
+			compCOut |= HopRewriteUtils.isBinaryMatrixMatrixOperation(current) &&
+				isCompressed(current.getInput().get(0), status);
+			// Compressed Output if the operation is binary.
+			compCOut |= HopRewriteUtils.isBinary(current, OpOp2.CBIND);
+
 			boolean metaOp = HopRewriteUtils.isUnary(current, OpOp1.NROW, OpOp1.NCOL);
 			status.nonApplicable |= !(compUCOut || compCOut || metaOp);
-			if( compCOut )
+			if(compCOut)
 				status.compMtx.add(getTmpName(current));
 		}
-		
+
 		current.setVisited();
 	}
-	
+
 	private static String getTmpName(Hop hop) {
 		return TMP_PREFIX + hop.getHopID();
 	}
-	
+
 	private static boolean hasCompressedInput(Hop hop, ProbeStatus status) {
-		if( status.compMtx.isEmpty() )
+		if(status.compMtx.isEmpty())
 			return false;
-		for( Hop input : hop.getInput() )
-			if( status.compMtx.contains(getTmpName(input)) )
+		for(Hop input : hop.getInput())
+			if(status.compMtx.contains(getTmpName(input)))
 				return true;
 		return false;
 	}
-	
+
+	private static boolean isCompressed(Hop hop, ProbeStatus status) {
+		return status.compMtx.contains(getTmpName(hop));
+	}
+
 	private static class ProbeStatus {
 		private final long startHopID;
 		private final DMLProgram prog;
@@ -308,10 +337,12 @@ public class RewriteCompressedReblock extends StatementBlockRewriteRule
 		private boolean nonApplicable = false;
 		private HashSet<String> procFn = new HashSet<>();
 		private HashSet<String> compMtx = new HashSet<>();
+
 		public ProbeStatus(long hopID, DMLProgram p) {
 			startHopID = hopID;
 			prog = p;
 		}
+
 		public ProbeStatus(ProbeStatus status) {
 			startHopID = status.startHopID;
 			prog = status.prog;
diff --git a/src/main/java/org/apache/sysds/lops/Compression.java b/src/main/java/org/apache/sysds/lops/Compression.java
index 4270e34..9e556a9 100644
--- a/src/main/java/org/apache/sysds/lops/Compression.java
+++ b/src/main/java/org/apache/sysds/lops/Compression.java
@@ -23,19 +23,17 @@ import org.apache.sysds.lops.LopProperties.ExecType;
 import org.apache.sysds.common.Types.DataType;
 import org.apache.sysds.common.Types.ValueType;
 
-public class Compression extends Lop 
-{
+public class Compression extends Lop {
 	public static final String OPCODE = "compress";
-	
+
 	public enum CompressConfig {
-		TRUE,
-		FALSE,
-		AUTO;
+		TRUE, FALSE, COST, AUTO;
+
 		public boolean isEnabled() {
-			return this == TRUE || this == AUTO;
+			return this != FALSE;
 		}
 	}
-	
+
 	public Compression(Lop input, DataType dt, ValueType vt, ExecType et) {
 		super(Lop.Type.Checkpoint, dt, vt);
 		addInput(input);
@@ -47,17 +45,17 @@ public class Compression extends Lop
 	public String toString() {
 		return "Compress";
 	}
-	
+
 	@Override
 	public String getInstructions(String input1, String output) {
 		StringBuilder sb = new StringBuilder();
-		sb.append( getExecType() );
-		sb.append( Lop.OPERAND_DELIMITOR );
-		sb.append( OPCODE );
-		sb.append( OPERAND_DELIMITOR );
-		sb.append( getInputs().get(0).prepInputOperand(input1));
-		sb.append( OPERAND_DELIMITOR );
-		sb.append( prepOutputOperand(output));
+		sb.append(getExecType());
+		sb.append(Lop.OPERAND_DELIMITOR);
+		sb.append(OPCODE);
+		sb.append(OPERAND_DELIMITOR);
+		sb.append(getInputs().get(0).prepInputOperand(input1));
+		sb.append(OPERAND_DELIMITOR);
+		sb.append(prepOutputOperand(output));
 		return sb.toString();
 	}
 }
diff --git a/src/main/java/org/apache/sysds/lops/Compression.java b/src/main/java/org/apache/sysds/lops/DeCompression.java
similarity index 89%
copy from src/main/java/org/apache/sysds/lops/Compression.java
copy to src/main/java/org/apache/sysds/lops/DeCompression.java
index 4270e34..d97f501 100644
--- a/src/main/java/org/apache/sysds/lops/Compression.java
+++ b/src/main/java/org/apache/sysds/lops/DeCompression.java
@@ -23,9 +23,9 @@ import org.apache.sysds.lops.LopProperties.ExecType;
 import org.apache.sysds.common.Types.DataType;
 import org.apache.sysds.common.Types.ValueType;
 
-public class Compression extends Lop 
+public class DeCompression extends Lop 
 {
-	public static final String OPCODE = "compress";
+	public static final String OPCODE = "decompress";
 	
 	public enum CompressConfig {
 		TRUE,
@@ -36,7 +36,7 @@ public class Compression extends Lop
 		}
 	}
 	
-	public Compression(Lop input, DataType dt, ValueType vt, ExecType et) {
+	public DeCompression(Lop input, DataType dt, ValueType vt, ExecType et) {
 		super(Lop.Type.Checkpoint, dt, vt);
 		addInput(input);
 		input.addOutput(this);
@@ -45,7 +45,7 @@ public class Compression extends Lop
 
 	@Override
 	public String toString() {
-		return "Compress";
+		return "DeCompress";
 	}
 	
 	@Override
diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java
index ae29535..32ae51f 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java
@@ -113,6 +113,9 @@ public class CompressedMatrixBlockFactory {
 			_stats.estimatedSizeCols = sizeInfos.memoryEstimate();
 
 		_stats.setNextTimePhase(time.stop());
+		if (DMLScript.STATISTICS ){
+			DMLCompressionStatistics.addCompressionTime(_stats.getLastTimePhase(), 1);
+		}
 		if(LOG.isDebugEnabled()){
 			LOG.debug("Compression statistics:");
 			LOG.debug("--compression phase 1: " + _stats.getLastTimePhase());
@@ -130,8 +133,10 @@ public class CompressedMatrixBlockFactory {
 		List<int[]> coCodeColGroups = PlanningCoCoder
 			.findCoCodesByPartitioning(sizeEstimator, sizeInfos, numRows, k, compSettings);
 		_stats.setNextTimePhase(time.stop());
+		if (DMLScript.STATISTICS ){
+			DMLCompressionStatistics.addCompressionTime(_stats.getLastTimePhase(), 2);
+		}
 		if(LOG.isDebugEnabled()) {
-
 			LOG.debug("--compression phase 2: " + _stats.getLastTimePhase());
 			StringBuilder sb = new StringBuilder();
 			for(int[] group : coCodeColGroups)
@@ -157,6 +162,9 @@ public class CompressedMatrixBlockFactory {
 		List<ColGroup> colGroupList = ColGroupFactory.assignColumns(numCols, colGroups, rawBlock, compSettings);
 		res.allocateColGroupList(colGroupList);
 		_stats.setNextTimePhase(time.stop());
+		if (DMLScript.STATISTICS ){
+			DMLCompressionStatistics.addCompressionTime(_stats.getLastTimePhase(), 3);
+		}
 		if(LOG.isDebugEnabled()) {
 			LOG.debug("Hash overlap count:" + DblArrayIntListHashMap.hashMissCount);
 			DblArrayIntListHashMap.hashMissCount = 0;
@@ -173,6 +181,9 @@ public class CompressedMatrixBlockFactory {
 		// res._sharedDDC1Dict = true;
 		// }
 		_stats.setNextTimePhase(time.stop());
+		if (DMLScript.STATISTICS ){
+			DMLCompressionStatistics.addCompressionTime(_stats.getLastTimePhase(), 4);
+		}
 		if(LOG.isDebugEnabled()) {
 			LOG.debug("--compression phase 4: " + _stats.getLastTimePhase());
 		}
@@ -197,7 +208,9 @@ public class CompressedMatrixBlockFactory {
 		_stats.setNextTimePhase(time.stop());
 		_stats.setColGroupsCounts(colGroupList);
 
-		
+		if (DMLScript.STATISTICS ){
+			DMLCompressionStatistics.addCompressionTime(_stats.getLastTimePhase(), 5);
+		}
 		if(LOG.isDebugEnabled()) {
 			LOG.debug("--num col groups: " + colGroupList.size() + ", -- num input cols: " + numCols);
 			LOG.debug("--compression phase 5: " + _stats.getLastTimePhase());
@@ -215,9 +228,7 @@ public class CompressedMatrixBlockFactory {
 			}
 		}
 
-		if (DMLScript.STATISTICS ){
-			DMLCompressionStatistics.addCompressionTimes(_stats.getTimeArrayList());
-		}
+		
 		return new ImmutablePair<>(res, _stats);
 		// --------------------------------------------------
 	}
diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressionStatistics.java b/src/main/java/org/apache/sysds/runtime/compress/CompressionStatistics.java
index 140644e..48093eb 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/CompressionStatistics.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/CompressionStatistics.java
@@ -19,7 +19,6 @@
 
 package org.apache.sysds.runtime.compress;
 
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -29,7 +28,7 @@ import org.apache.sysds.runtime.compress.colgroup.ColGroup.CompressionType;
 
 public class CompressionStatistics {
 
-	private ArrayList<Double> timePhases = new ArrayList<>(5);
+	private double lastPhase;
 	public double ratio;
 	public long originalSize;
 	public long estimatedSizeColGroups;
@@ -42,11 +41,11 @@ public class CompressionStatistics {
 	}
 
 	public void setNextTimePhase(double time) {
-		timePhases.add(time);
+		lastPhase = time;
 	}
 
 	public double getLastTimePhase() {
-		return timePhases.get(timePhases.size() - 1);
+		return lastPhase;
 	}
 
 	/**
@@ -79,10 +78,6 @@ public class CompressionStatistics {
 		return colGroupCounts;
 	}
 
-	public List<Double> getTimeArrayList() {
-		return timePhases;
-	}
-
 	public String getGroupsTypesString() {
 		StringBuilder sb = new StringBuilder();
 
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/CPInstructionParser.java b/src/main/java/org/apache/sysds/runtime/instructions/CPInstructionParser.java
index f7838ae..da86189 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/CPInstructionParser.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/CPInstructionParser.java
@@ -42,6 +42,7 @@ import org.apache.sysds.runtime.instructions.cp.CompressionCPInstruction;
 import org.apache.sysds.runtime.instructions.cp.CovarianceCPInstruction;
 import org.apache.sysds.runtime.instructions.cp.CtableCPInstruction;
 import org.apache.sysds.runtime.instructions.cp.DataGenCPInstruction;
+import org.apache.sysds.runtime.instructions.cp.DeCompressionCPInstruction;
 import org.apache.sysds.runtime.instructions.cp.DnnCPInstruction;
 import org.apache.sysds.runtime.instructions.cp.FunctionCallCPInstruction;
 import org.apache.sysds.runtime.instructions.cp.IndexingCPInstruction;
@@ -309,6 +310,7 @@ public class CPInstructionParser extends InstructionParser
 
 		String2CPInstructionType.put( "partition", CPType.Partition);
 		String2CPInstructionType.put( "compress",  CPType.Compression);
+		String2CPInstructionType.put( "decompress", CPType.DeCompression);
 		String2CPInstructionType.put( "spoof",     CPType.SpoofFused);
 		
 		String2CPInstructionType.put( "sql", CPType.Sql);
@@ -439,6 +441,9 @@ public class CPInstructionParser extends InstructionParser
 			case Compression:
 				return CompressionCPInstruction.parseInstruction(str);
 			
+			case DeCompression:
+				return DeCompressionCPInstruction.parseInstruction(str);
+				
 			case SpoofFused:
 				return SpoofCPInstruction.parseInstruction(str);
 				
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/SPInstructionParser.java b/src/main/java/org/apache/sysds/runtime/instructions/SPInstructionParser.java
index 70512ec..b8fdfe8 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/SPInstructionParser.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/SPInstructionParser.java
@@ -25,6 +25,7 @@ import org.apache.sysds.common.Types;
 import org.apache.sysds.lops.Checkpoint;
 import org.apache.sysds.lops.Compression;
 import org.apache.sysds.lops.DataGen;
+import org.apache.sysds.lops.DeCompression;
 import org.apache.sysds.lops.LeftIndex;
 import org.apache.sysds.lops.RightIndex;
 import org.apache.sysds.lops.WeightedCrossEntropy;
@@ -226,6 +227,7 @@ public class SPInstructionParser extends InstructionParser
 		// Spark-specific instructions
 		String2SPInstructionType.put( Checkpoint.OPCODE, SPType.Checkpoint);
 		String2SPInstructionType.put( Compression.OPCODE, SPType.Compression);
+		String2SPInstructionType.put( DeCompression.OPCODE, SPType.DeCompression);
 		
 		// Builtin Instruction Opcodes 
 		String2SPInstructionType.put( "log"  , SPType.Builtin);
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/CPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/CPInstruction.java
index 53dd274..565ac8f 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/cp/CPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/CPInstruction.java
@@ -39,7 +39,7 @@ public abstract class CPInstruction extends Instruction
 		Unary, Binary, Ternary, Quaternary, BuiltinNary, Ctable,
 		MultiReturnParameterizedBuiltin, ParameterizedBuiltin, MultiReturnBuiltin,
 		Builtin, Reorg, Variable, FCall, Append, Rand, QSort, QPick,
-		MatrixIndexing, MMTSJ, PMMJ, MMChain, Reshape, Partition, Compression, SpoofFused,
+		MatrixIndexing, MMTSJ, PMMJ, MMChain, Reshape, Partition, Compression, DeCompression, SpoofFused,
 		StringInit, CentralMoment, Covariance, UaggOuterChain, Dnn, Sql }
 
 	protected final CPType _cptype;
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/DeCompressionCPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/DeCompressionCPInstruction.java
new file mode 100644
index 0000000..27b7bc2
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/DeCompressionCPInstruction.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.instructions.cp;
+
+import org.apache.sysds.hops.OptimizerUtils;
+import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.instructions.InstructionUtils;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.matrix.operators.Operator;
+
+public class DeCompressionCPInstruction extends ComputationCPInstruction {
+
+	private DeCompressionCPInstruction(Operator op, CPOperand in, CPOperand out, String opcode, String istr) {
+		super(CPType.Compression, op, in, null, null, out, opcode, istr);
+	}
+
+	public static DeCompressionCPInstruction parseInstruction(String str) {
+		String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);
+		String opcode = parts[0];
+		CPOperand in1 = new CPOperand(parts[1]);
+		CPOperand out = new CPOperand(parts[2]);
+		return new DeCompressionCPInstruction(null, in1, out, opcode, str);
+	}
+
+	@Override
+	public void processInstruction(ExecutionContext ec) {
+		// Get matrix block input
+		MatrixBlock in = ec.getMatrixInput(input1.getName());
+
+		if(in instanceof CompressedMatrixBlock){
+			MatrixBlock out = ((CompressedMatrixBlock)in).decompress(OptimizerUtils.getConstrainedNumThreads(-1));
+			ec.releaseMatrixInput(input1.getName());
+			ec.setMatrixOutput(output.getName(), out);
+		} else{
+			MatrixBlock out = in;
+			ec.releaseMatrixInput(input1.getName());
+			ec.setMatrixOutput(output.getName(), out);
+		}
+	}
+}
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/DeCompressionSPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/DeCompressionSPInstruction.java
new file mode 100644
index 0000000..bd64775
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/DeCompressionSPInstruction.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.instructions.spark;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.function.Function;
+import org.apache.sysds.hops.OptimizerUtils;
+import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
+import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
+import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
+import org.apache.sysds.runtime.instructions.InstructionUtils;
+import org.apache.sysds.runtime.instructions.cp.CPOperand;
+import org.apache.sysds.runtime.instructions.spark.CompressionSPInstruction.CompressionFunction;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
+import org.apache.sysds.runtime.matrix.operators.Operator;
+
+public class DeCompressionSPInstruction extends UnarySPInstruction {
+
+	private DeCompressionSPInstruction(Operator op, CPOperand in, CPOperand out, String opcode, String istr) {
+		super(SPType.DeCompression, op, in, out, opcode, istr);
+	}
+
+	public static DeCompressionSPInstruction parseInstruction(String str) {
+		InstructionUtils.checkNumFields(str, 2);
+		String[] parts = InstructionUtils.getInstructionPartsWithValueType(str);
+		return new DeCompressionSPInstruction(null, new CPOperand(parts[1]), new CPOperand(parts[2]), parts[0], str);
+	}
+
+	@Override
+	public void processInstruction(ExecutionContext ec) {
+		SparkExecutionContext sec = (SparkExecutionContext) ec;
+
+		// get input rdd handle
+		JavaPairRDD<MatrixIndexes, MatrixBlock> in = sec.getBinaryMatrixBlockRDDHandleForVariable(input1.getName());
+
+		// execute compression
+		JavaPairRDD<MatrixIndexes, MatrixBlock> out = in.mapValues(new CompressionFunction());
+
+		// set outputs
+		sec.setRDDHandleForVariable(output.getName(), out);
+		sec.addLineageRDD(input1.getName(), output.getName());
+	}
+
+	public static class DeCompressionFunction implements Function<MatrixBlock, MatrixBlock> {
+		private static final long serialVersionUID = -6528833083609413922L;
+
+		@Override
+		public MatrixBlock call(MatrixBlock arg0) throws Exception {
+			if(arg0 instanceof CompressedMatrixBlock){
+				return ((CompressedMatrixBlock) arg0).decompress(OptimizerUtils.getConstrainedNumThreads(-1));
+			}else{
+				return arg0;
+			}
+		}
+	}
+}
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/SPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/SPInstruction.java
index 9cb2a1e..297bb5b 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/spark/SPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/SPInstruction.java
@@ -33,7 +33,7 @@ public abstract class SPInstruction extends Instruction {
 		MAPMM, MAPMMCHAIN, CPMM, RMM, TSMM, TSMM2, PMM, ZIPMM, PMAPMM, //matrix multiplication instructions  
 		MatrixIndexing, Reorg, Binary, Ternary,
 		AggregateUnary, AggregateTernary, Reblock, CSVReblock, 
-		Builtin, Unary, BuiltinNary, MultiReturnBuiltin, Checkpoint, Compression, Cast,
+		Builtin, Unary, BuiltinNary, MultiReturnBuiltin, Checkpoint, Compression, DeCompression, Cast,
 		CentralMoment, Covariance, QSort, QPick, 
 		ParameterizedBuiltin, MAppend, RAppend, GAppend, GAlignedAppend, Rand, 
 		MatrixReshape, Ctable, Quaternary, CumsumAggregate, CumsumOffset, BinUaggChain, UaggOuterChain, 
diff --git a/src/main/java/org/apache/sysds/utils/DMLCompressionStatistics.java b/src/main/java/org/apache/sysds/utils/DMLCompressionStatistics.java
index 2b08a0a..26ae97d 100644
--- a/src/main/java/org/apache/sysds/utils/DMLCompressionStatistics.java
+++ b/src/main/java/org/apache/sysds/utils/DMLCompressionStatistics.java
@@ -19,8 +19,6 @@
 
 package org.apache.sysds.utils;
 
-import java.util.List;
-
 public class DMLCompressionStatistics {
 
 	// Compute compressed size info
@@ -39,12 +37,25 @@ public class DMLCompressionStatistics {
 	private static int DecompressMTCount = 0;
 	private static double DecompressMT = 0.0;
 
-	public static void addCompressionTimes(List<Double> times) {
-		Phase1 += times.get(0);
-		Phase2 += times.get(1);
-		Phase3 += times.get(2);
-		Phase4 += times.get(3);
-		Phase5 += times.get(4);
+	public static void addCompressionTime(double time, int phase) {
+		switch(phase) {
+			case 1:
+				Phase1 += time;
+				break;
+			case 2:
+				Phase2 += time;
+				break;
+			case 3:
+				Phase3 += time;
+				break;
+			case 4:
+				Phase4 += time;
+				break;
+			case 5:
+				Phase5 += time;
+				break;
+
+		}
 	}
 
 	public static void addDecompressTime(double time, int threads) {
@@ -58,6 +69,14 @@ public class DMLCompressionStatistics {
 		}
 	}
 
+	public static int getDecompressionCount() {
+		return DecompressMTCount;
+	}
+
+	public static int getDecompressionSTCount() {
+		return DecompressSTCount;
+	}
+
 	public static void display(StringBuilder sb) {
 		sb.append(String.format(
 			"CLA Compression Phases (classify, group, compress, share, clean) :\t%.3f/%.3f/%.3f/%.3f/%.3f\n",
diff --git a/src/test/java/org/apache/sysds/test/AutomatedTestBase.java b/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
index fffe6fe..570b61a 100644
--- a/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
+++ b/src/test/java/org/apache/sysds/test/AutomatedTestBase.java
@@ -379,7 +379,23 @@ public abstract class AutomatedTestBase {
 	protected double[][] getRandomMatrix(int rows, int cols, double min, double max, double sparsity, long seed) {
 		return TestUtils.generateTestMatrix(rows, cols, min, max, sparsity, seed);
 	}
-
+	/**
+	 * <p>
+	 * Generates a random matrix with the specified characteristics and returns it as a two dimensional array.
+	 * </p>
+	 *
+	 * @param rows     number of rows
+	 * @param cols     number of columns
+	 * @param min      minimum value
+	 * @param max      maximum value
+	 * @param sparsity sparsity
+	 * @param seed     seed
+	 * @param delta    The minimum value in between values.
+	 * @return two dimensional array containing random matrix
+	 */
+	protected double[][] getRandomMatrix(int rows, int cols, double min, double max, double sparsity, long seed, double delta) {
+		return TestUtils.generateTestMatrix(rows, cols, min, max, sparsity, seed, delta);
+	}
 	/**
 	 * <p>
 	 * Generates a random matrix with the specified characteristics which does not contain any zero values and returns
diff --git a/src/test/java/org/apache/sysds/test/TestUtils.java b/src/test/java/org/apache/sysds/test/TestUtils.java
index f01cf61..6a7d8e6 100644
--- a/src/test/java/org/apache/sysds/test/TestUtils.java
+++ b/src/test/java/org/apache/sysds/test/TestUtils.java
@@ -1531,6 +1531,35 @@ public class TestUtils
 	}
 
 	/**
+	 * Generates a test matrix with the specified parameters as a two
+	 * dimensional array.
+	 * Set seed to -1 to use the current time as seed.
+	 * 
+	 * @param rows number of rows
+	 * @param cols number of columns
+	 * @param min minimum value
+	 * @param max maximum value
+	 * @param sparsity sparsity
+	 * @param seed seed
+	 * @param delta The minimum delta between values.
+	 * @return random matrix
+	 */
+	public static double[][] generateTestMatrix(int rows, int cols, double min, double max, double sparsity, long seed, double delta) {
+		double[][] matrix = new double[rows][cols];
+		Random random = (seed == -1) ? TestUtils.random : new Random(seed);
+		for (int i = 0; i < rows; i++) {
+			for (int j = 0; j < cols; j++) {
+				if (random.nextDouble() > sparsity)
+					continue;
+				double v = (random.nextDouble()) * (max - min);
+				matrix[i][j] = (v + min ) - v % delta;
+			}
+		}
+
+		return matrix;
+	}
+
+	/**
 	 * 
 	 * Generates a test matrix, but only containing real numbers, in the range specified.
 	 * 
diff --git a/src/test/java/org/apache/sysds/test/functions/compress/CompressBase.java b/src/test/java/org/apache/sysds/test/functions/compress/CompressBase.java
new file mode 100644
index 0000000..88a7ead
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/functions/compress/CompressBase.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.compress;
+
+import static org.junit.Assert.assertTrue;
+
+import org.apache.sysds.common.Types;
+import org.apache.sysds.lops.LopProperties;
+import org.apache.sysds.runtime.meta.MatrixCharacteristics;
+import org.apache.sysds.test.AutomatedTestBase;
+import org.apache.sysds.test.TestConfiguration;
+import org.apache.sysds.test.TestUtils;
+import org.apache.sysds.utils.DMLCompressionStatistics;
+import org.apache.sysds.utils.Statistics;
+import org.junit.Assert;
+
+public abstract class CompressBase extends AutomatedTestBase {
+	// private static final Log LOG = LogFactory.getLog(CompressBase.class.getName());
+
+	protected abstract String getTestClassDir();
+
+	protected abstract String getTestName();
+
+	protected abstract String getTestDir();
+
+	@Override
+	public void setUp() {
+		TestUtils.clearAssertionInformation();
+		addTestConfiguration(getTestName(), new TestConfiguration(getTestClassDir(), getTestName()));
+	}
+
+	public void transpose(int decompressCount, int compressCount) {
+		// Currently the transpose would decompress the compression.
+		// But since this script only contain one operation on potentially compressed, it should not try to compress but
+		// will if forced.
+		LopProperties.ExecType ex = LopProperties.ExecType.CP;
+		compressTest(1, 1000, 1.0, ex, 1, 10, 1, decompressCount, compressCount, "transpose");
+	}
+
+	public void sum(int decompressCount, int compressCount) {
+		// Only using sum operations the compression should not be decompressed.
+		// But since this script only contain one operation on potentially compressed, it should not try to compress but
+		// will if forced.
+		LopProperties.ExecType ex = LopProperties.ExecType.CP;
+		compressTest(1, 1000, 1.0, ex, 1, 10, 1, decompressCount, compressCount, "sum");
+	}
+
+	public void rowAggregate(int decompressCount, int compressCount) {
+		// If we use row aggregates, it is preferable not to compress at all.
+		// But since this script only contain one operation on potentially compressed, it should not try to compress but
+		// will if forced.
+		LopProperties.ExecType ex = LopProperties.ExecType.CP;
+		compressTest(1, 1000, 1.0, ex, 1, 10, 1, decompressCount, compressCount, "row_min");
+	}
+
+	public void compressTest(int cols, int rows, double sparsity, LopProperties.ExecType instType, int min, int max,
+		double delta, int decompressionCountExpected, int compressionCountsExpected, String name) {
+
+		Types.ExecMode platformOld = setExecMode(instType);
+		try {
+
+			loadTestConfiguration(getTestConfiguration(getTestName()));
+
+			double[][] A = getRandomMatrix(rows, cols, min, max, sparsity, 42, delta);
+			writeInputMatrixWithMTD("A", A, false, new MatrixCharacteristics(rows, cols, 1024, rows * cols));
+
+			fullDMLScriptName = SCRIPT_DIR + "/functions/compress/compress_" + name + ".dml";
+
+			programArgs = new String[] {"-stats", "100", "-nvargs", "A=" + input("A")};
+
+			runTest(null);
+
+			int decompressCount = 0;
+			decompressCount += DMLCompressionStatistics.getDecompressionCount();
+			decompressCount += DMLCompressionStatistics.getDecompressionSTCount();
+			long compressionCount = Statistics.getCPHeavyHitterCount("compress");
+
+			Assert.assertEquals(compressionCount, compressionCountsExpected);
+			Assert.assertEquals(decompressionCountExpected, decompressCount);
+
+		}
+		catch(Exception e) {
+			e.printStackTrace();
+			assertTrue("Exception in execution: " + e.getMessage(), false);
+		}
+		finally {
+			rtplatform = platformOld;
+		}
+	}
+}
diff --git a/src/test/java/org/apache/sysds/test/functions/compress/CompressCost.java b/src/test/java/org/apache/sysds/test/functions/compress/CompressCost.java
new file mode 100644
index 0000000..84545e3
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/functions/compress/CompressCost.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.compress;
+
+import java.io.File;
+
+import org.junit.Test;
+
+public class CompressCost extends CompressBase {
+
+	public String TEST_NAME = "compress";
+	public String TEST_DIR = "functions/compress/cost/";
+	public String TEST_CLASS_DIR = TEST_DIR + CompressCost.class.getSimpleName() + "/";
+	private String TEST_CONF = "SystemDS-config-compress-cost.xml";
+	private File TEST_CONF_FILE = new File(SCRIPT_DIR + TEST_DIR, TEST_CONF);
+
+	protected String getTestClassDir() {
+		return TEST_CLASS_DIR;
+	}
+
+	protected String getTestName() {
+		return TEST_NAME;
+	}
+
+	protected String getTestDir() {
+		return TEST_DIR;
+	}
+
+	@Test
+	public void testTranspose() {
+		transpose(0, 0);
+	}
+
+	@Test
+	public void testSum() {
+		sum(0, 0);
+	}
+
+	@Test
+	public void testRowAggregate() {
+		rowAggregate(0, 0);
+	}
+
+	/**
+	 * Override default configuration with custom test configuration to ensure scratch space and local temporary
+	 * directory locations are also updated.
+	 */
+	@Override
+	protected File getConfigTemplateFile() {
+		return TEST_CONF_FILE;
+	}
+}
diff --git a/src/test/java/org/apache/sysds/test/functions/compress/CompressForce.java b/src/test/java/org/apache/sysds/test/functions/compress/CompressForce.java
new file mode 100644
index 0000000..47990d5
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/functions/compress/CompressForce.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.compress;
+
+import java.io.File;
+
+import org.junit.Test;
+
+public class CompressForce extends CompressBase {
+
+	public String TEST_NAME = "compress";
+	public String TEST_DIR = "functions/compress/force/";
+	public String TEST_CLASS_DIR = TEST_DIR + CompressForce.class.getSimpleName() + "/";
+	private String TEST_CONF = "SystemDS-config-compress.xml";
+	private File TEST_CONF_FILE = new File(SCRIPT_DIR + TEST_DIR, TEST_CONF);
+
+	protected String getTestClassDir() {
+		return TEST_CLASS_DIR;
+	}
+
+	protected String getTestName() {
+		return TEST_NAME;
+	}
+
+	protected String getTestDir() {
+		return TEST_DIR;
+	}
+
+	@Test
+	public void testTranspose() {
+		transpose(1, 1);
+	}
+
+	@Test
+	public void testSum(){
+		sum(0,1);
+	}
+	
+	@Test
+	public void testRowAggregate() {
+		rowAggregate(0,1);
+	}
+
+	/**
+	 * Override default configuration with custom test configuration to ensure scratch space and local temporary
+	 * directory locations are also updated.
+	 */
+	@Override
+	protected File getConfigTemplateFile() {
+		return TEST_CONF_FILE;
+	}
+}
diff --git a/src/test/java/org/apache/sysds/test/functions/compress/CompressLossy.java b/src/test/java/org/apache/sysds/test/functions/compress/CompressLossy.java
new file mode 100644
index 0000000..8b1cccc
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/functions/compress/CompressLossy.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.compress;
+
+import java.io.File;
+
+public class CompressLossy extends CompressForce {
+
+	public String TEST_NAME = "compress";
+	public String TEST_DIR = "functions/compress/force/lossy/";
+	public String TEST_CLASS_DIR = TEST_DIR + CompressLossy.class.getSimpleName() + "/";
+	private String TEST_CONF = "SystemDS-config-compress-lossy.xml";
+	private File TEST_CONF_FILE = new File(SCRIPT_DIR + TEST_DIR, TEST_CONF);
+
+	protected String getTestClassDir() {
+		return TEST_CLASS_DIR;
+	}
+
+	protected String getTestName() {
+		return TEST_NAME;
+	}
+
+	protected String getTestDir() {
+		return TEST_DIR;
+	}
+
+	/**
+	 * Override default configuration with custom test configuration to ensure scratch space and local temporary
+	 * directory locations are also updated.
+	 */
+	@Override
+	protected File getConfigTemplateFile() {
+		return TEST_CONF_FILE;
+	}
+}
diff --git a/src/test/java/org/apache/sysds/test/functions/compress/CompressLossyCost.java b/src/test/java/org/apache/sysds/test/functions/compress/CompressLossyCost.java
new file mode 100644
index 0000000..42d68bc
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/functions/compress/CompressLossyCost.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.functions.compress;
+
+import java.io.File;
+
+public class CompressLossyCost extends CompressCost {
+
+	public String TEST_NAME = "compress";
+	public String TEST_DIR = "functions/compress/cost/lossy";
+	public String TEST_CLASS_DIR = TEST_DIR + CompressLossyCost.class.getSimpleName() + "/";
+	private String TEST_CONF = "SystemDS-config-compress-lossy-cost.xml";
+	private File TEST_CONF_FILE = new File(SCRIPT_DIR + TEST_DIR, TEST_CONF);
+
+	protected String getTestClassDir() {
+		return TEST_CLASS_DIR;
+	}
+
+	protected String getTestName() {
+		return TEST_NAME;
+	}
+
+	protected String getTestDir() {
+		return TEST_DIR;
+	}
+
+	/**
+	 * Override default configuration with custom test configuration to ensure scratch space and local temporary
+	 * directory locations are also updated.
+	 */
+	@Override
+	protected File getConfigTemplateFile() {
+		return TEST_CONF_FILE;
+	}
+}
diff --git a/src/test/scripts/functions/compress/compress_row_min.dml b/src/test/scripts/functions/compress/compress_row_min.dml
new file mode 100644
index 0000000..daac5b8
--- /dev/null
+++ b/src/test/scripts/functions/compress/compress_row_min.dml
@@ -0,0 +1,24 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+A = read($A)
+r = rowMins(A)
+print(toString(r, rows = 1, cols = 1))
diff --git a/src/test/scripts/functions/compress/compress_sum.dml b/src/test/scripts/functions/compress/compress_sum.dml
new file mode 100644
index 0000000..78d0f66
--- /dev/null
+++ b/src/test/scripts/functions/compress/compress_sum.dml
@@ -0,0 +1,24 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+A = read($A)
+r = sum(A)
+print(r)
diff --git a/src/test/scripts/functions/compress/compress_transpose.dml b/src/test/scripts/functions/compress/compress_transpose.dml
new file mode 100644
index 0000000..a6b5aaa
--- /dev/null
+++ b/src/test/scripts/functions/compress/compress_transpose.dml
@@ -0,0 +1,24 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+A = read($A)
+r = t(A)
+print(toString(r, rows = 1, cols = 1))
diff --git a/src/test/scripts/functions/compress/cost/SystemDS-config-compress-cost.xml b/src/test/scripts/functions/compress/cost/SystemDS-config-compress-cost.xml
new file mode 100644
index 0000000..988774b
--- /dev/null
+++ b/src/test/scripts/functions/compress/cost/SystemDS-config-compress-cost.xml
@@ -0,0 +1,24 @@
+<!--
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+-->
+
+<root>
+	<sysds.compressed.linalg>cost</sysds.compressed.linalg>
+	<sysds.cp.parallel.ops>true</sysds.cp.parallel.ops>
+	<sysds.scratch>cost_scratch_space</sysds.scratch>
+</root>
diff --git a/src/test/scripts/functions/compress/cost/lossy/SystemDS-config-compress-lossy-cost.xml b/src/test/scripts/functions/compress/cost/lossy/SystemDS-config-compress-lossy-cost.xml
new file mode 100644
index 0000000..097d577
--- /dev/null
+++ b/src/test/scripts/functions/compress/cost/lossy/SystemDS-config-compress-lossy-cost.xml
@@ -0,0 +1,25 @@
+<!--
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+-->
+
+<root>
+	<sysds.compressed.linalg>cost</sysds.compressed.linalg>
+	<sysds.compressed.lossy>true</sysds.compressed.lossy>
+	<sysds.cp.parallel.ops>true</sysds.cp.parallel.ops>
+	<sysds.scratch>cost_lossy_scratch_space</sysds.scratch>
+</root>
diff --git a/src/test/scripts/functions/compress/force/SystemDS-config-compress.xml b/src/test/scripts/functions/compress/force/SystemDS-config-compress.xml
new file mode 100644
index 0000000..b1b6807
--- /dev/null
+++ b/src/test/scripts/functions/compress/force/SystemDS-config-compress.xml
@@ -0,0 +1,24 @@
+<!--
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+-->
+
+<root>
+	<sysds.compressed.linalg>true</sysds.compressed.linalg>
+	<sysds.cp.parallel.ops>true</sysds.cp.parallel.ops>
+	<sysds.scratch>comp_scratch_space</sysds.scratch>
+</root>
diff --git a/src/test/scripts/functions/compress/force/lossy/SystemDS-config-compress-lossy.xml b/src/test/scripts/functions/compress/force/lossy/SystemDS-config-compress-lossy.xml
new file mode 100644
index 0000000..452c561
--- /dev/null
+++ b/src/test/scripts/functions/compress/force/lossy/SystemDS-config-compress-lossy.xml
@@ -0,0 +1,25 @@
+<!--
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+-->
+
+<root>
+	<sysds.compressed.linalg>true</sysds.compressed.linalg>
+	<sysds.compressed.lossy>true</sysds.compressed.lossy>
+	<sysds.cp.parallel.ops>true</sysds.cp.parallel.ops>
+	<sysds.scratch>lossy_scratch_space</sysds.scratch>
+</root>