You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2017/05/31 01:06:12 UTC

incubator-systemml git commit: [SYSTEMML-1289] Performance codegen cellwise over compressed matrices

Repository: incubator-systemml
Updated Branches:
  refs/heads/master 3f0c12ece -> b2feaf759


[SYSTEMML-1289] Performance codegen cellwise over compressed matrices

This patch makes the following performance improvements to codegen
cellwise operations (and others) over compressed matrices:

1) Sparse output matrices for sparse-safe operations, where we ensure
correctness by explicitly sorting the output sparse rows, which is done
in a thread-local manner per row partition.

2) Pre-allocation of sparse rows in order to avoid repeated
re-allocations by determining the number of non zeros per row partition
up front. Note that this requires segment-aligned row partitions, which
we only enforce if this does not limit the effective degree of
parallelism.

3) Skip-scan exploitation to find rl boundaries for OLE value iterators
in order to avoid repeated scans of the entire iterator just to find the
row partition starts. This is also implicitly used by all other codegen
operations over compressed matrices.

For example, on a scenario with a pre-processed ImageNet dataset of 1.2M
images, 30x30 pixels per image, and a simple sparse-safe cellwise
generated operator for X*(Y+7), where X is the compressed input, this
patch improved single-node performance from 36s to 11.5s. This is
competitive, given that the uncompressed codegen operation takes 16.3s.  

Finally, this patch also includes a robustness fix of the generic
ColGroupOffset iterator for cases, where an iterator over a row
partition does not have any row indexes.


Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/b2feaf75
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/b2feaf75
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/b2feaf75

Branch: refs/heads/master
Commit: b2feaf759b701889a0a2649608df8833eaf4737a
Parents: 3f0c12e
Author: Matthias Boehm <mb...@gmail.com>
Authored: Tue May 30 17:21:14 2017 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Tue May 30 18:06:06 2017 -0700

----------------------------------------------------------------------
 .../sysml/runtime/codegen/SpoofCellwise.java    | 124 +++++++++++++------
 .../sysml/runtime/compress/BitmapEncoder.java   |   5 +
 .../sysml/runtime/compress/ColGroupOLE.java     |  24 ++--
 .../sysml/runtime/compress/ColGroupOffset.java  |   3 +-
 .../runtime/compress/CompressedMatrixBlock.java |  22 ++--
 .../sysml/runtime/matrix/data/MatrixBlock.java  |   3 +-
 6 files changed, 125 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b2feaf75/src/main/java/org/apache/sysml/runtime/codegen/SpoofCellwise.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/codegen/SpoofCellwise.java b/src/main/java/org/apache/sysml/runtime/codegen/SpoofCellwise.java
index 3e1dcad..cc8ef69 100644
--- a/src/main/java/org/apache/sysml/runtime/codegen/SpoofCellwise.java
+++ b/src/main/java/org/apache/sysml/runtime/codegen/SpoofCellwise.java
@@ -31,6 +31,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.compress.BitmapEncoder;
 import org.apache.sysml.runtime.compress.CompressedMatrixBlock;
 import org.apache.sysml.runtime.functionobjects.Builtin;
 import org.apache.sysml.runtime.functionobjects.Builtin.BuiltinCode;
@@ -196,31 +197,32 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl
 		}
 		
 		//input preparation
+		MatrixBlock a = inputs.get(0);
 		SideInput[] b = prepInputMatricesAbstract(inputs);
 		double[] scalars = prepInputScalars(scalarObjects);
-		final int m = inputs.get(0).getNumRows();
-		final int n = inputs.get(0).getNumColumns();
+		final int m = a.getNumRows();
+		final int n = a.getNumColumns();
 		
 		//sparse safe check 
 		boolean sparseSafe = isSparseSafe() || (b.length == 0
 				&& genexec( 0, b, scalars, m, n, 0, 0 ) == 0);
 		
 		//result allocation and preparations
-		boolean sparseOut = sparseSafe && inputs.get(0).isInSparseFormat()
-				&& _type == CellType.NO_AGG && !(inputs.get(0) instanceof CompressedMatrixBlock);
-		out.reset(inputs.get(0).getNumRows(), _type == CellType.NO_AGG ?
-				inputs.get(0).getNumColumns() : 1, sparseOut);
+		boolean sparseOut = sparseSafe && a.isInSparseFormat()
+				&& _type == CellType.NO_AGG;
+		out.reset(a.getNumRows(), _type == CellType.NO_AGG ?
+				a.getNumColumns() : 1, sparseOut);
 		out.allocateDenseOrSparseBlock();
 		
 		long lnnz = 0;
 		if( k <= 1 ) //SINGLE-THREADED
 		{
 			if( inputs.get(0) instanceof CompressedMatrixBlock )
-				lnnz = executeCompressed((CompressedMatrixBlock)inputs.get(0), b, scalars, out, m, n, sparseSafe, 0, m);
+				lnnz = executeCompressed((CompressedMatrixBlock)a, b, scalars, out, m, n, sparseSafe, 0, m);
 			else if( !inputs.get(0).isInSparseFormat() )
-				lnnz = executeDense(inputs.get(0).getDenseBlock(), b, scalars, out, m, n, sparseSafe, 0, m);
+				lnnz = executeDense(a.getDenseBlock(), b, scalars, out, m, n, sparseSafe, 0, m);
 			else
-				lnnz = executeSparse(inputs.get(0).getSparseBlock(), b, scalars, out, m, n, sparseSafe, 0, m);
+				lnnz = executeSparse(a.getSparseBlock(), b, scalars, out, m, n, sparseSafe, 0, m);
 		}
 		else  //MULTI-THREADED
 		{
@@ -229,9 +231,12 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl
 				ArrayList<ParExecTask> tasks = new ArrayList<ParExecTask>();
 				int nk = UtilFunctions.roundToNext(Math.min(8*k,m/32), k);
 				int blklen = (int)(Math.ceil((double)m/nk));
+				if( a instanceof CompressedMatrixBlock && sparseOut
+					&& k/2*BitmapEncoder.BITMAP_BLOCK_SZ < m)
+					blklen = BitmapEncoder.getAlignedBlocksize(blklen);
 				for( int i=0; i<nk & i*blklen<m; i++ )
-					tasks.add(new ParExecTask(inputs.get(0), b, scalars, out, 
-						m, n, sparseSafe, i*blklen, Math.min((i+1)*blklen, m))); 
+					tasks.add(new ParExecTask(a, b, scalars, out, m, n,
+						sparseSafe, i*blklen, Math.min((i+1)*blklen, m))); 
 				//execute tasks
 				List<Future<Long>> taskret = pool.invokeAll(tasks);	
 				pool.shutdown();
@@ -253,7 +258,8 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl
 	/////////
 	//function dispatch
 	
-	private long executeDense(double[] a, SideInput[] b, double[] scalars, MatrixBlock out, int m, int n, boolean sparseSafe, int rl, int ru) 
+	private long executeDense(double[] a, SideInput[] b, double[] scalars, 
+			MatrixBlock out, int m, int n, boolean sparseSafe, int rl, int ru) 
 		throws DMLRuntimeException 
 	{
 		double[] c = out.getDenseBlock();
@@ -270,7 +276,8 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl
 		return -1;
 	}
 	
-	private double executeDenseAndAgg(double[] a, SideInput[] b, double[] scalars, int m, int n, boolean sparseSafe, int rl, int ru) throws DMLRuntimeException 
+	private double executeDenseAndAgg(double[] a, SideInput[] b, double[] scalars, 
+			int m, int n, boolean sparseSafe, int rl, int ru) throws DMLRuntimeException 
 	{
 		//numerically stable aggregation for sum/sum_sq
 		if( _aggOp == AggOp.SUM || _aggOp == AggOp.SUM_SQ )
@@ -279,7 +286,8 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl
 			return executeDenseAggMxx(a, b, scalars, m, n, sparseSafe, rl, ru);
 	}
 	
-	private long executeSparse(SparseBlock sblock, SideInput[] b, double[] scalars, MatrixBlock out, int m, int n, boolean sparseSafe, int rl, int ru) 
+	private long executeSparse(SparseBlock sblock, SideInput[] b, double[] scalars, 
+			MatrixBlock out, int m, int n, boolean sparseSafe, int rl, int ru) 
 		throws DMLRuntimeException 
 	{
 		if( sparseSafe && sblock == null )
@@ -301,7 +309,8 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl
 		return -1;
 	}
 	
-	private double executeSparseAndAgg(SparseBlock sblock, SideInput[] b, double[] scalars, int m, int n, boolean sparseSafe, int rl, int ru) 
+	private double executeSparseAndAgg(SparseBlock sblock, SideInput[] b, double[] scalars, 
+			int m, int n, boolean sparseSafe, int rl, int ru) 
 		throws DMLRuntimeException 
 	{
 		if( sparseSafe && sblock == null )
@@ -313,15 +322,18 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl
 			return executeSparseAggMxx(sblock, b, scalars, m, n, sparseSafe, rl, ru);
 	}
 	
-	private long executeCompressed(CompressedMatrixBlock a, SideInput[] b, double[] scalars, MatrixBlock out, int m, int n, boolean sparseSafe, int rl, int ru) 
+	private long executeCompressed(CompressedMatrixBlock a, SideInput[] b, double[] scalars, 
+			MatrixBlock out, int m, int n, boolean sparseSafe, int rl, int ru) 
 		throws DMLRuntimeException 
 	{
-		double[] c = out.getDenseBlock();
-		
 		if( _type == CellType.NO_AGG ) {
-			return executeCompressedNoAgg(a, b, scalars, c, m, n, sparseSafe, rl, ru);
+			long lnnz = executeCompressedNoAgg(a, b, scalars, out, m, n, sparseSafe, rl, ru);
+			if( out.isInSparseFormat() )
+				out.sortSparseRows(rl, ru);
+			return lnnz;
 		}
 		else if( _type == CellType.ROW_AGG ) {
+			double[] c = out.getDenseBlock();
 			if( _aggOp == AggOp.SUM || _aggOp == AggOp.SUM_SQ )
 				return executeCompressedRowAggSum(a, b, scalars, c, m, n, sparseSafe, rl, ru);
 			else
@@ -330,7 +342,8 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl
 		return -1;
 	}
 	
-	private double executeCompressedAndAgg(CompressedMatrixBlock a, SideInput[] b, double[] scalars, int m, int n, boolean sparseSafe, int rl, int ru) throws DMLRuntimeException 
+	private double executeCompressedAndAgg(CompressedMatrixBlock a, SideInput[] b, double[] scalars, 
+			int m, int n, boolean sparseSafe, int rl, int ru) throws DMLRuntimeException 
 	{
 		//numerically stable aggregation for sum/sum_sq
 		if( _aggOp == AggOp.SUM || _aggOp == AggOp.SUM_SQ )
@@ -342,7 +355,8 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl
 	/////////
 	//core operator skeletons for dense, sparse, and compressed
 
-	private long executeDenseNoAgg(double[] a, SideInput[] b, double[] scalars, double[] c, int m, int n, boolean sparseSafe, int rl, int ru) 
+	private long executeDenseNoAgg(double[] a, SideInput[] b, double[] scalars, 
+			double[] c, int m, int n, boolean sparseSafe, int rl, int ru) 
 		throws DMLRuntimeException 
 	{
 		long lnnz = 0;
@@ -357,7 +371,8 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl
 		return lnnz;
 	}
 	
-	private long executeDenseRowAggSum(double[] a, SideInput[] b, double[] scalars, double[] c, int m, int n, boolean sparseSafe, int rl, int ru) 
+	private long executeDenseRowAggSum(double[] a, SideInput[] b, double[] scalars, 
+			double[] c, int m, int n, boolean sparseSafe, int rl, int ru) 
 		throws DMLRuntimeException 
 	{
 		KahanFunction kplus = (KahanFunction) getAggFunction();
@@ -375,7 +390,8 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl
 		return lnnz;
 	}
 	
-	private long executeDenseRowAggMxx(double[] a, SideInput[] b, double[] scalars, double[] c, int m, int n, boolean sparseSafe, int rl, int ru) 
+	private long executeDenseRowAggMxx(double[] a, SideInput[] b, double[] scalars, 
+			double[] c, int m, int n, boolean sparseSafe, int rl, int ru) 
 		throws DMLRuntimeException 
 	{
 		double initialVal = (_aggOp==AggOp.MIN) ? Double.MAX_VALUE : -Double.MAX_VALUE;
@@ -403,7 +419,8 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl
 		return lnnz;
 	}
 	
-	private double executeDenseAggSum(double[] a, SideInput[] b, double[] scalars, int m, int n, boolean sparseSafe, int rl, int ru) 
+	private double executeDenseAggSum(double[] a, SideInput[] b, double[] scalars, 
+			int m, int n, boolean sparseSafe, int rl, int ru) 
 		throws DMLRuntimeException 
 	{
 		KahanFunction kplus = (KahanFunction) getAggFunction();
@@ -418,7 +435,8 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl
 		return kbuff._sum;
 	}
 	
-	private double executeDenseAggMxx(double[] a, SideInput[] b, double[] scalars, int m, int n, boolean sparseSafe, int rl, int ru) 
+	private double executeDenseAggMxx(double[] a, SideInput[] b, double[] scalars, 
+			int m, int n, boolean sparseSafe, int rl, int ru) 
 		throws DMLRuntimeException 
 	{
 		//safe aggregation for min/max w/ handling of zero entries
@@ -435,7 +453,8 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl
 		return ret;
 	}
 	
-	private long executeSparseNoAggSparse(SparseBlock sblock, SideInput[] b, double[] scalars, MatrixBlock out, int m, int n, boolean sparseSafe, int rl, int ru) 
+	private long executeSparseNoAggSparse(SparseBlock sblock, SideInput[] b, double[] scalars, 
+			MatrixBlock out, int m, int n, boolean sparseSafe, int rl, int ru) 
 		throws DMLRuntimeException 
 	{
 		//note: sequential scan algorithm for both sparse-safe and -unsafe 
@@ -471,7 +490,8 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl
 		return lnnz;
 	}
 	
-	private long executeSparseNoAggDense(SparseBlock sblock, SideInput[] b, double[] scalars, MatrixBlock out, int m, int n, boolean sparseSafe, int rl, int ru) 
+	private long executeSparseNoAggDense(SparseBlock sblock, SideInput[] b, double[] scalars, 
+			MatrixBlock out, int m, int n, boolean sparseSafe, int rl, int ru) 
 		throws DMLRuntimeException 
 	{
 		//note: sequential scan algorithm for both sparse-safe and -unsafe 
@@ -504,7 +524,8 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl
 		return lnnz;
 	}
 	
-	private long executeSparseRowAggSum(SparseBlock sblock, SideInput[] b, double[] scalars, MatrixBlock out, int m, int n, boolean sparseSafe, int rl, int ru) 
+	private long executeSparseRowAggSum(SparseBlock sblock, SideInput[] b, double[] scalars, 
+			MatrixBlock out, int m, int n, boolean sparseSafe, int rl, int ru) 
 		throws DMLRuntimeException 
 	{
 		KahanFunction kplus = (KahanFunction) getAggFunction();
@@ -542,7 +563,8 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl
 		return lnnz;
 	}
 	
-	private long executeSparseRowAggMxx(SparseBlock sblock, SideInput[] b, double[] scalars, MatrixBlock out, int m, int n, boolean sparseSafe, int rl, int ru) 
+	private long executeSparseRowAggMxx(SparseBlock sblock, SideInput[] b, double[] scalars, 
+			MatrixBlock out, int m, int n, boolean sparseSafe, int rl, int ru) 
 		throws DMLRuntimeException 
 	{
 		double initialVal = (_aggOp==AggOp.MIN) ? Double.MAX_VALUE : -Double.MAX_VALUE;
@@ -580,7 +602,8 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl
 		return lnnz;
 	}
 	
-	private double executeSparseAggSum(SparseBlock sblock, SideInput[] b, double[] scalars, int m, int n, boolean sparseSafe, int rl, int ru) 
+	private double executeSparseAggSum(SparseBlock sblock, SideInput[] b, double[] scalars, 
+			int m, int n, boolean sparseSafe, int rl, int ru) 
 		throws DMLRuntimeException 
 	{
 		KahanFunction kplus = (KahanFunction) getAggFunction();
@@ -614,7 +637,8 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl
 		return kbuff._sum;
 	}
 	
-	private double executeSparseAggMxx(SparseBlock sblock, SideInput[] b, double[] scalars, int m, int n, boolean sparseSafe, int rl, int ru) 
+	private double executeSparseAggMxx(SparseBlock sblock, SideInput[] b, double[] scalars, 
+			int m, int n, boolean sparseSafe, int rl, int ru) 
 		throws DMLRuntimeException 
 	{
 		double ret = (_aggOp==AggOp.MIN) ? Double.MAX_VALUE : -Double.MAX_VALUE;
@@ -649,21 +673,41 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl
 		return ret;
 	}
 	
-	private long executeCompressedNoAgg(CompressedMatrixBlock a, SideInput[] b, double[] scalars, double[] c, int m, int n, boolean sparseSafe, int rl, int ru) 
+	private long executeCompressedNoAgg(CompressedMatrixBlock a, SideInput[] b, double[] scalars, 
+			MatrixBlock out, int m, int n, boolean sparseSafe, int rl, int ru) 
 		throws DMLRuntimeException 
 	{
+		double[] c = out.getDenseBlock();
+		SparseBlock csblock = out.getSparseBlock();
+		
+		//preallocate sparse rows to avoid reallocations
+		//note: counting nnz requires segment-aligned boundaries, which is enforced 
+		//whenever k/2 * BITMAP_BLOCK_SZ > m (i.e., it does not limit parallelism)
+		if( out.isInSparseFormat() && rl%BitmapEncoder.BITMAP_BLOCK_SZ==0
+			&& ru%BitmapEncoder.BITMAP_BLOCK_SZ==0) {
+			int[] rnnz = a.countNonZerosPerRow(rl, ru);
+			for( int i=rl; i<ru; i++ )
+				csblock.allocate(i, rnnz[i-rl]);
+		}
+		
 		long lnnz = 0;
 		Iterator<IJV> iter = a.getIterator(rl, ru, !sparseSafe);
 		while( iter.hasNext() ) {
 			IJV cell = iter.next();
 			double val = genexec(cell.getV(), b, scalars, m, n, cell.getI(), cell.getJ());
-			c[cell.getI()*n+cell.getJ()] = val; 
+			if( out.isInSparseFormat() ) {
+				csblock.allocate(cell.getI());
+				csblock.append(cell.getI(), cell.getJ(), val);
+			}
+			else
+				c[cell.getI()*n+cell.getJ()] = val; 
 			lnnz += (val!=0) ? 1 : 0;
 		}
 		return lnnz;
 	}
 	
-	private long executeCompressedRowAggSum(CompressedMatrixBlock a, SideInput[] b, double[] scalars, double[] c, int m, int n, boolean sparseSafe, int rl, int ru) 
+	private long executeCompressedRowAggSum(CompressedMatrixBlock a, SideInput[] b, double[] scalars, 
+			double[] c, int m, int n, boolean sparseSafe, int rl, int ru) 
 		throws DMLRuntimeException 
 	{
 		KahanFunction kplus = (KahanFunction) getAggFunction();
@@ -682,7 +726,8 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl
 		return lnnz;
 	}
 	
-	private long executeCompressedRowAggMxx(CompressedMatrixBlock a, SideInput[] b, double[] scalars, double[] c, int m, int n, boolean sparseSafe, int rl, int ru) 
+	private long executeCompressedRowAggMxx(CompressedMatrixBlock a, SideInput[] b, double[] scalars, 
+			double[] c, int m, int n, boolean sparseSafe, int rl, int ru) 
 		throws DMLRuntimeException 
 	{
 		Arrays.fill(c, rl, ru, (_aggOp==AggOp.MIN) ? Double.MAX_VALUE : -Double.MAX_VALUE);
@@ -699,7 +744,8 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl
 		return lnnz;
 	}
 	
-	private double executeCompressedAggSum(CompressedMatrixBlock a, SideInput[] b, double[] scalars, int m, int n, boolean sparseSafe, int rl, int ru) 
+	private double executeCompressedAggSum(CompressedMatrixBlock a, SideInput[] b, double[] scalars, 
+			int m, int n, boolean sparseSafe, int rl, int ru) 
 		throws DMLRuntimeException 
 	{
 		KahanFunction kplus = (KahanFunction) getAggFunction();
@@ -714,7 +760,8 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl
 		return kbuff._sum;
 	}
 	
-	private double executeCompressedAggMxx(CompressedMatrixBlock a, SideInput[] b, double[] scalars, int m, int n, boolean sparseSafe, int rl, int ru) 
+	private double executeCompressedAggMxx(CompressedMatrixBlock a, SideInput[] b, double[] scalars, 
+			int m, int n, boolean sparseSafe, int rl, int ru) 
 		throws DMLRuntimeException 
 	{
 		//safe aggregation for min/max w/ handling of zero entries
@@ -731,7 +778,8 @@ public abstract class SpoofCellwise extends SpoofOperator implements Serializabl
 		return ret;
 	}
 	
-	protected abstract double genexec( double a, SideInput[] b, double[] scalars, int m, int n, int rowIndex, int colIndex);
+	protected abstract double genexec( double a, SideInput[] b, 
+			double[] scalars, int m, int n, int rowIndex, int colIndex);
 	
 	private class ParAggTask implements Callable<Double> 
 	{

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b2feaf75/src/main/java/org/apache/sysml/runtime/compress/BitmapEncoder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/BitmapEncoder.java b/src/main/java/org/apache/sysml/runtime/compress/BitmapEncoder.java
index a1f7454..f93d8b3 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/BitmapEncoder.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/BitmapEncoder.java
@@ -38,6 +38,11 @@ public class BitmapEncoder
 	/** Size of the blocks used in a blocked bitmap representation. */
 	public static final int BITMAP_BLOCK_SZ = 65536;
 	
+	public static int getAlignedBlocksize(int blklen) {
+		return blklen + ((blklen%BITMAP_BLOCK_SZ != 0) ? 
+			BITMAP_BLOCK_SZ-blklen%BITMAP_BLOCK_SZ : 0);
+	}
+	
 	/**
 	 * Generate uncompressed bitmaps for a set of columns in an uncompressed
 	 * matrix block.

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b2feaf75/src/main/java/org/apache/sysml/runtime/compress/ColGroupOLE.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupOLE.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroupOLE.java
index ac0b803..71be538 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/ColGroupOLE.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupOLE.java
@@ -782,13 +782,23 @@ public class ColGroupOLE extends ColGroupOffset
 			_ru = ru;
 			_boff = _ptr[k];
 			_blen = len(k);
-			_bix = 0;
-			_start = 0; //init first segment
-			_slen = _data[_boff + _bix];
-			_spos = 0;
-			_rpos = _data[_boff + _bix + 1];
-			while( _rpos < rl )
-				nextRowOffset();
+			
+			//initialize position via segment-aligned skip-scan
+			int lrl = rl - rl%BitmapEncoder.BITMAP_BLOCK_SZ;
+			_bix = skipScanVal(k, lrl);
+			_start = lrl; 
+			
+			//move position to actual rl boundary
+			if( _bix < _blen ) {
+				_slen = _data[_boff + _bix];
+				_spos = 0;
+				_rpos = _data[_boff + _bix + 1];
+				while( _rpos < rl )
+					nextRowOffset();
+			}
+			else {
+				_rpos = _ru;
+			}
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b2feaf75/src/main/java/org/apache/sysml/runtime/compress/ColGroupOffset.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupOffset.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroupOffset.java
index 5fc4a5a..bc0b7f1 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/ColGroupOffset.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupOffset.java
@@ -476,7 +476,8 @@ public abstract class ColGroupOffset extends ColGroupValue
 				_rpos = _ru; //end after zero iterator
 				return;
 			}
-			else if( _cpos+1 >= getNumCols() && !(_viter!=null && _viter.hasNext()) ) {
+			else if( (_rpos< 0 || _cpos+1 >= getNumCols()) 
+					&& !(_viter!=null && _viter.hasNext()) ) {
 				do {
 					_vpos++;
 					if( _vpos < getNumValues() )

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b2feaf75/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
index 0fbe608..ca22b63 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
@@ -594,9 +594,8 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 		try {
 			ExecutorService pool = Executors.newFixedThreadPool( k );
 			int rlen = getNumRows();
-			int seqsz = BitmapEncoder.BITMAP_BLOCK_SZ;
-			int blklen = (int)(Math.ceil((double)rlen/k));
-			blklen += (blklen%seqsz != 0)?seqsz-blklen%seqsz:0;
+			int blklen = BitmapEncoder.getAlignedBlocksize(
+				(int)(Math.ceil((double)rlen/k)));
 			ArrayList<DecompressTask> tasks = new ArrayList<DecompressTask>();
 			for( int i=0; i<k & i*blklen<getNumRows(); i++ )
 				tasks.add(new DecompressTask(_colGroups, ret, i*blklen, Math.min((i+1)*blklen,rlen)));
@@ -813,6 +812,13 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 		return new ColumnGroupIterator(rl, ru, cgl, cgu, inclZeros);
 	}
 	
+	public int[] countNonZerosPerRow(int rl, int ru) {
+		int[] rnnz = new int[ru-rl];
+		for (ColGroup grp : _colGroups)
+			grp.countNonZerosPerRow(rnnz, rl, ru);
+		return rnnz;
+	}
+	
 	//////////////////////////////////////////
 	// Operations (overwrite existing ops for seamless integration)
 
@@ -1096,9 +1102,8 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 				ExecutorService pool = Executors.newFixedThreadPool( op.getNumThreads() );
 				ArrayList<UnaryAggregateTask> tasks = new ArrayList<UnaryAggregateTask>();
 				if( op.indexFn instanceof ReduceCol && grpParts.length > 0 ) {
-					int seqsz = BitmapEncoder.BITMAP_BLOCK_SZ;
-					int blklen = (int)(Math.ceil((double)rlen/op.getNumThreads()));
-					blklen += (blklen%seqsz != 0)?seqsz-blklen%seqsz:0;
+					int blklen = BitmapEncoder.getAlignedBlocksize(
+						(int)(Math.ceil((double)rlen/op.getNumThreads())));
 					for( int i=0; i<op.getNumThreads() & i*blklen<rlen; i++ )
 						tasks.add(new UnaryAggregateTask(grpParts[0], ret, i*blklen, Math.min((i+1)*blklen,rlen), op));
 				}
@@ -1351,9 +1356,8 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 			//compute remaining compressed column groups in parallel
 			ExecutorService pool = Executors.newFixedThreadPool( k );
 			int rlen = getNumRows();
-			int seqsz = BitmapEncoder.BITMAP_BLOCK_SZ;
-			int blklen = (int)(Math.ceil((double)rlen/k));
-			blklen += (blklen%seqsz != 0)?seqsz-blklen%seqsz:0;
+			int blklen = BitmapEncoder.getAlignedBlocksize(
+				(int)(Math.ceil((double)rlen/k)));
 			ArrayList<RightMatrixMultTask> tasks = new ArrayList<RightMatrixMultTask>();
 			for( int i=0; i<k & i*blklen<getNumRows(); i++ )
 				tasks.add(new RightMatrixMultTask(_colGroups, vector, result, i*blklen, Math.min((i+1)*blklen,rlen)));

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/b2feaf75/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
index 233350a..ac66241 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
@@ -783,7 +783,8 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
 		if( !sparse || sparseBlock==null )
 			return;		
 		for( int i=rl; i<ru; i++ )
-			sparseBlock.sort(i);
+			if( !sparseBlock.isEmpty(i) )
+				sparseBlock.sort(i);
 	}
 	
 	/**