You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2017/02/08 02:22:30 UTC

[5/5] incubator-systemml git commit: [SYSTEMML-449] Compressed linear algebra v2

[SYSTEMML-449] Compressed linear algebra v2

This patch bundles various improvements for the experimental feature
'compressed linear algebra'. In detail, this includes the following
extensions:

* [SYSTEMML-820] New column encoding format DDC (dense dictionary
coding) with DDC1 and DDC2 for 1 and 2 byte codes as well as efficient
operations.

* [SYSTEMML-815] Hardened sample-based estimators (e.g.,
uncompressed size, empty segments, reduced population size, and
stabilization parameter as well as numerically stable implementations),
incl increased sample fraction and removed unnecessary parameters.

* [SYSTEMML-814] Debugging tools for compression plans, compression
tracing, and compression statistics.

* New greedy column grouping algorithm with pruning and memoization.

* New static column partitioning and changed bin packing heuristics.

* Additional operations (e.g., cache-conscious rowSums)

* Various fixes and performance improvements throughout all CLA
components.

* Extended test cases to cover OLE, RLE, DDC, and UC groups as well as
combinations thereof.

* Various internal refactorings to simplify the extension and
maintenance of CLA. 

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/37a215bc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/37a215bc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/37a215bc

Branch: refs/heads/master
Commit: 37a215bc3be26495c351eae6be4b85eaf22daedc
Parents: 390b81c
Author: Matthias Boehm <mb...@gmail.com>
Authored: Sun Feb 5 16:22:01 2017 +0100
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Wed Feb 8 03:12:18 2017 +0100

----------------------------------------------------------------------
 .../sysml/runtime/compress/BitmapEncoder.java   |  22 +-
 .../apache/sysml/runtime/compress/ColGroup.java |  39 +-
 .../sysml/runtime/compress/ColGroupBitmap.java  | 580 ------------------
 .../sysml/runtime/compress/ColGroupDDC.java     | 227 +++++++
 .../sysml/runtime/compress/ColGroupDDC1.java    | 358 +++++++++++
 .../sysml/runtime/compress/ColGroupDDC2.java    | 312 ++++++++++
 .../sysml/runtime/compress/ColGroupOLE.java     | 173 +++---
 .../sysml/runtime/compress/ColGroupOffset.java  | 424 +++++++++++++
 .../sysml/runtime/compress/ColGroupRLE.java     | 178 +++---
 .../runtime/compress/ColGroupUncompressed.java  |  41 +-
 .../sysml/runtime/compress/ColGroupValue.java   | 303 ++++++++++
 .../runtime/compress/CompressedMatrixBlock.java | 419 ++++++++-----
 .../runtime/compress/PlanningBinPacker.java     | 112 ----
 .../sysml/runtime/compress/PlanningCoCoder.java | 257 --------
 .../runtime/compress/PlanningCoCodingGroup.java | 110 ----
 .../compress/PlanningGroupMergeAction.java      |  73 ---
 .../compress/ReaderColumnSelectionSparse.java   |   1 -
 .../runtime/compress/UncompressedBitmap.java    |  73 ++-
 .../compress/cocode/ColumnGroupPartitioner.java |  19 +
 .../ColumnGroupPartitionerBinPacking.java       | 100 +++
 .../cocode/ColumnGroupPartitionerStatic.java    |  52 ++
 .../compress/cocode/PlanningCoCoder.java        | 236 ++++++++
 .../compress/cocode/PlanningCoCodingGroup.java  | 175 ++++++
 .../compress/cocode/PlanningMemoTable.java      |  75 +++
 .../compress/estim/CompressedSizeEstimator.java |  47 +-
 .../estim/CompressedSizeEstimatorExact.java     |   5 +-
 .../estim/CompressedSizeEstimatorSample.java    | 605 ++++++++++---------
 .../compress/estim/CompressedSizeInfo.java      |  46 +-
 .../compress/estim/SizeEstimatorFactory.java    |   6 +-
 .../runtime/compress/utils/ConverterUtils.java  |  16 +
 .../runtime/compress/utils/IntArrayList.java    |  13 +-
 .../compress/utils/LinearAlgebraUtils.java      | 164 +++++
 .../compress/BasicCompressionTest.java          |  40 +-
 .../functions/compress/BasicGetValueTest.java   |  40 +-
 .../compress/BasicMatrixAppendTest.java         |  40 +-
 .../compress/BasicMatrixMultChainTest.java      |  76 ++-
 .../BasicMatrixTransposeSelfMultTest.java       |  40 +-
 .../compress/BasicMatrixVectorMultTest.java     |  40 +-
 .../BasicScalarOperationsSparseUnsafeTest.java  |  40 +-
 .../compress/BasicScalarOperationsTest.java     |  40 +-
 .../BasicTransposeSelfLeftMatrixMultTest.java   |  40 +-
 .../compress/BasicUnaryAggregateTest.java       | 326 +++++++---
 .../compress/BasicVectorMatrixMultTest.java     |  40 +-
 .../functions/compress/CompressedLinregCG.java  |   5 +-
 .../compress/CompressedSerializationTest.java   |  40 +-
 .../compress/LargeCompressionTest.java          |  40 +-
 .../compress/LargeMatrixVectorMultTest.java     |  40 +-
 .../compress/LargeParMatrixVectorMultTest.java  |  40 +-
 .../compress/LargeParUnaryAggregateTest.java    | 337 +++++++----
 .../compress/LargeVectorMatrixMultTest.java     |  40 +-
 .../functions/compress/ParCompressionTest.java  |  40 +-
 .../compress/ParMatrixMultChainTest.java        |  66 +-
 .../compress/ParMatrixVectorMultTest.java       |  40 +-
 .../ParTransposeSelfLeftMatrixMultTest.java     |  40 +-
 .../compress/ParUnaryAggregateTest.java         | 327 ++++++----
 .../compress/ParVectorMatrixMultTest.java       |  40 +-
 56 files changed, 4733 insertions(+), 2385 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/BitmapEncoder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/BitmapEncoder.java b/src/main/java/org/apache/sysml/runtime/compress/BitmapEncoder.java
index 7fd2c69..b27112f 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/BitmapEncoder.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/BitmapEncoder.java
@@ -20,7 +20,6 @@
 package org.apache.sysml.runtime.compress;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 
 import org.apache.sysml.runtime.compress.utils.DblArray;
 import org.apache.sysml.runtime.compress.utils.DblArrayIntListHashMap;
@@ -100,8 +99,8 @@ public class BitmapEncoder
 	 *            the offsets of different bits
 	 * @return compressed version of said bitmap
 	 */
-	public static char[] genRLEBitmap(int[] offsets) {
-		if( offsets.length == 0 )
+	public static char[] genRLEBitmap(int[] offsets, int len) {
+		if( len == 0 )
 			return new char[0]; //empty list
 
 		// Use an ArrayList for correctness at the expense of temp space
@@ -139,7 +138,7 @@ public class BitmapEncoder
 		curRunLen = 1;
 
 		// Process the remaining offsets
-		for (int i = 1; i < offsets.length; i++) {
+		for (int i = 1; i < len; i++) {
 
 			int absOffset = offsets[i];
 
@@ -179,9 +178,8 @@ public class BitmapEncoder
 
 		// Convert wasteful ArrayList to packed array.
 		char[] ret = new char[buf.size()];
-		for (int i = 0; i < buf.size(); i++) {
+		for(int i = 0; i < buf.size(); i++ )
 			ret[i] = buf.get(i);
-		}
 		return ret;
 	}
 
@@ -194,21 +192,19 @@ public class BitmapEncoder
 	 *            the offsets of different bits
 	 * @return compressed version of said bitmap
 	 */
-	public static char[] genOffsetBitmap(int[] offsets) 
-	{
-		int lastOffset = offsets[offsets.length - 1];
+	public static char[] genOffsetBitmap(int[] offsets, int len) 
+	{ 
+		int lastOffset = offsets[len - 1];
 
 		// Build up the blocks
 		int numBlocks = (lastOffset / BITMAP_BLOCK_SZ) + 1;
 		// To simplify the logic, we make two passes.
 		// The first pass divides the offsets by block.
 		int[] blockLengths = new int[numBlocks];
-		Arrays.fill(blockLengths, 0);
 
-		for (int ix = 0; ix < offsets.length; ix++) {
+		for (int ix = 0; ix < len; ix++) {
 			int val = offsets[ix];
 			int blockForVal = val / BITMAP_BLOCK_SZ;
-
 			blockLengths[blockForVal]++;
 		}
 
@@ -238,7 +234,7 @@ public class BitmapEncoder
 
 		return encodedBlocks;
 	}
-
+	
 	private static UncompressedBitmap extractBitmap(int colIndex, MatrixBlock rawblock, boolean skipZeros) 
 	{
 		//probe map for distinct items (for value or value groups)

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/ColGroup.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroup.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroup.java
index 586690c..bf1b822 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/ColGroup.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroup.java
@@ -40,9 +40,11 @@ public abstract class ColGroup implements Serializable
 	private static final long serialVersionUID = 2439785418908671481L;
 
 	public enum CompressionType  {
-		UNCOMPRESSED,   //uncompressed sparse/dense 
-		RLE_BITMAP,     //RLE bitmap
-		OLE_BITMAP;  //OLE bitmap
+		UNCOMPRESSED, //uncompressed sparse/dense 
+		RLE_BITMAP,  //RLE bitmap
+		OLE_BITMAP,  //OLE bitmap
+		DDC1, //DDC 1 byte
+		DDC2; //DDC 2 byte
 	}
 	
 	/**
@@ -53,23 +55,17 @@ public abstract class ColGroup implements Serializable
 
 	/** Number of rows in the matrix, for use by child classes. */
 	protected int _numRows;
-
-	/** How the elements of the column group are compressed. */
-	private CompressionType _compType;
-
 	
 	/**
 	 * Main constructor.
 	 * 
-	 * @param type compression type
 	 * @param colIndices
 	 *            offsets of the columns in the matrix block that make up the
 	 *            group
 	 * @param numRows
 	 *            total number of rows in the parent block
 	 */
-	protected ColGroup(CompressionType type, int[] colIndices, int numRows) {
-		_compType = type;
+	protected ColGroup(int[] colIndices, int numRows) {
 		_colIndexes = colIndices;
 		_numRows = numRows;
 	}
@@ -77,16 +73,15 @@ public abstract class ColGroup implements Serializable
 	/**
 	 * Convenience constructor for converting indices to a more compact format.
 	 * 
-	 * @param type compression type
 	 * @param colIndicesList list of column indices
 	 * @param numRows total number of rows in the parent block
 	 */
-	protected ColGroup(CompressionType type, List<Integer> colIndicesList, int numRows) {
-		_compType = type;
+	protected ColGroup(List<Integer> colIndicesList, int numRows) {
 		_colIndexes = new int[colIndicesList.size()];
 		int i = 0;
 		for (Integer index : colIndicesList)
 			_colIndexes[i++] = index;
+		_numRows = numRows;
 	}
 
 	/**
@@ -126,9 +121,7 @@ public abstract class ColGroup implements Serializable
 	 * 
 	 * @return How the elements of the column group are compressed.
 	 */
-	public CompressionType getCompType() {
-		return _compType;
-	}
+	public abstract CompressionType getCompType();
 
 	public void shiftColIndices(int offset)  {
 		for( int i=0; i<_colIndexes.length; i++ )
@@ -143,14 +136,12 @@ public abstract class ColGroup implements Serializable
 	 *         in memory.
 	 */
 	public long estimateInMemorySize() {
-		// int numRows (4B) , array reference colIndices (8B) + array object
-		// overhead if exists (32B) + 4B per element, CompressionType compType
-		// (2 booleans 2B + enum overhead 32B + reference to enum 8B)
-		long size = 54;
-		if (_colIndexes == null)
-			return size;
-		else
-			return size + 32 + 4 * _colIndexes.length;
+		// object (12B padded to factors of 8), int numRows (4B), 
+		// array reference colIndices (8B) 
+		//+ array object overhead if exists (32B) + 4B per element
+		long size = 24;
+		return (_colIndexes == null) ? size : 
+			size + 32 + 4 * _colIndexes.length;
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/ColGroupBitmap.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupBitmap.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroupBitmap.java
deleted file mode 100644
index dac18ef..0000000
--- a/src/main/java/org/apache/sysml/runtime/compress/ColGroupBitmap.java
+++ /dev/null
@@ -1,580 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.compress;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.Map.Entry;
-import java.util.TreeMap;
-
-import org.apache.sysml.runtime.DMLRuntimeException;
-import org.apache.sysml.runtime.functionobjects.Builtin;
-import org.apache.sysml.runtime.functionobjects.Builtin.BuiltinCode;
-import org.apache.sysml.runtime.matrix.data.MatrixBlock;
-import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator;
-import org.apache.sysml.runtime.matrix.operators.ScalarOperator;
-
-
-/**
- * Base class for column groups encoded with various types of bitmap encoding.
- * 
- * 
- * NOTES:
- *  * OLE: separate storage segment length and bitmaps led to a 30% improvement
- *    but not applied because more difficult to support both data layouts at the
- *    same time (distributed/local as well as w/ and w/o low-level opt)
- */
-public abstract class ColGroupBitmap extends ColGroup 
-{
-	private static final long serialVersionUID = -1635828933479403125L;
-	
-	public static final boolean LOW_LEVEL_OPT = true;	
-	//sorting of values by physical length helps by 10-20%, especially for serial, while
-	//slight performance decrease for parallel incl multi-threaded, hence not applied for
-	//distributed operations (also because compression time + garbage collection increases)
-	private static final boolean SORT_VALUES_BY_LENGTH = true; 
-	protected static final boolean CREATE_SKIPLIST = true;
-	
-	protected static final int READ_CACHE_BLKSZ = 2 * BitmapEncoder.BITMAP_BLOCK_SZ;
-	protected static final int WRITE_CACHE_BLKSZ = 2 * BitmapEncoder.BITMAP_BLOCK_SZ;
-	
-	/** Distinct values associated with individual bitmaps. */
-	protected double[] _values; //linearized <numcol vals> <numcol vals>
-
-	/** Bitmaps, one per uncompressed value in {@link #_values}. */
-	protected int[] _ptr; //bitmap offsets per value
-	protected char[] _data; //linearized bitmaps (variable length)
-	protected boolean _zeros; //contains zero values
-	
-	protected int[] _skiplist;
-	
-	public ColGroupBitmap(CompressionType type) {
-		super(type, (int[]) null, -1);
-	}
-	
-	/**
-	 * Main constructor. Stores the headers for the individual bitmaps.
-	 * 
-	 * @param type column type
-	 * @param colIndices
-	 *            indices (within the block) of the columns included in this
-	 *            column
-	 * @param numRows
-	 *            total number of rows in the parent block
-	 * @param ubm
-	 *            Uncompressed bitmap representation of the block
-	 */
-	public ColGroupBitmap(CompressionType type, int[] colIndices, int numRows, UncompressedBitmap ubm) 
-	{
-		super(type, colIndices, numRows);
-
-		// Extract and store just the distinct values. The bitmaps themselves go
-		// into the subclasses.
-		final int numCols = ubm.getNumColumns();
-		final int numVals = ubm.getNumValues();
-		
-		_values = new double[numVals*numCols];
-		_zeros = (ubm.getNumOffsets() < numRows);
-		
-		for (int i=0; i<numVals; i++) {
-			//note: deep copied internally on getValues
-			double[] tmp = ubm.getValues(i);
-			System.arraycopy(tmp, 0, _values, i*numCols, numCols);
-		}
-	}
-
-	/**
-	 * Constructor for subclass methods that need to create shallow copies
-	 * 
-	 * @param type compression type
-	 * @param colIndices
-	 *            raw column index information
-	 * @param numRows
-	 *            number of rows in the block
-	 * @param zeros ?
-	 * @param values
-	 *            set of distinct values for the block (associated bitmaps are
-	 *            kept in the subclass)
-	 */
-	protected ColGroupBitmap(CompressionType type, int[] colIndices, int numRows, boolean zeros, double[] values) {
-		super(type, colIndices, numRows);
-		_zeros = zeros;
-		_values = values;
-	}
-	
-	protected final int len(int k) {
-		return _ptr[k+1] - _ptr[k];
-	}
-
-	protected void createCompressedBitmaps(int numVals, int totalLen, char[][] lbitmaps)
-	{
-		// compact bitmaps to linearized representation
-		if( LOW_LEVEL_OPT && SORT_VALUES_BY_LENGTH
-			&& _numRows > BitmapEncoder.BITMAP_BLOCK_SZ ) 
-		{
-			// sort value by num segments in descending order
-			TreeMap<Integer,ArrayList<Integer>> tree = new TreeMap<Integer, ArrayList<Integer>>();
-			for( int i=0; i<numVals; i++ ) {
-				int revlen = totalLen-lbitmaps[i].length;
-				if( !tree.containsKey(revlen) )
-					tree.put(revlen, new ArrayList<Integer>());
-				tree.get(revlen).add(i);
-			}
-			
-			// compact bitmaps to linearized representation
-			_ptr = new int[numVals+1];
-			_data = new char[totalLen];
-			int pos = 0, off = 0;
-			for( Entry<Integer,ArrayList<Integer>> e : tree.entrySet() ) {
-				for( Integer tmpix : e.getValue() ) {
-					int len = lbitmaps[tmpix].length;
-					_ptr[pos] = off;
-					System.arraycopy(lbitmaps[tmpix], 0, _data, off, len);
-					off += len;
-					pos++;
-				}
-			}
-			_ptr[numVals] = totalLen;
-			
-			// reorder values
-			double[] lvalues = new double[_values.length];
-			int off2 = 0; int numCols = _colIndexes.length;
-			for( Entry<Integer,ArrayList<Integer>> e : tree.entrySet() ) {
-				for( Integer tmpix : e.getValue() ) {
-					System.arraycopy(_values, tmpix*numCols, lvalues, off2, numCols);				
-					off2 += numCols;
-				}
-			}			
-			_values = lvalues;
-		}
-		else
-		{
-			// compact bitmaps to linearized representation
-			_ptr = new int[numVals+1];
-			_data = new char[totalLen];
-			for( int i=0, off=0; i<numVals; i++ ) {
-				int len = lbitmaps[i].length;
-				_ptr[i] = off;
-				System.arraycopy(lbitmaps[i], 0, _data, off, len);
-				off += len;
-			}
-			_ptr[numVals] = totalLen;
-		}
-	}
-	
-	@Override
-	public long estimateInMemorySize() {
-		long size = super.estimateInMemorySize();
-		
-		// adding the size of values
-		size += 8; //array reference
-		if (_values != null) {
-			size += 32 + _values.length * 8; //values
-		}
-		
-		// adding bitmaps size
-		size += 16; //array references
-		if (_data != null) {
-			size += 32 + _ptr.length * 4; // offsets
-			size += 32 + _data.length * 2;    // bitmaps
-		}
-	
-		return size;
-	}
-
-	//generic decompression for OLE/RLE, to be overwritten for performance
-	@Override
-	public void decompressToBlock(MatrixBlock target, int rl, int ru) 
-	{
-		final int numCols = getNumCols();
-		final int numVals = getNumValues();
-		int[] colIndices = getColIndices();
-		
-		// Run through the bitmaps for this column group
-		for (int i = 0; i < numVals; i++) {
-			Iterator<Integer> decoder = getDecodeIterator(i);
-			int valOff = i*numCols;
-
-			while (decoder.hasNext()) {
-				int row = decoder.next();
-				if( row<rl ) continue;
-				if( row>ru ) break;
-				
-				for (int colIx = 0; colIx < numCols; colIx++)
-					target.appendValue(row, colIndices[colIx], _values[valOff+colIx]);
-			}
-		}
-	}
-
-	//generic decompression for OLE/RLE, to be overwritten for performance
-	@Override
-	public void decompressToBlock(MatrixBlock target, int[] colIndexTargets) 
-	{
-		final int numCols = getNumCols();
-		final int numVals = getNumValues();
-		
-		// Run through the bitmaps for this column group
-		for (int i = 0; i < numVals; i++) {
-			Iterator<Integer> decoder = getDecodeIterator(i);
-			int valOff = i*numCols;
-
-			while (decoder.hasNext()) {
-				int row = decoder.next();
-				for (int colIx = 0; colIx < numCols; colIx++) {
-					int origMatrixColIx = getColIndex(colIx);
-					int targetColIx = colIndexTargets[origMatrixColIx];
-					target.quickSetValue(row, targetColIx, _values[valOff+colIx]);
-				}
-			}
-		}
-	}
-	
-	//generic decompression for OLE/RLE, to be overwritten for performance
-	@Override
-	public void decompressToBlock(MatrixBlock target, int colpos) 
-	{
-		final int numCols = getNumCols();
-		final int numVals = getNumValues();
-		
-		// Run through the bitmaps for this column group
-		for (int i = 0; i < numVals; i++) {
-			Iterator<Integer> decoder = getDecodeIterator(i);
-			int valOff = i*numCols;
-
-			while (decoder.hasNext()) {
-				int row = decoder.next();
-				target.quickSetValue(row, 0, _values[valOff+colpos]);
-			}
-		}
-	}
-
-	//generic get for OLE/RLE, to be overwritten for performance
-	//potential: skip scan (segment length agg and run length) instead of decode
-	@Override
-	public double get(int r, int c) {
-		//find local column index
-		int ix = Arrays.binarySearch(_colIndexes, c);
-		if( ix < 0 )
-			throw new RuntimeException("Column index "+c+" not in bitmap group.");
-		
-		//find row index in value offset lists via scan
-		final int numCols = getNumCols();
-		final int numVals = getNumValues();
-		for (int i = 0; i < numVals; i++) {
-			Iterator<Integer> decoder = getDecodeIterator(i);
-			int valOff = i*numCols;
-			while (decoder.hasNext()) {
-				int row = decoder.next();
-				if( row == r )
-					return _values[valOff+ix];
-				else if( row > r )
-					break; //current value
-			}
-		}		
-		return 0;
-	}
-
-	public abstract void unaryAggregateOperations(AggregateUnaryOperator op, MatrixBlock result, int rl, int ru)
-		throws DMLRuntimeException;
-
-	protected final double sumValues(int bitmapIx)
-	{
-		final int numCols = getNumCols();
-		final int valOff = bitmapIx * numCols;
-		
-		double val = 0.0;
-		for( int i = 0; i < numCols; i++ ) {
-			val += _values[valOff+i];
-		}
-		
-		return val;
-	}
-	
-	protected final double sumValues(int bitmapIx, double[] b)
-	{
-		final int numCols = getNumCols();
-		final int valOff = bitmapIx * numCols;
-		
-		double val = 0;
-		for( int i = 0; i < numCols; i++ ) {
-			val += _values[valOff+i] * b[i];
-		}
-		
-		return val;
-	}
-
-	protected final double mxxValues(int bitmapIx, Builtin builtin)
-	{
-		final int numCols = getNumCols();
-		final int valOff = bitmapIx * numCols;
-		
-		double val = Double.MAX_VALUE * ((builtin.getBuiltinCode()==BuiltinCode.MAX)?-1:1);
-		for( int i = 0; i < numCols; i++ )
-			val = builtin.execute2(val, _values[valOff+i]);
-		
-		return val;
-	}
-
-	protected final double[] preaggValues(int numVals, double[] b) {
-		double[] ret = new double[numVals];
-		for( int k = 0; k < numVals; k++ )
-			ret[k] = sumValues(k, b);
-		
-		return ret;
-	}
-	
-	/**
-	 * Method for use by subclasses. Applies a scalar operation to the value
-	 * metadata stored in the superclass.
-	 * 
-	 * @param op
-	 *            scalar operation to perform
-	 * @return transformed copy of value metadata for this column group
-	 * @throws DMLRuntimeException if DMLRuntimeException occurs
-	 */
-	protected double[] applyScalarOp(ScalarOperator op)
-			throws DMLRuntimeException 
-	{
-		//scan over linearized values
-		double[] ret = new double[_values.length];
-		for (int i = 0; i < _values.length; i++) {
-			ret[i] = op.executeScalar(_values[i]);
-		}
-
-		return ret;
-	}
-
-	protected double[] applyScalarOp(ScalarOperator op, double newVal, int numCols)
-			throws DMLRuntimeException 
-	{
-		//scan over linearized values
-		double[] ret = new double[_values.length + numCols];
-		for( int i = 0; i < _values.length; i++ ) {
-			ret[i] = op.executeScalar(_values[i]);
-		}
-		
-		//add new value to the end
-		Arrays.fill(ret, _values.length, _values.length+numCols, newVal);
-		
-		return ret;
-	}
-	
-	/**
-	 * NOTE: Shared across OLE/RLE because value-only computation. 
-	 * 
-	 * @param result matrix block
-	 * @param builtin ?
-	 */
-	protected void computeMxx(MatrixBlock result, Builtin builtin) 
-	{
-		//init and 0-value handling
-		double val = Double.MAX_VALUE * ((builtin.getBuiltinCode()==BuiltinCode.MAX)?-1:1);
-		if( _zeros )
-			val = builtin.execute2(val, 0);
-		
-		//iterate over all values only
-		final int numVals = getNumValues();
-		final int numCols = getNumCols();		
-		for (int k = 0; k < numVals; k++)
-			for( int j=0, valOff = k*numCols; j<numCols; j++ )
-				val = builtin.execute2(val, _values[ valOff+j ]);
-		
-		//compute new partial aggregate
-		val = builtin.execute2(val, result.quickGetValue(0, 0));
-		result.quickSetValue(0, 0, val);
-	}
-	
-	/**
-	 * NOTE: Shared across OLE/RLE because value-only computation. 
-	 * 
-	 * @param result matrix block
-	 * @param builtin ?
-	 */
-	protected void computeColMxx(MatrixBlock result, Builtin builtin)
-	{
-		final int numVals = getNumValues();
-		final int numCols = getNumCols();
-		
-		//init and 0-value handling
-		double[] vals = new double[numCols];
-		Arrays.fill(vals, Double.MAX_VALUE * ((builtin.getBuiltinCode()==BuiltinCode.MAX)?-1:1));
-		if( _zeros ) {
-			for( int j = 0; j < numCols; j++ )
-				vals[j] = builtin.execute2(vals[j], 0);		
-		}
-		
-		//iterate over all values only
-		for (int k = 0; k < numVals; k++) 
-			for( int j=0, valOff=k*numCols; j<numCols; j++ )
-				vals[j] = builtin.execute2(vals[j], _values[ valOff+j ]);
-		
-		//copy results to output
-		for( int j=0; j<numCols; j++ )
-			result.quickSetValue(0, _colIndexes[j], vals[j]);
-	}
-	
-
-	/**
-	 * Obtain number of distrinct sets of values associated with the bitmaps in this column group.
-	 * 
-	 * @return the number of distinct sets of values associated with the bitmaps
-	 *         in this column group
-	 */
-	public int getNumValues() {
-		return _values.length / _colIndexes.length;
-	}
-
-	public double[] getValues() {
-		return _values;
-	}
-
-	public char[] getBitmaps() {
-		return _data;
-	}
-	
-	public int[] getBitmapOffsets() {
-		return _ptr;
-	}
-
-	public boolean hasZeros() {
-		return _zeros;
-	}
-	
-	/**
-	 * @param k
-	 *            index of a specific compressed bitmap (stored in subclass,
-	 *            index same as {@link #getValues})
-	 * @return an object for iterating over the row offsets in this bitmap. Only
-	 *         valid until the next call to this method. May be reused across
-	 *         calls.
-	 */
-	public abstract Iterator<Integer> getDecodeIterator(int k);
-
-	//TODO getDecodeIterator(int k, int rl, int ru)
-
-	/**
-	 * Utility function of sparse-unsafe operations.
-	 * 
-	 * @param ind ?
-	 * @return offsets
-	 * @throws DMLRuntimeException if DMLRuntimeException occurs
-	 */
-	protected int[] computeOffsets(boolean[] ind)
-		throws DMLRuntimeException 
-	{
-		//determine number of offsets
-		int numOffsets = 0;
-		for( int i=0; i<ind.length; i++ )
-			numOffsets += ind[i] ? 1 : 0;
-		
-		//create offset lists
-		int[] ret = new int[numOffsets];
-		for( int i=0, pos=0; i<ind.length; i++ )
-			if( ind[i] )
-				ret[pos++] = i;
-		
-		return ret;
-	}
-
-	@Override
-	public void readFields(DataInput in) 
-		throws IOException 
-	{
-		_numRows = in.readInt();
-		int numCols = in.readInt();
-		int numVals = in.readInt();
-		_zeros = in.readBoolean();
-		
-		//read col indices
-		_colIndexes = new int[ numCols ];
-		for( int i=0; i<numCols; i++ )
-			_colIndexes[i] = in.readInt();
-		
-		//read distinct values
-		_values = new double[numVals*numCols];
-		for( int i=0; i<numVals*numCols; i++ )
-			_values[i] = in.readDouble();
-		
-		//read bitmaps
-		int totalLen = in.readInt();
-		_ptr = new int[numVals+1];
-		_data = new char[totalLen];		
-		for( int i=0, off=0; i<numVals; i++ ) {
-			int len = in.readInt();
-			_ptr[i] = off;
-			for( int j=0; j<len; j++ )
-				_data[off+j] = in.readChar();
-			off += len;
-		}
-		_ptr[numVals] = totalLen;
-	}
-	
-	@Override
-	public void write(DataOutput out) 
-		throws IOException 
-	{
-		int numCols = getNumCols();
-		int numVals = getNumValues();
-		out.writeInt(_numRows);
-		out.writeInt(numCols);
-		out.writeInt(numVals);
-		out.writeBoolean(_zeros);
-		
-		//write col indices
-		for( int i=0; i<_colIndexes.length; i++ )
-			out.writeInt( _colIndexes[i] );
-		
-		//write distinct values
-		for( int i=0; i<_values.length; i++ )
-			out.writeDouble(_values[i]);
-
-		//write bitmaps (lens and data, offset later recreated)
-		int totalLen = 0;
-		for( int i=0; i<numVals; i++ )
-			totalLen += len(i);
-		out.writeInt(totalLen);	
-		for( int i=0; i<numVals; i++ ) {
-			int len = len(i);
-			int off = _ptr[i];
-			out.writeInt(len);
-			for( int j=0; j<len; j++ )
-				out.writeChar(_data[off+j]);
-		}
-	}
-
-	@Override
-	public long getExactSizeOnDisk() {
-		long ret = 13; //header
-		//col indices
-		ret += 4 * _colIndexes.length; 
-		//distinct values (groups of values)
-		ret += 8 * _values.length;
-		//actual bitmaps
-		ret += 4; //total length
-		for( int i=0; i<getNumValues(); i++ )
-			ret += 4 + 2 * len(i);
-		
-		return ret;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC.java
new file mode 100644
index 0000000..1782e2e
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.compress;
+
+import java.util.Arrays;
+
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.functionobjects.Builtin;
+import org.apache.sysml.runtime.functionobjects.KahanFunction;
+import org.apache.sysml.runtime.functionobjects.KahanPlus;
+import org.apache.sysml.runtime.functionobjects.KahanPlusSq;
+import org.apache.sysml.runtime.functionobjects.ReduceAll;
+import org.apache.sysml.runtime.functionobjects.ReduceCol;
+import org.apache.sysml.runtime.functionobjects.ReduceRow;
+import org.apache.sysml.runtime.functionobjects.Builtin.BuiltinCode;
+import org.apache.sysml.runtime.instructions.cp.KahanObject;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator;
+
+/**
+ * Class to encapsulate information about a column group that is encoded with
+ * dense dictionary encoding (DDC).
+ * 
+ * NOTE: zero values are included at position 0 in the value dictionary, which
+ * simplifies various operations such as counting the number of non-zeros.
+ */
+public abstract class ColGroupDDC extends ColGroupValue 
+{
+	private static final long serialVersionUID = -3204391646123465004L;
+
+	public ColGroupDDC() {
+		super();
+	}
+	
+	public ColGroupDDC(int[] colIndices, int numRows, UncompressedBitmap ubm) {
+		super(colIndices, numRows, ubm);
+	}
+	
+	protected ColGroupDDC(int[] colIndices, int numRows, double[] values) {
+		super(colIndices, numRows, values);
+	}
+	
+	@Override
+	public void decompressToBlock(MatrixBlock target, int rl, int ru) {
+		for( int i = rl; i < ru; i++ ) {
+			for( int colIx = 0; colIx < _colIndexes.length; colIx++ ) {
+				int col = _colIndexes[colIx];
+				double cellVal = getData(i, colIx);
+				target.quickSetValue(i, col, cellVal);
+			}
+		}
+	}
+
+	@Override
+	public void decompressToBlock(MatrixBlock target, int[] colIndexTargets) {
+		int nrow = getNumRows();
+		int ncol = getNumCols();
+		for( int i = 0; i < nrow; i++ ) {
+			for( int colIx = 0; colIx < ncol; colIx++ ) {
+				int origMatrixColIx = getColIndex(colIx);
+				int col = colIndexTargets[origMatrixColIx];
+				double cellVal = getData(i, colIx);
+				target.quickSetValue(i, col, cellVal);
+			}
+		}
+	}
+
+	@Override
+	public void decompressToBlock(MatrixBlock target, int colpos) {
+		int nrow = getNumRows();
+		for( int i = 0; i < nrow; i++ ) {
+			double cellVal = getData(i, colpos);
+			target.quickSetValue(i, 0, cellVal);
+		}
+	}
+	
+	@Override
+	public double get(int r, int c) {
+		//find local column index
+		int ix = Arrays.binarySearch(_colIndexes, c);
+		if( ix < 0 )
+			throw new RuntimeException("Column index "+c+" not in DDC group.");
+		
+		//get value
+		return getData(r, ix);
+	}
+	
+
+	@Override
+	protected void countNonZerosPerRow(int[] rnnz, int rl, int ru) {
+		int ncol = getNumCols();
+		for( int i = rl; i < ru; i++ ) {
+			int lnnz = 0;
+			for( int colIx=0; colIx < ncol; colIx++ )
+				lnnz += (getData(i, colIx) != 0) ? 1 : 0;
+			rnnz[i-rl] += lnnz;
+		}
+	}
+	
+	@Override
+	public void unaryAggregateOperations(AggregateUnaryOperator op, MatrixBlock result, int rl, int ru)
+		throws DMLRuntimeException 
+	{
+		//sum and sumsq (reduceall/reducerow over tuples and counts)
+		if( op.aggOp.increOp.fn instanceof KahanPlus || op.aggOp.increOp.fn instanceof KahanPlusSq ) 
+		{
+			KahanFunction kplus = (op.aggOp.increOp.fn instanceof KahanPlus) ?
+					KahanPlus.getKahanPlusFnObject() : KahanPlusSq.getKahanPlusSqFnObject();
+			
+			if( op.indexFn instanceof ReduceAll )
+				computeSum(result, kplus);
+			else if( op.indexFn instanceof ReduceCol )
+				computeRowSums(result, kplus, rl, ru);
+			else if( op.indexFn instanceof ReduceRow )
+				computeColSums(result, kplus);
+		}
+		//min and max (reduceall/reducerow over tuples only)
+		else if(op.aggOp.increOp.fn instanceof Builtin 
+				&& (((Builtin)op.aggOp.increOp.fn).getBuiltinCode()==BuiltinCode.MAX 
+				|| ((Builtin)op.aggOp.increOp.fn).getBuiltinCode()==BuiltinCode.MIN)) 
+		{		
+			Builtin builtin = (Builtin) op.aggOp.increOp.fn;
+
+			if( op.indexFn instanceof ReduceAll )
+				computeMxx(result, builtin, false);
+			else if( op.indexFn instanceof ReduceCol )
+				computeRowMxx(result, builtin, rl, ru);
+			else if( op.indexFn instanceof ReduceRow )
+				computeColMxx(result, builtin, false);
+		}
+	}
+	
+	protected void computeSum(MatrixBlock result, KahanFunction kplus) {
+		int nrow = getNumRows();
+		int ncol = getNumCols();
+		KahanObject kbuff = new KahanObject(result.quickGetValue(0, 0), result.quickGetValue(0, 1));
+		
+		for( int i=0; i<nrow; i++ )
+			for( int j=0; j<ncol; j++ )
+				kplus.execute2(kbuff, getData(i, j));
+		
+		result.quickSetValue(0, 0, kbuff._sum);
+		result.quickSetValue(0, 1, kbuff._correction);
+	}
+	
+	protected void computeColSums(MatrixBlock result, KahanFunction kplus) {
+		int nrow = getNumRows();
+		int ncol = getNumCols();
+		KahanObject[] kbuff = new KahanObject[getNumCols()];
+		for( int j=0; j<ncol; j++ )
+			kbuff[j] = new KahanObject(result.quickGetValue(0, _colIndexes[j]), 
+					result.quickGetValue(1, _colIndexes[j]));
+		
+		for( int i=0; i<nrow; i++ )
+			for( int j=0; j<ncol; j++ )
+				kplus.execute2(kbuff[j], getData(i, j));
+		
+		for( int j=0; j<ncol; j++ ) {
+			result.quickSetValue(0, _colIndexes[j], kbuff[j]._sum);
+			result.quickSetValue(1, _colIndexes[j], kbuff[j]._correction);
+		}
+	}
+
+	protected void computeRowSums(MatrixBlock result, KahanFunction kplus, int rl, int ru) {
+		int ncol = getNumCols();
+		KahanObject kbuff = new KahanObject(0, 0);
+		
+		for( int i=rl; i<ru; i++ ) {
+			kbuff.set(result.quickGetValue(i, 0), result.quickGetValue(i, 1));
+			for( int j=0; j<ncol; j++ )
+				kplus.execute2(kbuff, getData(i, j));
+			result.quickSetValue(i, 0, kbuff._sum);
+			result.quickSetValue(i, 1, kbuff._correction);
+		}
+	}
+	
+	protected void computeRowMxx(MatrixBlock result, Builtin builtin, int rl, int ru) {
+		double[] c = result.getDenseBlock();
+		int ncol = getNumCols();
+		
+		for( int i=rl; i<ru; i++ )
+			for( int j=0; j<ncol; j++ )
+				c[i] = builtin.execute2(c[i], getData(i, j));
+	}
+	
+	
+
+	/**
+	 * Generic get value for byte-length-agnostic access.
+	 * 
+	 * @param r global row index
+	 * @param colIx local column index 
+	 * @return value
+	 */
+	protected abstract double getData(int r, int colIx);
+	
+	/**
+	 * Generic set value for byte-length-agnostic write 
+	 * of encoded value.
+	 * 
+	 * @param r global row index
+	 * @param code encoded value 
+	 */
+	protected abstract void setData(int r, int code);
+	
+	@Override
+	public long estimateInMemorySize() {
+		return super.estimateInMemorySize();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC1.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC1.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC1.java
new file mode 100644
index 0000000..4db871f
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC1.java
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.compress;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.compress.utils.ConverterUtils;
+import org.apache.sysml.runtime.functionobjects.KahanFunction;
+import org.apache.sysml.runtime.functionobjects.KahanPlus;
+import org.apache.sysml.runtime.instructions.cp.KahanObject;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.operators.ScalarOperator;
+
+/**
+ * Class to encapsulate information about a column group that is encoded with
+ * dense dictionary encoding (DDC) using 1 byte codes.
+ */
+public class ColGroupDDC1 extends ColGroupDDC 
+{
+	private static final long serialVersionUID = 5204955589230760157L;
+	
+	private byte[] _data;
+
+	public ColGroupDDC1() {
+		super();
+	}
+	
+	public ColGroupDDC1(int[] colIndices, int numRows, UncompressedBitmap ubm) {
+		super(colIndices, numRows, ubm);
+		_data = new byte[numRows];
+		
+		int numVals = ubm.getNumValues();
+		int numCols = ubm.getNumColumns();
+		
+		//materialize zero values, if necessary
+		if( ubm.getNumOffsets() < (long)numRows * numCols ) {
+			int zeroIx = containsAllZeroValue();
+			if( zeroIx < 0 ) {
+				zeroIx = numVals;
+				_values = Arrays.copyOf(_values, _values.length+numCols);
+			}
+			Arrays.fill(_data, (byte)zeroIx);
+		}
+		
+		//iterate over values and write dictionary codes
+		for( int i=0; i<numVals; i++ ) {
+			int[] tmpList = ubm.getOffsetsList(i).extractValues();
+			int tmpListSize = ubm.getNumOffsets(i); 
+			for( int k=0; k<tmpListSize; k++ )
+				_data[tmpList[k]] = (byte)i;
+		}
+	}
+	
+	public ColGroupDDC1(int[] colIndices, int numRows, double[] values, byte[] data) {
+		super(colIndices, numRows, values);
+		_data = data;
+	}
+
+	@Override
+	public CompressionType getCompType() {
+		return CompressionType.DDC1;
+	}
+	
+	@Override
+	protected double getData(int r, int colIx) {
+		return _values[(_data[r]&0xFF)*getNumCols()+colIx];
+	}
+	
+	@Override
+	protected void setData(int r, int code) {
+		_data[r] = (byte)code;
+	}
+	
+	@Override
+	public void write(DataOutput out) throws IOException {
+		int numCols = getNumCols();
+		int numVals = getNumValues();
+		out.writeInt(_numRows);
+		out.writeInt(numCols);
+		out.writeInt(numVals);
+		
+		//write col indices
+		for( int i=0; i<_colIndexes.length; i++ )
+			out.writeInt( _colIndexes[i] );
+		
+		//write distinct values
+		for( int i=0; i<_values.length; i++ )
+			out.writeDouble(_values[i]);
+
+		//write data
+		for( int i=0; i<_numRows; i++ )
+			out.writeByte(_data[i]);
+	}
+
+	@Override
+	public void readFields(DataInput in) throws IOException {
+		_numRows = in.readInt();
+		int numCols = in.readInt();
+		int numVals = in.readInt();
+		
+		//read col indices
+		_colIndexes = new int[ numCols ];
+		for( int i=0; i<numCols; i++ )
+			_colIndexes[i] = in.readInt();
+		
+		//read distinct values
+		_values = new double[numVals*numCols];
+		for( int i=0; i<numVals*numCols; i++ )
+			_values[i] = in.readDouble();
+		
+		//read data
+		_data = new byte[_numRows];
+		for( int i=0; i<_numRows; i++ )
+			_data[i] = in.readByte();
+	}
+
+	@Override
+	public long getExactSizeOnDisk() {
+		long ret = 12; //header
+		//col indices
+		ret += 4 * _colIndexes.length; 
+		//distinct values (groups of values)
+		ret += 8 * _values.length;
+		//data
+		ret += 1 * _data.length;
+		
+		return ret;
+	}
+
+	@Override
+	public long estimateInMemorySize() {
+		long size = super.estimateInMemorySize();
+		
+		//adding data size
+		if (_data != null)
+			size += _data.length;
+	
+		return size;
+	}
+	
+	@Override
+	public void decompressToBlock(MatrixBlock target, int rl, int ru) {
+		int ncol = getNumCols();
+		for( int i = rl; i < ru; i++ )
+			for( int j=0; j<ncol; j++ )
+				target.appendValue(i, _colIndexes[j], _values[(_data[i]&0xFF)*ncol+j]);
+		//note: append ok because final sort per row 
+	}
+	
+	@Override
+	protected void countNonZerosPerRow(int[] rnnz, int rl, int ru) {
+		final int ncol = getNumCols();
+		final int numVals = getNumValues();
+		
+		//pre-aggregate nnz per value tuple
+		int[] counts = new int[numVals];
+		for( int k=0, valOff=0; k<numVals; k++, valOff+=ncol )
+			for( int j=0; j<ncol; j++ )
+				counts[k] += (_values[valOff+j]!=0) ? 1 : 0;
+		
+		//scan data and add counts to output rows
+		for( int i = rl; i < ru; i++ )
+			rnnz[i-rl] += counts[_data[i]&0xFF];
+	}
+	
+	@Override
+	public void rightMultByVector(MatrixBlock vector, MatrixBlock result, int rl, int ru) 
+		throws DMLRuntimeException 
+	{
+		double[] b = ConverterUtils.getDenseVector(vector);
+		double[] c = result.getDenseBlock();
+		final int numCols = getNumCols();
+		final int numVals = getNumValues();
+		
+		//prepare reduced rhs w/ relevant values
+		double[] sb = new double[numCols];
+		for (int j = 0; j < numCols; j++) {
+			sb[j] = b[_colIndexes[j]];
+		}
+		
+		//pre-aggregate all distinct values (guaranteed <=255)
+		double[] vals = preaggValues(numVals, sb);
+		
+		//iterative over codes and add to output
+		for( int i=rl; i<ru; i++ ) {
+			c[i] += vals[_data[i]&0xFF];
+		}
+	}
+	
+	public static void rightMultByVector(ColGroupDDC1[] grps, MatrixBlock vector, MatrixBlock result, int rl, int ru) 
+		throws DMLRuntimeException 
+	{
+		double[] b = ConverterUtils.getDenseVector(vector);
+		double[] c = result.getDenseBlock();
+		
+		//prepare distinct values once
+		double[][] vals = new double[grps.length][];
+		for( int i=0; i<grps.length; i++ ) {
+			//prepare reduced rhs w/ relevant values
+			double[] sb = new double[grps[i].getNumCols()];
+			for (int j = 0; j < sb.length; j++) {
+				sb[j] = b[grps[i]._colIndexes[j]];
+			}	
+			//pre-aggregate all distinct values (guaranteed <=255)
+			vals[i] = grps[i].preaggValues(grps[i].getNumValues(), sb);
+		}
+		
+		//cache-conscious matrix-vector multiplication
+		//iterative over codes of all groups and add to output
+		int blksz = 2048; //16KB
+		for( int bi=rl; bi<ru; bi+=blksz )
+			for( int j=0; j<grps.length; j++ )
+				for( int i=bi; i<Math.min(bi+blksz, ru); i++ )
+					c[i] += vals[j][grps[j]._data[i]&0xFF];
+	}
+
+	@Override
+	public void leftMultByRowVector(MatrixBlock vector, MatrixBlock result) throws DMLRuntimeException {
+		double[] a = ConverterUtils.getDenseVector(vector);
+		double[] c = result.getDenseBlock();
+		final int nrow = getNumRows();
+		final int ncol = getNumCols();
+		final int numVals = getNumValues();
+		
+		if( 8*numVals < getNumRows() ) 
+		{
+			//iterative over codes and pre-aggregate inputs per code (guaranteed <=255)
+			//temporary array also avoids false sharing in multi-threaded environments
+			double[] vals = new double[numVals];
+			for( int i=0; i<nrow; i++ ) {
+				vals[_data[i]&0xFF] += a[i];
+			}
+			
+			//post-scaling of pre-aggregate with distinct values
+			for( int k=0, valOff=0; k<numVals; k++, valOff+=ncol ) {
+				double aval = vals[k];
+				for( int j=0; j<ncol; j++ ) {
+					int colIx = _colIndexes[j];
+					c[colIx] += aval * _values[valOff+j];
+				}	
+			}
+		}
+		else //general case
+		{
+			//iterate over codes, compute all, and add to the result
+			for( int i=0; i<nrow; i++ ) {
+				double aval = a[i];
+				if( aval != 0 ) {
+					int valOff = (_data[i]&0xFF) * ncol;
+					for( int j=0; j<ncol; j++ ) {
+						int colIx = _colIndexes[j];
+						c[colIx] += aval * _values[valOff+j];
+					}
+				}
+			}	
+		}
+	}
+	
+	@Override
+	protected void computeSum(MatrixBlock result, KahanFunction kplus) {
+		final int nrow = getNumRows();
+		final int ncol = getNumCols();
+		final int numVals = getNumValues();
+		
+		//iterative over codes and count per code (guaranteed <=255)
+		int[] counts = new int[numVals];
+		for( int i=0; i<nrow; i++ ) {
+			counts[_data[i]&0xFF] ++;
+		}
+		
+		//post-scaling of pre-aggregate with distinct values
+		KahanObject kbuff = new KahanObject(result.quickGetValue(0, 0), result.quickGetValue(0, 1));
+		for( int k=0, valOff=0; k<numVals; k++, valOff+=ncol ) {
+			int cntk = counts[k];
+			for( int j=0; j<ncol; j++ )
+				kplus.execute3(kbuff, _values[ valOff+j], cntk);
+		}
+		
+		result.quickSetValue(0, 0, kbuff._sum);
+		result.quickSetValue(0, 1, kbuff._correction);
+	}
+	
+	
+	@Override
+	protected void computeRowSums(MatrixBlock result, KahanFunction kplus, int rl, int ru) {
+		KahanObject kbuff = new KahanObject(0, 0);
+		KahanPlus kplus2 = KahanPlus.getKahanPlusFnObject();
+		double[] c = result.getDenseBlock();
+		
+		//pre-aggregate nnz per value tuple
+		double[] vals = sumAllValues(kplus, kbuff);
+		
+		//scan data and add to result (use kahan plus not general KahanFunction
+		//for correctness in case of sqk+)
+		for( int i=rl; i<ru; i++ ) {
+			kbuff.set(c[2*i], c[2*i+1]);
+			kplus2.execute2(kbuff, vals[_data[i]&0xFF]);
+			c[2*i] = kbuff._sum;
+			c[2*i+1] = kbuff._correction;
+		}
+	}
+	
+	public static void computeRowSums(ColGroupDDC1[] grps, MatrixBlock result, KahanFunction kplus, int rl, int ru) 
+		throws DMLRuntimeException 
+	{
+		KahanObject kbuff = new KahanObject(0, 0);
+		KahanPlus kplus2 = KahanPlus.getKahanPlusFnObject();
+		double[] c = result.getDenseBlock();
+		
+		//prepare distinct values once
+		double[][] vals = new double[grps.length][];
+		for( int i=0; i<grps.length; i++ ) {
+			//pre-aggregate all distinct values (guaranteed <=255)
+			vals[i] = grps[i].sumAllValues(kplus, kbuff);
+		}
+		
+		//cache-conscious row sums operations 
+		//iterative over codes of all groups and add to output
+		//(use kahan plus not general KahanFunction for correctness in case of sqk+)
+		int blksz = 1024; //16KB
+		for( int bi=rl; bi<ru; bi+=blksz )
+			for( int j=0; j<grps.length; j++ )
+				for( int i=bi; i<Math.min(bi+blksz, ru); i++ ) { 
+					kbuff.set(c[2*i], c[2*i+1]);
+					kplus2.execute2(kbuff, vals[j][grps[j]._data[i]&0xFF]);
+					c[2*i] = kbuff._sum;
+					c[2*i+1] = kbuff._correction;
+				}
+	}
+	
+	@Override
+	public ColGroup scalarOperation(ScalarOperator op) throws DMLRuntimeException {
+		//fast path: sparse-safe and -unsafe operations
+		//as zero are represented, it is sufficient to simply apply the scalar op
+		return new ColGroupDDC1(_colIndexes, _numRows, applyScalarOp(op), _data);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC2.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC2.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC2.java
new file mode 100644
index 0000000..5f29979
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC2.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.compress;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.compress.utils.ConverterUtils;
+import org.apache.sysml.runtime.functionobjects.KahanFunction;
+import org.apache.sysml.runtime.functionobjects.KahanPlus;
+import org.apache.sysml.runtime.instructions.cp.KahanObject;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.operators.ScalarOperator;
+
+/**
+ * Class to encapsulate information about a column group that is encoded with
+ * dense dictionary encoding (DDC) using 2 byte codes.
+ */
+public class ColGroupDDC2 extends ColGroupDDC 
+{
+	private static final long serialVersionUID = -3995768285207071013L;
+	
+	private static final int MAX_TMP_VALS = 32*1024;
+	
+	private char[] _data;
+
+	public ColGroupDDC2() {
+		super();
+	}
+	
+	public ColGroupDDC2(int[] colIndices, int numRows, UncompressedBitmap ubm) {
+		super(colIndices, numRows, ubm);
+		_data = new char[numRows];
+		
+		int numVals = ubm.getNumValues();
+		int numCols = ubm.getNumColumns();
+		
+		//materialize zero values, if necessary
+		if( ubm.getNumOffsets() < (long)numRows * numCols ) {
+			int zeroIx = containsAllZeroValue();
+			if( zeroIx < 0 ) {
+				zeroIx = numVals;
+				_values = Arrays.copyOf(_values, _values.length+numCols);
+			}
+			Arrays.fill(_data, (char)zeroIx);
+		}
+		
+		//iterate over values and write dictionary codes
+		for( int i=0; i<numVals; i++ ) {
+			int[] tmpList = ubm.getOffsetsList(i).extractValues();
+			int tmpListSize = ubm.getNumOffsets(i); 
+			for( int k=0; k<tmpListSize; k++ )
+				_data[tmpList[k]] = (char)i;
+		}
+	}
+	
+	public ColGroupDDC2(int[] colIndices, int numRows, double[] values, char[] data) {
+		super(colIndices, numRows, values);
+		_data = data;
+	}
+
+	@Override
+	public CompressionType getCompType() {
+		return CompressionType.DDC2;
+	}
+	
+	@Override
+	protected double getData(int r, int colIx) {
+		return _values[_data[r]*getNumCols()+colIx];
+	}
+
+	@Override
+	protected void setData(int r, int code) {
+		_data[r] = (char)code;
+	}
+	
+	@Override
+	public void write(DataOutput out) throws IOException {
+		int numCols = getNumCols();
+		int numVals = getNumValues();
+		out.writeInt(_numRows);
+		out.writeInt(numCols);
+		out.writeInt(numVals);
+		
+		//write col indices
+		for( int i=0; i<_colIndexes.length; i++ )
+			out.writeInt( _colIndexes[i] );
+		
+		//write distinct values
+		for( int i=0; i<_values.length; i++ )
+			out.writeDouble(_values[i]);
+
+		//write data
+		for( int i=0; i<_numRows; i++ )
+			out.writeChar(_data[i]);
+	}
+
+	@Override
+	public void readFields(DataInput in) throws IOException {
+		_numRows = in.readInt();
+		int numCols = in.readInt();
+		int numVals = in.readInt();
+		
+		//read col indices
+		_colIndexes = new int[ numCols ];
+		for( int i=0; i<numCols; i++ )
+			_colIndexes[i] = in.readInt();
+		
+		//read distinct values
+		_values = new double[numVals*numCols];
+		for( int i=0; i<numVals*numCols; i++ )
+			_values[i] = in.readDouble();
+		
+		//read data
+		_data = new char[_numRows];
+		for( int i=0; i<_numRows; i++ )
+			_data[i] = in.readChar();
+	}
+
+	@Override
+	public long getExactSizeOnDisk() {
+		long ret = 12; //header
+		//col indices
+		ret += 4 * _colIndexes.length; 
+		//distinct values (groups of values)
+		ret += 8 * _values.length;
+		//data
+		ret += 2 * _data.length;
+		
+		return ret;
+	}
+	
+	@Override
+	public long estimateInMemorySize() {
+		long size = super.estimateInMemorySize();
+		
+		//adding data size
+		if (_data != null)
+			size += 2 * _data.length;
+	
+		return size;
+	}
+	
+	@Override
+	public void decompressToBlock(MatrixBlock target, int rl, int ru) {
+		int ncol = getNumCols();
+		for( int i = rl; i < ru; i++ )
+			for( int j=0; j<ncol; j++ )
+				target.appendValue(i, _colIndexes[j], _values[_data[i]*ncol+j]);
+		//note: append ok because final sort per row 
+	}
+	
+	@Override
+	protected void countNonZerosPerRow(int[] rnnz, int rl, int ru) {
+		final int ncol = getNumCols();
+		final int numVals = getNumValues();
+		
+		//pre-aggregate nnz per value tuple
+		int[] counts = new int[numVals];
+		for( int k=0, valOff=0; k<numVals; k++, valOff+=ncol )
+			for( int j=0; j<ncol; j++ )
+				counts[k] += (_values[valOff+j]!=0) ? 1 : 0;
+		
+		//scan data and add counts to output rows
+		for( int i = rl; i < ru; i++ )
+			rnnz[i-rl] += counts[_data[i]];
+	}
+	
+	@Override
+	public void rightMultByVector(MatrixBlock vector, MatrixBlock result, int rl, int ru) throws DMLRuntimeException {
+		double[] b = ConverterUtils.getDenseVector(vector);
+		double[] c = result.getDenseBlock();
+		final int numCols = getNumCols();
+		final int numVals = getNumValues();
+
+		//prepare reduced rhs w/ relevant values
+		double[] sb = new double[numCols];
+		for (int j = 0; j < numCols; j++) {
+			sb[j] = b[_colIndexes[j]];
+		}
+		
+		//pre-aggregate all distinct values 
+		double[] vals = preaggValues(numVals, sb);
+
+		//iterative over codes and add to output
+		for( int i=rl; i<ru; i++ )
+			c[i] += vals[_data[i]];
+	}
+
+	@Override
+	public void leftMultByRowVector(MatrixBlock vector, MatrixBlock result) 
+		throws DMLRuntimeException 
+	{
+		double[] a = ConverterUtils.getDenseVector(vector);
+		double[] c = result.getDenseBlock();
+		final int nrow = getNumRows();
+		final int ncol = getNumCols();
+		final int numVals = getNumValues();
+		
+		if( 8*numVals < getNumRows() )
+		{
+			//iterative over codes and pre-aggregate inputs per code
+			//temporary array also avoids false sharing in multi-threaded environments
+			double[] vals = new double[numVals];
+			for( int i=0; i<nrow; i++ ) {
+				vals[_data[i]] += a[i];
+			}
+			
+			//post-scaling of pre-aggregate with distinct values
+			for( int k=0, valOff=0; k<numVals; k++, valOff+=ncol ) {
+				double aval = vals[k];
+				for( int j=0; j<ncol; j++ ) {
+					int colIx = _colIndexes[j];
+					c[colIx] += aval * _values[valOff+j];
+				}	
+			}
+		}
+		else //general case
+		{
+		
+			//iterate over codes, compute all, and add to the result
+			for( int i=0; i<nrow; i++ ) {
+				double aval = a[i];
+				if( aval != 0 ) {
+					int valOff = _data[i] * ncol;
+					for( int j=0; j<ncol; j++ ) {
+						int colIx = _colIndexes[j];
+						c[colIx] += aval * _values[valOff+j];
+					}
+				}
+			}
+		}
+	}
+	
+	@Override
+	protected void computeSum(MatrixBlock result, KahanFunction kplus) {
+		final int nrow = getNumRows();
+		final int ncol = getNumCols();
+		final int numVals = getNumValues();
+		
+		if( numVals < MAX_TMP_VALS )
+		{
+			//iterative over codes and count per code
+			int[] counts = new int[numVals];
+			for( int i=0; i<nrow; i++ ) {
+				counts[_data[i]] ++;
+			}
+			
+			//post-scaling of pre-aggregate with distinct values
+			KahanObject kbuff = new KahanObject(result.quickGetValue(0, 0), result.quickGetValue(0, 1));
+			for( int k=0, valOff=0; k<numVals; k++, valOff+=ncol ) {
+				int cntk = counts[k];
+				for( int j=0; j<ncol; j++ )
+					kplus.execute3(kbuff, _values[ valOff+j], cntk);
+			}
+			
+			result.quickSetValue(0, 0, kbuff._sum);
+			result.quickSetValue(0, 1, kbuff._correction);
+		}
+		else //general case 
+		{
+			super.computeSum(result, kplus);
+		}
+	}
+	
+	
+	@Override
+	protected void computeRowSums(MatrixBlock result, KahanFunction kplus, int rl, int ru) {
+		KahanObject kbuff = new KahanObject(0, 0);
+		KahanPlus kplus2 = KahanPlus.getKahanPlusFnObject();
+		double[] c = result.getDenseBlock();
+		
+		//pre-aggregate nnz per value tuple
+		double[] vals = sumAllValues(kplus, kbuff);
+		
+		//scan data and add to result (use kahan plus not general KahanFunction
+		//for correctness in case of sqk+)
+		for( int i=rl; i<ru; i++ ) {
+			kbuff.set(c[2*i], c[2*i+1]);
+			kplus2.execute2(kbuff, vals[_data[i]]);
+			c[2*i] = kbuff._sum;
+			c[2*i+1] = kbuff._correction;
+		}
+	}
+	
+	@Override
+	public ColGroup scalarOperation(ScalarOperator op) throws DMLRuntimeException {
+		//fast path: sparse-safe and -unsafe operations
+		//as zero are represented, it is sufficient to simply apply the scalar op
+		return new ColGroupDDC2(_colIndexes, _numRows, applyScalarOp(op), _data);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/ColGroupOLE.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupOLE.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroupOLE.java
index 696adf2..f47a432 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/ColGroupOLE.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupOLE.java
@@ -22,20 +22,16 @@ package org.apache.sysml.runtime.compress;
 import java.util.Arrays;
 import java.util.Iterator;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.compress.utils.ConverterUtils;
 import org.apache.sysml.runtime.compress.utils.LinearAlgebraUtils;
 import org.apache.sysml.runtime.functionobjects.Builtin;
-import org.apache.sysml.runtime.functionobjects.Builtin.BuiltinCode;
 import org.apache.sysml.runtime.functionobjects.KahanFunction;
 import org.apache.sysml.runtime.functionobjects.KahanPlus;
-import org.apache.sysml.runtime.functionobjects.KahanPlusSq;
-import org.apache.sysml.runtime.functionobjects.ReduceAll;
-import org.apache.sysml.runtime.functionobjects.ReduceCol;
-import org.apache.sysml.runtime.functionobjects.ReduceRow;
 import org.apache.sysml.runtime.instructions.cp.KahanObject;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
-import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator;
 import org.apache.sysml.runtime.matrix.operators.ScalarOperator;
 
 /**
@@ -43,12 +39,14 @@ import org.apache.sysml.runtime.matrix.operators.ScalarOperator;
  * simple lists of offsets for each set of distinct values.
  * 
  */
-public class ColGroupOLE extends ColGroupBitmap 
+public class ColGroupOLE extends ColGroupOffset 
 {
 	private static final long serialVersionUID = -9157676271360528008L;
 
+	private static final Log LOG = LogFactory.getLog(ColGroupOLE.class.getName());
+	
 	public ColGroupOLE() {
-		super(CompressionType.OLE_BITMAP);
+		super();
 	}
 	
 	/**
@@ -64,14 +62,15 @@ public class ColGroupOLE extends ColGroupBitmap
 	 */
 	public ColGroupOLE(int[] colIndices, int numRows, UncompressedBitmap ubm) 
 	{
-		super(CompressionType.OLE_BITMAP, colIndices, numRows, ubm);
+		super(colIndices, numRows, ubm);
 
 		// compress the bitmaps
 		final int numVals = ubm.getNumValues();
 		char[][] lbitmaps = new char[numVals][];
 		int totalLen = 0;
 		for( int i=0; i<numVals; i++ ) {
-			lbitmaps[i] = BitmapEncoder.genOffsetBitmap(ubm.getOffsetsList(i));
+			lbitmaps[i] = BitmapEncoder.genOffsetBitmap(
+				ubm.getOffsetsList(i).extractValues(), ubm.getNumOffsets(i));
 			totalLen += lbitmaps[i].length;
 		}
 		
@@ -95,13 +94,24 @@ public class ColGroupOLE extends ColGroupBitmap
 				_skiplist[k] = bix;
 			}		
 		}
+		
+		//debug output
+		double ucSize = MatrixBlock.estimateSizeDenseInMemory(numRows, colIndices.length);
+		if( estimateInMemorySize() > ucSize )
+			LOG.warn("OLE group larger than UC dense: "+estimateInMemorySize()+" "+ucSize);
 	}
 
 	public ColGroupOLE(int[] colIndices, int numRows, boolean zeros, double[] values, char[] bitmaps, int[] bitmapOffs) {
-		super(CompressionType.OLE_BITMAP, colIndices, numRows, zeros, values);
+		super(colIndices, numRows, zeros, values);
 		_data = bitmaps;
 		_ptr = bitmapOffs;
 	}
+	
+
+	@Override
+	public CompressionType getCompType() {
+		return CompressionType.OLE_BITMAP;
+	}
 
 	@Override
 	public Iterator<Integer> getDecodeIterator(int k) {
@@ -251,7 +261,7 @@ public class ColGroupOLE extends ColGroupBitmap
 		}
 		
 		double[] rvalues = applyScalarOp(op, val0, getNumCols());		
-		char[] lbitmap = BitmapEncoder.genOffsetBitmap(loff);
+		char[] lbitmap = BitmapEncoder.genOffsetBitmap(loff, loff.length);
 		char[] rbitmaps = Arrays.copyOf(_data, _data.length+lbitmap.length);
 		System.arraycopy(lbitmap, 0, rbitmaps, _data.length, lbitmap.length);
 		int[] rbitmapOffs = Arrays.copyOf(_ptr, _ptr.length+1);
@@ -284,7 +294,7 @@ public class ColGroupOLE extends ColGroupBitmap
 			//best configuration aligns with L3 cache size (x*vcores*64K*8B < L3)
 			//x=4 leads to a good yet slightly conservative compromise for single-/
 			//multi-threaded and typical number of cores and L3 cache sizes
-			final int blksz2 = ColGroupBitmap.WRITE_CACHE_BLKSZ;
+			final int blksz2 = ColGroupOffset.WRITE_CACHE_BLKSZ;
 			
 			//step 1: prepare position and value arrays
 			int[] apos = skipScan(numVals, rl);
@@ -365,7 +375,7 @@ public class ColGroupOLE extends ColGroupBitmap
 		if( LOW_LEVEL_OPT && numVals > 1 && _numRows > blksz )
 		{
 			//cache blocking config (see matrix-vector mult for explanation)
-			final int blksz2 = ColGroupBitmap.READ_CACHE_BLKSZ;
+			final int blksz2 = ColGroupOffset.READ_CACHE_BLKSZ;
 			
 			//step 1: prepare position and value arrays
 			
@@ -426,46 +436,7 @@ public class ColGroupOLE extends ColGroupBitmap
 	}
 
 	@Override
-	public void unaryAggregateOperations(AggregateUnaryOperator op, MatrixBlock result) 
-		throws DMLRuntimeException 
-	{
-		unaryAggregateOperations(op, result, 0, getNumRows());
-	}
-	
-	@Override
-	public void unaryAggregateOperations(AggregateUnaryOperator op, MatrixBlock result, int rl, int ru) 
-		throws DMLRuntimeException 
-	{
-		//sum and sumsq (reduceall/reducerow over tuples and counts)
-		if( op.aggOp.increOp.fn instanceof KahanPlus || op.aggOp.increOp.fn instanceof KahanPlusSq ) 
-		{
-			KahanFunction kplus = (op.aggOp.increOp.fn instanceof KahanPlus) ?
-					KahanPlus.getKahanPlusFnObject() : KahanPlusSq.getKahanPlusSqFnObject();
-			
-			if( op.indexFn instanceof ReduceAll )
-				computeSum(result, kplus);
-			else if( op.indexFn instanceof ReduceCol )
-				computeRowSums(result, kplus, rl, ru);
-			else if( op.indexFn instanceof ReduceRow )
-				computeColSums(result, kplus);
-		}
-		//min and max (reduceall/reducerow over tuples only)
-		else if(op.aggOp.increOp.fn instanceof Builtin 
-				&& (((Builtin)op.aggOp.increOp.fn).getBuiltinCode()==BuiltinCode.MAX 
-				|| ((Builtin)op.aggOp.increOp.fn).getBuiltinCode()==BuiltinCode.MIN)) 
-		{		
-			Builtin builtin = (Builtin) op.aggOp.increOp.fn;
-
-			if( op.indexFn instanceof ReduceAll )
-				computeMxx(result, builtin);
-			else if( op.indexFn instanceof ReduceCol )
-				computeRowMxx(result, builtin, rl, ru);
-			else if( op.indexFn instanceof ReduceRow )
-				computeColMxx(result, builtin);
-		}
-	}
-
-	private void computeSum(MatrixBlock result, KahanFunction kplus)
+	protected final void computeSum(MatrixBlock result, KahanFunction kplus)
 	{
 		KahanObject kbuff = new KahanObject(result.quickGetValue(0, 0), result.quickGetValue(0, 1));
 		
@@ -493,41 +464,88 @@ public class ColGroupOLE extends ColGroupBitmap
 		result.quickSetValue(0, 1, kbuff._correction);
 	}
 
-	private void computeRowSums(MatrixBlock result, KahanFunction kplus, int rl, int ru)
+	@Override
+	protected final void computeRowSums(MatrixBlock result, KahanFunction kplus, int rl, int ru)
 	{
 		KahanObject kbuff = new KahanObject(0, 0);
-	
+		KahanPlus kplus2 = KahanPlus.getKahanPlusFnObject();
+		
 		final int blksz = BitmapEncoder.BITMAP_BLOCK_SZ;
 		final int numVals = getNumValues();
 		double[] c = result.getDenseBlock();
 		
-		//iterate over all values and their bitmaps
-		for (int k = 0; k < numVals; k++) 
+		if( ALLOW_CACHE_CONSCIOUS_ROWSUMS &&
+			LOW_LEVEL_OPT && numVals > 1 && _numRows > blksz )
 		{
-			//prepare value-to-add for entire value bitmap
-			int boff = _ptr[k];
-			int blen = len(k);
-			double val = sumValues(k);
+			final int blksz2 = ColGroupOffset.WRITE_CACHE_BLKSZ/2;
 			
-			//iterate over bitmap blocks and add values
-			if (val != 0) {
-				int slen;
-				int bix = skipScanVal(k, rl);
-				for( int off=bix*blksz; bix<blen && off<ru; bix+=slen+1, off+=blksz ) {
-					slen = _data[boff+bix];
-					for (int i = 1; i <= slen; i++) {
-						int rix = off + _data[boff+bix + i];
-						kbuff.set(c[2*rix], c[2*rix+1]);
-						kplus.execute2(kbuff, val);
-						c[2*rix] = kbuff._sum;
-						c[2*rix+1] = kbuff._correction;
+			//step 1: prepare position and value arrays
+			int[] apos = skipScan(numVals, rl);
+			double[] aval = sumAllValues(kplus, kbuff);
+					
+			//step 2: cache conscious row sums via horizontal scans 
+			for( int bi=rl; bi<ru; bi+=blksz2 ) 
+			{
+				int bimax = Math.min(bi+blksz2, ru);
+				
+				//horizontal segment scan, incl pos maintenance
+				for (int k = 0; k < numVals; k++) {
+					int boff = _ptr[k];
+					int blen = len(k);
+					double val = aval[k];
+					int bix = apos[k];
+					
+					for( int ii=bi; ii<bimax && bix<blen; ii+=blksz ) {
+						//prepare length, start, and end pos
+						int len = _data[boff+bix];
+						int pos = boff+bix+1;
+						
+						//compute partial results
+						for (int i = 0; i < len; i++) {
+							int rix = ii + _data[pos + i];
+							kbuff.set(c[2*rix], c[2*rix+1]);
+							kplus2.execute2(kbuff, val);
+							c[2*rix] = kbuff._sum;
+							c[2*rix+1] = kbuff._correction;
+						}
+						bix += len + 1;
+					}
+
+					apos[k] = bix;
+				}
+			}		
+		}
+		else
+		{
+			//iterate over all values and their bitmaps
+			for (int k = 0; k < numVals; k++) 
+			{
+				//prepare value-to-add for entire value bitmap
+				int boff = _ptr[k];
+				int blen = len(k);
+				double val = sumValues(k, kplus, kbuff);
+				
+				//iterate over bitmap blocks and add values
+				if (val != 0) {
+					int slen;
+					int bix = skipScanVal(k, rl);
+					for( int off=((rl+1)/blksz)*blksz; bix<blen && off<ru; bix+=slen+1, off+=blksz ) {
+						slen = _data[boff+bix];
+						for (int i = 1; i <= slen; i++) {
+							int rix = off + _data[boff+bix + i];
+							kbuff.set(c[2*rix], c[2*rix+1]);
+							kplus2.execute2(kbuff, val);
+							c[2*rix] = kbuff._sum;
+							c[2*rix+1] = kbuff._correction;
+						}
 					}
 				}
 			}
 		}
 	}
 
-	private void computeColSums(MatrixBlock result, KahanFunction kplus)
+	@Override
+	protected final void computeColSums(MatrixBlock result, KahanFunction kplus)
 	{
 		KahanObject kbuff = new KahanObject(0, 0);
 		
@@ -555,7 +573,8 @@ public class ColGroupOLE extends ColGroupBitmap
 		}
 	}
 
-	private void computeRowMxx(MatrixBlock result, Builtin builtin, int rl, int ru)
+	@Override
+	protected final void computeRowMxx(MatrixBlock result, Builtin builtin, int rl, int ru)
 	{
 		//NOTE: zeros handled once for all column groups outside
 		final int blksz = BitmapEncoder.BITMAP_BLOCK_SZ;
@@ -624,7 +643,7 @@ public class ColGroupOLE extends ColGroupBitmap
 	protected void countNonZerosPerRow(int[] rnnz, int rl, int ru)
 	{
 		final int blksz = BitmapEncoder.BITMAP_BLOCK_SZ;
-		final int blksz2 = ColGroupBitmap.WRITE_CACHE_BLKSZ;
+		final int blksz2 = ColGroupOffset.WRITE_CACHE_BLKSZ;
 		final int numVals = getNumValues();
 		final int numCols = getNumCols();
 		

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/ColGroupOffset.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupOffset.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroupOffset.java
new file mode 100644
index 0000000..e49c1a3
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupOffset.java
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.compress;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.compress.utils.LinearAlgebraUtils;
+import org.apache.sysml.runtime.functionobjects.Builtin;
+import org.apache.sysml.runtime.functionobjects.KahanFunction;
+import org.apache.sysml.runtime.functionobjects.KahanPlus;
+import org.apache.sysml.runtime.functionobjects.KahanPlusSq;
+import org.apache.sysml.runtime.functionobjects.ReduceAll;
+import org.apache.sysml.runtime.functionobjects.ReduceCol;
+import org.apache.sysml.runtime.functionobjects.ReduceRow;
+import org.apache.sysml.runtime.functionobjects.Builtin.BuiltinCode;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator;
+
+
+/**
+ * Base class for column groups encoded with various types of bitmap encoding.
+ * 
+ * 
+ * NOTES:
+ *  * OLE: separate storage segment length and bitmaps led to a 30% improvement
+ *    but not applied because more difficult to support both data layouts at the
+ *    same time (distributed/local as well as w/ and w/o low-level opt)
+ */
+public abstract class ColGroupOffset extends ColGroupValue 
+{
+	private static final long serialVersionUID = -1635828933479403125L;
+
+	protected static final boolean CREATE_SKIPLIST = true;
+	
+	protected static final int READ_CACHE_BLKSZ = 2 * BitmapEncoder.BITMAP_BLOCK_SZ;
+	public static final int WRITE_CACHE_BLKSZ = 2 * BitmapEncoder.BITMAP_BLOCK_SZ;
+	public static boolean ALLOW_CACHE_CONSCIOUS_ROWSUMS = true;
+	
+	/** Bitmaps, one per uncompressed value in {@link #_values}. */
+	protected int[] _ptr; //bitmap offsets per value
+	protected char[] _data; //linearized bitmaps (variable length)
+	protected boolean _zeros; //contains zero values
+	
+	protected int[] _skiplist;
+	
+	public ColGroupOffset() {
+		super();
+	}
+	
+	/**
+	 * Main constructor. Stores the headers for the individual bitmaps.
+	 * 
+	 * @param colIndices
+	 *            indices (within the block) of the columns included in this
+	 *            column
+	 * @param numRows
+	 *            total number of rows in the parent block
+	 * @param ubm
+	 *            Uncompressed bitmap representation of the block
+	 */
+	public ColGroupOffset(int[] colIndices, int numRows, UncompressedBitmap ubm) {
+		super(colIndices, numRows, ubm);
+		_zeros = (ubm.getNumOffsets() < numRows);
+	}
+
+	/**
+	 * Constructor for subclass methods that need to create shallow copies
+	 * 
+	 * @param type compression type
+	 * @param colIndices
+	 *            raw column index information
+	 * @param numRows
+	 *            number of rows in the block
+	 * @param zeros ?
+	 * @param values
+	 *            set of distinct values for the block (associated bitmaps are
+	 *            kept in the subclass)
+	 */
+	protected ColGroupOffset(int[] colIndices, int numRows, boolean zeros, double[] values) {
+		super(colIndices, numRows, values);
+		_zeros = zeros;
+	}
+	
+	protected final int len(int k) {
+		return _ptr[k+1] - _ptr[k];
+	}
+
+	protected void createCompressedBitmaps(int numVals, int totalLen, char[][] lbitmaps) {
+		// compact bitmaps to linearized representation
+		_ptr = new int[numVals+1];
+		_data = new char[totalLen];
+		for( int i=0, off=0; i<numVals; i++ ) {
+			int len = lbitmaps[i].length;
+			_ptr[i] = off;
+			System.arraycopy(lbitmaps[i], 0, _data, off, len);
+			off += len;
+		}
+		_ptr[numVals] = totalLen;
+	}
+	
+	@Override
+	public long estimateInMemorySize() {
+		long size = super.estimateInMemorySize();
+		
+		// adding bitmaps size
+		size += 16; //array references
+		if (_data != null) {
+			size += 32 + _ptr.length * 4; // offsets
+			size += 32 + _data.length * 2;    // bitmaps
+		}
+	
+		return size;
+	}
+
+	//generic decompression for OLE/RLE, to be overwritten for performance
+	@Override
+	public void decompressToBlock(MatrixBlock target, int rl, int ru) 
+	{
+		final int numCols = getNumCols();
+		final int numVals = getNumValues();
+		int[] colIndices = getColIndices();
+		
+		// Run through the bitmaps for this column group
+		for (int i = 0; i < numVals; i++) {
+			Iterator<Integer> decoder = getDecodeIterator(i);
+			int valOff = i*numCols;
+
+			while (decoder.hasNext()) {
+				int row = decoder.next();
+				if( row<rl ) continue;
+				if( row>ru ) break;
+				
+				for (int colIx = 0; colIx < numCols; colIx++)
+					target.appendValue(row, colIndices[colIx], _values[valOff+colIx]);
+			}
+		}
+	}
+
+	//generic decompression for OLE/RLE, to be overwritten for performance
+	@Override
+	public void decompressToBlock(MatrixBlock target, int[] colIndexTargets) 
+	{
+		final int numCols = getNumCols();
+		final int numVals = getNumValues();
+		
+		// Run through the bitmaps for this column group
+		for (int i = 0; i < numVals; i++) {
+			Iterator<Integer> decoder = getDecodeIterator(i);
+			int valOff = i*numCols;
+
+			while (decoder.hasNext()) {
+				int row = decoder.next();
+				for (int colIx = 0; colIx < numCols; colIx++) {
+					int origMatrixColIx = getColIndex(colIx);
+					int targetColIx = colIndexTargets[origMatrixColIx];
+					target.quickSetValue(row, targetColIx, _values[valOff+colIx]);
+				}
+			}
+		}
+	}
+	
+	//generic decompression for OLE/RLE, to be overwritten for performance
+	@Override
+	public void decompressToBlock(MatrixBlock target, int colpos) 
+	{
+		final int numCols = getNumCols();
+		final int numVals = getNumValues();
+		
+		// Run through the bitmaps for this column group
+		for (int i = 0; i < numVals; i++) {
+			Iterator<Integer> decoder = getDecodeIterator(i);
+			int valOff = i*numCols;
+
+			while (decoder.hasNext()) {
+				int row = decoder.next();
+				target.quickSetValue(row, 0, _values[valOff+colpos]);
+			}
+		}
+	}
+
+	//generic get for OLE/RLE, to be overwritten for performance
+	//potential: skip scan (segment length agg and run length) instead of decode
+	@Override
+	public double get(int r, int c) {
+		//find local column index
+		int ix = Arrays.binarySearch(_colIndexes, c);
+		if( ix < 0 )
+			throw new RuntimeException("Column index "+c+" not in bitmap group.");
+		
+		//find row index in value offset lists via scan
+		final int numCols = getNumCols();
+		final int numVals = getNumValues();
+		for (int i = 0; i < numVals; i++) {
+			Iterator<Integer> decoder = getDecodeIterator(i);
+			int valOff = i*numCols;
+			while (decoder.hasNext()) {
+				int row = decoder.next();
+				if( row == r )
+					return _values[valOff+ix];
+				else if( row > r )
+					break; //current value
+			}
+		}		
+		return 0;
+	}
+
+	protected final void sumAllValues(double[] b, double[] c)
+	{
+		final int numVals = getNumValues();
+		final int numCols = getNumCols();
+		
+		//vectMultiplyAdd over cols instead of dotProduct over vals because
+		//usually more values than columns
+		for( int i=0, off=0; i<numCols; i++, off+=numVals )
+			LinearAlgebraUtils.vectMultiplyAdd(b[i], _values, c, off, 0, numVals);
+	}
+
+	protected final double mxxValues(int bitmapIx, Builtin builtin)
+	{
+		final int numCols = getNumCols();
+		final int valOff = bitmapIx * numCols;
+		
+		double val = Double.MAX_VALUE * ((builtin.getBuiltinCode()==BuiltinCode.MAX)?-1:1);
+		for( int i = 0; i < numCols; i++ )
+			val = builtin.execute2(val, _values[valOff+i]);
+		
+		return val;
+	}
+
+	public char[] getBitmaps() {
+		return _data;
+	}
+	
+	public int[] getBitmapOffsets() {
+		return _ptr;
+	}
+
+	public boolean hasZeros() {
+		return _zeros;
+	}
+	
+	/**
+	 * @param k
+	 *            index of a specific compressed bitmap (stored in subclass,
+	 *            index same as {@link #getValues})
+	 * @return an object for iterating over the row offsets in this bitmap. Only
+	 *         valid until the next call to this method. May be reused across
+	 *         calls.
+	 */
+	public abstract Iterator<Integer> getDecodeIterator(int k);
+
+	//TODO getDecodeIterator(int k, int rl, int ru)
+
+	/**
+	 * Utility function of sparse-unsafe operations.
+	 * 
+	 * @param ind row indicator vector of non zeros
+	 * @return offsets
+	 * @throws DMLRuntimeException if DMLRuntimeException occurs
+	 */
+	protected int[] computeOffsets(boolean[] ind)
+		throws DMLRuntimeException 
+	{
+		//determine number of offsets
+		int numOffsets = 0;
+		for( int i=0; i<ind.length; i++ )
+			numOffsets += ind[i] ? 1 : 0;
+		
+		//create offset lists
+		int[] ret = new int[numOffsets];
+		for( int i=0, pos=0; i<ind.length; i++ )
+			if( ind[i] )
+				ret[pos++] = i;
+		
+		return ret;
+	}
+
+	@Override
+	public void readFields(DataInput in) 
+		throws IOException 
+	{
+		_numRows = in.readInt();
+		int numCols = in.readInt();
+		int numVals = in.readInt();
+		_zeros = in.readBoolean();
+		
+		//read col indices
+		_colIndexes = new int[ numCols ];
+		for( int i=0; i<numCols; i++ )
+			_colIndexes[i] = in.readInt();
+		
+		//read distinct values
+		_values = new double[numVals*numCols];
+		for( int i=0; i<numVals*numCols; i++ )
+			_values[i] = in.readDouble();
+		
+		//read bitmaps
+		int totalLen = in.readInt();
+		_ptr = new int[numVals+1];
+		_data = new char[totalLen];		
+		for( int i=0, off=0; i<numVals; i++ ) {
+			int len = in.readInt();
+			_ptr[i] = off;
+			for( int j=0; j<len; j++ )
+				_data[off+j] = in.readChar();
+			off += len;
+		}
+		_ptr[numVals] = totalLen;
+	}
+	
+	@Override
+	public void write(DataOutput out) 
+		throws IOException 
+	{
+		int numCols = getNumCols();
+		int numVals = getNumValues();
+		out.writeInt(_numRows);
+		out.writeInt(numCols);
+		out.writeInt(numVals);
+		out.writeBoolean(_zeros);
+		
+		//write col indices
+		for( int i=0; i<_colIndexes.length; i++ )
+			out.writeInt( _colIndexes[i] );
+		
+		//write distinct values
+		for( int i=0; i<_values.length; i++ )
+			out.writeDouble(_values[i]);
+
+		//write bitmaps (lens and data, offset later recreated)
+		int totalLen = 0;
+		for( int i=0; i<numVals; i++ )
+			totalLen += len(i);
+		out.writeInt(totalLen);	
+		for( int i=0; i<numVals; i++ ) {
+			int len = len(i);
+			int off = _ptr[i];
+			out.writeInt(len);
+			for( int j=0; j<len; j++ )
+				out.writeChar(_data[off+j]);
+		}
+	}
+
+	@Override
+	public long getExactSizeOnDisk() {
+		long ret = 13; //header
+		//col indices
+		ret += 4 * _colIndexes.length; 
+		//distinct values (groups of values)
+		ret += 8 * _values.length;
+		//actual bitmaps
+		ret += 4; //total length
+		for( int i=0; i<getNumValues(); i++ )
+			ret += 4 + 2 * len(i);
+		
+		return ret;
+	}
+	
+
+	
+	@Override
+	public void unaryAggregateOperations(AggregateUnaryOperator op, MatrixBlock result, int rl, int ru) 
+		throws DMLRuntimeException 
+	{
+		//sum and sumsq (reduceall/reducerow over tuples and counts)
+		if( op.aggOp.increOp.fn instanceof KahanPlus || op.aggOp.increOp.fn instanceof KahanPlusSq ) 
+		{
+			KahanFunction kplus = (op.aggOp.increOp.fn instanceof KahanPlus) ?
+					KahanPlus.getKahanPlusFnObject() : KahanPlusSq.getKahanPlusSqFnObject();
+			
+			if( op.indexFn instanceof ReduceAll )
+				computeSum(result, kplus);
+			else if( op.indexFn instanceof ReduceCol )
+				computeRowSums(result, kplus, rl, ru);
+			else if( op.indexFn instanceof ReduceRow )
+				computeColSums(result, kplus);
+		}
+		//min and max (reduceall/reducerow over tuples only)
+		else if(op.aggOp.increOp.fn instanceof Builtin 
+				&& (((Builtin)op.aggOp.increOp.fn).getBuiltinCode()==BuiltinCode.MAX 
+				|| ((Builtin)op.aggOp.increOp.fn).getBuiltinCode()==BuiltinCode.MIN)) 
+		{		
+			Builtin builtin = (Builtin) op.aggOp.increOp.fn;
+
+			if( op.indexFn instanceof ReduceAll )
+				computeMxx(result, builtin, _zeros);
+			else if( op.indexFn instanceof ReduceCol )
+				computeRowMxx(result, builtin, rl, ru);
+			else if( op.indexFn instanceof ReduceRow )
+				computeColMxx(result, builtin, _zeros);
+		}
+	}
+	
+	protected abstract void computeSum(MatrixBlock result, KahanFunction kplus);
+	
+	protected abstract void computeRowSums(MatrixBlock result, KahanFunction kplus, int rl, int ru);
+	
+	protected abstract void computeColSums(MatrixBlock result, KahanFunction kplus);
+	
+	protected abstract void computeRowMxx(MatrixBlock result, Builtin builtin, int rl, int ru);
+	
+}