You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2017/09/14 07:27:02 UTC

systemml git commit: [SYSTEMML-1906] Performance codegen row ops over compressed matrices

Repository: systemml
Updated Branches:
  refs/heads/master 4cf95c92e -> 0a984a43b


[SYSTEMML-1906] Performance codegen row ops over compressed matrices

This patch improves the performance of codegen row-wise operations over
compressed dense and sparse matrices. So far we used dense/sparse row
iterators over the compressed matrix block, which internally reused the
existing column group iterators. However, for the OLE and RLE encoding
schemes, these iterators were realized via a set of value iterators,
which did not perform very well. The key challenge is the translation
from the value-based, column-wise compressed form to uncompressed rows. 

We now use dedicated row iterators for the individual column encoding
schemes. For DDC and UC, these are trivial. For OLE and RLE, however, we
determine a vector of dictionary codes per logical segment (via a single
pass over all value offset lists) and simply read out the value tuples
for these codes on the individual next calls. Furthermore, we now use
static task partitioning (with segment alignment) for multi-threaded row
operations to avoid unnecessary iterator initialization overhead. On the
Airline78 (dense) and Mnist8m (sparse) dataset, this patch improves
performance from 4.1s to 1s, and from 66s to 21s, respectively


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/0a984a43
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/0a984a43
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/0a984a43

Branch: refs/heads/master
Commit: 0a984a43b1062aaf58bf7a2ce019adef264ba459
Parents: 4cf95c9
Author: Matthias Boehm <mb...@gmail.com>
Authored: Thu Sep 14 00:26:45 2017 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Thu Sep 14 00:27:04 2017 -0700

----------------------------------------------------------------------
 .../sysml/runtime/codegen/SpoofRowwise.java     |  33 +--
 .../apache/sysml/runtime/compress/ColGroup.java |  19 ++
 .../sysml/runtime/compress/ColGroupDDC.java     |  47 +++-
 .../sysml/runtime/compress/ColGroupDDC1.java    |  12 +-
 .../sysml/runtime/compress/ColGroupDDC2.java    |  14 +-
 .../sysml/runtime/compress/ColGroupOLE.java     |  67 ++++-
 .../sysml/runtime/compress/ColGroupRLE.java     |  78 +++++-
 .../runtime/compress/ColGroupUncompressed.java  |  51 ++++
 .../runtime/compress/CompressedMatrixBlock.java |  76 ++----
 .../CompressedRowAggregateLargeTest.java        | 270 +++++++++++++++++++
 .../functions/codegen/ZPackageSuite.java        |   1 +
 11 files changed, 589 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/0a984a43/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java b/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java
index f1cef34..659059e 100644
--- a/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java
+++ b/src/main/java/org/apache/sysml/runtime/codegen/SpoofRowwise.java
@@ -29,6 +29,7 @@ import java.util.concurrent.Future;
 import java.util.stream.IntStream;
 
 import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.compress.BitmapEncoder;
 import org.apache.sysml.runtime.compress.CompressedMatrixBlock;
 import org.apache.sysml.runtime.instructions.cp.DoubleObject;
 import org.apache.sysml.runtime.instructions.cp.ScalarObject;
@@ -185,20 +186,25 @@ public abstract class SpoofRowwise extends SpoofOperator
 			&& LibSpoofPrimitives.isFlipOuter(out.getNumRows(), out.getNumColumns());
 		
 		//input preparation
+		MatrixBlock a = inputs.get(0);
 		SideInput[] b = prepInputMatrices(inputs, 1, inputs.size()-1, true, _tB1);
 		double[] scalars = prepInputScalars(scalarObjects);
 		
 		//core parallel execute
 		ExecutorService pool = Executors.newFixedThreadPool( k );
-		int nk = UtilFunctions.roundToNext(Math.min(8*k,m/32), k);
+		int nk = (a instanceof CompressedMatrixBlock) ? k :
+			UtilFunctions.roundToNext(Math.min(8*k,m/32), k);
 		int blklen = (int)(Math.ceil((double)m/nk));
+		if( a instanceof CompressedMatrixBlock )
+			blklen = BitmapEncoder.getAlignedBlocksize(blklen);
+		
 		try
 		{
 			if( _type.isColumnAgg() || _type == RowType.FULL_AGG ) {
 				//execute tasks
 				ArrayList<ParColAggTask> tasks = new ArrayList<ParColAggTask>();
 				for( int i=0; i<nk & i*blklen<m; i++ )
-					tasks.add(new ParColAggTask(inputs.get(0), b, scalars, n, n2, i*blklen, Math.min((i+1)*blklen, m)));
+					tasks.add(new ParColAggTask(a, b, scalars, n, n2, i*blklen, Math.min((i+1)*blklen, m)));
 				List<Future<double[]>> taskret = pool.invokeAll(tasks);	
 				//aggregate partial results
 				int len = _type.isColumnAgg() ? out.getNumRows()*out.getNumColumns() : 1;
@@ -210,7 +216,7 @@ public abstract class SpoofRowwise extends SpoofOperator
 				//execute tasks
 				ArrayList<ParExecTask> tasks = new ArrayList<ParExecTask>();
 				for( int i=0; i<nk & i*blklen<m; i++ )
-					tasks.add(new ParExecTask(inputs.get(0), b, out, scalars, n, n2, i*blklen, Math.min((i+1)*blklen, m)));
+					tasks.add(new ParExecTask(a, b, out, scalars, n, n2, i*blklen, Math.min((i+1)*blklen, m)));
 				List<Future<Long>> taskret = pool.invokeAll(tasks);
 				//aggregate nnz, no need to aggregate results
 				long nnz = 0;
@@ -304,24 +310,9 @@ public abstract class SpoofRowwise extends SpoofOperator
 		if( a.isEmptyBlock(false) )
 			return;
 		
-		if( !a.isInSparseFormat() ) { //DENSE
-			Iterator<double[]> iter = a.getDenseRowIterator(rl, ru);
-			for( int i=rl; iter.hasNext(); i++ ) {
-				genexec(iter.next(), 0, b, scalars, c, n, i);
-			}
-		}
-		else { //SPARSE
-			Iterator<SparseRow> iter = a.getSparseRowIterator(rl, ru);
-			SparseRow empty = new SparseRowVector(1);
-			for( int i=rl; iter.hasNext(); i++ ) {
-				SparseRow row = iter.next();
-				if( !row.isEmpty() )
-					genexec(row.values(), 
-						row.indexes(), 0, b, scalars, c, row.size(), n, i);
-				else
-					genexec(empty.values(), 
-						empty.indexes(), 0, b, scalars, c, 0, n, i);
-			}
+		Iterator<double[]> iter = a.getDenseRowIterator(rl, ru);
+		for( int i=rl; iter.hasNext(); i++ ) {
+			genexec(iter.next(), 0, b, scalars, c, n, i);
 		}
 	}
 	

http://git-wip-us.apache.org/repos/asf/systemml/blob/0a984a43/src/main/java/org/apache/sysml/runtime/compress/ColGroup.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroup.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroup.java
index dbed2b4..ba6509c 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/ColGroup.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroup.java
@@ -261,10 +261,29 @@ public abstract class ColGroup implements Serializable
 	public abstract void unaryAggregateOperations(AggregateUnaryOperator op, MatrixBlock result)
 		throws DMLRuntimeException;
 	
+	/**
+	 * Create a column group iterator for a row index range.
+	 * 
+	 * @param rl row lower index, inclusive
+	 * @param ru row upper index, exclusive
+	 * @param inclZeros include zero values into scope of iterator
+	 * @param rowMajor use a row major iteration order
+	 * @return an iterator instance
+	 */
 	public abstract Iterator<IJV> getIterator(int rl, int ru,
 			boolean inclZeros, boolean rowMajor);
 	
 	/**
+	 * Create a dense row iterator for a row index range. This iterator
+	 * implies the inclusion of zeros and row-major iteration order.
+	 * 
+	 * @param rl row lower index, inclusive
+	 * @param ru row upper index, exclusive
+	 * @return an iterator instance
+	 */
+	public abstract Iterator<double[]> getRowIterator(int rl, int ru);
+	
+	/**
 	 * Count the number of non-zeros per row
 	 * 
 	 * @param rnnz non-zeros per row

http://git-wip-us.apache.org/repos/asf/systemml/blob/0a984a43/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC.java
index 4bf7c20..3618651 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC.java
@@ -214,7 +214,16 @@ public abstract class ColGroupDDC extends ColGroupValue
 			}	
 		}
 	}
-
+	
+	/**
+	 * Generic get value for byte-length-agnostic access
+	 * to first column.
+	 * 
+	 * @param r global row index
+	 * @return value
+	 */
+	protected abstract double getData(int r);
+	
 	/**
 	 * Generic get value for byte-length-agnostic access.
 	 * 
@@ -233,6 +242,8 @@ public abstract class ColGroupDDC extends ColGroupValue
 	 */
 	protected abstract void setData(int r, int code);
 	
+	protected abstract int getCode(int r);
+	
 	@Override
 	public long estimateInMemorySize() {
 		return super.estimateInMemorySize();
@@ -244,6 +255,11 @@ public abstract class ColGroupDDC extends ColGroupValue
 		return new DDCIterator(rl, ru, inclZeros);
 	}
 	
+	@Override
+	public Iterator<double[]> getRowIterator(int rl, int ru) {
+		return new DDCRowIterator(rl, ru);
+	}
+	
 	private class DDCIterator implements Iterator<IJV>
 	{
 		//iterator configuration 
@@ -288,4 +304,33 @@ public abstract class ColGroupDDC extends ColGroupValue
 			while( !_inclZeros && _value==0);
 		}
 	}
+	
+	private class DDCRowIterator implements Iterator<double[]>
+	{
+		//iterator configuration 
+		private final int _ru;
+		//iterator state
+		private final double[] _buff = new double[getNumCols()]; 
+		private int _rpos = -1;
+		
+		public DDCRowIterator(int rl, int ru) {
+			_ru = ru;
+			_rpos = rl;
+		}
+
+		@Override
+		public boolean hasNext() {
+			return (_rpos < _ru);
+		}
+
+		@Override
+		public double[] next() {
+			//copy entire value tuple and 
+			final int clen = getNumCols();
+			System.arraycopy(getValues(), getCode(_rpos)*clen, _buff, 0, clen);
+			//advance position to next row
+			_rpos++;
+			return _buff;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/0a984a43/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC1.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC1.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC1.java
index d003aa5..89ca931 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC1.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC1.java
@@ -83,6 +83,11 @@ public class ColGroupDDC1 extends ColGroupDDC
 	}
 	
 	@Override
+	protected double getData(int r) {
+		return _values[(_data[r]&0xFF)];
+	}
+	
+	@Override
 	protected double getData(int r, int colIx) {
 		return _values[(_data[r]&0xFF)*getNumCols()+colIx];
 	}
@@ -93,6 +98,11 @@ public class ColGroupDDC1 extends ColGroupDDC
 	}
 	
 	@Override
+	protected int getCode(int r) {
+		return (_data[r]&0xFF);
+	}
+	
+	@Override
 	public void write(DataOutput out) throws IOException {
 		int numCols = getNumCols();
 		int numVals = getNumValues();
@@ -288,7 +298,7 @@ public class ColGroupDDC1 extends ColGroupDDC
 		//temporary array also avoids false sharing in multi-threaded environments
 		double[] vals = allocDVector(numVals, true);
 		for( int i=0; i<nrow; i++ )
-			vals[_data[i]&0xFF] += a.getData(i, 0);
+			vals[_data[i]&0xFF] += a.getData(i);
 		
 		//post-scaling of pre-aggregate with distinct values
 		postScaling(vals, c);

http://git-wip-us.apache.org/repos/asf/systemml/blob/0a984a43/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC2.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC2.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC2.java
index ec7aa18..d9c851d 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC2.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC2.java
@@ -85,16 +85,26 @@ public class ColGroupDDC2 extends ColGroupDDC
 	}
 	
 	@Override
+	protected double getData(int r) {
+		return _values[_data[r]];
+	}
+	
+	@Override
 	protected double getData(int r, int colIx) {
 		return _values[_data[r]*getNumCols()+colIx];
 	}
-
+	
 	@Override
 	protected void setData(int r, int code) {
 		_data[r] = (char)code;
 	}
 	
 	@Override
+	protected int getCode(int r) {
+		return _data[r];
+	}
+	
+	@Override
 	public void write(DataOutput out) throws IOException {
 		int numCols = getNumCols();
 		int numVals = getNumValues();
@@ -281,7 +291,7 @@ public class ColGroupDDC2 extends ColGroupDDC
 			//temporary array also avoids false sharing in multi-threaded environments
 			double[] vals = allocDVector(numVals, true);
 			for( int i=0; i<nrow; i++ ) {
-				vals[_data[i]] += a.getData(i, 0);
+				vals[_data[i]] += a.getData(i);
 			}
 			
 			//post-scaling of pre-aggregate with distinct values

http://git-wip-us.apache.org/repos/asf/systemml/blob/0a984a43/src/main/java/org/apache/sysml/runtime/compress/ColGroupOLE.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupOLE.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroupOLE.java
index 7574c73..1f2cd50 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/ColGroupOLE.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupOLE.java
@@ -473,7 +473,7 @@ public class ColGroupOLE extends ColGroupOffset
 			//iterate over bitmap blocks and add partial results
 			double vsum = 0;
 			for( int j = boff+1; j < boff+1+_data[boff]; j++ )
-				vsum += a.getData(_data[j], 0);
+				vsum += a.getData(_data[j]);
 			
 			//scale partial results by values and write results
 			for( int j = 0; j < numCols; j++ )
@@ -780,7 +780,12 @@ public class ColGroupOLE extends ColGroupOffset
 	public Iterator<Integer> getIterator(int k, int rl, int ru) {
 		return new OLEValueIterator(k, rl, ru);
 	}
-
+	
+	@Override
+	public Iterator<double[]> getRowIterator(int rl, int ru) {
+		return new OLERowIterator(rl, ru);
+	}
+	
 	private class OLEValueIterator implements Iterator<Integer>
 	{
 		private final int _ru;
@@ -848,4 +853,62 @@ public class ColGroupOLE extends ColGroupOffset
 			}
 		}
 	}
+	
+	private class OLERowIterator implements Iterator<double[]>
+	{
+		//iterator configuration 
+		private final int _ru;
+		//iterator state
+		private final double[] _buff = new double[getNumCols()]; 
+		private final int[] _apos;
+		private final int[] _vcodes;
+		private int _rpos = -1;
+		
+		public OLERowIterator(int rl, int ru) {
+			_ru = ru;
+			_rpos = rl;
+			_apos = skipScan(getNumValues(), rl);
+			_vcodes = new int[Math.min(BitmapEncoder.BITMAP_BLOCK_SZ, ru-rl)];
+			getNextSegment();
+		}
+		
+		@Override
+		public boolean hasNext() {
+			return (_rpos < _ru);
+		}
+		
+		@Override
+		public double[] next() {
+			//copy entire value tuple or reset to zero
+			int ix = _rpos%BitmapEncoder.BITMAP_BLOCK_SZ;
+			final int clen = getNumCols();
+			if( _vcodes[ix] >= 0 )
+				System.arraycopy(getValues(), _vcodes[ix]*clen, _buff, 0, clen);
+			else
+				Arrays.fill(_buff, 0);
+			//advance position to next row
+			_rpos++;
+			if( _rpos%BitmapEncoder.BITMAP_BLOCK_SZ==0 && _rpos<_ru )
+				getNextSegment();
+			return _buff;
+		}
+		
+		public void getNextSegment() {
+			//materialize value codes for entire segment in a 
+			//single pass over all values (store value code by pos)
+			Arrays.fill(_vcodes, -1);
+			final int numVals = getNumValues();
+			for (int k = 0; k < numVals; k++)  {
+				int boff = _ptr[k];
+				int blen = len(k);
+				int bix = _apos[k];
+				if( bix < blen ) {
+					int slen = _data[boff+bix];
+					for(int blckIx = 1; blckIx <= slen; blckIx++)
+						_vcodes[_data[boff+bix + blckIx]] = k;
+					_apos[k] += slen+1;
+				}
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/0a984a43/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java
index 964e513..478fd31 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java
@@ -790,11 +790,17 @@ public class ColGroupRLE extends ColGroupOffset
 		return new RLEValueIterator(k, 0, getNumRows());
 	}
 	
+	
 	@Override
 	public Iterator<Integer> getIterator(int k, int rl, int ru) {
 		return new RLEValueIterator(k, rl, ru);
 	}
-
+	
+	@Override
+	public Iterator<double[]> getRowIterator(int rl, int ru) {
+		return new RLERowIterator(rl, ru);
+	}
+	
 	private class RLEValueIterator implements Iterator<Integer>
 	{
 		private final int _ru;
@@ -848,4 +854,74 @@ public class ColGroupRLE extends ColGroupOffset
 			}
 		}
 	}
+	
+	private class RLERowIterator implements Iterator<double[]>
+	{
+		//iterator configuration 
+		private final int _ru;
+		//iterator state
+		private final double[] _buff = new double[getNumCols()];
+		private final int[] _astart;
+		private final int[] _apos;
+		private final int[] _vcodes;
+		private int _rpos = -1;
+		
+		public RLERowIterator(int rl, int ru) {
+			_ru = ru;
+			_rpos = rl;
+			_astart = new int[getNumValues()];
+			_apos = skipScan(getNumValues(), rl, _astart);
+			_vcodes = new int[Math.min(BitmapEncoder.BITMAP_BLOCK_SZ, ru-rl)];
+			getNextSegment();
+		}
+		
+		@Override
+		public boolean hasNext() {
+			return (_rpos < _ru);
+		}
+		
+		@Override
+		public double[] next() {
+			//copy entire value tuple or reset to zero
+			int ix = _rpos%BitmapEncoder.BITMAP_BLOCK_SZ;
+			final int clen = getNumCols();
+			if( _vcodes[ix] >= 0 )
+				System.arraycopy(getValues(), _vcodes[ix]*clen, _buff, 0, clen);
+			else
+				Arrays.fill(_buff, 0);
+			//advance position to next row
+			_rpos++;
+			if( _rpos%BitmapEncoder.BITMAP_BLOCK_SZ==0 && _rpos<_ru )
+				getNextSegment();
+			return _buff;
+		}
+		
+		public void getNextSegment() {
+			//materialize value codes for entire segment in a 
+			//single pass over all values (store value code by pos)
+			Arrays.fill(_vcodes, -1);
+			final int numVals = getNumValues();
+			final int blksz = BitmapEncoder.BITMAP_BLOCK_SZ;
+			for (int k = 0; k < numVals; k++) {
+				int boff = _ptr[k];
+				int blen = len(k);
+				int bix = _apos[k];
+				int start = _astart[k];
+				int end = (_rpos/blksz+1)*blksz;
+				while( bix < blen && start < end ) {
+					int lstart = _data[boff + bix];
+					int llen = _data[boff + bix + 1];
+					//set codes of entire run, with awareness of unaligned runs/segments
+					Arrays.fill(_vcodes, Math.min(Math.max(_rpos, start+lstart), end)-_rpos, 
+						Math.min(start+lstart+llen,end)-_rpos, k);
+					if( start+lstart+llen >= end )
+						break;
+					start += lstart + llen;
+					bix += 2;
+				}
+				_apos[k] = bix;
+				_astart[k] = start;
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/0a984a43/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java
index 7e54e4e..b27215a 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java
@@ -34,6 +34,7 @@ import org.apache.sysml.runtime.matrix.data.IJV;
 import org.apache.sysml.runtime.matrix.data.LibMatrixAgg;
 import org.apache.sysml.runtime.matrix.data.LibMatrixMult;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.data.SparseBlock;
 import org.apache.sysml.runtime.matrix.data.SparseBlock.Type;
 import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator;
 import org.apache.sysml.runtime.matrix.operators.ScalarOperator;
@@ -416,6 +417,11 @@ public class ColGroupUncompressed extends ColGroup
 		return new UCIterator(rl, ru, inclZeros);
 	}
 	
+	@Override
+	public Iterator<double[]> getRowIterator(int rl, int ru) {
+		return new UCRowIterator(rl, ru);
+	}
+	
 	private class UCIterator implements Iterator<IJV>
 	{
 		//iterator configuration
@@ -460,4 +466,49 @@ public class ColGroupUncompressed extends ColGroup
 			while( !_inclZeros && _value==0 );
 		}
 	}
+	
+	private class UCRowIterator implements Iterator<double[]>
+	{
+		//iterator configuration
+		private final int _ru;
+		//iterator state
+		private final double[] _buff = new double[getNumCols()];
+		private int _rpos = -1;
+		
+		public UCRowIterator(int rl, int ru) {
+			_ru = ru;
+			_rpos = rl;
+		}
+		
+		@Override
+		public boolean hasNext() {
+			return (_rpos < _ru);
+		}
+		
+		@Override
+		public double[] next() {
+			//copy entire dense/sparse row
+			if( _data.isAllocated() ) {
+				if( _data.isInSparseFormat() ) {
+					Arrays.fill(_buff, 0); //reset
+					if( !_data.getSparseBlock().isEmpty(_rpos) ) {
+						SparseBlock sblock = _data.getSparseBlock();
+						int apos = sblock.pos(_rpos);
+						int alen = sblock.size(_rpos);
+						int[] aix = sblock.indexes(_rpos);
+						double[] avals = sblock.values(_rpos);
+						for(int k=apos; k<apos+alen; k++)
+							_buff[aix[k]] = avals[k];
+					}
+				}
+				else {
+					final int clen = getNumCols();
+					System.arraycopy(_data.getDenseBlock(), _rpos*clen, _buff, 0, clen);
+				}
+			}
+			//advance position to next row
+			_rpos++;
+			return _buff;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/systemml/blob/0a984a43/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
index 418394b..3299594 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
@@ -2305,9 +2305,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 		protected final int _ru;
 		
 		//iterator state
-		private Iterator<IJV>[] _iters = null;
-		protected final int[] _ixbuff = new int[clen];
-		protected final double[] _vbuff = new double[clen];
+		protected Iterator<double[]>[] _iters = null;
 		protected int _rpos;
 		
 		@SuppressWarnings("unchecked")
@@ -2318,43 +2316,16 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 			//initialize array of column group iterators
 			_iters = new Iterator[_colGroups.size()];
 			for( int i=0; i<_colGroups.size(); i++ )
-				_iters[i] = _colGroups.get(i).getIterator(
-					_rl, _ru, true, true);
-			Arrays.fill(_ixbuff, -1);
+				_iters[i] = _colGroups.get(i).getRowIterator(_rl, _ru);
 			
 			//get initial row
-			_rpos = rl-1;
-			getNextRow();
+			_rpos = rl;
 		}
 		
 		@Override
 		public boolean hasNext() {
 			return (_rpos < _ru);
 		}
-		
-		@Override
-		public abstract T next();
-		
-		protected void getNextRow() {
-			_rpos++;
-			//read iterators if necessary
-			for(int j=0; j<_iters.length; j++) {
-				ColGroup grp = _colGroups.get(j);
-				if( _ixbuff[grp.getColIndex(0)] < _rpos ) {
-					if( _iters[j].hasNext() ) {
-						for( int k=0; k<grp.getNumCols(); k++ ) {
-							IJV cell = _iters[j].next();
-							_ixbuff[cell.getJ()] = cell.getI();
-							_vbuff[cell.getJ()] = cell.getV();
-						}
-					}
-					else {
-						for( int k=0; k<grp.getNumCols(); k++ )
-							_ixbuff[grp.getColIndex(k)] = _ru;
-					}
-				}
-			}
-		}
 	}
 	
 	private class DenseRowIterator extends RowIterator<double[]>
@@ -2367,13 +2338,15 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 		
 		@Override
 		public double[] next() {
-			if( !hasNext() )
-				throw new RuntimeException("No more rows in row partition ["+_rl+","+_ru+")");
-			//copy currently buffered row entries
-			for( int j=0; j<clen; j++ )
-				_ret[j] = (_ixbuff[j] == _rpos) ? _vbuff[j] : 0;
+			//copy group rows into consolidated row
+			for(int j=0; j<_iters.length; j++) {
+				ColGroup grp = _colGroups.get(j);
+				double[] row = _iters[j].next();
+				for( int k=0; k<row.length; k++ )
+					_ret[grp.getColIndex(k)] = row[k];
+			}
 			//advance to next row and return buffer
-			getNextRow();
+			_rpos++;
 			return _ret;
 		}
 	}
@@ -2381,27 +2354,28 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 	private class SparseRowIterator extends RowIterator<SparseRow>
 	{
 		private final SparseRowVector _ret = new SparseRowVector(clen);
+		private final double[] _tmp = new double[clen];
 		
 		public SparseRowIterator(int rl, int ru) {
 			super(rl, ru);
 		}
-
-		@Override
-		public boolean hasNext() {
-			return (_rpos < _ru);
-		}
-
+		
 		@Override
 		public SparseRow next() {
-			if( !hasNext() )
-				throw new RuntimeException("No more rows in row partition ["+_rl+","+_ru+")");
-			//copy currently buffered row entries
+			//copy group rows into consolidated dense vector
+			//to avoid binary search+shifting or final sort
+			for(int j=0; j<_iters.length; j++) {
+				ColGroup grp = _colGroups.get(j);
+				double[] row = _iters[j].next();
+				for( int k=0; k<row.length; k++ )
+					_tmp[grp.getColIndex(k)] = row[k];
+			}
+			//append non-zero values to consolidated sparse row
 			_ret.setSize(0);
-			for( int j=0; j<clen; j++ )
-				if( _ixbuff[j] == _rpos )
-					_ret.append(j, _vbuff[j]);
+			for(int i=0; i<_tmp.length; i++)
+				_ret.append(i, _tmp[i]);
 			//advance to next row and return buffer
-			getNextRow();
+			_rpos++;
 			return _ret;
 		}
 	}

http://git-wip-us.apache.org/repos/asf/systemml/blob/0a984a43/src/test/java/org/apache/sysml/test/integration/functions/codegen/CompressedRowAggregateLargeTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/codegen/CompressedRowAggregateLargeTest.java b/src/test/java/org/apache/sysml/test/integration/functions/codegen/CompressedRowAggregateLargeTest.java
new file mode 100644
index 0000000..13f99cc
--- /dev/null
+++ b/src/test/java/org/apache/sysml/test/integration/functions/codegen/CompressedRowAggregateLargeTest.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.test.integration.functions.codegen;
+
+import java.io.File;
+import java.util.HashMap;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.sysml.api.DMLScript;
+import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
+import org.apache.sysml.hops.OptimizerUtils;
+import org.apache.sysml.lops.LopProperties.ExecType;
+import org.apache.sysml.runtime.compress.BitmapEncoder;
+import org.apache.sysml.runtime.compress.CompressedMatrixBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixValue.CellIndex;
+import org.apache.sysml.test.integration.AutomatedTestBase;
+import org.apache.sysml.test.integration.TestConfiguration;
+import org.apache.sysml.test.utils.TestUtils;
+
+public class CompressedRowAggregateLargeTest extends AutomatedTestBase 
+{	
+	private static final String TEST_NAME1 = "CompressedRowAggregateMain";
+	private static final String TEST_DIR = "functions/codegen/";
+	private static final String TEST_CLASS_DIR = TEST_DIR + CompressedRowAggregateLargeTest.class.getSimpleName() + "/";
+	private final static String TEST_CONF = "SystemML-config-codegen-compress.xml";
+	private final static File   TEST_CONF_FILE = new File(SCRIPT_DIR + TEST_DIR, TEST_CONF);
+	
+	private static final int rows = 7*BitmapEncoder.BITMAP_BLOCK_SZ;
+	private static final int cols = 7;
+	private static final double sparsity1 = 0.9;
+	private static final double sparsity2 = 0.1;
+	private static final double sparsity3 = 0.0;
+	private static final double eps = Math.pow(10, -4); //large values
+	
+	public enum SparsityType {
+		DENSE,
+		SPARSE,
+		EMPTY,
+	}
+	
+	public enum ValueType {
+		RAND, //UC
+		CONST, //RLE
+		RAND_ROUND_OLE, //OLE
+		RAND_ROUND_DDC, //RLE
+	}
+	
+	@Override
+	public void setUp() {
+		TestUtils.clearAssertionInformation();
+		addTestConfiguration( TEST_NAME1, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] { "R" }) );
+	}
+		
+	@Test
+	public void testCompressedRowAggregateMainDenseConstCP() {
+		testCompressedRowAggregate( TEST_NAME1, SparsityType.DENSE, ValueType.CONST, ExecType.CP );
+	}
+	
+	@Test
+	public void testCompressedRowAggregateMainDenseRandCP() {
+		testCompressedRowAggregate( TEST_NAME1, SparsityType.DENSE, ValueType.RAND, ExecType.CP );
+	}
+	
+	@Test
+	public void testCompressedRowAggregateMainDenseRand2CP() {
+		testCompressedRowAggregate( TEST_NAME1, SparsityType.DENSE, ValueType.RAND_ROUND_DDC, ExecType.CP );
+	}
+	
+	@Test
+	public void testCompressedRowAggregateMainDenseRand3CP() {
+		testCompressedRowAggregate( TEST_NAME1, SparsityType.DENSE, ValueType.RAND_ROUND_OLE, ExecType.CP );
+	}
+	
+	@Test
+	public void testCompressedRowAggregateMainSparseConstCP() {
+		testCompressedRowAggregate( TEST_NAME1, SparsityType.SPARSE, ValueType.CONST, ExecType.CP );
+	}
+	
+	@Test
+	public void testCompressedRowAggregateMainSparseRandCP() {
+		testCompressedRowAggregate( TEST_NAME1, SparsityType.SPARSE, ValueType.RAND, ExecType.CP );
+	}
+	
+	@Test
+	public void testCompressedRowAggregateMainSparseRand2CP() {
+		testCompressedRowAggregate( TEST_NAME1, SparsityType.SPARSE, ValueType.RAND_ROUND_DDC, ExecType.CP );
+	}
+	
+	@Test
+	public void testCompressedRowAggregateMainSparseRand3CP() {
+		testCompressedRowAggregate( TEST_NAME1, SparsityType.SPARSE, ValueType.RAND_ROUND_OLE, ExecType.CP );
+	}
+	
+	@Test
+	public void testCompressedRowAggregateMainEmptyConstCP() {
+		testCompressedRowAggregate( TEST_NAME1, SparsityType.EMPTY, ValueType.CONST, ExecType.CP );
+	}
+	
+	@Test
+	public void testCompressedRowAggregateMainEmptyRandCP() {
+		testCompressedRowAggregate( TEST_NAME1, SparsityType.EMPTY, ValueType.RAND, ExecType.CP );
+	}
+	
+	@Test
+	public void testCompressedRowAggregateMainEmptyRand2CP() {
+		testCompressedRowAggregate( TEST_NAME1, SparsityType.EMPTY, ValueType.RAND_ROUND_DDC, ExecType.CP );
+	}
+	
+	@Test
+	public void testCompressedRowAggregateMainEmptyRand3CP() {
+		testCompressedRowAggregate( TEST_NAME1, SparsityType.EMPTY, ValueType.RAND_ROUND_OLE, ExecType.CP );
+	}
+	
+	@Test
+	public void testCompressedRowAggregateMainDenseConstSP() {
+		testCompressedRowAggregate( TEST_NAME1, SparsityType.DENSE, ValueType.CONST, ExecType.SPARK );
+	}
+	
+	@Test
+	public void testCompressedRowAggregateMainDenseRandSP() {
+		testCompressedRowAggregate( TEST_NAME1, SparsityType.DENSE, ValueType.RAND, ExecType.SPARK );
+	}
+	
+	@Test
+	public void testCompressedRowAggregateMainDenseRand2SP() {
+		testCompressedRowAggregate( TEST_NAME1, SparsityType.DENSE, ValueType.RAND_ROUND_DDC, ExecType.SPARK );
+	}
+	
+	@Test
+	public void testCompressedRowAggregateMainDenseRand3SP() {
+		testCompressedRowAggregate( TEST_NAME1, SparsityType.DENSE, ValueType.RAND_ROUND_OLE, ExecType.SPARK );
+	}
+	
+	@Test
+	public void testCompressedRowAggregateMainSparseConstSP() {
+		testCompressedRowAggregate( TEST_NAME1, SparsityType.SPARSE, ValueType.CONST, ExecType.SPARK );
+	}
+	
+	@Test
+	public void testCompressedRowAggregateMainSparseRandSP() {
+		testCompressedRowAggregate( TEST_NAME1, SparsityType.SPARSE, ValueType.RAND, ExecType.SPARK );
+	}
+	
+	@Test
+	public void testCompressedRowAggregateMainSparseRand2SP() {
+		testCompressedRowAggregate( TEST_NAME1, SparsityType.SPARSE, ValueType.RAND_ROUND_DDC, ExecType.SPARK );
+	}
+	
+	@Test
+	public void testCompressedRowAggregateMainSparseRand3SP() {
+		testCompressedRowAggregate( TEST_NAME1, SparsityType.SPARSE, ValueType.RAND_ROUND_OLE, ExecType.SPARK );
+	}
+	
+	@Test
+	public void testCompressedRowAggregateMainEmptyConstSP() {
+		testCompressedRowAggregate( TEST_NAME1, SparsityType.EMPTY, ValueType.CONST, ExecType.SPARK );
+	}
+	
+	@Test
+	public void testCompressedRowAggregateMainEmptyRandSP() {
+		testCompressedRowAggregate( TEST_NAME1, SparsityType.EMPTY, ValueType.RAND, ExecType.SPARK );
+	}
+	
+	@Test
+	public void testCompressedRowAggregateMainEmptyRand2SP() {
+		testCompressedRowAggregate( TEST_NAME1, SparsityType.EMPTY, ValueType.RAND_ROUND_DDC, ExecType.SPARK );
+	}
+	
+	@Test
+	public void testCompressedRowAggregateMainEmptyRand3SP() {
+		testCompressedRowAggregate( TEST_NAME1, SparsityType.EMPTY, ValueType.RAND_ROUND_OLE, ExecType.SPARK );
+	}
+	
+	private void testCompressedRowAggregate(String testname, SparsityType stype, ValueType vtype, ExecType et)
+	{	
+		boolean oldRewrites = OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION;
+		RUNTIME_PLATFORM platformOld = rtplatform;
+		switch( et ){
+			case MR: rtplatform = RUNTIME_PLATFORM.HADOOP; break;
+			case SPARK: rtplatform = RUNTIME_PLATFORM.SPARK; break;
+			default: rtplatform = RUNTIME_PLATFORM.HYBRID_SPARK; break;
+		}
+	
+		boolean sparkConfigOld = DMLScript.USE_LOCAL_SPARK_CONFIG;
+		if( rtplatform == RUNTIME_PLATFORM.SPARK || rtplatform == RUNTIME_PLATFORM.HYBRID_SPARK )
+			DMLScript.USE_LOCAL_SPARK_CONFIG = true;
+		
+		try
+		{
+			OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION = true;
+			TestConfiguration config = getTestConfiguration(testname);
+			loadTestConfiguration(config);
+			
+			String HOME = SCRIPT_DIR + TEST_DIR;
+			fullDMLScriptName = HOME + testname + ".dml";
+			programArgs = new String[]{"-explain", "-stats", 
+					"-args", input("X"), output("R") };
+			
+			fullRScriptName = HOME + testname + ".R";
+			rCmd = getRCmd(inputDir(), expectedDir());			
+
+			//generate input data
+			double sparsity = -1;
+			switch( stype ){
+				case DENSE: sparsity = sparsity1; break;
+				case SPARSE: sparsity = sparsity2; break;
+				case EMPTY: sparsity = sparsity3; break;
+			}
+			
+			//generate input data
+			double min = (vtype==ValueType.CONST)? 10 : -10;
+			double[][] X = TestUtils.generateTestMatrix(rows, cols, min, 10, sparsity, 7);
+			if( vtype==ValueType.RAND_ROUND_OLE || vtype==ValueType.RAND_ROUND_DDC ) {
+				CompressedMatrixBlock.ALLOW_DDC_ENCODING = (vtype==ValueType.RAND_ROUND_DDC);
+				X = TestUtils.round(X);
+			}
+			writeInputMatrixWithMTD("X", X, true);
+			
+			//run tests
+			runTest(true, false, null, -1); 
+			runRScript(true); 
+			
+			//compare matrices 
+			HashMap<CellIndex, Double> dmlfile = readDMLMatrixFromHDFS("R");
+			HashMap<CellIndex, Double> rfile  = readRMatrixFromFS("R");	
+			TestUtils.compareMatrices(dmlfile, rfile, eps, "Stat-DML", "Stat-R");
+			Assert.assertTrue(heavyHittersContainsSubString("spoofRA", 2) 
+				|| heavyHittersContainsSubString("sp_spoofRA", 2));
+			Assert.assertTrue(heavyHittersContainsSubString("compress")
+				|| heavyHittersContainsSubString("sp_compress"));
+		}
+		finally {
+			rtplatform = platformOld;
+			DMLScript.USE_LOCAL_SPARK_CONFIG = sparkConfigOld;
+			OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION = oldRewrites;
+			OptimizerUtils.ALLOW_AUTO_VECTORIZATION = true;
+			OptimizerUtils.ALLOW_OPERATOR_FUSION = true;
+			CompressedMatrixBlock.ALLOW_DDC_ENCODING = true;
+		}
+	}	
+
+	/**
+	 * Override default configuration with custom test configuration to ensure
+	 * scratch space and local temporary directory locations are also updated.
+	 */
+	@Override
+	protected File getConfigTemplateFile() {
+		// Instrumentation in this test's output log to show custom configuration file used for template.
+		System.out.println("This test case overrides default configuration with " + TEST_CONF_FILE.getPath());
+		return TEST_CONF_FILE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/systemml/blob/0a984a43/src/test_suites/java/org/apache/sysml/test/integration/functions/codegen/ZPackageSuite.java
----------------------------------------------------------------------
diff --git a/src/test_suites/java/org/apache/sysml/test/integration/functions/codegen/ZPackageSuite.java b/src/test_suites/java/org/apache/sysml/test/integration/functions/codegen/ZPackageSuite.java
index fda71a5..75b66a1 100644
--- a/src/test_suites/java/org/apache/sysml/test/integration/functions/codegen/ZPackageSuite.java
+++ b/src/test_suites/java/org/apache/sysml/test/integration/functions/codegen/ZPackageSuite.java
@@ -42,6 +42,7 @@ import org.junit.runners.Suite;
 	CompressedMultiAggregateTest.class,
 	CompressedOuterProductTest.class,
 	CompressedRowAggregateTest.class,
+	CompressedRowAggregateLargeTest.class,
 	CPlanComparisonTest.class,
 	CPlanVectorPrimitivesTest.class,
 	DAGCellwiseTmplTest.class,