You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2017/02/08 02:22:29 UTC

[4/5] incubator-systemml git commit: [SYSTEMML-449] Compressed linear algebra v2

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java
index efdcc86..6d2ec43 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java
@@ -22,31 +22,29 @@ package org.apache.sysml.runtime.compress;
 import java.util.Arrays;
 import java.util.Iterator;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.compress.utils.ConverterUtils;
 import org.apache.sysml.runtime.compress.utils.LinearAlgebraUtils;
 import org.apache.sysml.runtime.functionobjects.Builtin;
 import org.apache.sysml.runtime.functionobjects.KahanFunction;
 import org.apache.sysml.runtime.functionobjects.KahanPlus;
-import org.apache.sysml.runtime.functionobjects.KahanPlusSq;
-import org.apache.sysml.runtime.functionobjects.ReduceAll;
-import org.apache.sysml.runtime.functionobjects.ReduceCol;
-import org.apache.sysml.runtime.functionobjects.ReduceRow;
-import org.apache.sysml.runtime.functionobjects.Builtin.BuiltinCode;
 import org.apache.sysml.runtime.instructions.cp.KahanObject;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 import org.apache.sysml.runtime.matrix.data.Pair;
-import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator;
 import org.apache.sysml.runtime.matrix.operators.ScalarOperator;
 
 
 /** A group of columns compressed with a single run-length encoded bitmap. */
-public class ColGroupRLE extends ColGroupBitmap 
+public class ColGroupRLE extends ColGroupOffset 
 {
 	private static final long serialVersionUID = 7450232907594748177L;
 
+	private static final Log LOG = LogFactory.getLog(ColGroupRLE.class.getName());
+	
 	public ColGroupRLE() {
-		super(CompressionType.RLE_BITMAP);
+		super();
 	}
 	
 	/**
@@ -62,26 +60,37 @@ public class ColGroupRLE extends ColGroupBitmap
 	 */
 	public ColGroupRLE(int[] colIndices, int numRows, UncompressedBitmap ubm) 
 	{
-		super(CompressionType.RLE_BITMAP, colIndices, numRows, ubm);
+		super(colIndices, numRows, ubm);
 		
 		// compress the bitmaps
 		final int numVals = ubm.getNumValues();
 		char[][] lbitmaps = new char[numVals][];
 		int totalLen = 0;
 		for( int k=0; k<numVals; k++ ) {
-			lbitmaps[k] = BitmapEncoder.genRLEBitmap(ubm.getOffsetsList(k));
+			lbitmaps[k] = BitmapEncoder.genRLEBitmap(
+				ubm.getOffsetsList(k).extractValues(), ubm.getNumOffsets(k));
 			totalLen += lbitmaps[k].length;
 		}
 		
 		// compact bitmaps to linearized representation
 		createCompressedBitmaps(numVals, totalLen, lbitmaps);
+		
+		//debug output
+		double ucSize = MatrixBlock.estimateSizeDenseInMemory(numRows, colIndices.length);
+		if( estimateInMemorySize() > ucSize )
+			LOG.warn("RLE group larger than UC dense: "+estimateInMemorySize()+" "+ucSize);
 	}
 
 	public ColGroupRLE(int[] colIndices, int numRows, boolean zeros, double[] values, char[] bitmaps, int[] bitmapOffs) {
-		super(CompressionType.RLE_BITMAP, colIndices, numRows, zeros, values);
+		super(colIndices, numRows, zeros, values);
 		_data = bitmaps;
 		_ptr = bitmapOffs;
 	}
+	
+	@Override
+	public CompressionType getCompType() {
+		return CompressionType.RLE_BITMAP;
+	}
 
 	@Override
 	public Iterator<Integer> getDecodeIterator(int k) {
@@ -247,7 +256,7 @@ public class ColGroupRLE extends ColGroupBitmap
 			//L3 cache alignment, see comment rightMultByVector OLE column group 
 			//core difference of RLE to OLE is that runs are not segment alignment,
 			//which requires care of handling runs crossing cache-buckets
-			final int blksz = ColGroupBitmap.WRITE_CACHE_BLKSZ; 
+			final int blksz = ColGroupOffset.WRITE_CACHE_BLKSZ; 
 			
 			//step 1: prepare position and value arrays
 			
@@ -335,7 +344,7 @@ public class ColGroupRLE extends ColGroupBitmap
 		if( LOW_LEVEL_OPT && numVals > 1 
 			&& _numRows > BitmapEncoder.BITMAP_BLOCK_SZ ) 
 		{
-			final int blksz = ColGroupBitmap.READ_CACHE_BLKSZ; 
+			final int blksz = ColGroupOffset.READ_CACHE_BLKSZ; 
 			
 			//step 1: prepare position and value arrays
 			
@@ -423,7 +432,7 @@ public class ColGroupRLE extends ColGroupBitmap
 		}
 		
 		double[] rvalues = applyScalarOp(op, val0, getNumCols());		
-		char[] lbitmap = BitmapEncoder.genRLEBitmap(loff);
+		char[] lbitmap = BitmapEncoder.genRLEBitmap(loff, loff.length);
 		char[] rbitmaps = Arrays.copyOf(_data, _data.length+lbitmap.length);
 		System.arraycopy(lbitmap, 0, rbitmaps, _data.length, lbitmap.length);
 		int[] rbitmapOffs = Arrays.copyOf(_ptr, _ptr.length+1);
@@ -432,49 +441,9 @@ public class ColGroupRLE extends ColGroupBitmap
 		return new ColGroupRLE(_colIndexes, _numRows, loff.length<_numRows,
 				rvalues, rbitmaps, rbitmapOffs);
 	}
-	
-	@Override
-	public void unaryAggregateOperations(AggregateUnaryOperator op, MatrixBlock result) 
-		throws DMLRuntimeException
-	{
-		unaryAggregateOperations(op, result, 0, getNumRows());
-	}
-	
-	
-	@Override
-	public void unaryAggregateOperations(AggregateUnaryOperator op, MatrixBlock result, int rl, int ru) 
-		throws DMLRuntimeException 
-	{
-		//sum and sumsq (reduceall/reducerow over tuples and counts)
-		if( op.aggOp.increOp.fn instanceof KahanPlus || op.aggOp.increOp.fn instanceof KahanPlusSq ) 
-		{
-			KahanFunction kplus = (op.aggOp.increOp.fn instanceof KahanPlus) ?
-					KahanPlus.getKahanPlusFnObject() : KahanPlusSq.getKahanPlusSqFnObject();
-			
-			if( op.indexFn instanceof ReduceAll )
-				computeSum(result, kplus);
-			else if( op.indexFn instanceof ReduceCol )
-				computeRowSums(result, kplus, rl, ru);
-			else if( op.indexFn instanceof ReduceRow )
-				computeColSums(result, kplus);
-		}
-		//min and max (reduceall/reducerow over tuples only)
-		else if(op.aggOp.increOp.fn instanceof Builtin 
-				&& (((Builtin)op.aggOp.increOp.fn).getBuiltinCode()==BuiltinCode.MAX 
-				|| ((Builtin)op.aggOp.increOp.fn).getBuiltinCode()==BuiltinCode.MIN)) 
-		{		
-			Builtin builtin = (Builtin) op.aggOp.increOp.fn;
 
-			if( op.indexFn instanceof ReduceAll )
-				computeMxx(result, builtin);
-			else if( op.indexFn instanceof ReduceCol )
-				computeRowMxx(result, builtin, rl, ru);
-			else if( op.indexFn instanceof ReduceRow )
-				computeColMxx(result, builtin);
-		}
-	}
-
-	private void computeSum(MatrixBlock result, KahanFunction kplus)
+	@Override
+	protected final void computeSum(MatrixBlock result, KahanFunction kplus)
 	{
 		KahanObject kbuff = new KahanObject(result.quickGetValue(0, 0), result.quickGetValue(0, 1));
 		
@@ -502,37 +471,93 @@ public class ColGroupRLE extends ColGroupBitmap
 		result.quickSetValue(0, 1, kbuff._correction);
 	}
 
-	private void computeRowSums(MatrixBlock result, KahanFunction kplus, int rl, int ru)
+	@Override
+	protected final void computeRowSums(MatrixBlock result, KahanFunction kplus, int rl, int ru)
 	{
 		KahanObject kbuff = new KahanObject(0, 0);
+		KahanPlus kplus2 = KahanPlus.getKahanPlusFnObject();
+		
 		final int numVals = getNumValues();
 		double[] c = result.getDenseBlock();
 		
-		for (int k = 0; k < numVals; k++) {
-			int boff = _ptr[k];
-			int blen = len(k);
-			double val = sumValues(k);
+		if( ALLOW_CACHE_CONSCIOUS_ROWSUMS 
+			&& LOW_LEVEL_OPT && numVals > 1 
+			&& _numRows > BitmapEncoder.BITMAP_BLOCK_SZ )
+		{
+			final int blksz = ColGroupOffset.WRITE_CACHE_BLKSZ/2; 
+			
+			//step 1: prepare position and value arrays
+			
+			//current pos / values per RLE list
+			int[] astart = new int[numVals];
+			int[] apos = skipScan(numVals, rl, astart);
+			double[] aval = sumAllValues(kplus, kbuff);
+			
+			//step 2: cache conscious matrix-vector via horizontal scans 
+			for( int bi=rl; bi<ru; bi+=blksz ) 
+			{
+				int bimax = Math.min(bi+blksz, ru);
+					
+				//horizontal segment scan, incl pos maintenance
+				for (int k = 0; k < numVals; k++) {
+					int boff = _ptr[k];
+					int blen = len(k);
+					double val = aval[k];
+					int bix = apos[k];
+					int start = astart[k];
+					
+					//compute partial results, not aligned
+					while( bix<blen ) {
+						int lstart = _data[boff + bix];
+						int llen = _data[boff + bix + 1];
+						int from = Math.max(bi, start+lstart);
+						int to = Math.min(start+lstart+llen,bimax);
+						for (int rix=from; rix<to; rix++) {
+							kbuff.set(c[2*rix], c[2*rix+1]);
+							kplus2.execute2(kbuff, val);
+							c[2*rix] = kbuff._sum;
+							c[2*rix+1] = kbuff._correction;
+						}
+						if(start+lstart+llen >= bimax)
+							break;
+						start += lstart + llen;
+						bix += 2;
+					}
 					
-			if (val != 0.0) {
-				Pair<Integer,Integer> tmp = skipScanVal(k, rl);
-				int bix = tmp.getKey();
-				int curRunStartOff = tmp.getValue();
-				int curRunEnd = tmp.getValue();
-				for ( ; bix<blen && curRunEnd<ru; bix+=2) {
-					curRunStartOff = curRunEnd + _data[boff+bix];
-					curRunEnd = curRunStartOff + _data[boff+bix+1];
-					for (int rix=curRunStartOff; rix<curRunEnd && rix<ru; rix++) {
-						kbuff.set(c[2*rix], c[2*rix+1]);
-						kplus.execute2(kbuff, val);
-						c[2*rix] = kbuff._sum;
-						c[2*rix+1] = kbuff._correction;
+					apos[k] = bix;	
+					astart[k] = start;
+				}
+			}
+		}
+		else
+		{
+			for (int k = 0; k < numVals; k++) {
+				int boff = _ptr[k];
+				int blen = len(k);
+				double val = sumValues(k, kplus, kbuff);
+						
+				if (val != 0.0) {
+					Pair<Integer,Integer> tmp = skipScanVal(k, rl);
+					int bix = tmp.getKey();
+					int curRunStartOff = tmp.getValue();
+					int curRunEnd = tmp.getValue();
+					for ( ; bix<blen && curRunEnd<ru; bix+=2) {
+						curRunStartOff = curRunEnd + _data[boff+bix];
+						curRunEnd = curRunStartOff + _data[boff+bix+1];
+						for (int rix=curRunStartOff; rix<curRunEnd && rix<ru; rix++) {
+							kbuff.set(c[2*rix], c[2*rix+1]);
+							kplus2.execute2(kbuff, val);
+							c[2*rix] = kbuff._sum;
+							c[2*rix+1] = kbuff._correction;
+						}
 					}
 				}
 			}
 		}
 	}
 
-	private void computeColSums(MatrixBlock result, KahanFunction kplus)
+	@Override
+	protected final void computeColSums(MatrixBlock result, KahanFunction kplus)
 	{
 		KahanObject kbuff = new KahanObject(0, 0);
 		
@@ -561,7 +586,8 @@ public class ColGroupRLE extends ColGroupBitmap
 		}
 	}
 
-	private void computeRowMxx(MatrixBlock result, Builtin builtin, int rl, int ru)
+	@Override
+	protected final void computeRowMxx(MatrixBlock result, Builtin builtin, int rl, int ru)
 	{
 		//NOTE: zeros handled once for all column groups outside
 		final int numVals = getNumValues();

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java
index 9d06bf8..6445c52 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java
@@ -32,6 +32,7 @@ import org.apache.sysml.runtime.functionobjects.ReduceRow;
 import org.apache.sysml.runtime.matrix.data.LibMatrixAgg;
 import org.apache.sysml.runtime.matrix.data.LibMatrixMult;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.data.SparseBlock.Type;
 import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator;
 import org.apache.sysml.runtime.matrix.operators.ScalarOperator;
 import org.apache.sysml.runtime.util.SortUtils;
@@ -53,7 +54,7 @@ public class ColGroupUncompressed extends ColGroup
 	private MatrixBlock _data;
 
 	public ColGroupUncompressed() {
-		super(CompressionType.UNCOMPRESSED, (int[])null, -1);
+		super((int[])null, -1);
 	}
 	
 	/**
@@ -71,7 +72,7 @@ public class ColGroupUncompressed extends ColGroup
 	public ColGroupUncompressed(List<Integer> colIndicesList, MatrixBlock rawblock) 
 		throws DMLRuntimeException 
 	{
-		super(CompressionType.UNCOMPRESSED, colIndicesList, 
+		super(colIndicesList, 
 				CompressedMatrixBlock.TRANSPOSE_INPUT ? 
 				rawblock.getNumColumns() : rawblock.getNumRows());
 
@@ -97,7 +98,7 @@ public class ColGroupUncompressed extends ColGroup
 			return;
 		}
 		
-		// dense implementation for dense and sparse matrices to avoid linear search
+		//dense implementation for dense and sparse matrices to avoid linear search
 		int m = numRows;
 		int n = _colIndexes.length;
 		for( int i = 0; i < m; i++) {
@@ -109,6 +110,11 @@ public class ColGroupUncompressed extends ColGroup
 			}
 		}
 		_data.examSparsity();
+		
+		//convert sparse MCSR to read-optimized CSR representation
+		if( _data.isInSparseFormat() ) {
+			_data = new MatrixBlock(_data, Type.CSR, false);
+		}
 	}
 
 	/**
@@ -121,8 +127,7 @@ public class ColGroupUncompressed extends ColGroup
 	 */
 	public ColGroupUncompressed(ArrayList<ColGroup> groupsToDecompress) 
 	{
-		super(CompressionType.UNCOMPRESSED, 
-				mergeColIndices(groupsToDecompress),
+		super(mergeColIndices(groupsToDecompress),
 				groupsToDecompress.get(0)._numRows);
 
 		// Invert the list of column indices
@@ -152,10 +157,14 @@ public class ColGroupUncompressed extends ColGroup
 	 */
 	public ColGroupUncompressed(int[] colIndices, int numRows, MatrixBlock data) 
 	{
-		super(CompressionType.UNCOMPRESSED, colIndices, numRows);
+		super(colIndices, numRows);
 		_data = data;
 	}
 
+	@Override
+	public CompressionType getCompType() {
+		return CompressionType.UNCOMPRESSED;
+	}
 
 	/**
 	 * Access for superclass
@@ -276,6 +285,23 @@ public class ColGroupUncompressed extends ColGroup
 		LibMatrixMult.matrixMult(_data, shortVector, result, rl, ru);	
 	}
 	
+	public void rightMultByVector(MatrixBlock vector, MatrixBlock result, int k)
+			throws DMLRuntimeException 
+	{
+		// Pull out the relevant rows of the vector
+		int clen = _colIndexes.length;
+		
+		MatrixBlock shortVector = new MatrixBlock(clen, 1, false);
+		shortVector.allocateDenseBlock();
+		double[] b = shortVector.getDenseBlock();
+		for (int colIx = 0; colIx < clen; colIx++)
+			b[colIx] = vector.quickGetValue(_colIndexes[colIx], 0);
+		shortVector.recomputeNonZeros();
+		
+		// Multiply the selected columns by the appropriate parts of the vector
+		LibMatrixMult.matrixMult(_data, shortVector, result, k);	
+	}
+	
 	@Override
 	public void leftMultByRowVector(MatrixBlock vector, MatrixBlock result)
 			throws DMLRuntimeException 
@@ -377,8 +403,7 @@ public class ColGroupUncompressed extends ColGroup
 	}
 	
 	@Override
-	protected void countNonZerosPerRow(int[] rnnz, int rl, int ru)
-	{
+	protected void countNonZerosPerRow(int[] rnnz, int rl, int ru) {
 		for( int i=rl; i<ru; i++ )
 			rnnz[i-rl] += _data.recomputeNonZeros(i, i, 0, _data.getNumColumns()-1);
 	}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/ColGroupValue.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupValue.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroupValue.java
new file mode 100644
index 0000000..b3b5e80
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupValue.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.compress;
+
+import java.util.Arrays;
+
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.functionobjects.Builtin;
+import org.apache.sysml.runtime.functionobjects.Builtin.BuiltinCode;
+import org.apache.sysml.runtime.functionobjects.KahanFunction;
+import org.apache.sysml.runtime.functionobjects.KahanPlus;
+import org.apache.sysml.runtime.instructions.cp.KahanObject;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator;
+import org.apache.sysml.runtime.matrix.operators.ScalarOperator;
+
+
+/**
+ * Base class for column groups encoded with value dictionary.
+ * 
+ */
+public abstract class ColGroupValue extends ColGroup 
+{	
+	private static final long serialVersionUID = 3786247536054353658L;
+		
+	public static boolean LOW_LEVEL_OPT = true;	
+	
+	//sorting of values by physical length helps by 10-20%, especially for serial, while
+	//slight performance decrease for parallel incl multi-threaded, hence not applied for
+	//distributed operations (also because compression time + garbage collection increases)
+	public static final boolean SORT_VALUES_BY_LENGTH = true; 
+		
+	
+	/** Distinct values associated with individual bitmaps. */
+	protected double[] _values; //linearized <numcol vals> <numcol vals>
+	
+	public ColGroupValue() {
+		super((int[]) null, -1);
+	}
+	
+	/**
+	 * Main constructor. Stores the headers for the individual bitmaps.
+	 * 
+	 * @param colIndices
+	 *            indices (within the block) of the columns included in this
+	 *            column
+	 * @param numRows
+	 *            total number of rows in the parent block
+	 * @param ubm
+	 *            Uncompressed bitmap representation of the block
+	 */
+	public ColGroupValue(int[] colIndices, int numRows, UncompressedBitmap ubm) 
+	{
+		super(colIndices, numRows);
+
+		// sort values by frequency, if requested 
+		if( LOW_LEVEL_OPT && SORT_VALUES_BY_LENGTH 
+				&& numRows > BitmapEncoder.BITMAP_BLOCK_SZ ) {
+			ubm.sortValuesByFrequency();
+		}
+
+		// extract and store distinct values (bitmaps handled by subclasses)
+		_values = ubm.getValues();
+	}
+
+	/**
+	 * Constructor for subclass methods that need to create shallow copies
+	 * 
+	 * @param colIndices
+	 *            raw column index information
+	 * @param numRows
+	 *            number of rows in the block
+	 * @param values
+	 *            set of distinct values for the block (associated bitmaps are
+	 *            kept in the subclass)
+	 */
+	protected ColGroupValue(int[] colIndices, int numRows, double[] values) {
+		super(colIndices, numRows);
+		_values = values;
+	}
+	
+	@Override
+	public long estimateInMemorySize() {
+		long size = super.estimateInMemorySize();
+		
+		// adding the size of values
+		size += 8; //array reference
+		if (_values != null) {
+			size += 32 + _values.length * 8; //values
+		}
+	
+		return size;
+	}
+
+	/**
+	 * Obtain number of distrinct sets of values associated with the bitmaps in this column group.
+	 * 
+	 * @return the number of distinct sets of values associated with the bitmaps
+	 *         in this column group
+	 */
+	public int getNumValues() {
+		return _values.length / _colIndexes.length;
+	}
+
+	public double[] getValues() {
+		return _values;
+	}
+	
+	protected int containsAllZeroValue() {
+		int numVals = getNumValues();
+		int numCols = getNumCols();
+		for( int i=0, off=0; i<numVals; i++, off+=numCols ) {
+			boolean allZeros = true;
+			for( int j=0; j<numCols; j++ )
+				allZeros &= (_values[off+j] == 0);
+			if( allZeros )
+				return i;
+		}
+		return -1;
+	}
+	
+	protected final double sumValues(int valIx) {
+		final int numCols = getNumCols();
+		final int valOff = valIx * numCols;
+		double val = 0.0;
+		for( int i = 0; i < numCols; i++ ) {
+			val += _values[valOff+i];
+		}
+		
+		return val;
+	}
+	
+	protected final double sumValues(int valIx, KahanFunction kplus, KahanObject kbuff) {
+		final int numCols = getNumCols();
+		final int valOff = valIx * numCols;
+		kbuff.set(0, 0);
+		for( int i = 0; i < numCols; i++ ) {
+			kplus.execute2(kbuff, _values[valOff+i]);
+		}
+		
+		return kbuff._sum;
+	}
+	
+	protected final double[] sumAllValues(KahanFunction kplus, KahanObject kbuff) {
+		//quick path: sum 
+		if( getNumCols()==1 && kplus instanceof KahanPlus )
+			return _values; //shallow copy of values
+		
+		//pre-aggregate value tuple 
+		final int numVals = getNumValues();
+		double[] ret = new double[numVals];
+		for( int k=0; k<numVals; k++ )
+			ret[k] = sumValues(k, kplus, kbuff);
+		
+		return ret;
+	}
+	
+	protected final double sumValues(int valIx, double[] b) {
+		final int numCols = getNumCols();
+		final int valOff = valIx * numCols;
+		double val = 0;
+		for( int i = 0; i < numCols; i++ ) {
+			val += _values[valOff+i] * b[i];
+		}
+		
+		return val;
+	}
+
+	protected final double[] preaggValues(int numVals, double[] b) {
+		double[] ret = new double[numVals];
+		for( int k = 0; k < numVals; k++ )
+			ret[k] = sumValues(k, b);
+		
+		return ret;
+	}
+	
+	/**
+	 * NOTE: Shared across OLE/RLE/DDC because value-only computation. 
+	 * 
+	 * @param result output matrix block
+	 * @param builtin function object
+	 * @param zeros indicator if column group contains zero values
+	 */
+	protected void computeMxx(MatrixBlock result, Builtin builtin, boolean zeros) 
+	{
+		//init and 0-value handling
+		double val = Double.MAX_VALUE * ((builtin.getBuiltinCode()==BuiltinCode.MAX)?-1:1);
+		if( zeros )
+			val = builtin.execute2(val, 0);
+		
+		//iterate over all values only
+		final int numVals = getNumValues();
+		final int numCols = getNumCols();		
+		for (int k = 0; k < numVals; k++)
+			for( int j=0, valOff = k*numCols; j<numCols; j++ )
+				val = builtin.execute2(val, _values[ valOff+j ]);
+		
+		//compute new partial aggregate
+		val = builtin.execute2(val, result.quickGetValue(0, 0));
+		result.quickSetValue(0, 0, val);
+	}
+	
+	/**
+	 * NOTE: Shared across OLE/RLE/DDC because value-only computation. 
+	 * 
+	 * @param result output matrix block
+	 * @param builtin function object
+	 * @param zeros indicator if column group contains zero values
+	 */
+	protected void computeColMxx(MatrixBlock result, Builtin builtin, boolean zeros)
+	{
+		final int numVals = getNumValues();
+		final int numCols = getNumCols();
+		
+		//init and 0-value handling
+		double[] vals = new double[numCols];
+		Arrays.fill(vals, Double.MAX_VALUE * ((builtin.getBuiltinCode()==BuiltinCode.MAX)?-1:1));
+		if( zeros ) {
+			for( int j = 0; j < numCols; j++ )
+				vals[j] = builtin.execute2(vals[j], 0);		
+		}
+		
+		//iterate over all values only
+		for (int k = 0; k < numVals; k++) 
+			for( int j=0, valOff=k*numCols; j<numCols; j++ )
+				vals[j] = builtin.execute2(vals[j], _values[ valOff+j ]);
+		
+		//copy results to output
+		for( int j=0; j<numCols; j++ )
+			result.quickSetValue(0, _colIndexes[j], vals[j]);
+	}
+	
+	/**
+	 * Method for use by subclasses. Applies a scalar operation to the value
+	 * metadata stored in the superclass.
+	 * 
+	 * @param op
+	 *            scalar operation to perform
+	 * @return transformed copy of value metadata for this column group
+	 * @throws DMLRuntimeException if DMLRuntimeException occurs
+	 */
+	protected double[] applyScalarOp(ScalarOperator op)
+		throws DMLRuntimeException 
+	{
+		//scan over linearized values
+		double[] ret = new double[_values.length];
+		for (int i = 0; i < _values.length; i++) {
+			ret[i] = op.executeScalar(_values[i]);
+		}
+
+		return ret;
+	}
+
+	protected double[] applyScalarOp(ScalarOperator op, double newVal, int numCols)
+		throws DMLRuntimeException 
+	{
+		//scan over linearized values
+		double[] ret = new double[_values.length + numCols];
+		for( int i = 0; i < _values.length; i++ ) {
+			ret[i] = op.executeScalar(_values[i]);
+		}
+		
+		//add new value to the end
+		Arrays.fill(ret, _values.length, _values.length+numCols, newVal);
+		
+		return ret;
+	}
+	
+	@Override
+	public void unaryAggregateOperations(AggregateUnaryOperator op, MatrixBlock result) 
+		throws DMLRuntimeException 
+	{
+		unaryAggregateOperations(op, result, 0, getNumRows());
+	}
+	
+	/**
+	 * 
+	 * @param op aggregation operator
+	 * @param result output matrix block
+	 * @param rl row lower index, inclusive
+	 * @param ru row upper index, exclusive
+	 * @throws DMLRuntimeException
+	 */
+	public abstract void unaryAggregateOperations(AggregateUnaryOperator op, MatrixBlock result, int rl, int ru)
+		throws DMLRuntimeException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
index 48ebcc5..84c4812 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
@@ -46,6 +46,7 @@ import org.apache.sysml.lops.MMTSJ.MMTSJType;
 import org.apache.sysml.lops.MapMultChain.ChainType;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.compress.ColGroup.CompressionType;
+import org.apache.sysml.runtime.compress.cocode.PlanningCoCoder;
 import org.apache.sysml.runtime.compress.estim.CompressedSizeEstimator;
 import org.apache.sysml.runtime.compress.estim.CompressedSizeInfo;
 import org.apache.sysml.runtime.compress.estim.SizeEstimatorFactory;
@@ -56,12 +57,14 @@ import org.apache.sysml.runtime.controlprogram.caching.MatrixObject.UpdateType;
 import org.apache.sysml.runtime.controlprogram.parfor.stat.Timing;
 import org.apache.sysml.runtime.functionobjects.Builtin;
 import org.apache.sysml.runtime.functionobjects.Builtin.BuiltinCode;
+import org.apache.sysml.runtime.functionobjects.KahanFunction;
 import org.apache.sysml.runtime.functionobjects.KahanPlus;
 import org.apache.sysml.runtime.functionobjects.KahanPlusSq;
 import org.apache.sysml.runtime.functionobjects.Multiply;
 import org.apache.sysml.runtime.functionobjects.ReduceAll;
 import org.apache.sysml.runtime.functionobjects.ReduceCol;
 import org.apache.sysml.runtime.instructions.cp.CM_COV_Object;
+import org.apache.sysml.runtime.instructions.cp.KahanObject;
 import org.apache.sysml.runtime.instructions.cp.ScalarObject;
 import org.apache.sysml.runtime.matrix.data.CTableMap;
 import org.apache.sysml.runtime.matrix.data.LibMatrixBincell;
@@ -98,7 +101,9 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 	public static final boolean MATERIALIZE_ZEROS = false;
 	public static final long MIN_PAR_AGG_THRESHOLD = 16*1024*1024; //16MB
 	public static final boolean INVESTIGATE_ESTIMATES = false;
-	private static final boolean LDEBUG = false; //local debug flag
+	public static boolean ALLOW_DDC_ENCODING = true;
+	private static final boolean LDEBUG = true; //local debug flag
+	private static final Level LDEBUG_LEVEL = Level.DEBUG; //DEBUG/TRACE for details
 	
 	private static final Log LOG = LogFactory.getLog(CompressedMatrixBlock.class.getName());
 	
@@ -106,7 +111,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 		// for internal debugging only
 		if( LDEBUG ) {
 			Logger.getLogger("org.apache.sysml.runtime.compress")
-				  .setLevel((Level) Level.DEBUG);
+				  .setLevel((Level) LDEBUG_LEVEL);
 		}	
 	}
 	
@@ -231,7 +236,6 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 		final int numRows = getNumRows();
 		final int numCols = getNumColumns();
 		final boolean sparse = isInSparseFormat();
-		final double sp = OptimizerUtils.getSparsity(numRows, numCols, getNonZeros());
 		MatrixBlock rawblock = !TRANSPOSE_INPUT ? new MatrixBlock(this) :
 			LibMatrixReorg.transpose(this, new MatrixBlock(numCols, numRows, sparse), k);
 		
@@ -239,45 +243,50 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 		CompressedSizeEstimator bitmapSizeEstimator = 
 				SizeEstimatorFactory.getSizeEstimator(rawblock, numRows);
 
-		// The current implementation of this method is written for correctness,
-		// not for performance or for minimal use of temporary space.
-
-		// We start with a full set of columns.
-		HashSet<Integer> remainingCols = new HashSet<Integer>();
-		for (int i = 0; i < numCols; i++)
-			remainingCols.add(i);
-
 		// PHASE 1: Classify columns by compression type
-		// We start by determining which columns are amenable to bitmap compression
-		double uncompressedColumnSize = getUncompressedSize(numRows, 1, sp);
-
-		// information about the bitmap amenable columns
-		List<Integer> bitmapCols = new ArrayList<Integer>();
-		List<Integer> uncompressedCols = new ArrayList<Integer>();
-		List<Integer> colsCards = new ArrayList<Integer>();
-		List<Long> compressedSizes = new ArrayList<Long>();
-		HashMap<Integer, Double> compressionRatios = new HashMap<Integer, Double>();
+		// We start by determining which columns are amenable to compression
+		List<Integer> colsC = new ArrayList<Integer>();
+		List<Integer> colsUC = new ArrayList<Integer>();
+		HashMap<Integer, Double> compRatios = new HashMap<Integer, Double>();
 		
-		// Classify columns according to ration (size uncompressed / size compressed), 
+		// Classify columns according to ratio (size uncompressed / size compressed), 
 		// where a column is compressible if ratio > 1.
 		CompressedSizeInfo[] sizeInfos = (k > 1) ?
 				computeCompressedSizeInfos(bitmapSizeEstimator, numCols, k) : 
-				computeCompressedSizeInfos(bitmapSizeEstimator, numCols);		
+				computeCompressedSizeInfos(bitmapSizeEstimator, numCols);	
+		long nnzUC = 0;		
 		for (int col = 0; col < numCols; col++)  {	
-			long compressedSize = sizeInfos[col].getMinSize();
-			double compRatio = uncompressedColumnSize / compressedSize;			
-			if (compRatio > 1) {
-				bitmapCols.add(col);
-				compressionRatios.put(col, compRatio);
-				colsCards.add(sizeInfos[col].getEstCarinality());
-				compressedSizes.add(compressedSize);
+			double uncompSize = getUncompressedSize(numRows, 1, 
+				OptimizerUtils.getSparsity(numRows, 1, sizeInfos[col].getEstNnz()));
+			double compRatio = uncompSize / sizeInfos[col].getMinSize();			
+			if( compRatio > 1 ) {
+				colsC.add(col);
+				compRatios.put(col, compRatio);
+			}
+			else {
+				colsUC.add(col); 
+				nnzUC += sizeInfos[col].getEstNnz();
 			}
-			else
-				uncompressedCols.add(col);
 		}
-
-		_stats.timePhase1 = time.stop();
+		
+		// correction of column classification (reevaluate dense estimates if necessary)
+		boolean sparseUC = MatrixBlock.evalSparseFormatInMemory(numRows, colsUC.size(), nnzUC);
+		if( !sparseUC && !colsUC.isEmpty() ) {
+			for( int i=0; i<colsUC.size(); i++ ) {
+				int col = colsUC.get(i);
+				double uncompSize = getUncompressedSize(numRows, 1, 1.0);
+				double compRatio = uncompSize / sizeInfos[col].getMinSize();			
+				if( compRatio > 1 ) {
+					colsC.add(col);
+					colsUC.remove(i); i--;
+					compRatios.put(col, compRatio);
+					nnzUC -= sizeInfos[col].getEstNnz();
+				}
+			}
+		}
+		
 		if( LOG.isDebugEnabled() ) {
+			_stats.timePhase1 = time.stop();
 			LOG.debug("Compression statistics:");
 			LOG.debug("--compression phase 1: "+_stats.timePhase1);
 		}
@@ -285,26 +294,28 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 		// PHASE 2: Grouping columns
 		// Divide the bitmap columns into column groups.
 		List<int[]> bitmapColGrps = PlanningCoCoder.findCocodesByPartitioning(
-				bitmapSizeEstimator, bitmapCols, colsCards, compressedSizes, numRows, 
-				isInSparseFormat() ? sp : 1, k);
+				bitmapSizeEstimator, colsC, sizeInfos, numRows, k);
 
-		_stats.timePhase2 = time.stop();
-		if( LOG.isDebugEnabled() )
+		if( LOG.isDebugEnabled() ) {
+			_stats.timePhase2 = time.stop();
 			LOG.debug("--compression phase 2: "+_stats.timePhase2);
-		
+		}
+			
 		if( INVESTIGATE_ESTIMATES ) {
 			double est = 0;
 			for( int[] groupIndices : bitmapColGrps )
 				est += bitmapSizeEstimator.estimateCompressedColGroupSize(groupIndices).getMinSize();
-			est += uncompressedCols.size() * uncompressedColumnSize;
+			est += MatrixBlock.estimateSizeInMemory(numRows, colsUC.size(), 
+					OptimizerUtils.getSparsity(numRows, colsUC.size(), nnzUC));
 			_stats.estSize = est;
 		}
 		
 		// PHASE 3: Compress and correct sample-based decisions
 		ColGroup[] colGroups = (k > 1) ?
-				compressColGroups(rawblock, bitmapSizeEstimator, compressionRatios, numRows, sp, bitmapColGrps, k) : 
-				compressColGroups(rawblock, bitmapSizeEstimator, compressionRatios, numRows, sp, bitmapColGrps); 	
+				compressColGroups(rawblock, bitmapSizeEstimator, compRatios, numRows, bitmapColGrps, colsUC.isEmpty(), k) : 
+				compressColGroups(rawblock, bitmapSizeEstimator, compRatios, numRows, bitmapColGrps, colsUC.isEmpty()); 	
 		allocateColGroupList();
+		HashSet<Integer> remainingCols = seq(0, numCols-1, 1);
 		for( int j=0; j<colGroups.length; j++ ) {
 			if( colGroups[j] != null ) {
 				for( int col : colGroups[j].getColIndices() )
@@ -313,10 +324,11 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 			}
 		}
 		
-		_stats.timePhase3 = time.stop();
-		if( LOG.isDebugEnabled() )
+		if( LOG.isDebugEnabled() ) {
+			_stats.timePhase3 = time.stop();
 			LOG.debug("--compression phase 3: "+_stats.timePhase3);
-		
+		}
+			
 		// Phase 4: Cleanup
 		// The remaining columns are stored uncompressed as one big column group
 		if( !remainingCols.isEmpty() ) {
@@ -332,10 +344,15 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 		rawblock.cleanupBlock(true, true);
 		this.cleanupBlock(true, true);
 		
-		_stats.timePhase4 = time.stop();
 		if( LOG.isDebugEnabled() ) {
+			_stats.timePhase4 = time.stop();
+			int[] counts = getColGroupCounts(_colGroups);
 			LOG.debug("--compression phase 4: "+_stats.timePhase4);
 			LOG.debug("--num col groups: "+_colGroups.size());
+			LOG.debug("--col groups types (OLE,RLE,DDC1,DDC2,UC): "
+					+counts[2]+","+counts[1]+","+counts[3]+","+counts[4]+","+counts[0]);
+			LOG.debug("--col groups sizes (OLE,RLE,DDC1,DDC2,UC): "
+					+counts[7]+","+counts[6]+","+counts[8]+","+counts[9]+","+counts[5]);
 			LOG.debug("--compressed size: "+_stats.size);
 			LOG.debug("--compression ratio: "+_stats.ratio);
 		}
@@ -345,6 +362,22 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 		return _stats;
 	}
 
+	/**
+	 * Get array of counts regarding col group types. The position
+	 * corresponds with the enum ordinal. 
+	 * 
+	 * @param colgroups list of column groups
+	 * @return counts 
+	 */
+	private static int[] getColGroupCounts(ArrayList<ColGroup> colgroups) {
+		int[] ret = new int[10]; //5 x count, 5 x num_columns
+		for( ColGroup c : colgroups ) {
+			ret[c.getCompType().ordinal()] ++;
+			ret[5+c.getCompType().ordinal()] += c.getNumCols();
+		}
+		return ret;
+	}
+	
 	private static CompressedSizeInfo[] computeCompressedSizeInfos(CompressedSizeEstimator estim, int clen) {
 		CompressedSizeInfo[] ret = new CompressedSizeInfo[clen];
 		for( int col=0; col<clen; col++ )
@@ -372,23 +405,23 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 		}
 	}
 
-	private static ColGroup[] compressColGroups(MatrixBlock in, CompressedSizeEstimator estim, HashMap<Integer, Double> compRatios, int rlen, double sp, List<int[]> groups)
+	private static ColGroup[] compressColGroups(MatrixBlock in, CompressedSizeEstimator estim, HashMap<Integer, Double> compRatios, int rlen, List<int[]> groups, boolean denseEst)
 	{
 		ColGroup[] ret = new ColGroup[groups.size()];
 		for( int i=0; i<groups.size(); i++ )
-			ret[i] = compressColGroup(in, estim, compRatios, rlen, sp, groups.get(i));
+			ret[i] = compressColGroup(in, estim, compRatios, rlen, groups.get(i), denseEst);
 		
 		return ret;
 	}
 
-	private static ColGroup[] compressColGroups(MatrixBlock in, CompressedSizeEstimator estim, HashMap<Integer, Double> compRatios, int rlen, double sp, List<int[]> groups, int k) 
+	private static ColGroup[] compressColGroups(MatrixBlock in, CompressedSizeEstimator estim, HashMap<Integer, Double> compRatios, int rlen, List<int[]> groups, boolean denseEst, int k) 
 		throws DMLRuntimeException
 	{
 		try {
 			ExecutorService pool = Executors.newFixedThreadPool( k );
 			ArrayList<CompressTask> tasks = new ArrayList<CompressTask>();
 			for( int[] colIndexes : groups )
-				tasks.add(new CompressTask(in, estim, compRatios, rlen, sp, colIndexes));
+				tasks.add(new CompressTask(in, estim, compRatios, rlen, colIndexes, denseEst));
 			List<Future<ColGroup>> rtask = pool.invokeAll(tasks);	
 			ArrayList<ColGroup> ret = new ArrayList<ColGroup>();
 			for( Future<ColGroup> lrtask : rtask )
@@ -401,7 +434,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 		}
 	}
 
-	private static ColGroup compressColGroup(MatrixBlock in, CompressedSizeEstimator estim, HashMap<Integer, Double> compRatios, int rlen, double sp, int[] colIndexes) 
+	private static ColGroup compressColGroup(MatrixBlock in, CompressedSizeEstimator estim, HashMap<Integer, Double> compRatios, int rlen, int[] colIndexes, boolean denseEst) 
 	{
 		int[] allGroupIndices = null;
 		int allColsCount = colIndexes.length;
@@ -416,12 +449,13 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 			//exact big list and observe compression ratio
 			ubm = BitmapEncoder.extractBitmap(colIndexes, in); 
 			sizeInfo = estim.estimateCompressedColGroupSize(ubm);	
-			double compRatio = getUncompressedSize(rlen, colIndexes.length, sp) / sizeInfo.getMinSize();
-			
+			double sp2 = denseEst ? 1.0 : OptimizerUtils.getSparsity(rlen, 1, ubm.getNumOffsets());
+			double compRatio = getUncompressedSize(rlen, colIndexes.length, sp2) / sizeInfo.getMinSize();
+		
 			if( compRatio > 1 ) {
 				break; // we have a good group
 			} 
-			
+
 			// modify the group
 			if (compRatioPQ == null) {
 				// first modification
@@ -454,9 +488,17 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 		//create compressed column group
 		long rleSize = sizeInfo.getRLESize();
 		long oleSize = sizeInfo.getOLESize();
-		if( rleSize < oleSize )
+		long ddcSize = sizeInfo.getDDCSize();
+		
+		if( ALLOW_DDC_ENCODING && ddcSize < rleSize && ddcSize < oleSize ) {
+			if( ubm.getNumValues()<=255 )
+				return new ColGroupDDC1(colIndexes, rlen, ubm);
+			else
+				return new ColGroupDDC2(colIndexes, rlen, ubm);	
+		}
+		else if( rleSize < oleSize )
 			return new ColGroupRLE(colIndexes, rlen, ubm);
-		else
+		else 
 			return new ColGroupOLE(colIndexes, rlen, ubm);
 	}
 	
@@ -469,10 +511,11 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 	 * @return estimate of uncompressed size of column group
 	 */
 	private static double getUncompressedSize(int rlen, int clen, double sparsity) {
-		//we estimate the uncompressed size as 8 * nnz in order to cover both
-		//sparse and dense with moderate underestimation (which is conservative as 
-		//it is biased towards uncompressed columns)
-		return 8 * rlen * clen * sparsity;
+		//we estimate the uncompressed size as the minimum of dense representation
+		//and representation in csr, which moderately overestimates sparse representations
+		//of single columns but helps avoid anomalies with sparse columns that are
+		//eventually represented in dense
+		return Math.min(8d * rlen * clen, 4d * rlen + 12d * rlen * clen * sparsity);
 	}
 
 	/**
@@ -587,8 +630,8 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 	}
 
 	private static class CompressedColumn implements Comparable<CompressedColumn> {
-		int colIx;
-		double compRatio;
+		final int colIx;
+		final double compRatio;
 
 		public CompressedColumn(int colIx, double compRatio) {
 			this.colIx = colIx;
@@ -613,6 +656,13 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 		public CompressionStatistics() {
 			//do nothing
 		}
+		
+		public CompressionStatistics(double t1, double t2, double t3, double t4){
+			timePhase1 = t1;
+			timePhase2 = t2;
+			timePhase3 = t3;
+			timePhase4 = t4;
+		}
 	} 
 
 	@Override
@@ -681,6 +731,10 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 					grp = new ColGroupOLE(); break;
 				case RLE_BITMAP:
 					grp = new ColGroupRLE(); break;
+				case DDC1:
+					grp = new ColGroupDDC1(); break;
+				case DDC2:
+					grp = new ColGroupDDC2(); break;	
 			}
 			
 			//deserialize and add column group
@@ -1040,11 +1094,22 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 				
 				//aggregate partial results
 				if( op.indexFn instanceof ReduceAll ) {
-					double val = ret.quickGetValue(0, 0);
-					for( Future<MatrixBlock> rtask : rtasks )
-						val = op.aggOp.increOp.fn.execute(val, 
-								rtask.get().quickGetValue(0, 0));
-					ret.quickSetValue(0, 0, val);
+					if( op.aggOp.increOp.fn instanceof KahanFunction ) {
+						KahanObject kbuff = new KahanObject(ret.quickGetValue(0, 0), 0);
+						for( Future<MatrixBlock> rtask : rtasks ) {
+							double tmp = rtask.get().quickGetValue(0, 0);
+							((KahanFunction) op.aggOp.increOp.fn).execute2(kbuff, tmp);
+						}
+						ret.quickSetValue(0, 0, kbuff._sum);
+					}	
+					else {
+						double val = ret.quickGetValue(0, 0);
+						for( Future<MatrixBlock> rtask : rtasks ) {
+							double tmp = rtask.get().quickGetValue(0, 0);
+							val = op.aggOp.increOp.fn.execute(val, tmp);
+						}
+						ret.quickSetValue(0, 0, val);
+					}
 				}		
 			}
 			catch(Exception ex) {
@@ -1058,9 +1123,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 					grp.unaryAggregateOperations(op, ret);
 			
 			//process OLE/RLE column groups
-			for (ColGroup grp : _colGroups)
-				if( !(grp instanceof ColGroupUncompressed) )
-					grp.unaryAggregateOperations(op, ret);
+			aggregateUnaryOperations(op, _colGroups, ret, 0, rlen);
 		}
 		
 		//special handling zeros for rowmins/rowmax
@@ -1089,6 +1152,41 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 	}
 	
 	@Override
+	public MatrixValue aggregateUnaryOperations(AggregateUnaryOperator op,
+			MatrixValue result, int blockingFactorRow, int blockingFactorCol,
+			MatrixIndexes indexesIn) throws DMLRuntimeException {
+		return aggregateUnaryOperations(op, result, 
+				blockingFactorRow, blockingFactorCol, indexesIn, false);
+	}
+	
+	private static void aggregateUnaryOperations(AggregateUnaryOperator op, 
+			ArrayList<ColGroup> groups, MatrixBlock ret, int rl, int ru) throws DMLRuntimeException 
+	{
+		boolean cacheDDC1 = ColGroupValue.LOW_LEVEL_OPT 
+				&& op.indexFn instanceof ReduceCol && op.aggOp.increOp.fn instanceof KahanPlus //rowSums
+				&& ColGroupOffset.ALLOW_CACHE_CONSCIOUS_ROWSUMS
+				&& ru-rl > ColGroupOffset.WRITE_CACHE_BLKSZ/2;
+			
+		//process cache-conscious DDC1 groups (adds to output)
+		if( cacheDDC1 ) {
+			ArrayList<ColGroupDDC1> tmp = new ArrayList<ColGroupDDC1>();
+			for( ColGroup grp : groups )
+				if( grp instanceof ColGroupDDC1 )
+					tmp.add((ColGroupDDC1)grp);
+			if( !tmp.isEmpty() )
+				ColGroupDDC1.computeRowSums(tmp.toArray(new ColGroupDDC1[0]), ret,
+						KahanPlus.getKahanPlusFnObject(), rl, ru);
+		}
+			
+		//process remaining groups (adds to output)
+		//note: UC group never passed into this function
+		for( ColGroup grp : groups )
+			if( !(grp instanceof ColGroupUncompressed) 
+				&& !(cacheDDC1 && grp instanceof ColGroupDDC1) )
+				((ColGroupValue)grp).unaryAggregateOperations(op, ret, rl, ru);
+	}
+	
+	@Override
 	public MatrixBlock transposeSelfMatrixMultOperations(MatrixBlock out, MMTSJType tstype) 
 		throws DMLRuntimeException 
 	{
@@ -1204,12 +1302,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 		result.allocateDenseBlock();
 
 		// delegate matrix-vector operation to each column group
-		for( ColGroup grp : _colGroups )
-			if( grp instanceof ColGroupUncompressed ) //overwrites output
-				grp.rightMultByVector(vector, result, 0, result.getNumRows());
-		for( ColGroup grp : _colGroups )
-			if( !(grp instanceof ColGroupUncompressed) ) //adds to output
-				grp.rightMultByVector(vector, result, 0, result.getNumRows());
+		rightMultByVector(_colGroups, vector, result, true, 0, result.getNumRows());
 		
 		// post-processing
 		result.recomputeNonZeros();
@@ -1231,6 +1324,13 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 
 		//multi-threaded execution of all groups
 		try {
+			ColGroupUncompressed uc = getUncompressedColGroup();
+			
+			//compute uncompressed column group in parallel 
+			if( uc != null )
+				uc.rightMultByVector(vector, result, k);					
+			
+			//compute remaining compressed column groups in parallel
 			ExecutorService pool = Executors.newFixedThreadPool( k );
 			int rlen = getNumRows();
 			int seqsz = BitmapEncoder.BITMAP_BLOCK_SZ;
@@ -1239,15 +1339,48 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 			ArrayList<RightMatrixMultTask> tasks = new ArrayList<RightMatrixMultTask>();
 			for( int i=0; i<k & i*blklen<getNumRows(); i++ )
 				tasks.add(new RightMatrixMultTask(_colGroups, vector, result, i*blklen, Math.min((i+1)*blklen,rlen)));
-			pool.invokeAll(tasks);	
+			List<Future<Long>> ret = pool.invokeAll(tasks);	
 			pool.shutdown();
+			
+			//error handling and nnz aggregation
+			long lnnz = 0;
+			for( Future<Long> tmp : ret )
+				lnnz += tmp.get(); 
+			result.setNonZeros(lnnz);
 		}
 		catch(Exception ex) {
 			throw new DMLRuntimeException(ex);
 		}
+	}
+	
+	private static void rightMultByVector(ArrayList<ColGroup> groups, MatrixBlock vect, MatrixBlock ret, boolean inclUC, int rl, int ru) 
+		throws DMLRuntimeException 
+	{
+		boolean cacheDDC1 = ColGroupValue.LOW_LEVEL_OPT 
+			&& ru-rl > ColGroupOffset.WRITE_CACHE_BLKSZ;
 		
-		// post-processing
-		result.recomputeNonZeros();
+		// process uncompressed column group (overwrites output)
+		if( inclUC ) {
+			for( ColGroup grp : groups )
+				if( grp instanceof ColGroupUncompressed )
+					grp.rightMultByVector(vect, ret, rl, ru);
+		}
+		
+		//process cache-conscious DDC1 groups (adds to output)
+		if( cacheDDC1 ) {
+			ArrayList<ColGroupDDC1> tmp = new ArrayList<ColGroupDDC1>();
+			for( ColGroup grp : groups )
+				if( grp instanceof ColGroupDDC1 )
+					tmp.add((ColGroupDDC1)grp);
+			if( !tmp.isEmpty() )
+				ColGroupDDC1.rightMultByVector(tmp.toArray(new ColGroupDDC1[0]), vect, ret, rl, ru);
+		}
+		
+		//process remaining groups (adds to output)
+		for( ColGroup grp : groups )
+			if( !(grp instanceof ColGroupUncompressed) 
+				&& !(cacheDDC1 && grp instanceof ColGroupDDC1) )
+				grp.rightMultByVector(vect, ret, rl, ru);
 	}
 	
 	/**
@@ -1299,11 +1432,9 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 	 * @param k number of threads
 	 * @throws DMLRuntimeException if DMLRuntimeException occurs
 	 */
-	private static void leftMultByVectorTranspose(List<ColGroup> colGroups,MatrixBlock vector, MatrixBlock result, boolean doTranspose, int k) 
+	private void leftMultByVectorTranspose(List<ColGroup> colGroups,MatrixBlock vector, MatrixBlock result, boolean doTranspose, int k) 
 		throws DMLRuntimeException 
 	{
-		int kuc = Math.max(1, k - colGroups.size() + 1);
-		
 		//transpose vector if required
 		MatrixBlock rowVector = vector;
 		if (doTranspose) {
@@ -1317,12 +1448,21 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 
 		//multi-threaded execution
 		try {
-			ExecutorService pool = Executors.newFixedThreadPool( Math.min(colGroups.size(), k) );
+			//compute uncompressed column group in parallel 
+			ColGroupUncompressed uc = getUncompressedColGroup();
+			if( uc != null )
+				uc.leftMultByRowVector(vector, result, k);					
+			
+			//compute remaining compressed column groups in parallel
+			ExecutorService pool = Executors.newFixedThreadPool( Math.min(colGroups.size()-((uc!=null)?1:0), k) );
 			ArrayList<LeftMatrixMultTask> tasks = new ArrayList<LeftMatrixMultTask>();
 			for( ColGroup grp : colGroups )
-				tasks.add(new LeftMatrixMultTask(grp, rowVector, result, kuc));
-			pool.invokeAll(tasks);	
+				if( !(grp instanceof ColGroupUncompressed) )
+					tasks.add(new LeftMatrixMultTask(grp, rowVector, result));
+			List<Future<Object>> ret = pool.invokeAll(tasks);	
 			pool.shutdown();
+			for( Future<Object> tmp : ret )
+				tmp.get(); //error handling
 		}
 		catch(Exception ex) {
 			throw new DMLRuntimeException(ex);
@@ -1405,37 +1545,32 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 
 	private static class LeftMatrixMultTask implements Callable<Object> 
 	{
-		private ColGroup _group = null;
-		private MatrixBlock _vect = null;
-		private MatrixBlock _ret = null;
-		private int _kuc = 1;
+		private final ColGroup _group;
+		private final MatrixBlock _vect;
+		private final MatrixBlock _ret;
 		
-		protected LeftMatrixMultTask( ColGroup group, MatrixBlock vect, MatrixBlock ret, int kuc)  {
+		protected LeftMatrixMultTask( ColGroup group, MatrixBlock vect, MatrixBlock ret)  {
 			_group = group;
 			_vect = vect;
 			_ret = ret;
-			_kuc = kuc;
 		}
 		
 		@Override
 		public Object call() throws DMLRuntimeException 
 		{
 			// delegate matrix-vector operation to each column group
-			if( _group instanceof ColGroupUncompressed && _kuc >1 && ColGroupBitmap.LOW_LEVEL_OPT )
-				((ColGroupUncompressed)_group).leftMultByRowVector(_vect, _ret, _kuc);
-			else
-				_group.leftMultByRowVector(_vect, _ret);
+			_group.leftMultByRowVector(_vect, _ret);
 			return null;
 		}
 	}
 
-	private static class RightMatrixMultTask implements Callable<Object> 
+	private static class RightMatrixMultTask implements Callable<Long> 
 	{
-		private ArrayList<ColGroup> _groups = null;
-		private MatrixBlock _vect = null;
-		private MatrixBlock _ret = null;
-		private int _rl = -1;
-		private int _ru = -1;
+		private final ArrayList<ColGroup> _groups;
+		private final MatrixBlock _vect;
+		private final MatrixBlock _ret;
+		private final int _rl;
+		private final int _ru;
 		
 		protected RightMatrixMultTask( ArrayList<ColGroup> groups, MatrixBlock vect, MatrixBlock ret, int rl, int ru)  {
 			_groups = groups;
@@ -1446,25 +1581,18 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 		}
 		
 		@Override
-		public Object call() throws DMLRuntimeException 
-		{
-			// delegate vector-matrix operation to each column group
-			for( ColGroup grp : _groups )
-				if( grp instanceof ColGroupUncompressed ) //overwrites output
-					grp.rightMultByVector(_vect, _ret, _rl, _ru);
-			for( ColGroup grp : _groups )
-				if( !(grp instanceof ColGroupUncompressed) ) //adds to output
-					grp.rightMultByVector(_vect, _ret, _rl, _ru);
-			return null;
+		public Long call() throws DMLRuntimeException {
+			rightMultByVector(_groups, _vect, _ret, false, _rl, _ru);
+			return _ret.recomputeNonZeros(_rl, _ru-1, 0, 0);
 		}
 	}
 	
 	private static class MatrixMultTransposeTask implements Callable<Object> 
 	{
-		private ArrayList<ColGroup> _groups = null;
-		private MatrixBlock _ret = null;
-		private int _gl = -1;
-		private int _gu = -1;
+		private final ArrayList<ColGroup> _groups;
+		private final MatrixBlock _ret;
+		private final int _gl;
+		private final int _gu;
 		
 		protected MatrixMultTransposeTask(ArrayList<ColGroup> groups, MatrixBlock ret, int gl, int gu)  {
 			_groups = groups;
@@ -1482,11 +1610,11 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 	
 	private static class UnaryAggregateTask implements Callable<MatrixBlock> 
 	{
-		private ArrayList<ColGroup> _groups = null;
-		private int _rl = -1;
-		private int _ru = -1;
-		private MatrixBlock _ret = null;
-		private AggregateUnaryOperator _op = null;
+		private final ArrayList<ColGroup> _groups;
+		private final int _rl;
+		private final int _ru;
+		private final MatrixBlock _ret;
+		private final AggregateUnaryOperator _op;
 		
 		protected UnaryAggregateTask( ArrayList<ColGroup> groups, MatrixBlock ret, int rl, int ru, AggregateUnaryOperator op)  {
 			_groups = groups;
@@ -1507,18 +1635,15 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 		
 		@Override
 		public MatrixBlock call() throws DMLRuntimeException {
-			// delegate unary aggregate operation to each column group
-			// (uncompressed column group handles separately)
-			for( ColGroup grp : _groups )
-				((ColGroupBitmap)grp).unaryAggregateOperations(_op, _ret, _rl, _ru);
+			aggregateUnaryOperations(_op, _groups, _ret, _rl, _ru);
 			return _ret;
 		}
 	}
 
 	private static class SizeEstimTask implements Callable<CompressedSizeInfo> 
 	{
-		private CompressedSizeEstimator _estim = null;
-		private int _col = -1;
+		private final CompressedSizeEstimator _estim;
+		private final int _col;
 		
 		protected SizeEstimTask( CompressedSizeEstimator estim, int col )  {
 			_estim = estim;
@@ -1533,34 +1658,34 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 
 	private static class CompressTask implements Callable<ColGroup> 
 	{
-		private MatrixBlock _in = null;
-		private CompressedSizeEstimator _estim = null;
-		private HashMap<Integer, Double> _compRatios = null;
-		private int _rlen = -1;
-		private double _sp = -1;
-		private int[] _colIndexes = null;
-		
-		protected CompressTask( MatrixBlock in, CompressedSizeEstimator estim, HashMap<Integer, Double> compRatios, int rlen, double sp, int[] colIndexes )  {
+		private final MatrixBlock _in;
+		private final CompressedSizeEstimator _estim;
+		private final HashMap<Integer, Double> _compRatios;
+		private final int _rlen;
+		private final int[] _colIndexes;
+		private final boolean _denseEst;
+		
+		protected CompressTask( MatrixBlock in, CompressedSizeEstimator estim, HashMap<Integer, Double> compRatios, int rlen, int[] colIndexes, boolean denseEst )  {
 			_in = in;
 			_estim = estim;
 			_compRatios = compRatios;
 			_rlen = rlen;
-			_sp = sp;
 			_colIndexes = colIndexes;
+			_denseEst = denseEst;
 		}
 		
 		@Override
 		public ColGroup call() throws DMLRuntimeException {
-			return compressColGroup(_in, _estim, _compRatios, _rlen, _sp, _colIndexes);
+			return compressColGroup(_in, _estim, _compRatios, _rlen, _colIndexes, _denseEst);
 		}
 	}
 	
 	private static class DecompressTask implements Callable<Object> 
 	{
-		private List<ColGroup> _colGroups = null;
-		private MatrixBlock _ret = null;
-		private int _rl = -1;
-		private int _ru = -1;
+		private final List<ColGroup> _colGroups;
+		private final MatrixBlock _ret;
+		private final int _rl;
+		private final int _ru;
 		
 		protected DecompressTask( List<ColGroup> colGroups, MatrixBlock ret, int rl, int ru )  {
 			_colGroups = colGroups;
@@ -1735,15 +1860,6 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 		MatrixBlock tmp = isCompressed() ? decompress() : this;
 		return tmp.zeroOutOperations(result, range, complementary);
 	}
-	
-	@Override
-	public MatrixValue aggregateUnaryOperations(AggregateUnaryOperator op,
-			MatrixValue result, int blockingFactorRow, int blockingFactorCol,
-			MatrixIndexes indexesIn) throws DMLRuntimeException {
-		printDecompressWarning("aggregateUnaryOperations");
-		MatrixBlock tmp = isCompressed() ? decompress() : this;
-		return tmp.aggregateUnaryOperations(op, result, blockingFactorRow, blockingFactorCol, indexesIn);
-	}
 
 	@Override
 	public CM_COV_Object cmOperations(CMOperator op) throws DMLRuntimeException {
@@ -2000,4 +2116,11 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 			LOG.warn("Operation '"+operation+"' not supported yet - decompressing for ULA operations.");
 		}
 	}
+	
+	private HashSet<Integer> seq(int from, int to, int incr) {
+		HashSet<Integer> ret = new HashSet<Integer>();
+		for (int i = from; i <= to; i+=incr)
+			ret.add(i);
+		return ret;
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/PlanningBinPacker.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/PlanningBinPacker.java b/src/main/java/org/apache/sysml/runtime/compress/PlanningBinPacker.java
deleted file mode 100644
index 70308bb..0000000
--- a/src/main/java/org/apache/sysml/runtime/compress/PlanningBinPacker.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.compress;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-/**
- * Used for the finding columns to co-code
- * 
- */
-public class PlanningBinPacker 
-{
-	private final float _binWeight;
-	private final List<Integer> _items;
-	private final List<Float> _itemWeights;
-
-	public PlanningBinPacker(float binWeight, List<Integer> items, List<Float> itemWeights) {
-		_binWeight = binWeight;
-		_items = items;
-		_itemWeights = itemWeights;
-	}
-
-	/**
-	 * NOTE: upper bound is 17/10 OPT
-	 * 
-	 * @return key: available space, value: list of the bins that have that free space
-	 */
-	public TreeMap<Float, List<List<Integer>>> packFirstFit() {
-		return packFirstFit(_items, _itemWeights);
-	}
-
-	private TreeMap<Float, List<List<Integer>>> packFirstFit(List<Integer> items, List<Float> itemWeights) 
-	{
-		// when searching for a bin, the first bin in the list is used
-		TreeMap<Float, List<List<Integer>>> bins = new TreeMap<Float, List<List<Integer>>>();
-		// first bin
-		bins.put(_binWeight, createBinList());
-		int numItems = items.size();
-		for (int i = 0; i < numItems; i++) {
-			float itemWeight = itemWeights.get(i);
-			Map.Entry<Float, List<List<Integer>>> entry = bins
-					.ceilingEntry(itemWeight);
-			if (entry == null) {
-				// new bin
-				float newBinWeight = _binWeight - itemWeight;
-				List<List<Integer>> binList = bins.get(newBinWeight);
-				if (binList == null) {
-					bins.put(newBinWeight, createBinList(items.get(i)));
-				} else {
-					List<Integer> newBin = new ArrayList<Integer>();
-					newBin.add(items.get(i));
-					binList.add(newBin);
-				}
-			} else {
-				// add to the first bin in the list
-				List<Integer> assignedBin = entry.getValue().remove(0);
-				assignedBin.add(items.get(i));
-				if (entry.getValue().size() == 0)
-					bins.remove(entry.getKey());
-				float newBinWeight = entry.getKey() - itemWeight;
-				List<List<Integer>> newBinsList = bins.get(newBinWeight);
-				if (newBinsList == null) {
-					// new bin
-					bins.put(newBinWeight, createBinList(assignedBin));
-				} else {
-					newBinsList.add(assignedBin);
-				}
-			}
-		}
-		return bins;
-	}
-
-	private List<List<Integer>> createBinList() {
-		List<List<Integer>> binList = new ArrayList<List<Integer>>();
-		binList.add(new ArrayList<Integer>());
-		return binList;
-	}
-
-	private List<List<Integer>> createBinList(int item) {
-		List<List<Integer>> binList = new ArrayList<List<Integer>>();
-		List<Integer> bin = new ArrayList<Integer>();
-		binList.add(bin);
-		bin.add(item);
-		return binList;
-	}
-
-	private List<List<Integer>> createBinList(List<Integer> bin) {
-		List<List<Integer>> binList = new ArrayList<List<Integer>>();
-		binList.add(bin);
-		return binList;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCoder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCoder.java b/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCoder.java
deleted file mode 100644
index 9313cd9..0000000
--- a/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCoder.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.compress;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.PriorityQueue;
-import java.util.TreeMap;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import org.apache.sysml.runtime.DMLRuntimeException;
-import org.apache.sysml.runtime.compress.estim.CompressedSizeEstimator;
-
-public class PlanningCoCoder 
-{
-	//constants for weight computation
-	private final static float GROUPABILITY_THRESHOLD = 0.00064f;
-	private final static float PARTITION_WEIGHT = 0.05F; //higher values lead to more grouping
-	private final static float PARTITION_SIZE = PARTITION_WEIGHT * GROUPABILITY_THRESHOLD;
-
-	public static List<int[]> findCocodesByPartitioning(CompressedSizeEstimator sizeEstimator, List<Integer> availCols, 
-			List<Integer> colsCardinalities, List<Long> compressedSize, int numRows, double sparsity, int k) 
-		throws DMLRuntimeException 
-	{
-		List<int[]> retGroups = new ArrayList<int[]>();
-		
-		// filtering out non-groupable columns as singleton groups
-		// weighted of each column is the ratio of its cardinality to the number
-		// of rows scaled by the matrix sparsity
-		int numCols = availCols.size();
-		List<Integer> groupCols = new ArrayList<Integer>();
-		List<Float> groupColWeights = new ArrayList<Float>();
-		HashMap<Integer, GroupableColInfo> groupColsInfo = new HashMap<Integer, GroupableColInfo>();
-		for (int i = 0; i < numCols; i++) {
-			int colIx = availCols.get(i);
-			int cardinality = colsCardinalities.get(i);
-			float weight = ((float) cardinality) / numRows;
-			if (weight <= GROUPABILITY_THRESHOLD) {
-				groupCols.add(colIx);
-				groupColWeights.add(weight);
-				groupColsInfo.put(colIx, new GroupableColInfo(weight,compressedSize.get(i)));
-			} else {
-				retGroups.add(new int[] { colIx });
-			}
-		}
-		
-		// bin packing based on PARTITION_WEIGHT and column weights
-		float weight = computeWeightForCoCoding(numRows, sparsity);
-		TreeMap<Float, List<List<Integer>>> bins = new PlanningBinPacker(
-				weight, groupCols, groupColWeights).packFirstFit();
-
-		// brute force grouping within each partition
-		retGroups.addAll( (k > 1) ?
-				getCocodingGroupsBruteForce(bins, groupColsInfo, sizeEstimator, numRows, k) :
-				getCocodingGroupsBruteForce(bins, groupColsInfo, sizeEstimator, numRows));
-			
-		return retGroups;
-	}
-
-	private static List<int[]> getCocodingGroupsBruteForce(TreeMap<Float, List<List<Integer>>> bins, HashMap<Integer, GroupableColInfo> groupColsInfo, CompressedSizeEstimator estim, int rlen) 
-	{
-		List<int[]> retGroups = new ArrayList<int[]>();		
-		for (List<List<Integer>> binList : bins.values()) {
-			for (List<Integer> bin : binList) {
-				// building an array of singleton CoCodingGroup
-				ArrayList<PlanningCoCodingGroup> sgroups = new ArrayList<PlanningCoCodingGroup>();
-				for (Integer col : bin)
-					sgroups.add(new PlanningCoCodingGroup(col, groupColsInfo.get(col)));
-				// brute force co-coding	
-				PlanningCoCodingGroup[] outputGroups = findCocodesBruteForce(
-						estim, rlen, sgroups.toArray(new PlanningCoCodingGroup[0]));
-				for (PlanningCoCodingGroup grp : outputGroups)
-					retGroups.add(grp.getColIndices());
-			}
-		}
-		
-		return retGroups;
-	}
-
-	private static List<int[]> getCocodingGroupsBruteForce(TreeMap<Float, List<List<Integer>>> bins, HashMap<Integer, GroupableColInfo> groupColsInfo, CompressedSizeEstimator estim, int rlen, int k) 
-		throws DMLRuntimeException 
-	{
-		List<int[]> retGroups = new ArrayList<int[]>();		
-		try {
-			ExecutorService pool = Executors.newFixedThreadPool( k );
-			ArrayList<CocodeTask> tasks = new ArrayList<CocodeTask>();
-			for (List<List<Integer>> binList : bins.values())
-				for (List<Integer> bin : binList) {
-					// building an array of singleton CoCodingGroup
-					ArrayList<PlanningCoCodingGroup> sgroups = new ArrayList<PlanningCoCodingGroup>();
-					for (Integer col : bin)
-						sgroups.add(new PlanningCoCodingGroup(col, groupColsInfo.get(col)));
-					tasks.add(new CocodeTask(estim, sgroups, rlen));
-				}
-			List<Future<PlanningCoCodingGroup[]>> rtask = pool.invokeAll(tasks);	
-			for( Future<PlanningCoCodingGroup[]> lrtask : rtask )
-				for (PlanningCoCodingGroup grp : lrtask.get())
-					retGroups.add(grp.getColIndices());
-			pool.shutdown();
-		}
-		catch(Exception ex) {
-			throw new DMLRuntimeException(ex);
-		}
-		
-		return retGroups;
-	}
-
-	/**
-	 * Identify columns to code together. Uses a greedy approach that merges
-	 * pairs of column groups into larger groups. Each phase of the greedy
-	 * algorithm considers all combinations of pairs to merge.
-	 * 
-	 * @param sizeEstimator compressed size estimator
-	 * @param numRowsWeight number of rows weight
-	 * @param singltonGroups planning co-coding groups
-	 * @return
-	 */
-	private static PlanningCoCodingGroup[] findCocodesBruteForce(
-			CompressedSizeEstimator sizeEstimator, float numRowsWeight,
-			PlanningCoCodingGroup[] singltonGroups) 
-	{
-		// Populate a priority queue with all available 2-column cocodings.
-		PriorityQueue<PlanningGroupMergeAction> q = new PriorityQueue<PlanningGroupMergeAction>();
-		for (int leftIx = 0; leftIx < singltonGroups.length; leftIx++) {
-			PlanningCoCodingGroup leftGrp = singltonGroups[leftIx];
-			for (int rightIx = leftIx + 1; rightIx < singltonGroups.length; rightIx++) {
-				PlanningCoCodingGroup rightGrp = singltonGroups[rightIx];
-				// at least one of the two groups should be low-cardinality
-				float cardRatio = leftGrp.getCardinalityRatio() + rightGrp.getCardinalityRatio(); 
-				if ( cardRatio < GROUPABILITY_THRESHOLD) {
-					PlanningGroupMergeAction potentialMerge = new PlanningGroupMergeAction(
-							sizeEstimator, numRowsWeight, leftGrp, rightGrp);
-					if (potentialMerge.getChangeInSize() < 0) {
-						q.add(potentialMerge);
-					}
-				}
-			}
-		}
-		PlanningCoCodingGroup[] colGroups = singltonGroups;
-		
-		// Greedily merge groups until we can no longer reduce the number of
-		// runs by merging groups
-		while (q.size() > 0) {
-			PlanningGroupMergeAction merge = q.poll();
-
-			// The queue can contain merge actions involving column groups that
-			// have already been merged.
-			// Filter those actions out.
-			int leftIx = findInArray(colGroups, merge.getLeftGrp());
-			int rightIx = findInArray(colGroups, merge.getRightGrp());
-			if (leftIx < 0 || rightIx < 0) {
-				// One or more of the groups to be merged has already been made
-				// part of another group.
-				// Drop the merge action.
-			} else {
-				PlanningCoCodingGroup mergedGrp = merge.getMergedGrp();
-
-				PlanningCoCodingGroup[] newColGroups = new PlanningCoCodingGroup[colGroups.length - 1];
-				int targetIx = 0;
-				for (int i = 0; i < colGroups.length; i++) {
-					if (i != leftIx && i != rightIx) {
-						newColGroups[targetIx] = colGroups[i];
-						targetIx++;
-					}
-				}
-
-				// New group goes at the end to (hopefully) speed up future
-				// linear search operations
-				newColGroups[newColGroups.length - 1] = mergedGrp;
-
-				// Consider merging the new group with all the other
-				// pre-existing groups.
-				for (int i = 0; i < newColGroups.length - 1; i++) {
-					PlanningCoCodingGroup newLeftGrp = newColGroups[i];
-					PlanningCoCodingGroup newRightGrp = mergedGrp;
-					if (newLeftGrp.getCardinalityRatio()
-							+ newRightGrp.getCardinalityRatio() < GROUPABILITY_THRESHOLD) {
-						PlanningGroupMergeAction newPotentialMerge = new PlanningGroupMergeAction(
-								sizeEstimator, numRowsWeight, newLeftGrp,
-								newRightGrp);
-						if (newPotentialMerge.getChangeInSize() < 0) {
-							q.add(newPotentialMerge);
-						}
-					}
-				}
-				colGroups = newColGroups;
-			}
-		}
-		return colGroups;
-	}
-
-	private static float computeWeightForCoCoding(int numRows, double sparsity) {
-		//we use a constant partition size (independent of the number of rows
-		//in order to ensure constant compression speed independent of blocking)
-		return PARTITION_SIZE;
-	}
-
-	private static int findInArray(Object[] arr, Object val) {
-		for (int i = 0; i < arr.length; i++) {
-			if (arr[i].equals(val)) {
-				return i;
-			}
-		}
-		return -1;
-	}
-
-	protected static class GroupableColInfo {
-		float cardRatio;
-		long size;
-
-		public GroupableColInfo(float lcardRatio, long lsize) {
-			cardRatio = lcardRatio;
-			size = lsize;
-		}
-	}
-
-	private static class CocodeTask implements Callable<PlanningCoCodingGroup[]> 
-	{
-		private CompressedSizeEstimator _estim = null;
-		private ArrayList<PlanningCoCodingGroup> _sgroups = null;
-		private int _rlen = -1;
-		
-		protected CocodeTask( CompressedSizeEstimator estim, ArrayList<PlanningCoCodingGroup> sgroups, int rlen )  {
-			_estim = estim;
-			_sgroups = sgroups;
-			_rlen = rlen;
-		}
-		
-		@Override
-		public PlanningCoCodingGroup[] call() throws DMLRuntimeException {
-			// brute force co-coding	
-			return findCocodesBruteForce(_estim, _rlen, 
-					_sgroups.toArray(new PlanningCoCodingGroup[0]));
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCodingGroup.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCodingGroup.java b/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCodingGroup.java
deleted file mode 100644
index 9ee0d7e..0000000
--- a/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCodingGroup.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.compress;
-
-import java.util.Arrays;
-
-import org.apache.sysml.runtime.compress.PlanningCoCoder.GroupableColInfo;
-import org.apache.sysml.runtime.compress.estim.CompressedSizeEstimator;
-import org.apache.sysml.runtime.compress.estim.CompressedSizeInfo;
-
-/** 
- * Class to represent information about co-coding a group of columns. 
- * 
- */
-public class PlanningCoCodingGroup 
-{
-	private int[] _colIndexes;
-	private long _estSize;
-	private float _cardRatio;
-
-	/**
-	 * Constructor for a one-column group; i.e. do not co-code a given column.
-	 * 
-	 * @param col column
-	 * @param info groupable column info
-	 */
-	public PlanningCoCodingGroup(int col, GroupableColInfo info) {
-		_colIndexes = new int[]{col};
-		_estSize = info.size;
-		_cardRatio = info.cardRatio;
-	}
-
-	/**
-	 * Constructor for merging two disjoint groups of columns
-	 * 
-	 * @param grp1   first group of columns to merge
-	 * @param grp2   second group to merge
-	 * @param bitmapSizeEstimator bitmap size estimator
-	 * @param numRowsWeight numRows x sparsity
-	 */
-	public PlanningCoCodingGroup(PlanningCoCodingGroup grp1, PlanningCoCodingGroup grp2,
-			CompressedSizeEstimator bitmapSizeEstimator, float numRowsWeight) 
-	{
-		// merge sorted non-empty arrays
-		_colIndexes = new int[grp1._colIndexes.length + grp2._colIndexes.length];		
-		int grp1Ptr = 0, grp2Ptr = 0;
-		for (int mergedIx = 0; mergedIx < _colIndexes.length; mergedIx++) {
-			if (grp1._colIndexes[grp1Ptr] < grp2._colIndexes[grp2Ptr]) {
-				_colIndexes[mergedIx] = grp1._colIndexes[grp1Ptr++];
-				if (grp1Ptr == grp1._colIndexes.length) {
-					System.arraycopy(grp2._colIndexes, grp2Ptr, _colIndexes,
-							mergedIx + 1, grp2._colIndexes.length - grp2Ptr);
-					break;
-				}
-			} else {
-				_colIndexes[mergedIx] = grp2._colIndexes[grp2Ptr++];
-				if (grp2Ptr == grp2._colIndexes.length) {
-					System.arraycopy(grp1._colIndexes, grp1Ptr, _colIndexes,
-							mergedIx + 1, grp1._colIndexes.length - grp1Ptr);
-					break;
-				}
-			}
-		}
-		
-		// estimating size info
-		CompressedSizeInfo groupSizeInfo = bitmapSizeEstimator
-				.estimateCompressedColGroupSize(_colIndexes);
-		_estSize = groupSizeInfo.getMinSize();
-		_cardRatio = groupSizeInfo.getEstCarinality() / numRowsWeight;
-	}
-
-	public int[] getColIndices() {
-		return _colIndexes;
-	}
-
-	/**
-	 * Obtain estimated compressed size of the grouped columns.
-	 * 
-	 * @return estimated compressed size of the grouped columns
-	 */
-	public long getEstSize() {
-		return _estSize;
-	}
-
-	public float getCardinalityRatio() {
-		return _cardRatio;
-	}
-
-	@Override
-	public String toString() {
-		return Arrays.toString(_colIndexes);
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/PlanningGroupMergeAction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/PlanningGroupMergeAction.java b/src/main/java/org/apache/sysml/runtime/compress/PlanningGroupMergeAction.java
deleted file mode 100644
index 47d46d5..0000000
--- a/src/main/java/org/apache/sysml/runtime/compress/PlanningGroupMergeAction.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.compress;
-
-import org.apache.sysml.runtime.compress.estim.CompressedSizeEstimator;
-
-/**
- * Internal data structure for tracking potential merges of column groups in
- * co-coding calculations.
- * 
- */
-class PlanningGroupMergeAction implements Comparable<PlanningGroupMergeAction> 
-{
-	private PlanningCoCodingGroup _leftGrp;   //left input
-	private PlanningCoCodingGroup _rightGrp;  //right input
-	private PlanningCoCodingGroup _mergedGrp; //output
-	private long _changeInSize;
-
-	
-	public PlanningGroupMergeAction(CompressedSizeEstimator sizeEstimator,
-			float numRowsWeight, PlanningCoCodingGroup leftGrp, PlanningCoCodingGroup rightGrp) {
-		_leftGrp = leftGrp;
-		_rightGrp = rightGrp;
-		_mergedGrp = new PlanningCoCodingGroup(leftGrp, rightGrp, sizeEstimator, numRowsWeight);
-
-		// Negative size change ==> Decrease in size
-		_changeInSize = _mergedGrp.getEstSize() 
-				- leftGrp.getEstSize() - rightGrp.getEstSize();
-	}
-
-	public int compareTo(PlanningGroupMergeAction o) {
-		// We only sort by the change in size
-		return (int) Math.signum(_changeInSize - o._changeInSize);
-	}
-
-	@Override
-	public String toString() {
-		return String.format("Merge %s and %s", _leftGrp, _rightGrp);
-	}
-
-	public PlanningCoCodingGroup getLeftGrp() {
-		return _leftGrp;
-	}
-
-	public PlanningCoCodingGroup getRightGrp() {
-		return _rightGrp;
-	}
-
-	public PlanningCoCodingGroup getMergedGrp() {
-		return _mergedGrp;
-	}
-
-	public long getChangeInSize() {
-		return _changeInSize;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/ReaderColumnSelectionSparse.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ReaderColumnSelectionSparse.java b/src/main/java/org/apache/sysml/runtime/compress/ReaderColumnSelectionSparse.java
index 63c0467..60d0532 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/ReaderColumnSelectionSparse.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/ReaderColumnSelectionSparse.java
@@ -63,7 +63,6 @@ public class ReaderColumnSelectionSparse extends ReaderColumnSelection
 		if( data.getSparseBlock()!=null )
 		for( int i=0; i<colIndexes.length; i++ )
 			sparseCols[i] = data.getSparseBlock().get(colIndexes[i]);
-		Arrays.fill(sparsePos, 0);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/UncompressedBitmap.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/UncompressedBitmap.java b/src/main/java/org/apache/sysml/runtime/compress/UncompressedBitmap.java
index d62bae9..2f68edf 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/UncompressedBitmap.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/UncompressedBitmap.java
@@ -21,10 +21,13 @@ package org.apache.sysml.runtime.compress;
 
 import java.util.Arrays;
 
+import org.apache.commons.lang.ArrayUtils;
 import org.apache.sysml.runtime.compress.utils.DblArrayIntListHashMap;
 import org.apache.sysml.runtime.compress.utils.DoubleIntListHashMap;
 import org.apache.sysml.runtime.compress.utils.DblArrayIntListHashMap.DArrayIListEntry;
 import org.apache.sysml.runtime.compress.utils.DoubleIntListHashMap.DIListEntry;
+import org.apache.sysml.runtime.compress.utils.IntArrayList;
+import org.apache.sysml.runtime.util.SortUtils;
 
 /** 
  * Uncompressed representation of one or more columns in bitmap format. 
@@ -32,13 +35,13 @@ import org.apache.sysml.runtime.compress.utils.DoubleIntListHashMap.DIListEntry;
  */
 public final class UncompressedBitmap 
 {
-	private int _numCols;
+	private final int _numCols;
 
 	/** Distinct values that appear in the column. Linearized as value groups <v11 v12> <v21 v22>.*/
 	private double[] _values;
 
 	/** Bitmaps (as lists of offsets) for each of the values. */
-	private int[][] _offsetsLists;
+	private IntArrayList[] _offsetsLists;
 
 	public UncompressedBitmap( DblArrayIntListHashMap distinctVals, int numColumns ) 
 	{
@@ -46,11 +49,11 @@ public final class UncompressedBitmap
 		// Convert inputs to arrays
 		int numVals = distinctVals.size();
 		_values = new double[numVals*numColumns];
-		_offsetsLists = new int[numVals][];
+		_offsetsLists = new IntArrayList[numVals];
 		int bitmapIx = 0;
 		for( DArrayIListEntry val : distinctVals.extractValues()) {
 			System.arraycopy(val.key.getData(), 0, _values, bitmapIx*numColumns, numColumns);
-			_offsetsLists[bitmapIx++] = val.value.extractValues();
+			_offsetsLists[bitmapIx++] = val.value;
 		}
 		_numCols = numColumns;
 	}
@@ -61,11 +64,11 @@ public final class UncompressedBitmap
 		// Convert inputs to arrays
 		int numVals = distinctVals.size();
 		_values = new double[numVals];
-		_offsetsLists = new int[numVals][];
+		_offsetsLists = new IntArrayList[numVals];
 		int bitmapIx = 0;
 		for(DIListEntry val : distinctVals.extractValues()) {
 			_values[bitmapIx] = val.key;
-			_offsetsLists[bitmapIx++] = val.value.extractValues();
+			_offsetsLists[bitmapIx++] = val.value;
 		}
 		_numCols = 1;
 	}
@@ -74,6 +77,15 @@ public final class UncompressedBitmap
 		return _numCols;
 	}
 	
+	/** 
+	 * Get all values without unnecessary allocations and copies.
+	 * 
+	 * @return dictionary of value tuples
+	 */
+	public double[] getValues() {
+		return _values;
+	}
+	
 	/**
 	 * Obtain tuple of column values associated with index.
 	 * 
@@ -94,21 +106,46 @@ public final class UncompressedBitmap
 		return _values.length / _numCols;
 	}
 
-	/**
-	 * Obtain array of offsets of the rows containing index value
-	 * 
-	 * @param ix   index of a particular distinct value
-	 * @return IMMUTABLE array of the offsets of the rows containing the value
-	 *         with the indicated index
-	 */
-	public int[] getOffsetsList(int ix) {
+	public IntArrayList getOffsetsList(int ix) {
 		return _offsetsLists[ix];
 	}
 
-	public int getNumOffsets() {
-		int ret = 0;
-		for( int[] offlist : _offsetsLists )
-			ret += offlist.length;
+	public long getNumOffsets() {
+		long ret = 0;
+		for( IntArrayList offlist : _offsetsLists )
+			ret += offlist.size();
 		return ret;
 	}
+	
+	public int getNumOffsets(int ix) {
+		return _offsetsLists[ix].size();
+	}
+	
+	public void sortValuesByFrequency() {
+		int numVals = getNumValues();
+		int numCols = getNumColumns();
+		
+		double[] freq = new double[numVals];
+		int[] pos = new int[numVals];
+		
+		//populate the temporary arrays
+		for(int i=0; i<numVals; i++) {
+			freq[i] = getNumOffsets(i);
+			pos[i] = i;
+		}
+		
+		//sort ascending and reverse (descending)
+		SortUtils.sortByValue(0, numVals, freq, pos);
+		ArrayUtils.reverse(pos);
+		
+		//create new value and offset list arrays
+		double[] lvalues = new double[numVals*numCols];
+		IntArrayList[] loffsets = new IntArrayList[numVals];
+		for(int i=0; i<numVals; i++) {
+			System.arraycopy(_values, pos[i]*numCols, lvalues, i*numCols, numCols);
+			loffsets[i] = _offsetsLists[pos[i]];
+		}
+		_values = lvalues;
+		_offsetsLists = loffsets;
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/cocode/ColumnGroupPartitioner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/cocode/ColumnGroupPartitioner.java b/src/main/java/org/apache/sysml/runtime/compress/cocode/ColumnGroupPartitioner.java
new file mode 100644
index 0000000..05af19d
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/compress/cocode/ColumnGroupPartitioner.java
@@ -0,0 +1,19 @@
+package org.apache.sysml.runtime.compress.cocode;
+
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.sysml.runtime.compress.cocode.PlanningCoCoder.GroupableColInfo;
+
+public abstract class ColumnGroupPartitioner 
+{
+	/**
+	 * Partitions a list of columns into a list of partitions that contains subsets of columns.
+	 * Note that this call must compute a complete and disjoint partitioning.
+	 * 
+	 * @param groupCols list of columns 
+	 * @param groupColsInfo list of column infos
+	 * @return list of partitions (where each partition is a list of columns)
+	 */
+	public abstract List<List<Integer>> partitionColumns(List<Integer> groupCols, HashMap<Integer, GroupableColInfo> groupColsInfo);
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/cocode/ColumnGroupPartitionerBinPacking.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/cocode/ColumnGroupPartitionerBinPacking.java b/src/main/java/org/apache/sysml/runtime/compress/cocode/ColumnGroupPartitionerBinPacking.java
new file mode 100644
index 0000000..0fb6abe
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/compress/cocode/ColumnGroupPartitionerBinPacking.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.compress.cocode;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.sysml.runtime.compress.cocode.PlanningCoCoder.GroupableColInfo;
+import org.apache.sysml.runtime.util.SortUtils;
+
+/**
+ * Column group partitioning with bin packing heuristic.
+ * 
+ */
+public class ColumnGroupPartitionerBinPacking extends ColumnGroupPartitioner
+{
+	private static final boolean FIRST_FIT_DEC = true;
+	private static final int MAX_COL_PER_GROUP = Integer.MAX_VALUE;
+
+	//we use a constant partition size (independent of the number of rows
+	//in order to ensure constant compression speed independent of blocking)
+	public static double BIN_CAPACITY = 0.000032; //higher values, more grouping
+	
+	@Override
+	public List<List<Integer>> partitionColumns(List<Integer> groupCols, HashMap<Integer, GroupableColInfo> groupColsInfo) 
+	{
+		//obtain column weights
+		int[] items = new int[groupCols.size()];
+		double[] itemWeights = new double[groupCols.size()];
+		for( int i=0; i<groupCols.size(); i++ ) {
+			int col = groupCols.get(i);
+			items[i] = col;
+			itemWeights[i] = groupColsInfo.get(col).cardRatio;
+		} 
+		
+		//sort items (first fit decreasing)
+		if( FIRST_FIT_DEC ) {
+			SortUtils.sortByValue(0, items.length, itemWeights, items);
+			ArrayUtils.reverse(items);
+			ArrayUtils.reverse(itemWeights);
+		}
+		
+		//partition columns via bin packing
+		return packFirstFit(items, itemWeights);
+	}
+
+	/**
+	 * NOTE: upper bound is 17/10 OPT
+	 * 
+	 * @param items the items in terms of columns
+	 * @param itemWeights the weights of the items
+	 * @return
+	 */
+	private List<List<Integer>> packFirstFit(int[] items, double[] itemWeights) 
+	{
+		List<List<Integer>> bins = new ArrayList<List<Integer>>();
+		List<Double> binWeights = new ArrayList<Double>(); 
+		
+		for( int i = 0; i < items.length; i++ ) {
+			//add to existing bin
+			boolean assigned = false;
+			for( int j = 0; j < bins.size(); j++ ) {
+				double newBinWeight = binWeights.get(j)-itemWeights[i];
+				if( newBinWeight >= 0 && bins.get(j).size() < MAX_COL_PER_GROUP-1 ){
+					bins.get(j).add(items[i]);
+					binWeights.set(j, newBinWeight);
+					assigned = true; break;
+				}
+			}
+				
+			//create new bin at end of list
+			if( !assigned ) {
+				bins.add(new ArrayList<Integer>(Arrays.asList(items[i])));
+				binWeights.add(BIN_CAPACITY-itemWeights[i]);
+			}		
+		}
+		
+		return bins;
+	}
+}