You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2017/05/28 08:15:30 UTC

incubator-systemml git commit: [SYSTEMML-1289] Codegen cell-wise operations over compressed matrices

Repository: incubator-systemml
Updated Branches:
  refs/heads/master 14fd9da1b -> 5174fbc00


[SYSTEMML-1289] Codegen cell-wise operations over compressed matrices

This patch generalized the codegen cell-wise operations (no aggregation,
row aggregates, full aggregates) to support compressed matrix blocks as
main or side inputs, without full decompression. The approach is to use
column-wise access in the operator skeletons to make a single pass over
the compressed representation. As a basis for these operations, this
patch introduces a column group iterator and encoding-format-specific
iterators for convenient access to row partitions of compressed values
with or without zero values. These iterators can be reused for all other
codegen templates as well as special cases of decompression. 


Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/5174fbc0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/5174fbc0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/5174fbc0

Branch: refs/heads/master
Commit: 5174fbc0029bcddea26e52dd3150193aad6fa18c
Parents: 14fd9da
Author: Matthias Boehm <mb...@gmail.com>
Authored: Sun May 28 01:01:42 2017 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Sun May 28 01:01:42 2017 -0700

----------------------------------------------------------------------
 .../sysml/runtime/codegen/SpoofCellwise.java    | 262 +++++++++++----
 .../sysml/runtime/codegen/SpoofOperator.java    |  31 +-
 .../runtime/compress/BitmapDecoderOLE.java      | 129 --------
 .../runtime/compress/BitmapDecoderRLE.java      | 119 -------
 .../apache/sysml/runtime/compress/ColGroup.java |   4 +
 .../sysml/runtime/compress/ColGroupDDC.java     |  52 +++
 .../sysml/runtime/compress/ColGroupOLE.java     |  76 ++++-
 .../sysml/runtime/compress/ColGroupOffset.java  | 143 +++++++-
 .../sysml/runtime/compress/ColGroupRLE.java     |  69 +++-
 .../runtime/compress/ColGroupUncompressed.java  |  52 +++
 .../runtime/compress/CompressedMatrixBlock.java |  51 +++
 .../codegen/CompressedCellwiseTest.java         | 331 +++++++++++++++++++
 .../functions/codegen/CompressedCellwiseMain.R  |  31 ++
 .../codegen/CompressedCellwiseMain.dml          |  27 ++
 .../functions/codegen/CompressedCellwiseSide.R  |  33 ++
 .../codegen/CompressedCellwiseSide.dml          |  29 ++
 .../SystemML-config-codegen-compress.xml        |  62 ++++
 .../functions/codegen/ZPackageSuite.java        |   1 +
 18 files changed, 1152 insertions(+), 350 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/5174fbc0/src/main/java/org/apache/sysml/runtime/codegen/SpoofCellwise.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/codegen/SpoofCellwise.java b/src/main/java/org/apache/sysml/runtime/codegen/SpoofCellwise.java
index eb45cc4..3e1dcad 100644
--- a/src/main/java/org/apache/sysml/runtime/codegen/SpoofCellwise.java
+++ b/src/main/java/org/apache/sysml/runtime/codegen/SpoofCellwise.java
@@ -22,6 +22,8 @@ package org.apache.sysml.runtime.codegen;
 import java.io.Serializable;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
@@ -29,6 +31,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.compress.CompressedMatrixBlock;
 import org.apache.sysml.runtime.functionobjects.Builtin;
 import org.apache.sysml.runtime.functionobjects.Builtin.BuiltinCode;
 import org.apache.sysml.runtime.functionobjects.KahanFunction;
@@ -38,6 +41,7 @@ import org.apache.sysml.runtime.functionobjects.ValueFunction;
 import org.apache.sysml.runtime.instructions.cp.DoubleObject;
 import org.apache.sysml.runtime.instructions.cp.KahanObject;
 import org.apache.sysml.runtime.instructions.cp.ScalarObject;
+import org.apache.sysml.runtime.matrix.data.IJV;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.SparseBlock;
 import org.apache.sysml.runtime.util.UtilFunctions;
@@ -125,9 +129,12 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl
 		double ret = 0;
 		if( k <= 1 ) //SINGLE-THREADED
 		{
-			ret = ( !inputs.get(0).isInSparseFormat() ) ?
-				executeDenseAndAgg(inputs.get(0).getDenseBlock(), b, scalars, m, n, sparseSafe, 0, m) :
-				executeSparseAndAgg(inputs.get(0).getSparseBlock(), b, scalars, m, n, sparseSafe, 0, m);
+			if( inputs.get(0) instanceof CompressedMatrixBlock )
+				ret = executeCompressedAndAgg((CompressedMatrixBlock)inputs.get(0), b, scalars, m, n, sparseSafe, 0, m);
+			else if( !inputs.get(0).isInSparseFormat() )
+				ret = executeDenseAndAgg(inputs.get(0).getDenseBlock(), b, scalars, m, n, sparseSafe, 0, m);
+			else
+				ret = executeSparseAndAgg(inputs.get(0).getSparseBlock(), b, scalars, m, n, sparseSafe, 0, m);
 		}
 		else  //MULTI-THREADED
 		{
@@ -199,7 +206,8 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl
 				&& genexec( 0, b, scalars, m, n, 0, 0 ) == 0);
 		
 		//result allocation and preparations
-		boolean sparseOut = sparseSafe && inputs.get(0).isInSparseFormat() && _type == CellType.NO_AGG;
+		boolean sparseOut = sparseSafe && inputs.get(0).isInSparseFormat()
+				&& _type == CellType.NO_AGG && !(inputs.get(0) instanceof CompressedMatrixBlock);
 		out.reset(inputs.get(0).getNumRows(), _type == CellType.NO_AGG ?
 				inputs.get(0).getNumColumns() : 1, sparseOut);
 		out.allocateDenseOrSparseBlock();
@@ -207,9 +215,12 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl
 		long lnnz = 0;
 		if( k <= 1 ) //SINGLE-THREADED
 		{
-			lnnz = (!inputs.get(0).isInSparseFormat()) ?
-				executeDense(inputs.get(0).getDenseBlock(), b, scalars, out, m, n, sparseSafe, 0, m) :
-				executeSparse(inputs.get(0).getSparseBlock(), b, scalars, out, m, n, sparseSafe, 0, m);
+			if( inputs.get(0) instanceof CompressedMatrixBlock )
+				lnnz = executeCompressed((CompressedMatrixBlock)inputs.get(0), b, scalars, out, m, n, sparseSafe, 0, m);
+			else if( !inputs.get(0).isInSparseFormat() )
+				lnnz = executeDense(inputs.get(0).getDenseBlock(), b, scalars, out, m, n, sparseSafe, 0, m);
+			else
+				lnnz = executeSparse(inputs.get(0).getSparseBlock(), b, scalars, out, m, n, sparseSafe, 0, m);
 		}
 		else  //MULTI-THREADED
 		{
@@ -239,6 +250,9 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl
 		out.examSparsity();
 	}
 	
+	/////////
+	//function dispatch
+	
 	private long executeDense(double[] a, SideInput[] b, double[] scalars, MatrixBlock out, int m, int n, boolean sparseSafe, int rl, int ru) 
 		throws DMLRuntimeException 
 	{
@@ -299,6 +313,128 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl
 			return executeSparseAggMxx(sblock, b, scalars, m, n, sparseSafe, rl, ru);
 	}
 	
+	private long executeCompressed(CompressedMatrixBlock a, SideInput[] b, double[] scalars, MatrixBlock out, int m, int n, boolean sparseSafe, int rl, int ru) 
+		throws DMLRuntimeException 
+	{
+		double[] c = out.getDenseBlock();
+		
+		if( _type == CellType.NO_AGG ) {
+			return executeCompressedNoAgg(a, b, scalars, c, m, n, sparseSafe, rl, ru);
+		}
+		else if( _type == CellType.ROW_AGG ) {
+			if( _aggOp == AggOp.SUM || _aggOp == AggOp.SUM_SQ )
+				return executeCompressedRowAggSum(a, b, scalars, c, m, n, sparseSafe, rl, ru);
+			else
+				return executeCompressedRowAggMxx(a, b, scalars, c, m, n, sparseSafe, rl, ru);
+		}
+		return -1;
+	}
+	
+	private double executeCompressedAndAgg(CompressedMatrixBlock a, SideInput[] b, double[] scalars, int m, int n, boolean sparseSafe, int rl, int ru) throws DMLRuntimeException 
+	{
+		//numerically stable aggregation for sum/sum_sq
+		if( _aggOp == AggOp.SUM || _aggOp == AggOp.SUM_SQ )
+			return executeCompressedAggSum(a, b, scalars, m, n, sparseSafe, rl, ru);
+		else
+			return executeCompressedAggMxx(a, b, scalars, m, n, sparseSafe, rl, ru);
+	}
+	
+	/////////
+	//core operator skeletons for dense, sparse, and compressed
+
+	private long executeDenseNoAgg(double[] a, SideInput[] b, double[] scalars, double[] c, int m, int n, boolean sparseSafe, int rl, int ru) 
+		throws DMLRuntimeException 
+	{
+		long lnnz = 0;
+		for( int i=rl, ix=rl*n; i<ru; i++ ) 
+			for( int j=0; j<n; j++, ix++ ) {
+				double aval = (a != null) ? a[ix] : 0;
+				if( aval != 0 || !sparseSafe) {
+					c[ix] = genexec( aval, b, scalars, m, n, i, j); 
+					lnnz += (c[ix]!=0) ? 1 : 0;
+				}
+			}
+		return lnnz;
+	}
+	
+	private long executeDenseRowAggSum(double[] a, SideInput[] b, double[] scalars, double[] c, int m, int n, boolean sparseSafe, int rl, int ru) 
+		throws DMLRuntimeException 
+	{
+		KahanFunction kplus = (KahanFunction) getAggFunction();
+		KahanObject kbuff = new KahanObject(0, 0);
+		long lnnz = 0;
+		for( int i=rl, ix=rl*n; i<ru; i++ ) {
+			kbuff.set(0, 0);
+			for( int j=0; j<n; j++, ix++ ) {
+				double aval = (a != null) ? a[ix] : 0;
+				if( aval != 0 || !sparseSafe)
+					kplus.execute2(kbuff, genexec(aval, b, scalars, m, n, i, j));
+			}
+			lnnz += ((c[i] = kbuff._sum)!=0) ? 1 : 0;
+		}
+		return lnnz;
+	}
+	
+	private long executeDenseRowAggMxx(double[] a, SideInput[] b, double[] scalars, double[] c, int m, int n, boolean sparseSafe, int rl, int ru) 
+		throws DMLRuntimeException 
+	{
+		double initialVal = (_aggOp==AggOp.MIN) ? Double.MAX_VALUE : -Double.MAX_VALUE;
+		ValueFunction vfun = getAggFunction();
+		long lnnz = 0;
+		if( a == null && !sparseSafe ) { //empty
+			for( int i=rl; i<ru; i++ ) { 
+				double tmp = initialVal;
+				for( int j=0; j<n; j++ )
+					tmp = vfun.execute(tmp, genexec( 0, b, scalars, m, n, i, j ));
+				lnnz += ((c[i] = tmp)!=0) ? 1 : 0;
+			}
+		}
+		else if( a != null ) { //general case
+			for( int i=rl, ix=rl*n; i<ru; i++ ) {
+				double tmp = initialVal;
+				for( int j=0; j<n; j++, ix++ )
+					if( a[ix] != 0 || !sparseSafe)
+						tmp = vfun.execute(tmp, genexec( a[ix], b, scalars, m, n, i, j ));
+				if( sparseSafe && UtilFunctions.containsZero(a, ix-n, n) )
+					tmp = vfun.execute(tmp, 0);
+				lnnz += ((c[i] = tmp)!=0) ? 1 : 0;
+			}
+		}
+		return lnnz;
+	}
+	
+	private double executeDenseAggSum(double[] a, SideInput[] b, double[] scalars, int m, int n, boolean sparseSafe, int rl, int ru) 
+		throws DMLRuntimeException 
+	{
+		KahanFunction kplus = (KahanFunction) getAggFunction();
+		KahanObject kbuff = new KahanObject(0, 0);
+		
+		for( int i=rl, ix=rl*n; i<ru; i++ ) 
+			for( int j=0; j<n; j++, ix++ ) {
+				double aval = (a != null) ? a[ix] : 0;
+				if( aval != 0 || !sparseSafe)
+					kplus.execute2(kbuff, genexec(aval, b, scalars, m, n, i, j));
+			}
+		return kbuff._sum;
+	}
+	
+	private double executeDenseAggMxx(double[] a, SideInput[] b, double[] scalars, int m, int n, boolean sparseSafe, int rl, int ru) 
+		throws DMLRuntimeException 
+	{
+		//safe aggregation for min/max w/ handling of zero entries
+		//note: sparse safe with zero value as min/max handled outside
+		double ret = (_aggOp==AggOp.MIN) ? Double.MAX_VALUE : -Double.MAX_VALUE; 
+		ValueFunction vfun = getAggFunction();
+		
+		for( int i=rl, ix=rl*n; i<ru; i++ ) 
+			for( int j=0; j<n; j++, ix++ ) {
+				double aval = (a != null) ? a[ix] : 0;
+				if( aval != 0 || !sparseSafe)
+					ret = vfun.execute(ret, genexec(aval, b, scalars, m, n, i, j));
+			}
+		return ret;
+	}
+	
 	private long executeSparseNoAggSparse(SparseBlock sblock, SideInput[] b, double[] scalars, MatrixBlock out, int m, int n, boolean sparseSafe, int rl, int ru) 
 		throws DMLRuntimeException 
 	{
@@ -513,83 +649,72 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl
 		return ret;
 	}
 	
-	private long executeDenseNoAgg(double[] a, SideInput[] b, double[] scalars, double[] c, int m, int n, boolean sparseSafe, int rl, int ru) 
+	private long executeCompressedNoAgg(CompressedMatrixBlock a, SideInput[] b, double[] scalars, double[] c, int m, int n, boolean sparseSafe, int rl, int ru) 
 		throws DMLRuntimeException 
 	{
 		long lnnz = 0;
-		for( int i=rl, ix=rl*n; i<ru; i++ ) 
-			for( int j=0; j<n; j++, ix++ ) {
-				double aval = (a != null) ? a[ix] : 0;
-				if( aval != 0 || !sparseSafe) {
-					c[ix] = genexec( aval, b, scalars, m, n, i, j); 
-					lnnz += (c[ix]!=0) ? 1 : 0;
-				}
-			}
+		Iterator<IJV> iter = a.getIterator(rl, ru, !sparseSafe);
+		while( iter.hasNext() ) {
+			IJV cell = iter.next();
+			double val = genexec(cell.getV(), b, scalars, m, n, cell.getI(), cell.getJ());
+			c[cell.getI()*n+cell.getJ()] = val; 
+			lnnz += (val!=0) ? 1 : 0;
+		}
 		return lnnz;
 	}
 	
-	private long executeDenseRowAggSum(double[] a, SideInput[] b, double[] scalars, double[] c, int m, int n, boolean sparseSafe, int rl, int ru) 
+	private long executeCompressedRowAggSum(CompressedMatrixBlock a, SideInput[] b, double[] scalars, double[] c, int m, int n, boolean sparseSafe, int rl, int ru) 
 		throws DMLRuntimeException 
 	{
 		KahanFunction kplus = (KahanFunction) getAggFunction();
 		KahanObject kbuff = new KahanObject(0, 0);
 		long lnnz = 0;
-		for( int i=rl, ix=rl*n; i<ru; i++ ) {
-			kbuff.set(0, 0);
-			for( int j=0; j<n; j++, ix++ ) {
-				double aval = (a != null) ? a[ix] : 0;
-				if( aval != 0 || !sparseSafe)
-					kplus.execute2(kbuff, genexec(aval, b, scalars, m, n, i, j));
-			}
-			lnnz += ((c[i] = kbuff._sum)!=0) ? 1 : 0;
+		Iterator<IJV> iter = a.getIterator(rl, ru, !sparseSafe);
+		while( iter.hasNext() ) {
+			IJV cell = iter.next();
+			double val = genexec(cell.getV(), b, scalars, m, n, cell.getI(), cell.getJ());
+			kbuff.set(c[cell.getI()], 0);
+			kplus.execute2(kbuff, val);
+			c[cell.getI()] = kbuff._sum;
 		}
+		for( int i=rl; i<ru; i++ )
+			lnnz += (c[i]!=0) ? 1 : 0;
 		return lnnz;
 	}
 	
-	private long executeDenseRowAggMxx(double[] a, SideInput[] b, double[] scalars, double[] c, int m, int n, boolean sparseSafe, int rl, int ru) 
+	private long executeCompressedRowAggMxx(CompressedMatrixBlock a, SideInput[] b, double[] scalars, double[] c, int m, int n, boolean sparseSafe, int rl, int ru) 
 		throws DMLRuntimeException 
 	{
-		double initialVal = (_aggOp==AggOp.MIN) ? Double.MAX_VALUE : -Double.MAX_VALUE;
+		Arrays.fill(c, rl, ru, (_aggOp==AggOp.MIN) ? Double.MAX_VALUE : -Double.MAX_VALUE);
 		ValueFunction vfun = getAggFunction();
 		long lnnz = 0;
-		if( a == null && !sparseSafe ) { //empty
-			for( int i=rl; i<ru; i++ ) { 
-				double tmp = initialVal;
-				for( int j=0; j<n; j++ )
-					tmp = vfun.execute(tmp, genexec( 0, b, scalars, m, n, i, j ));
-				lnnz += ((c[i] = tmp)!=0) ? 1 : 0;
-			}
-		}
-		else if( a != null ) { //general case
-			for( int i=rl, ix=rl*n; i<ru; i++ ) {
-				double tmp = initialVal;
-				for( int j=0; j<n; j++, ix++ )
-					if( a[ix] != 0 || !sparseSafe)
-						tmp = vfun.execute(tmp, genexec( a[ix], b, scalars, m, n, i, j ));
-				if( sparseSafe && UtilFunctions.containsZero(a, ix-n, n) )
-					tmp = vfun.execute(tmp, 0);
-				lnnz += ((c[i] = tmp)!=0) ? 1 : 0;
-			}
+		Iterator<IJV> iter = a.getIterator(rl, ru, !sparseSafe);
+		while( iter.hasNext() ) {
+			IJV cell = iter.next();
+			double val = genexec(cell.getV(), b, scalars, m, n, cell.getI(), cell.getJ());
+			c[cell.getI()] = vfun.execute(c[cell.getI()], val);
 		}
+		for( int i=rl; i<ru; i++ )
+			lnnz += (c[i]!=0) ? 1 : 0;
 		return lnnz;
 	}
 	
-	private double executeDenseAggSum(double[] a, SideInput[] b, double[] scalars, int m, int n, boolean sparseSafe, int rl, int ru) 
+	private double executeCompressedAggSum(CompressedMatrixBlock a, SideInput[] b, double[] scalars, int m, int n, boolean sparseSafe, int rl, int ru) 
 		throws DMLRuntimeException 
 	{
 		KahanFunction kplus = (KahanFunction) getAggFunction();
 		KahanObject kbuff = new KahanObject(0, 0);
 		
-		for( int i=rl, ix=rl*n; i<ru; i++ ) 
-			for( int j=0; j<n; j++, ix++ ) {
-				double aval = (a != null) ? a[ix] : 0;
-				if( aval != 0 || !sparseSafe)
-					kplus.execute2(kbuff, genexec(aval, b, scalars, m, n, i, j));
-			}
+		Iterator<IJV> iter = a.getIterator(rl, ru, !sparseSafe);
+		while( iter.hasNext() ) {
+			IJV cell = iter.next();
+			double val = genexec(cell.getV(), b, scalars, m, n, cell.getI(), cell.getJ()); 
+			kplus.execute2(kbuff, val);
+		}
 		return kbuff._sum;
 	}
 	
-	private double executeDenseAggMxx(double[] a, SideInput[] b, double[] scalars, int m, int n, boolean sparseSafe, int rl, int ru) 
+	private double executeCompressedAggMxx(CompressedMatrixBlock a, SideInput[] b, double[] scalars, int m, int n, boolean sparseSafe, int rl, int ru) 
 		throws DMLRuntimeException 
 	{
 		//safe aggregation for min/max w/ handling of zero entries
@@ -597,16 +722,15 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl
 		double ret = (_aggOp==AggOp.MIN) ? Double.MAX_VALUE : -Double.MAX_VALUE; 
 		ValueFunction vfun = getAggFunction();
 		
-		for( int i=rl, ix=rl*n; i<ru; i++ ) 
-			for( int j=0; j<n; j++, ix++ ) {
-				double aval = (a != null) ? a[ix] : 0;
-				if( aval != 0 || !sparseSafe)
-					ret = vfun.execute(ret, genexec(aval, b, scalars, m, n, i, j));
-			}
+		Iterator<IJV> iter = a.getIterator(rl, ru, !sparseSafe);
+		while( iter.hasNext() ) {
+			IJV cell = iter.next();
+			double val = genexec(cell.getV(), b, scalars, m, n, cell.getI(), cell.getJ());
+			ret = vfun.execute(ret, val);
+		}
 		return ret;
 	}
 	
-	
 	protected abstract double genexec( double a, SideInput[] b, double[] scalars, int m, int n, int rowIndex, int colIndex);
 	
 	private class ParAggTask implements Callable<Double> 
@@ -634,9 +758,12 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl
 		
 		@Override
 		public Double call() throws DMLRuntimeException {
-			return ( !_a.isInSparseFormat()) ?
-				executeDenseAndAgg(_a.getDenseBlock(), _b, _scalars, _rlen, _clen, _safe, _rl, _ru) :
-				executeSparseAndAgg(_a.getSparseBlock(), _b, _scalars, _rlen, _clen, _safe, _rl, _ru);
+			if( _a instanceof CompressedMatrixBlock )
+				return executeCompressedAndAgg((CompressedMatrixBlock)_a, _b, _scalars, _rlen, _clen, _safe, _rl, _ru);
+			else if (!_a.isInSparseFormat())
+				return executeDenseAndAgg(_a.getDenseBlock(), _b, _scalars, _rlen, _clen, _safe, _rl, _ru);
+			else
+				return executeSparseAndAgg(_a.getSparseBlock(), _b, _scalars, _rlen, _clen, _safe, _rl, _ru);
 		}
 	}
 
@@ -667,9 +794,12 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl
 		
 		@Override
 		public Long call() throws DMLRuntimeException {
-			return (!_a.isInSparseFormat()) ?
-					executeDense(_a.getDenseBlock(), _b, _scalars, _c, _rlen, _clen, _safe, _rl, _ru) :
-					executeSparse(_a.getSparseBlock(), _b, _scalars,  _c, _rlen, _clen, _safe, _rl, _ru);
+			if( _a instanceof CompressedMatrixBlock )
+				return executeCompressed((CompressedMatrixBlock)_a, _b, _scalars, _c, _rlen, _clen, _safe, _rl, _ru);
+			else if( !_a.isInSparseFormat() )
+				return executeDense(_a.getDenseBlock(), _b, _scalars, _c, _rlen, _clen, _safe, _rl, _ru);
+			else
+				return executeSparse(_a.getSparseBlock(), _b, _scalars,  _c, _rlen, _clen, _safe, _rl, _ru);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/5174fbc0/src/main/java/org/apache/sysml/runtime/codegen/SpoofOperator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/codegen/SpoofOperator.java b/src/main/java/org/apache/sysml/runtime/codegen/SpoofOperator.java
index 9499319..1f3920f 100644
--- a/src/main/java/org/apache/sysml/runtime/codegen/SpoofOperator.java
+++ b/src/main/java/org/apache/sysml/runtime/codegen/SpoofOperator.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.compress.CompressedMatrixBlock;
 import org.apache.sysml.runtime.instructions.cp.ScalarObject;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.util.DataConverter;
@@ -57,17 +58,26 @@ public abstract class SpoofOperator implements Serializable
 		return execute(inputs, scalars);
 	}
 	
-	protected double[][] prepInputMatricesDense(ArrayList<MatrixBlock> inputs) {
+	protected double[][] prepInputMatricesDense(ArrayList<MatrixBlock> inputs) 
+		throws DMLRuntimeException 
+	{
 		return prepInputMatricesDense(inputs, 1, inputs.size()-1);
 	}
 	
-	protected double[][] prepInputMatricesDense(ArrayList<MatrixBlock> inputs, int offset) {
+	protected double[][] prepInputMatricesDense(ArrayList<MatrixBlock> inputs, int offset) 
+		throws DMLRuntimeException 
+	{
 		return prepInputMatricesDense(inputs, offset, inputs.size()-offset);
 	}
 	
-	protected double[][] prepInputMatricesDense(ArrayList<MatrixBlock> inputs, int offset, int len) {
+	protected double[][] prepInputMatricesDense(ArrayList<MatrixBlock> inputs, int offset, int len) 
+		throws DMLRuntimeException 
+	{
 		double[][] b = new double[len][]; 
 		for(int i=offset; i<offset+len; i++) {
+			if( inputs.get(i) instanceof CompressedMatrixBlock ) 
+				inputs.set(i, ((CompressedMatrixBlock)inputs.get(i)).decompress());
+			
 			//convert empty or sparse to dense temporary block (note: we don't do
 			//this in place because this block might be used by multiple threads)
 			if( inputs.get(i).isInSparseFormat() && inputs.get(i).isAllocated() ) {
@@ -85,17 +95,26 @@ public abstract class SpoofOperator implements Serializable
 		return b;
 	}
 	
-	protected SideInput[] prepInputMatricesAbstract(ArrayList<MatrixBlock> inputs) {
+	protected SideInput[] prepInputMatricesAbstract(ArrayList<MatrixBlock> inputs) 
+		throws DMLRuntimeException 
+	{
 		return prepInputMatricesAbstract(inputs, 1, inputs.size()-1);
 	}
 	
-	protected SideInput[] prepInputMatricesAbstract(ArrayList<MatrixBlock> inputs, int offset) {
+	protected SideInput[] prepInputMatricesAbstract(ArrayList<MatrixBlock> inputs, int offset) 
+		throws DMLRuntimeException 
+	{
 		return prepInputMatricesAbstract(inputs, offset, inputs.size()-offset);
 	}
 	
-	protected SideInput[] prepInputMatricesAbstract(ArrayList<MatrixBlock> inputs, int offset, int len) {
+	protected SideInput[] prepInputMatricesAbstract(ArrayList<MatrixBlock> inputs, int offset, int len) 
+		throws DMLRuntimeException 
+	{
 		SideInput[] b = new SideInput[len]; 
 		for(int i=offset; i<offset+len; i++) {
+			if( inputs.get(i) instanceof CompressedMatrixBlock ) 
+				inputs.set(i, ((CompressedMatrixBlock)inputs.get(i)).decompress());
+			
 			if( inputs.get(i).isInSparseFormat() && inputs.get(i).isAllocated() )
 				b[i-offset] = new SideInput(null, inputs.get(i));
 			else

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/5174fbc0/src/main/java/org/apache/sysml/runtime/compress/BitmapDecoderOLE.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/BitmapDecoderOLE.java b/src/main/java/org/apache/sysml/runtime/compress/BitmapDecoderOLE.java
deleted file mode 100644
index 3dc8462..0000000
--- a/src/main/java/org/apache/sysml/runtime/compress/BitmapDecoderOLE.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.compress;
-
-import java.util.Arrays;
-import java.util.Iterator;
-
-/**
- * General-purpose iterator to decode a compressed OLE bitmap.
- *  
- */
-public final class BitmapDecoderOLE implements Iterator<Integer> 
-{
-	// pointer to the compressed bitmap
-	private int _bmOff;
-	private int _bmLen;
-	private char[] _bmPtr;
-
-	// The index of the current block. Block 0 covers bits 1 through 2^16
-	private int _blockIx;
-
-	// The offset where the current block starts within the bitmap.
-	private int _blockStartOffset;
-
-	// The number of offsets in the current block.
-	private int _curBlockSize;
-
-	// The offset <b>in the current block</b> the <b>next</b> element we will
-	// read from the bitmap, or bmPtr.length if we are done.
-	private int _nextBmOffset;
-
-	/**
-	 * Point this object at the beginning of a particular bitmap. After a call
-	 * to this method, the next call to {@link #next()} will return the
-	 * offset of the first bit in the specified bitmap.
-	 * 
-	 * @param bmPtr
-	 *            pointer to a compressed bitmap
-	 * @param off offset
-	 * @param len length
-	 */
-	public BitmapDecoderOLE(char[] bmPtr, int off, int len) {
-		_bmOff = off;
-		_bmLen = len;
-		_bmPtr = bmPtr;
-		_blockIx = 0;
-		_blockStartOffset = 0;
-		_curBlockSize = _bmPtr[_bmOff+_blockStartOffset];
-		if (_curBlockSize < 0) {
-			throw new RuntimeException(String.format(
-					"Negative block size %d at position %d of %s",
-					_curBlockSize, _blockStartOffset, Arrays.toString(bmPtr)));
-		}
-		_nextBmOffset = 0;
-
-		// Advance past any zero-length blocks at the beginning of the array
-		while (_blockStartOffset < _bmLen
-				&& _nextBmOffset >= _curBlockSize) {
-			advanceToNextBlock();
-		}
-	}
-
-	@Override
-	public Integer next() {
-		if( !hasNext() )
-			throw new RuntimeException("No next offset existing.");
-		
-		// Grab the lookahead value Note the +1 in the array indexing; 
-		// the first number in a block is the block size
-		int offsetFromBlockBegin = _bmPtr[_bmOff+_blockStartOffset + 1 + _nextBmOffset];
-		int ret = (_blockIx * BitmapEncoder.BITMAP_BLOCK_SZ)
-				+ offsetFromBlockBegin;
-		_nextBmOffset++;
-
-		// Advance to next non-empty block if we reached the end of the block.
-		while (_blockStartOffset < _bmLen && _nextBmOffset >= _curBlockSize) {
-			advanceToNextBlock();
-		}
-
-		return ret;
-	}
-
-	@Override
-	public boolean hasNext() {
-		return _blockStartOffset < _bmLen;
-	}
-
-	@Override
-	public void remove() {
-		throw new RuntimeException("Not implemented for BitmapDecoderOLE.");
-	}
-
-	/**
-	 * Move forward to the next block. Does not skip empty blocks.
-	 */
-	private void advanceToNextBlock() {
-		_blockStartOffset += (1 + _curBlockSize);
-		_blockIx++;
-		if (_blockStartOffset >= _bmLen) {
-			// Read past last block
-			return;
-		}
-
-		_curBlockSize = _bmPtr[_bmOff+_blockStartOffset];
-		if (_curBlockSize < 0) {
-			throw new RuntimeException(String.format(
-					"Negative block size %d at position %d of %s",
-					_curBlockSize, _blockStartOffset, Arrays.toString(_bmPtr)));
-		}
-		_nextBmOffset = 0;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/5174fbc0/src/main/java/org/apache/sysml/runtime/compress/BitmapDecoderRLE.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/BitmapDecoderRLE.java b/src/main/java/org/apache/sysml/runtime/compress/BitmapDecoderRLE.java
deleted file mode 100644
index d599c9e..0000000
--- a/src/main/java/org/apache/sysml/runtime/compress/BitmapDecoderRLE.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.compress;
-
-import java.util.Iterator;
-
-/**
- * General-purpose iterator to decode a compressed OLE bitmap.
- * 
- */
-public final class BitmapDecoderRLE implements Iterator<Integer>
-{
-	// pointer to the compressed bitmap
-	private int _bmOff;
-	private int _bmLen;
-	private char[] _bmPtr;
-
-	// The offset of the <b>next</b> element we will read from the bitmap, or
-	// bmPtr.length if we are done.
-	private int _nextBmOffset;
-
-	// The offset in the matrix column of the beginning of the current run
-	private int _runStartOffset;
-
-	// The length of the current run
-	private int _curRunLen;
-
-	// The number of bits that we have returned from the current run.
-	private int _runBitsReturned;
-
-	/**
-	 * Point this object at the beginning of a particular bitmap. After a call
-	 * to this method, the next call to {@link #next()} will return the
-	 * offset of the first bit in the specified bitmap.
-	 * 
-	 * @param bmPtr
-	 *            pointer to a compressed bitmap
-	 * @param off offset
-	 * @param len length
-	 */
-	public BitmapDecoderRLE(char[] bmPtr, int off, int len) {
-		_bmOff = off;
-		_bmLen = len;
-		_bmPtr = bmPtr;
-		_nextBmOffset = 0;
-		_runStartOffset = 0;
-		_curRunLen = 0;
-		_runBitsReturned = 0;
-
-		if (0 == _bmLen) {
-			return; //no runs
-		}
-
-		// Advance to the beginning of the first non-empty run.
-		advanceToNextRun();
-	}
-
-	@Override
-	public Integer next() {
-		if( !hasNext() )
-			throw new RuntimeException("No next offset existing.");
-		
-		// Grab the lookahead value
-		int ret = _runStartOffset + _runBitsReturned;
-
-		_runBitsReturned++;
-
-		// Check for end of run
-		if (_runBitsReturned == _curRunLen) {
-			advanceToNextRun();
-		}
-
-		return ret;
-	}
-
-	@Override
-	public boolean hasNext() {
-		return _runBitsReturned < _curRunLen;
-	}
-	
-	@Override
-	public void remove() {
-		throw new RuntimeException("Not implemented for BitmapDecoderRLE.");
-	}
-	
-	/** Move forward to the next non-empty run. */
-	private void advanceToNextRun() {
-		// While loop needed because some runs are of length 0
-		while (_runBitsReturned == _curRunLen && _nextBmOffset < _bmLen) {
-
-			_runBitsReturned = 0;
-
-			// Read the distance to the next run
-			char delta = _bmPtr[_bmOff + _nextBmOffset];
-
-			// Run length is stored in the next element of the array
-			_runStartOffset += delta + _curRunLen;
-			_curRunLen = _bmPtr[_bmOff + _nextBmOffset + 1];
-			_nextBmOffset += 2;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/5174fbc0/src/main/java/org/apache/sysml/runtime/compress/ColGroup.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroup.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroup.java
index bf1b822..2f9d1de 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/ColGroup.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroup.java
@@ -23,9 +23,11 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.Iterator;
 import java.util.List;
 
 import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.matrix.data.IJV;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator;
 import org.apache.sysml.runtime.matrix.operators.ScalarOperator;
@@ -259,6 +261,8 @@ public abstract class ColGroup implements Serializable
 	public abstract void unaryAggregateOperations(AggregateUnaryOperator op, MatrixBlock result)
 		throws DMLRuntimeException;
 	
+	public abstract Iterator<IJV> getIterator(int rl, int ru, boolean inclZeros);
+	
 	/**
 	 * Count the number of non-zeros per row
 	 * 

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/5174fbc0/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC.java
index 9a6067b..9a4a982 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC.java
@@ -20,6 +20,7 @@
 package org.apache.sysml.runtime.compress;
 
 import java.util.Arrays;
+import java.util.Iterator;
 
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.functionobjects.Builtin;
@@ -31,6 +32,7 @@ import org.apache.sysml.runtime.functionobjects.ReduceCol;
 import org.apache.sysml.runtime.functionobjects.ReduceRow;
 import org.apache.sysml.runtime.functionobjects.Builtin.BuiltinCode;
 import org.apache.sysml.runtime.instructions.cp.KahanObject;
+import org.apache.sysml.runtime.matrix.data.IJV;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator;
 
@@ -235,4 +237,54 @@ public abstract class ColGroupDDC extends ColGroupValue
 	public long estimateInMemorySize() {
 		return super.estimateInMemorySize();
 	}
+	
+	@Override
+	public Iterator<IJV> getIterator(int rl, int ru, boolean inclZeros) {
+		return new DDCIterator(rl, ru, inclZeros);
+	}
+	
+	private class DDCIterator implements Iterator<IJV>
+	{
+		//iterator configuration 
+		private final int _ru;
+		private final boolean _inclZeros;
+		
+		//iterator state
+		private final IJV _buff = new IJV(); 
+		private int _rpos = -1;
+		private int _cpos = -1;
+		private double _value = 0;
+		
+		public DDCIterator(int rl, int ru, boolean inclZeros) {
+			_ru = ru;
+			_inclZeros = inclZeros;
+			_rpos = rl;
+			_cpos = -1;
+			getNextValue();
+		}
+
+		@Override
+		public boolean hasNext() {
+			return (_rpos < _ru);
+		}
+
+		@Override
+		public IJV next() {
+			_buff.set(_rpos, _colIndexes[_cpos], _value);
+			getNextValue();
+			return _buff;
+		}
+		
+		private void getNextValue() {
+			do {
+				boolean nextRow = (_cpos+1 >= getNumCols());
+				_rpos += nextRow ? 1 : 0; 
+				_cpos = nextRow ? 0 : _cpos+1;
+				if( _rpos >= _ru )
+					return; //reached end
+				_value = getData(_rpos, _cpos);
+			}
+			while( !_inclZeros && _value==0);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/5174fbc0/src/main/java/org/apache/sysml/runtime/compress/ColGroupOLE.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupOLE.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroupOLE.java
index 70f759e..f100495 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/ColGroupOLE.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupOLE.java
@@ -112,11 +112,6 @@ public class ColGroupOLE extends ColGroupOffset
 	public CompressionType getCompType() {
 		return CompressionType.OLE_BITMAP;
 	}
-
-	@Override
-	public Iterator<Integer> getDecodeIterator(int k) {
-		return new BitmapDecoderOLE(_data, _ptr[k], len(k));
-	}
 	
 	@Override
 	public void decompressToBlock(MatrixBlock target, int rl, int ru) 
@@ -643,10 +638,9 @@ public class ColGroupOLE extends ColGroupOffset
 	 * Utility function of sparse-unsafe operations.
 	 * 
 	 * @return zero indicator vector
-	 * @throws DMLRuntimeException if DMLRuntimeException occurs
 	 */
-	private boolean[] computeZeroIndicatorVector()
-		throws DMLRuntimeException 
+	@Override
+	protected boolean[] computeZeroIndicatorVector()
 	{
 		boolean[] ret = new boolean[_numRows];
 		final int blksz = BitmapEncoder.BITMAP_BLOCK_SZ;
@@ -762,4 +756,70 @@ public class ColGroupOLE extends ColGroupOffset
 		
 		return 0;
 	}
+
+	@Override
+	public Iterator<Integer> getIterator(int k) {
+		return new OLEValueIterator(k, 0, getNumRows());
+	}
+	
+	@Override
+	public Iterator<Integer> getIterator(int k, int rl, int ru) {
+		return new OLEValueIterator(k, rl, ru);
+	}
+
+	private class OLEValueIterator implements Iterator<Integer>
+	{
+		private final int _ru;
+		private final int _boff;
+		private final int _blen;
+		private int _bix;
+		private int _start;
+		private int _slen;
+		private int _spos;
+		private int _rpos;
+		
+		public OLEValueIterator(int k, int rl, int ru) {
+			_ru = ru;
+			_boff = _ptr[k];
+			_blen = len(k);
+			_bix = 0;
+			_start = 0; //init first segment
+			_slen = _data[_boff + _bix];
+			_spos = 0;
+			_rpos = _data[_boff + _bix + 1];
+			while( _rpos < rl )
+				nextRowOffset();
+		}
+
+		@Override
+		public boolean hasNext() {
+			return (_rpos < _ru);
+		}
+
+		@Override
+		public Integer next() {
+			int ret = _rpos;
+			nextRowOffset();
+			return ret;
+		}
+		
+		private void nextRowOffset() {
+			if( _spos+1 < _slen ) {
+				_spos++;
+				_rpos = _start + _data[_boff + _bix + _spos + 1];
+			}
+			else {
+				_start += BitmapEncoder.BITMAP_BLOCK_SZ;
+				_bix += _slen+1;
+				if( _bix < _blen ) {
+					_slen = _data[_boff + _bix];
+					_spos = 0;
+					_rpos = _start + _data[_boff + _bix + 1];
+				}
+				else {
+					_rpos = _ru;
+				}
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/5174fbc0/src/main/java/org/apache/sysml/runtime/compress/ColGroupOffset.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupOffset.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroupOffset.java
index 41d7184..184c69a 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/ColGroupOffset.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupOffset.java
@@ -35,6 +35,7 @@ import org.apache.sysml.runtime.functionobjects.ReduceAll;
 import org.apache.sysml.runtime.functionobjects.ReduceCol;
 import org.apache.sysml.runtime.functionobjects.ReduceRow;
 import org.apache.sysml.runtime.functionobjects.Builtin.BuiltinCode;
+import org.apache.sysml.runtime.matrix.data.IJV;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator;
 
@@ -144,7 +145,7 @@ public abstract class ColGroupOffset extends ColGroupValue
 		
 		// Run through the bitmaps for this column group
 		for (int i = 0; i < numVals; i++) {
-			Iterator<Integer> decoder = getDecodeIterator(i);
+			Iterator<Integer> decoder = getIterator(i);
 			int valOff = i*numCols;
 
 			while (decoder.hasNext()) {
@@ -167,7 +168,7 @@ public abstract class ColGroupOffset extends ColGroupValue
 		
 		// Run through the bitmaps for this column group
 		for (int i = 0; i < numVals; i++) {
-			Iterator<Integer> decoder = getDecodeIterator(i);
+			Iterator<Integer> decoder = getIterator(i);
 			int valOff = i*numCols;
 
 			while (decoder.hasNext()) {
@@ -190,7 +191,7 @@ public abstract class ColGroupOffset extends ColGroupValue
 		
 		// Run through the bitmaps for this column group
 		for (int i = 0; i < numVals; i++) {
-			Iterator<Integer> decoder = getDecodeIterator(i);
+			Iterator<Integer> decoder = getIterator(i);
 			int valOff = i*numCols;
 
 			while (decoder.hasNext()) {
@@ -213,7 +214,7 @@ public abstract class ColGroupOffset extends ColGroupValue
 		final int numCols = getNumCols();
 		final int numVals = getNumValues();
 		for (int i = 0; i < numVals; i++) {
-			Iterator<Integer> decoder = getDecodeIterator(i);
+			Iterator<Integer> decoder = getIterator(i);
 			int valOff = i*numCols;
 			while (decoder.hasNext()) {
 				int row = decoder.next();
@@ -260,18 +261,6 @@ public abstract class ColGroupOffset extends ColGroupValue
 	public boolean hasZeros() {
 		return _zeros;
 	}
-	
-	/**
-	 * @param k
-	 *            index of a specific compressed bitmap (stored in subclass,
-	 *            index same as {@link #getValues})
-	 * @return an object for iterating over the row offsets in this bitmap. Only
-	 *         valid until the next call to this method. May be reused across
-	 *         calls.
-	 */
-	public abstract Iterator<Integer> getDecodeIterator(int k);
-
-	//TODO getDecodeIterator(int k, int rl, int ru)
 
 	/**
 	 * Utility function of sparse-unsafe operations.
@@ -420,5 +409,127 @@ public abstract class ColGroupOffset extends ColGroupValue
 	protected abstract void computeColSums(MatrixBlock result, KahanFunction kplus);
 	
 	protected abstract void computeRowMxx(MatrixBlock result, Builtin builtin, int rl, int ru);
+
+	protected abstract boolean[] computeZeroIndicatorVector();
+	
+	@Override
+	public Iterator<IJV> getIterator(int rl, int ru, boolean inclZeros) {
+		return new OffsetIterator(rl, ru, inclZeros);
+	}
+	
+	/**
+	 * @param k index of value tuple with associated bitmap
+	 * @return an iterator over the row offsets in this bitmap
+	 */
+	public abstract Iterator<Integer> getIterator(int k);
+
+	/**
+	 * 
+	 * @param k index of value tuple with associated bitmap
+	 * @param rl row lower index, inclusive
+	 * @param ru row upper index, exclusive
+	 * @return an iterator over the row offsets in this bitmap
+	 */
+	public abstract Iterator<Integer> getIterator(int k, int rl, int ru);
+
+	
+	protected class OffsetIterator implements Iterator<IJV>
+	{
+		//iterator configuration
+		private final int _rl;
+		private final int _ru;
+		private final boolean _inclZeros;
+		
+		//iterator state
+		private final IJV _buff = new IJV();
+		private Iterator<Integer> _viter = null;
+		private int _vpos = -1;
+		private int _rpos = -1;
+		private int _cpos = -1;
+		
+		public OffsetIterator(int rl, int ru, boolean inclZeros) {
+			_rl = rl;
+			_ru = ru;
+			_inclZeros = inclZeros;
+			_vpos = 0;
+			_rpos = -1;
+			_cpos = 0;
+			getNextValue();
+		}
+
+		@Override
+		public boolean hasNext() {
+			return (_rpos < _ru);
+		}
+
+		@Override
+		public IJV next() {
+			_buff.set(_rpos, _colIndexes[_cpos], (_vpos >= getNumValues()) ? 
+				0 : _values[_vpos*getNumCols()+_cpos]);
+			getNextValue();
+			return _buff;
+		}
+		
+		private void getNextValue() {
+			//advance to next value iterator if required
+			if( _viter == null ) {
+				_viter = (getNumValues()>0) ? //first iterator
+					getIterator(_vpos, _rl, _ru) : new ZeroIterator(_rl, _ru);
+			}
+			else if( _viter instanceof ZeroIterator && !_viter.hasNext() ) {
+				_rpos = _ru; //end after zero iterator
+				return;
+			}
+			else if( _cpos+1 >= getNumCols() && !_viter.hasNext() ) {
+				_vpos++; //
+				if( _vpos < getNumValues() )
+					_viter = getIterator(_vpos, _rl, _ru);
+				else if( _inclZeros && _zeros)
+					_viter = new ZeroIterator(_rl, _ru);
+				else
+					_rpos = _ru; //end w/o zero iterator
+				_rpos = -1;
+			}
+			
+			//get next value
+			if( _rpos < 0 || _cpos+1 >= getNumCols()) {
+				_rpos = _viter.next();
+				_cpos = 0;
+			}
+			else {
+				_cpos++;
+			}
+		}
+	}
 	
+	protected class ZeroIterator implements Iterator<Integer>
+	{
+		private final boolean[] _zeros;
+		private final int _ru;
+		private int _rpos; 
+		
+		public ZeroIterator(int rl, int ru) {
+			_zeros = computeZeroIndicatorVector();
+			_ru = ru;
+			_rpos = rl-1;
+			getNextValue();
+		}
+
+		@Override
+		public boolean hasNext() {
+			return (_rpos < _ru);
+		}
+
+		@Override
+		public Integer next() {
+			int ret = _rpos;
+			getNextValue();
+			return ret;
+		}
+		
+		private void getNextValue() {
+			do { _rpos++; }
+			while( _rpos < _ru && !_zeros[_rpos] );
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/5174fbc0/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java
index 6d1fb9f..7f0dcf9 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java
@@ -91,11 +91,6 @@ public class ColGroupRLE extends ColGroupOffset
 	public CompressionType getCompType() {
 		return CompressionType.RLE_BITMAP;
 	}
-
-	@Override
-	public Iterator<Integer> getDecodeIterator(int k) {
-		return new BitmapDecoderRLE(_data, _ptr[k], len(k)); 
-	}
 	
 	@Override
 	public void decompressToBlock(MatrixBlock target, int rl, int ru) 
@@ -656,8 +651,8 @@ public class ColGroupRLE extends ColGroupOffset
 		}
 	}
 	
+	@Override
 	public boolean[] computeZeroIndicatorVector()
-		throws DMLRuntimeException 
 	{	
 		boolean[] ret = new boolean[_numRows];
 		final int numVals = getNumValues();
@@ -769,4 +764,66 @@ public class ColGroupRLE extends ColGroupOffset
 		
 		return new Pair<Integer,Integer>(apos, astart);
 	}
+	
+	@Override
+	public Iterator<Integer> getIterator(int k) {
+		return new RLEValueIterator(k, 0, getNumRows());
+	}
+	
+	@Override
+	public Iterator<Integer> getIterator(int k, int rl, int ru) {
+		return new RLEValueIterator(k, rl, ru);
+	}
+
+	private class RLEValueIterator implements Iterator<Integer>
+	{
+		private final int _ru;
+		private final int _boff;
+		private final int _blen;
+		private int _bix;
+		private int _start;
+		private int _rpos;
+		
+		public RLEValueIterator(int k, int rl, int ru) {
+			_ru = ru;
+			_boff = _ptr[k];
+			_blen = len(k);
+			_bix = 0; 
+			_start = 0; //init first run
+			_rpos = _data[_boff+_bix]; 
+			while( _rpos < rl )
+				nextRowOffset();
+		}
+
+		@Override
+		public boolean hasNext() {
+			return (_rpos < _ru);
+		}
+
+		@Override
+		public Integer next() {
+			int ret = _rpos;
+			nextRowOffset();
+			return ret;
+		}
+		
+		private void nextRowOffset() {
+			if( !hasNext() )
+			  return;
+			//get current run information
+			int lstart = _data[_boff + _bix]; //start
+			int llen = _data[_boff + _bix + 1]; //len
+			//advance to next run if necessary
+			if( _rpos - _start - lstart + 1 >= llen ) {
+				_start += lstart + llen;
+				_bix +=2;
+				_rpos = (_bix>=_blen) ? _ru : 
+					_start + _data[_boff + _bix];
+			}
+			//increment row index within run
+			else {
+				_rpos++;
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/5174fbc0/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java
index 6445c52..526df16 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java
@@ -25,10 +25,12 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Iterator;
 import java.util.List;
 
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.functionobjects.ReduceRow;
+import org.apache.sysml.runtime.matrix.data.IJV;
 import org.apache.sysml.runtime.matrix.data.LibMatrixAgg;
 import org.apache.sysml.runtime.matrix.data.LibMatrixMult;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
@@ -407,4 +409,54 @@ public class ColGroupUncompressed extends ColGroup
 		for( int i=rl; i<ru; i++ )
 			rnnz[i-rl] += _data.recomputeNonZeros(i, i, 0, _data.getNumColumns()-1);
 	}
+	
+	@Override
+	public Iterator<IJV> getIterator(int rl, int ru, boolean inclZeros) {
+		return new UCIterator(rl, ru, inclZeros);
+	}
+	
+	private class UCIterator implements Iterator<IJV>
+	{
+		//iterator configuration 
+		private final int _ru;
+		private final boolean _inclZeros;
+		
+		//iterator state
+		private final IJV _buff = new IJV(); 
+		private int _rpos = -1;
+		private int _cpos = -1;
+		private double _value = 0;
+		
+		public UCIterator(int rl, int ru, boolean inclZeros) {
+			_ru = ru;
+			_inclZeros = inclZeros;
+			_rpos = rl;
+			_cpos = -1;
+			getNextValue();
+		}
+
+		@Override
+		public boolean hasNext() {
+			return (_rpos < _ru);
+		}
+
+		@Override
+		public IJV next() {
+			_buff.set(_rpos, _colIndexes[_cpos], _value);
+			getNextValue();
+			return _buff;
+		}
+		
+		private void getNextValue() {
+			do {
+				boolean nextRow = (_cpos+1 >= getNumCols());
+				_rpos += nextRow ? 1 : 0; 
+				_cpos = nextRow ? 0 : _cpos+1;
+				if( _rpos >= _ru )
+					return; //reached end
+				_value = _data.quickGetValue(_rpos, _cpos);
+			}
+			while( !_inclZeros && _value==0);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/5174fbc0/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
index ed2ab27..d3bdcca 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
@@ -29,6 +29,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.PriorityQueue;
 import java.util.concurrent.Callable;
@@ -68,6 +69,7 @@ import org.apache.sysml.runtime.instructions.cp.CM_COV_Object;
 import org.apache.sysml.runtime.instructions.cp.KahanObject;
 import org.apache.sysml.runtime.instructions.cp.ScalarObject;
 import org.apache.sysml.runtime.matrix.data.CTableMap;
+import org.apache.sysml.runtime.matrix.data.IJV;
 import org.apache.sysml.runtime.matrix.data.LibMatrixBincell;
 import org.apache.sysml.runtime.matrix.data.LibMatrixReorg;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
@@ -799,6 +801,9 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 		write(os);	
 	}
 	
+	public Iterator<IJV> getIterator(int rl, int ru, boolean inclZeros) {
+		return new ColumnGroupIterator(rl, ru, inclZeros);
+	}
 	
 	//////////////////////////////////////////
 	// Operations (overwrite existing ops for seamless integration)
@@ -2218,4 +2223,50 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 			ret.add(i);
 		return ret;
 	}
+	
+	private class ColumnGroupIterator implements Iterator<IJV>
+	{
+		//iterator configuration 
+		private final int _rl;
+		private final int _ru;
+		private final boolean _inclZeros;
+		
+		//iterator state
+		private int _posColGroup = -1;
+		private Iterator<IJV> _iterColGroup = null;
+		private boolean _noNext = false;
+		
+		public ColumnGroupIterator(int rl, int ru, boolean inclZeros) {
+			_rl = rl;
+			_ru = ru;
+			_inclZeros = inclZeros;
+			getNextIterator();
+		}
+
+		@Override
+		public boolean hasNext() {
+			return !_noNext;
+		}
+
+		@Override
+		public IJV next() {
+			if( _noNext )
+				throw new RuntimeException("No more entries.");
+			IJV ret = _iterColGroup.next(); 
+			if( !_iterColGroup.hasNext() )
+				getNextIterator();
+			return ret;
+		}
+		
+		private void getNextIterator() {
+			while( _posColGroup+1 < _colGroups.size() ) {
+				_posColGroup++;
+				_iterColGroup = _colGroups.get(_posColGroup)
+					.getIterator(_rl, _ru, _inclZeros);
+				if( _iterColGroup.hasNext() )
+					return;
+			}
+			_noNext = true;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/5174fbc0/src/test/java/org/apache/sysml/test/integration/functions/codegen/CompressedCellwiseTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/codegen/CompressedCellwiseTest.java b/src/test/java/org/apache/sysml/test/integration/functions/codegen/CompressedCellwiseTest.java
new file mode 100644
index 0000000..a94cdb2
--- /dev/null
+++ b/src/test/java/org/apache/sysml/test/integration/functions/codegen/CompressedCellwiseTest.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.test.integration.functions.codegen;
+
+import java.io.File;
+import java.util.HashMap;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.sysml.api.DMLScript;
+import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
+import org.apache.sysml.hops.OptimizerUtils;
+import org.apache.sysml.lops.LopProperties.ExecType;
+import org.apache.sysml.runtime.compress.CompressedMatrixBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixValue.CellIndex;
+import org.apache.sysml.test.integration.AutomatedTestBase;
+import org.apache.sysml.test.integration.TestConfiguration;
+import org.apache.sysml.test.utils.TestUtils;
+
+public class CompressedCellwiseTest extends AutomatedTestBase 
+{	
+	private static final String TEST_NAME1 = "CompressedCellwiseMain";
+	private static final String TEST_NAME2 = "CompressedCellwiseSide";
+	private static final String TEST_DIR = "functions/codegen/";
+	private static final String TEST_CLASS_DIR = TEST_DIR + CompressedCellwiseTest.class.getSimpleName() + "/";
+	private final static String TEST_CONF = "SystemML-config-codegen-compress.xml";
+	private final static File   TEST_CONF_FILE = new File(SCRIPT_DIR + TEST_DIR, TEST_CONF);
+	
+	private static final int rows = 2023;
+	private static final int cols = 20;
+	private static final double sparsity1 = 0.9;
+	private static final double sparsity2 = 0.1;
+	private static final double sparsity3 = 0.0;
+	private static final double eps = Math.pow(10, -8);
+	
+	public enum SparsityType {
+		DENSE,
+		SPARSE,
+		EMPTY,
+	}
+	
+	public enum ValueType {
+		RAND, //UC
+		CONST, //RLE
+		RAND_ROUND_OLE, //OLE
+		RAND_ROUND_DDC, //RLE
+	}
+	
+	@Override
+	public void setUp() {
+		TestUtils.clearAssertionInformation();
+		addTestConfiguration( TEST_NAME1, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] { "R" }) );
+		addTestConfiguration( TEST_NAME2, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME2, new String[] { "R" }) );
+	}
+		
+	@Test
+	public void testCompressedCellwiseMainDenseConstCP() {
+		testCompressedCellwise( TEST_NAME1, SparsityType.DENSE, ValueType.CONST, ExecType.CP );
+	}
+	
+	@Test
+	public void testCompressedCellwiseMainDenseRandCP() {
+		testCompressedCellwise( TEST_NAME1, SparsityType.DENSE, ValueType.RAND, ExecType.CP );
+	}
+	
+	@Test
+	public void testCompressedCellwiseMainDenseRand2CP() {
+		testCompressedCellwise( TEST_NAME1, SparsityType.DENSE, ValueType.RAND_ROUND_DDC, ExecType.CP );
+	}
+	
+	@Test
+	public void testCompressedCellwiseMainDenseRand3CP() {
+		testCompressedCellwise( TEST_NAME1, SparsityType.DENSE, ValueType.RAND_ROUND_OLE, ExecType.CP );
+	}
+	
+	@Test
+	public void testCompressedCellwiseMainSparseConstCP() {
+		testCompressedCellwise( TEST_NAME1, SparsityType.SPARSE, ValueType.CONST, ExecType.CP );
+	}
+	
+	@Test
+	public void testCompressedCellwiseMainSparseRandCP() {
+		testCompressedCellwise( TEST_NAME1, SparsityType.SPARSE, ValueType.RAND, ExecType.CP );
+	}
+	
+	@Test
+	public void testCompressedCellwiseMainSparseRand2CP() {
+		testCompressedCellwise( TEST_NAME1, SparsityType.SPARSE, ValueType.RAND_ROUND_DDC, ExecType.CP );
+	}
+	
+	@Test
+	public void testCompressedCellwiseMainSparseRand3CP() {
+		testCompressedCellwise( TEST_NAME1, SparsityType.SPARSE, ValueType.RAND_ROUND_OLE, ExecType.CP );
+	}
+	
+	@Test
+	public void testCompressedCellwiseMainEmptyConstCP() {
+		testCompressedCellwise( TEST_NAME1, SparsityType.EMPTY, ValueType.CONST, ExecType.CP );
+	}
+	
+	@Test
+	public void testCompressedCellwiseMainEmptyRandCP() {
+		testCompressedCellwise( TEST_NAME1, SparsityType.EMPTY, ValueType.RAND, ExecType.CP );
+	}
+	
+	@Test
+	public void testCompressedCellwiseMainEmptyRand2CP() {
+		testCompressedCellwise( TEST_NAME1, SparsityType.EMPTY, ValueType.RAND_ROUND_DDC, ExecType.CP );
+	}
+	
+	@Test
+	public void testCompressedCellwiseMainEmptyRand3CP() {
+		testCompressedCellwise( TEST_NAME1, SparsityType.EMPTY, ValueType.RAND_ROUND_OLE, ExecType.CP );
+	}
+	
+	@Test
+	public void testCompressedCellwiseSideDenseConstCP() {
+		testCompressedCellwise( TEST_NAME2, SparsityType.DENSE, ValueType.CONST, ExecType.CP );
+	}
+	
+	@Test
+	public void testCompressedCellwiseSideDenseRandCP() {
+		testCompressedCellwise( TEST_NAME2, SparsityType.DENSE, ValueType.RAND, ExecType.CP );
+	}
+	
+	@Test
+	public void testCompressedCellwiseSideDenseRand2CP() {
+		testCompressedCellwise( TEST_NAME2, SparsityType.DENSE, ValueType.RAND_ROUND_DDC, ExecType.CP );
+	}
+	
+	@Test
+	public void testCompressedCellwiseSideDenseRand3CP() {
+		testCompressedCellwise( TEST_NAME2, SparsityType.DENSE, ValueType.RAND_ROUND_OLE, ExecType.CP );
+	}
+	
+	@Test
+	public void testCompressedCellwiseSideSparseConstCP() {
+		testCompressedCellwise( TEST_NAME2, SparsityType.SPARSE, ValueType.CONST, ExecType.CP );
+	}
+	
+	@Test
+	public void testCompressedCellwiseSideSparseRandCP() {
+		testCompressedCellwise( TEST_NAME2, SparsityType.SPARSE, ValueType.RAND, ExecType.CP );
+	}
+	
+	@Test
+	public void testCompressedCellwiseSideSparseRand2CP() {
+		testCompressedCellwise( TEST_NAME2, SparsityType.SPARSE, ValueType.RAND_ROUND_DDC, ExecType.CP );
+	}
+	
+	@Test
+	public void testCompressedCellwiseSideSparseRand3CP() {
+		testCompressedCellwise( TEST_NAME2, SparsityType.SPARSE, ValueType.RAND_ROUND_OLE, ExecType.CP );
+	}
+	
+	@Test
+	public void testCompressedCellwiseSideEmptyConstCP() {
+		testCompressedCellwise( TEST_NAME2, SparsityType.EMPTY, ValueType.CONST, ExecType.CP );
+	}
+	
+	@Test
+	public void testCompressedCellwiseSideEmptyRandCP() {
+		testCompressedCellwise( TEST_NAME2, SparsityType.EMPTY, ValueType.RAND, ExecType.CP );
+	}
+	
+	@Test
+	public void testCompressedCellwiseSideEmptyRand2CP() {
+		testCompressedCellwise( TEST_NAME2, SparsityType.EMPTY, ValueType.RAND_ROUND_DDC, ExecType.CP );
+	}
+	
+	@Test
+	public void testCompressedCellwiseSideEmptyRand3CP() {
+		testCompressedCellwise( TEST_NAME2, SparsityType.EMPTY, ValueType.RAND_ROUND_OLE, ExecType.CP );
+	}
+	
+	@Test
+	public void testCompressedCellwiseMainDenseConstSP() {
+		testCompressedCellwise( TEST_NAME1, SparsityType.DENSE, ValueType.CONST, ExecType.SPARK );
+	}
+	
+	@Test
+	public void testCompressedCellwiseMainDenseRandSP() {
+		testCompressedCellwise( TEST_NAME1, SparsityType.DENSE, ValueType.RAND, ExecType.SPARK );
+	}
+	
+	@Test
+	public void testCompressedCellwiseMainDenseRand2SP() {
+		testCompressedCellwise( TEST_NAME1, SparsityType.DENSE, ValueType.RAND_ROUND_DDC, ExecType.SPARK );
+	}
+	
+	@Test
+	public void testCompressedCellwiseMainDenseRand3SP() {
+		testCompressedCellwise( TEST_NAME1, SparsityType.DENSE, ValueType.RAND_ROUND_OLE, ExecType.SPARK );
+	}
+	
+	@Test
+	public void testCompressedCellwiseMainSparseConstSP() {
+		testCompressedCellwise( TEST_NAME1, SparsityType.SPARSE, ValueType.CONST, ExecType.SPARK );
+	}
+	
+	@Test
+	public void testCompressedCellwiseMainSparseRandSP() {
+		testCompressedCellwise( TEST_NAME1, SparsityType.SPARSE, ValueType.RAND, ExecType.SPARK );
+	}
+	
+	@Test
+	public void testCompressedCellwiseMainSparseRand2SP() {
+		testCompressedCellwise( TEST_NAME1, SparsityType.SPARSE, ValueType.RAND_ROUND_DDC, ExecType.SPARK );
+	}
+	
+	@Test
+	public void testCompressedCellwiseMainSparseRand3SP() {
+		testCompressedCellwise( TEST_NAME1, SparsityType.SPARSE, ValueType.RAND_ROUND_OLE, ExecType.SPARK );
+	}
+	
+	@Test
+	public void testCompressedCellwiseMainEmptyConstSP() {
+		testCompressedCellwise( TEST_NAME1, SparsityType.EMPTY, ValueType.CONST, ExecType.SPARK );
+	}
+	
+	@Test
+	public void testCompressedCellwiseMainEmptyRandSP() {
+		testCompressedCellwise( TEST_NAME1, SparsityType.EMPTY, ValueType.RAND, ExecType.SPARK );
+	}
+	
+	@Test
+	public void testCompressedCellwiseMainEmptyRand2SP() {
+		testCompressedCellwise( TEST_NAME1, SparsityType.EMPTY, ValueType.RAND_ROUND_DDC, ExecType.SPARK );
+	}
+	
+	@Test
+	public void testCompressedCellwiseMainEmptyRand3SP() {
+		testCompressedCellwise( TEST_NAME1, SparsityType.EMPTY, ValueType.RAND_ROUND_OLE, ExecType.SPARK );
+	}
+	
+	//TODO compressed side inputs in spark
+	
+	
+	private void testCompressedCellwise(String testname, SparsityType stype, ValueType vtype, ExecType et)
+	{	
+		boolean oldRewrites = OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION;
+		RUNTIME_PLATFORM platformOld = rtplatform;
+		switch( et ){
+			case MR: rtplatform = RUNTIME_PLATFORM.HADOOP; break;
+			case SPARK: rtplatform = RUNTIME_PLATFORM.SPARK; break;
+			default: rtplatform = RUNTIME_PLATFORM.HYBRID_SPARK; break;
+		}
+	
+		boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+		if( rtplatform == RUNTIME_PLATFORM.SPARK || rtplatform == RUNTIME_PLATFORM.HYBRID_SPARK )
+			DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+		
+		try
+		{
+			OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION = true;
+			TestConfiguration config = getTestConfiguration(testname);
+			loadTestConfiguration(config);
+			
+			String HOME = SCRIPT_DIR + TEST_DIR;
+			fullDMLScriptName = HOME + testname + ".dml";
+			programArgs = new String[]{"-explain", "-stats", 
+					"-args", input("X"), output("R") };
+			
+			fullRScriptName = HOME + testname + ".R";
+			rCmd = getRCmd(inputDir(), expectedDir());			
+
+			//generate input data
+			double sparsity = -1;
+			switch( stype ){
+				case DENSE: sparsity = sparsity1; break;
+				case SPARSE: sparsity = sparsity2; break;
+				case EMPTY: sparsity = sparsity3; break;
+			}
+			
+			//generate input data
+			double min = (vtype==ValueType.CONST)? 10 : -10;
+			double[][] X = TestUtils.generateTestMatrix(rows, cols, min, 10, sparsity, 7);
+			if( vtype==ValueType.RAND_ROUND_OLE || vtype==ValueType.RAND_ROUND_DDC ) {
+				CompressedMatrixBlock.ALLOW_DDC_ENCODING = (vtype==ValueType.RAND_ROUND_DDC);
+				X = TestUtils.round(X);
+			}
+			writeInputMatrixWithMTD("X", X, true);
+			
+			//run tests
+			runTest(true, false, null, -1); 
+			runRScript(true); 
+			
+			//compare matrices 
+			HashMap<CellIndex, Double> dmlfile = readDMLMatrixFromHDFS("R");
+			HashMap<CellIndex, Double> rfile  = readRMatrixFromFS("R");	
+			TestUtils.compareMatrices(dmlfile, rfile, eps, "Stat-DML", "Stat-R");
+			Assert.assertTrue(heavyHittersContainsSubString("spoofCell") 
+				|| heavyHittersContainsSubString("sp_spoofCell"));
+		}
+		finally {
+			rtplatform = platformOld;
+			DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
+			OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION = oldRewrites;
+			OptimizerUtils.ALLOW_AUTO_VECTORIZATION = true;
+			OptimizerUtils.ALLOW_OPERATOR_FUSION = true;
+		}
+	}	
+
+	/**
+	 * Override default configuration with custom test configuration to ensure
+	 * scratch space and local temporary directory locations are also updated.
+	 */
+	@Override
+	protected File getConfigTemplateFile() {
+		// Instrumentation in this test's output log to show custom configuration file used for template.
+		System.out.println("This test case overrides default configuration with " + TEST_CONF_FILE.getPath());
+		return TEST_CONF_FILE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/5174fbc0/src/test/scripts/functions/codegen/CompressedCellwiseMain.R
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/codegen/CompressedCellwiseMain.R b/src/test/scripts/functions/codegen/CompressedCellwiseMain.R
new file mode 100644
index 0000000..63670b4
--- /dev/null
+++ b/src/test/scripts/functions/codegen/CompressedCellwiseMain.R
@@ -0,0 +1,31 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+args <- commandArgs(TRUE)
+library("Matrix")
+library("matrixStats")
+
+X = readMM(paste(args[1], "X.mtx", sep=""));
+
+# two fused with and without aggregation
+R = sum(X/3 * X/4 * X/5) - (X * X/2)
+
+writeMM(as(R,"CsparseMatrix"), paste(args[2], "R", sep=""));

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/5174fbc0/src/test/scripts/functions/codegen/CompressedCellwiseMain.dml
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/codegen/CompressedCellwiseMain.dml b/src/test/scripts/functions/codegen/CompressedCellwiseMain.dml
new file mode 100644
index 0000000..458cc31
--- /dev/null
+++ b/src/test/scripts/functions/codegen/CompressedCellwiseMain.dml
@@ -0,0 +1,27 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+X = read($1)
+
+# two fused with and without aggregation
+R = sum(X/3 * X/4 * X/5) - (X * X/2)
+
+write(R, $2)

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/5174fbc0/src/test/scripts/functions/codegen/CompressedCellwiseSide.R
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/codegen/CompressedCellwiseSide.R b/src/test/scripts/functions/codegen/CompressedCellwiseSide.R
new file mode 100644
index 0000000..19349c9
--- /dev/null
+++ b/src/test/scripts/functions/codegen/CompressedCellwiseSide.R
@@ -0,0 +1,33 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+args <- commandArgs(TRUE)
+library("Matrix")
+library("matrixStats")
+
+X = readMM(paste(args[1], "X.mtx", sep=""));
+M = matrix(0, nrow(X), ncol(X));
+M[7,7] = 7;
+
+# two fused with and without aggregation
+R = sum(M/3 * M/4 * X) - (M * M/2 * X)
+
+writeMM(as(R,"CsparseMatrix"), paste(args[2], "R", sep=""));

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/5174fbc0/src/test/scripts/functions/codegen/CompressedCellwiseSide.dml
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/codegen/CompressedCellwiseSide.dml b/src/test/scripts/functions/codegen/CompressedCellwiseSide.dml
new file mode 100644
index 0000000..457c164
--- /dev/null
+++ b/src/test/scripts/functions/codegen/CompressedCellwiseSide.dml
@@ -0,0 +1,29 @@
+#-------------------------------------------------------------
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#-------------------------------------------------------------
+
+X = read($1)
+M = matrix(0, nrow(X), ncol(X));
+M[7,7] = 7;
+
+# two fused with and without aggregation
+R = sum(M/3 * M/4 * X) - (M * M/2 * X)
+
+write(R, $2)

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/5174fbc0/src/test/scripts/functions/codegen/SystemML-config-codegen-compress.xml
----------------------------------------------------------------------
diff --git a/src/test/scripts/functions/codegen/SystemML-config-codegen-compress.xml b/src/test/scripts/functions/codegen/SystemML-config-codegen-compress.xml
new file mode 100644
index 0000000..ffdbaac
--- /dev/null
+++ b/src/test/scripts/functions/codegen/SystemML-config-codegen-compress.xml
@@ -0,0 +1,62 @@
+<!--
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+-->
+
+<root>
+   <!-- local fs tmp working directory-->
+   <localtmpdir>/tmp/systemml</localtmpdir>
+
+   <!-- hdfs tmp working directory--> 
+   <scratch>scratch_space</scratch> 
+
+   <!-- compiler optimization level, valid values: 0 | 1 | 2 | 3 | 4, default: 2 -->
+   <optlevel>7</optlevel>  
+
+   <!-- default number of reduce tasks per MR job, default: 2 x number of nodes -->
+   <numreducers>10</numreducers> 
+   
+   <!-- override jvm reuse flag for specific MR jobs, valid values: true | false  -->
+   <jvmreuse>false</jvmreuse> 
+
+   <!-- default block dim for binary block files -->
+   <defaultblocksize>1000</defaultblocksize> 
+
+   <!-- run systemml control program as yarn appmaster, in case of MR1 always falls back to client, please disable for debug mode -->
+   <dml.yarn.appmaster>false</dml.yarn.appmaster>
+
+   <!-- maximum jvm heap size of the dml yarn appmaster in MB, the requested memory is 1.5x this parameter -->
+   <dml.yarn.appmaster.mem>2048</dml.yarn.appmaster.mem>
+
+   <!-- maximum jvm heap size of the map/reduce tasks in MB, the requested memory is 1.5x this parameter, negative values ignored  -->
+   <dml.yarn.mapreduce.mem>2048</dml.yarn.mapreduce.mem>
+
+   <!-- yarn application submission queue, relevant for default capacity scheduler -->
+   <dml.yarn.app.queue>default</dml.yarn.app.queue>
+   
+   <!-- enables multi-threaded matrix multiplications in singlenode control program -->
+   <cp.parallel.matrixmult>true</cp.parallel.matrixmult>
+   
+   <!-- enables multi-threaded read/write of text formats in singlenode control program -->
+   <cp.parallel.textio>true</cp.parallel.textio>
+   
+   <!-- enables automatic code generation -->
+   <compressed.linalg>true</compressed.linalg>
+   <codegen.enabled>true</codegen.enabled>
+   <codegen.plancache>true</codegen.plancache>
+   <codegen.literals>1</codegen.literals>
+</root>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/5174fbc0/src/test_suites/java/org/apache/sysml/test/integration/functions/codegen/ZPackageSuite.java
----------------------------------------------------------------------
diff --git a/src/test_suites/java/org/apache/sysml/test/integration/functions/codegen/ZPackageSuite.java b/src/test_suites/java/org/apache/sysml/test/integration/functions/codegen/ZPackageSuite.java
index d063728..6b0b5be 100644
--- a/src/test_suites/java/org/apache/sysml/test/integration/functions/codegen/ZPackageSuite.java
+++ b/src/test_suites/java/org/apache/sysml/test/integration/functions/codegen/ZPackageSuite.java
@@ -33,6 +33,7 @@ import org.junit.runners.Suite;
 	AlgorithmMLogreg.class,
 	AlgorithmPNMF.class,
 	CellwiseTmplTest.class,
+	CompressedCellwiseTest.class,
 	DAGCellwiseTmplTest.class,
 	MultiAggTmplTest.class,
 	OuterProdTmplTest.class,