You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/07/17 00:23:30 UTC

[5/6] incubator-systemml git commit: [SYSTEMML-810] New compressed matrix blocks and operations, tests

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/16e7b1c8/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java
new file mode 100644
index 0000000..39bc162
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java
@@ -0,0 +1,617 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.compress;
+
+import java.util.Arrays;
+import java.util.Iterator;
+
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.compress.utils.ConverterUtils;
+import org.apache.sysml.runtime.compress.utils.LinearAlgebraUtils;
+import org.apache.sysml.runtime.functionobjects.KahanFunction;
+import org.apache.sysml.runtime.functionobjects.KahanPlus;
+import org.apache.sysml.runtime.functionobjects.KahanPlusSq;
+import org.apache.sysml.runtime.functionobjects.ReduceAll;
+import org.apache.sysml.runtime.functionobjects.ReduceCol;
+import org.apache.sysml.runtime.functionobjects.ReduceRow;
+import org.apache.sysml.runtime.instructions.cp.KahanObject;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator;
+import org.apache.sysml.runtime.matrix.operators.ScalarOperator;
+
+
+/** A group of columns compressed with a single run-length encoded bitmap. */
+public class ColGroupRLE extends ColGroupBitmap 
+{
+	private static final long serialVersionUID = 7450232907594748177L;
+
+	public ColGroupRLE() {
+		super(CompressionType.RLE_BITMAP);
+	}
+	
+	/**
+	 * Main constructor. Constructs and stores the necessary bitmaps.
+	 * 
+	 * @param colIndices
+	 *            indices (within the block) of the columns included in this
+	 *            column
+	 * @param numRows
+	 *            total number of rows in the parent block
+	 * @param ubm
+	 *            Uncompressed bitmap representation of the block
+	 */
+	public ColGroupRLE(int[] colIndices, int numRows, UncompressedBitmap ubm) 
+	{
+		super(CompressionType.RLE_BITMAP, colIndices, numRows, ubm);
+		
+		// compress the bitmaps
+		final int numVals = ubm.getNumValues();
+		char[][] lbitmaps = new char[numVals][];
+		int totalLen = 0;
+		for( int i=0; i<numVals; i++ ) {
+			lbitmaps[i] = BitmapEncoder.genRLEBitmap(ubm.getOffsetsList(i));
+			totalLen += lbitmaps[i].length;
+		}
+		
+		// compact bitmaps to linearized representation
+		createCompressedBitmaps(numVals, totalLen, lbitmaps);
+	}
+
+	/**
+	 * Constructor for internal use.
+	 */
+	public ColGroupRLE(int[] colIndices, int numRows, double[] values, char[] bitmaps, int[] bitmapOffs) {
+		super(CompressionType.RLE_BITMAP, colIndices, numRows, values);
+		_data = bitmaps;
+		_ptr = bitmapOffs;
+	}
+
+	@Override
+	public Iterator<Integer> getDecodeIterator(int bmpIx) {
+		return new BitmapDecoderRLE(_data, _ptr[bmpIx], len(bmpIx)); 
+	}
+	
+	@Override
+	public void decompressToBlock(MatrixBlock target) 
+	{
+		if( LOW_LEVEL_OPT && getNumValues() > 1 )
+		{
+			final int blksz = 128 * 1024;
+			final int numCols = getNumCols();
+			final int numVals = getNumValues();
+			final int n = getNumRows();
+			
+			//position and start offset arrays
+			int[] apos = new int[numVals];
+			int[] astart = new int[numVals];
+			
+			//cache conscious append via horizontal scans 
+			for( int bi=0; bi<n; bi+=blksz ) {
+				int bimax = Math.min(bi+blksz, n);					
+				for (int k=0, off=0; k < numVals; k++, off+=numCols) {
+					int bitmapOff = _ptr[k];
+					int bitmapLen = len(k);
+					int bufIx = apos[k];
+					int start = astart[k];
+					for( ; bufIx<bitmapLen & start<bimax; bufIx+=2) {
+						start += _data[bitmapOff + bufIx];
+						int len = _data[bitmapOff + bufIx+1];
+						for( int i=start; i<start+len; i++ )
+							for( int j=0; j<numCols; j++ )
+								if( _values[off+j]!=0 )
+									target.appendValue(i, _colIndexes[j], _values[off+j]);
+						start += len;
+					}
+					apos[k] = bufIx;	
+					astart[k] = start;
+				}
+			}
+		}
+		else
+		{
+			//call generic decompression with decoder
+			super.decompressToBlock(target);
+		}
+	}
+
+	@Override
+	public void decompressToBlock(MatrixBlock target, int[] colixTargets) 
+	{
+		if( LOW_LEVEL_OPT && getNumValues() > 1 )
+		{
+			final int blksz = 128 * 1024;
+			final int numCols = getNumCols();
+			final int numVals = getNumValues();
+			final int n = getNumRows();
+			
+			//position and start offset arrays
+			int[] apos = new int[numVals];
+			int[] astart = new int[numVals];
+			int[] cix = new int[numCols];
+			
+			//prepare target col indexes
+			for( int j=0; j<numCols; j++ )
+				cix[j] = colixTargets[_colIndexes[j]];
+			
+			//cache conscious append via horizontal scans 
+			for( int bi=0; bi<n; bi+=blksz ) {
+				int bimax = Math.min(bi+blksz, n);					
+				for (int k=0, off=0; k < numVals; k++, off+=numCols) {
+					int bitmapOff = _ptr[k];
+					int bitmapLen = len(k);
+					int bufIx = apos[k];
+					if( bufIx >= bitmapLen ) 
+						continue;
+					int start = astart[k];
+					for( ; bufIx<bitmapLen & start<bimax; bufIx+=2) {
+						start += _data[bitmapOff + bufIx];
+						int len = _data[bitmapOff + bufIx+1];
+						for( int i=start; i<start+len; i++ )
+							for( int j=0; j<numCols; j++ )
+								if( _values[off+j]!=0 )
+									target.appendValue(i, cix[j], _values[off+j]);
+						start += len;
+					}
+					apos[k] = bufIx;	
+					astart[k] = start;
+				}
+			}
+		}
+		else
+		{
+			//call generic decompression with decoder
+			super.decompressToBlock(target, colixTargets);
+		}
+	}
+
+	@Override
+	public void decompressToBlock(MatrixBlock target, int colpos) 
+	{
+		if( LOW_LEVEL_OPT && getNumValues() > 1 )
+		{
+			final int blksz = 128 * 1024;
+			final int numCols = getNumCols();
+			final int numVals = getNumValues();
+			final int n = getNumRows();
+			double[] c = target.getDenseBlock();
+			
+			//position and start offset arrays
+			int[] apos = new int[numVals];
+			int[] astart = new int[numVals];
+			
+			//cache conscious append via horizontal scans 
+			for( int bi=0; bi<n; bi+=blksz ) {
+				int bimax = Math.min(bi+blksz, n);					
+				for (int k=0, off=0; k < numVals; k++, off+=numCols) {
+					int bitmapOff = _ptr[k];
+					int bitmapLen = len(k);
+					int bufIx = apos[k];
+					if( bufIx >= bitmapLen ) 
+						continue;
+					int start = astart[k];
+					for( ; bufIx<bitmapLen & start<bimax; bufIx+=2) {
+						start += _data[bitmapOff + bufIx];
+						int len = _data[bitmapOff + bufIx+1];
+						for( int i=start; i<start+len; i++ )
+							c[i] = _values[off+colpos];
+						start += len;
+					}
+					apos[k] = bufIx;	
+					astart[k] = start;
+				}
+			}
+			
+			target.recomputeNonZeros();
+		}
+		else
+		{
+			//call generic decompression with decoder
+			super.decompressToBlock(target, colpos);
+		}
+	}
+	
+	@Override
+	public void rightMultByVector(MatrixBlock vector, MatrixBlock result, int rl, int ru)
+			throws DMLRuntimeException 
+	{
+		double[] b = ConverterUtils.getDenseVector(vector);
+		double[] c = result.getDenseBlock();
+		final int numCols = getNumCols();
+		final int numVals = getNumValues();
+		
+		//prepare reduced rhs w/ relevant values
+		double[] sb = new double[numCols];
+		for (int j = 0; j < numCols; j++) {
+			sb[j] = b[_colIndexes[j]];
+		}
+		
+		if( LOW_LEVEL_OPT && numVals > 1 
+			&& _numRows > BitmapEncoder.BITMAP_BLOCK_SZ )
+		{
+			//L3 cache alignment, see comment rightMultByVector OLE column group 
+			//core difference of RLE to OLE is that runs are not segment alignment,
+			//which requires care of handling runs crossing cache-buckets
+			final int blksz = ColGroupBitmap.WRITE_CACHE_BLKSZ; 
+			
+			//step 1: prepare position and value arrays
+			
+			//current pos / values per RLE list
+			int[] apos = new int[numVals];
+			int[] astart = new int[numVals];
+			double[] aval = new double[numVals];
+			
+			//skip-scan to beginning for all OLs 
+			if( rl > 0 ) { //rl aligned with blksz	
+				for (int k = 0; k < numVals; k++) {
+					int boff = _ptr[k];
+					int blen = len(k);
+					int bix = 0;
+					int start = 0;
+					while( bix<blen ) {	
+						int lstart = _data[boff + bix]; //start
+						int llen = _data[boff + bix + 1]; //len
+						if( start+lstart+llen >= rl )
+							break;
+						start += lstart + llen;
+						bix += 2;
+					}
+					apos[k] = bix;
+					astart[k] = start;
+				}
+			}
+			
+			//pre-aggregate values per OLs
+			for( int k = 0; k < numVals; k++ )
+				aval[k] = sumValues(k, sb);
+					
+			//step 2: cache conscious matrix-vector via horizontal scans 
+			for( int bi=rl; bi<ru; bi+=blksz ) 
+			{
+				int bimax = Math.min(bi+blksz, ru);
+					
+				//horizontal segment scan, incl pos maintenance
+				for (int k = 0; k < numVals; k++) {
+					int boff = _ptr[k];
+					int blen = len(k);
+					double val = aval[k];
+					int bix = apos[k];
+					int start = astart[k];
+					
+					//compute partial results, not aligned
+					while( bix<blen ) {
+						int lstart = _data[boff + bix];
+						int llen = _data[boff + bix + 1];
+						LinearAlgebraUtils.vectAdd(val, c, Math.max(bi, start+lstart), 
+								Math.min(start+lstart+llen,bimax) - Math.max(bi, start+lstart));
+						if(start+lstart+llen >= bimax)
+							break;
+						start += lstart + llen;
+						bix += 2;
+					}
+					
+					apos[k] = bix;	
+					astart[k] = start;
+				}
+			}
+		}
+		else
+		{
+			for (int k = 0; k < numVals; k++) {
+				int boff = _ptr[k];
+				int blen = len(k);
+				double val = sumValues(k, sb);
+				int bix = 0;
+				int start = 0;
+				
+				//scan to beginning offset if necessary 
+				if( rl > 0 ) { //rl aligned with blksz	
+					while( bix<blen ) {	
+						int lstart = _data[boff + bix]; //start
+						int llen = _data[boff + bix + 1]; //len
+						if( start+lstart+llen >= rl )
+							break;
+						start += lstart + llen;
+						bix += 2;
+					}
+				}
+				
+				//compute partial results, not aligned
+				while( bix<blen ) {
+					int lstart = _data[boff + bix];
+					int llen = _data[boff + bix + 1];
+					LinearAlgebraUtils.vectAdd(val, c, Math.max(rl, start+lstart), 
+							Math.min(start+lstart+llen,ru) - Math.max(rl, start+lstart));
+					if(start+lstart+llen >= ru)
+						break;
+					start += lstart + llen;
+					bix += 2;
+				}
+			}
+		}
+	}
+
+	@Override
+	public void leftMultByRowVector(MatrixBlock vector, MatrixBlock result)
+			throws DMLRuntimeException 
+	{		
+		double[] a = ConverterUtils.getDenseVector(vector);
+		double[] c = result.getDenseBlock();
+		final int numCols = getNumCols();
+		final int numVals = getNumValues();
+		final int n = getNumRows();
+		
+		if( LOW_LEVEL_OPT && numVals > 1 
+			&& _numRows > BitmapEncoder.BITMAP_BLOCK_SZ ) 
+		{
+			final int blksz = ColGroupBitmap.READ_CACHE_BLKSZ; 
+			
+			//step 1: prepare position and value arrays
+			
+			//current pos per OLs / output values
+			int[] apos = new int[numVals];
+			int[] astart = new int[numVals];
+			double[] cvals = new double[numVals];
+			
+			//step 2: cache conscious matrix-vector via horizontal scans 
+			for( int ai=0; ai<n; ai+=blksz ) 
+			{
+				int aimax = Math.min(ai+blksz, n);
+				
+				//horizontal scan, incl pos maintenance
+				for (int k = 0; k < numVals; k++) {
+					int bitmapOff = _ptr[k];
+					int bitmapLen = len(k);						
+					int bufIx = apos[k];
+					int start = astart[k];
+					
+					//compute partial results, not aligned
+					while( bufIx<bitmapLen & start<aimax ) {
+						start += _data[bitmapOff + bufIx];
+						int len = _data[bitmapOff + bufIx+1];
+						cvals[k] += LinearAlgebraUtils.vectSum(a, start, len);
+						start += len;
+						bufIx += 2;
+					}
+					
+					apos[k] = bufIx;	
+					astart[k] = start;
+				}
+			}
+			
+			//step 3: scale partial results by values and write to global output
+			for (int k = 0, valOff=0; k < numVals; k++, valOff+=numCols)
+				for( int j = 0; j < numCols; j++ )
+					c[ _colIndexes[j] ] += cvals[k] * _values[valOff+j];
+			
+		}
+		else
+		{
+			//iterate over all values and their bitmaps
+			for (int bitmapIx=0, valOff=0; bitmapIx<numVals; bitmapIx++, valOff+=numCols) 
+			{	
+				int bitmapOff = _ptr[bitmapIx];
+				int bitmapLen = len(bitmapIx);
+				
+				double vsum = 0;
+				int curRunEnd = 0;
+				for (int bufIx = 0; bufIx < bitmapLen; bufIx += 2) {
+					int curRunStartOff = curRunEnd + _data[bitmapOff+bufIx];
+					int curRunLen = _data[bitmapOff+bufIx + 1];
+					vsum += LinearAlgebraUtils.vectSum(a, curRunStartOff, curRunLen);
+					curRunEnd = curRunStartOff + curRunLen;
+				}
+				
+				//scale partial results by values and write results
+				for( int j = 0; j < numCols; j++ )
+					c[ _colIndexes[j] ] += vsum * _values[ valOff+j ];
+			}
+		}
+	}
+
+	@Override
+	public ColGroup scalarOperation(ScalarOperator op)
+			throws DMLRuntimeException 
+	{
+		double val0 = op.executeScalar(0);
+		
+		//fast path: sparse-safe operations
+		// Note that bitmaps don't change and are shallow-copied
+		if( op.sparseSafe || val0==0 ) {
+			return new ColGroupRLE(_colIndexes, _numRows, 
+					applyScalarOp(op), _data, _ptr);
+		}
+		
+		//slow path: sparse-unsafe operations (potentially create new bitmap)
+		//note: for efficiency, we currently don't drop values that become 0
+		boolean[] lind = computeZeroIndicatorVector();
+		int[] loff = computeOffsets(lind);
+		if( loff.length==0 ) { //empty offset list: go back to fast path
+			return new ColGroupRLE(_colIndexes, _numRows, 
+					applyScalarOp(op), _data, _ptr);
+		}
+		
+		double[] rvalues = applyScalarOp(op, val0, getNumCols());		
+		char[] lbitmap = BitmapEncoder.genRLEBitmap(loff);
+		char[] rbitmaps = Arrays.copyOf(_data, _data.length+lbitmap.length);
+		System.arraycopy(lbitmap, 0, rbitmaps, _data.length, lbitmap.length);
+		int[] rbitmapOffs = Arrays.copyOf(_ptr, _ptr.length+1);
+		rbitmapOffs[rbitmapOffs.length-1] = rbitmaps.length; 
+		
+		return new ColGroupRLE(_colIndexes, _numRows, 
+				rvalues, rbitmaps, rbitmapOffs);
+	}
+	
+	@Override
+	public void unaryAggregateOperations(AggregateUnaryOperator op, MatrixBlock result) 
+		throws DMLRuntimeException 
+	{
+		KahanFunction kplus = (op.aggOp.increOp.fn instanceof KahanPlus) ?
+				KahanPlus.getKahanPlusFnObject() : KahanPlusSq.getKahanPlusSqFnObject();
+		
+		if( op.indexFn instanceof ReduceAll )
+			computeSum(result, kplus);
+		else if( op.indexFn instanceof ReduceCol )
+			computeRowSums(result, kplus);
+		else if( op.indexFn instanceof ReduceRow )
+			computeColSums(result, kplus);
+	}
+	
+	/**
+	 * 
+	 * @param result
+	 */
+	private void computeSum(MatrixBlock result, KahanFunction kplus)
+	{
+		KahanObject kbuff = new KahanObject(result.quickGetValue(0, 0), result.quickGetValue(0, 1));
+		
+		final int numCols = getNumCols();
+		final int numVals = getNumValues();
+		
+		for (int bitmapIx = 0; bitmapIx < numVals; bitmapIx++) {
+			int bitmapOff = _ptr[bitmapIx];
+			int bitmapLen = len(bitmapIx);
+			int valOff = bitmapIx * numCols;
+			int curRunEnd = 0;
+			int count = 0;
+			for (int bufIx = 0; bufIx < bitmapLen; bufIx += 2) {
+				int curRunStartOff = curRunEnd + _data[bitmapOff+bufIx];
+				curRunEnd = curRunStartOff + _data[bitmapOff+bufIx + 1];
+				count += curRunEnd-curRunStartOff;
+			}
+			
+			//scale counts by all values
+			for( int j = 0; j < numCols; j++ )
+				kplus.execute3(kbuff, _values[ valOff+j ], count);
+		}
+		
+		result.quickSetValue(0, 0, kbuff._sum);
+		result.quickSetValue(0, 1, kbuff._correction);
+	}
+	
+	/**
+	 * 
+	 * @param result
+	 */
+	private void computeRowSums(MatrixBlock result, KahanFunction kplus)
+	{
+		KahanObject kbuff = new KahanObject(0, 0);
+		final int numVals = getNumValues();
+		
+		for (int bitmapIx = 0; bitmapIx < numVals; bitmapIx++) {
+			int bitmapOff = _ptr[bitmapIx];
+			int bitmapLen = len(bitmapIx);
+			double val = sumValues(bitmapIx);
+					
+			if (val != 0.0) {
+				int curRunStartOff = 0;
+				int curRunEnd = 0;
+				for (int bufIx = 0; bufIx < bitmapLen; bufIx += 2) {
+					curRunStartOff = curRunEnd + _data[bitmapOff+bufIx];
+					curRunEnd = curRunStartOff + _data[bitmapOff+bufIx + 1];
+					for (int rix = curRunStartOff; rix < curRunEnd; rix++) {
+						kbuff.set(result.quickGetValue(rix, 0), result.quickGetValue(rix, 1));
+						kplus.execute2(kbuff, val);
+						result.quickSetValue(rix, 0, kbuff._sum);
+						result.quickSetValue(rix, 1, kbuff._correction);
+					}
+				}
+			}
+		}
+	}
+	
+	/**
+	 * 
+	 * @param result
+	 */
+	private void computeColSums(MatrixBlock result, KahanFunction kplus)
+	{
+		KahanObject kbuff = new KahanObject(0, 0);
+		
+		final int numCols = getNumCols();
+		final int numVals = getNumValues();
+		
+		for (int bitmapIx = 0; bitmapIx < numVals; bitmapIx++) {
+			int bitmapOff = _ptr[bitmapIx];
+			int bitmapLen = len(bitmapIx);
+			int valOff = bitmapIx * numCols;
+			int curRunEnd = 0;
+			int count = 0;
+			for (int bufIx = 0; bufIx < bitmapLen; bufIx += 2) {
+				int curRunStartOff = curRunEnd + _data[bitmapOff+bufIx];
+				curRunEnd = curRunStartOff + _data[bitmapOff+bufIx + 1];
+				count += curRunEnd-curRunStartOff;
+			}
+			
+			//scale counts by all values
+			for( int j = 0; j < numCols; j++ ) {
+				kbuff.set(result.quickGetValue(0, _colIndexes[j]),result.quickGetValue(1, _colIndexes[j]));
+				kplus.execute3(kbuff, _values[ valOff+j ], count);
+				result.quickSetValue(0, _colIndexes[j], kbuff._sum);
+				result.quickSetValue(1, _colIndexes[j], kbuff._correction);
+			}
+		}
+	}
+	
+	public boolean[] computeZeroIndicatorVector()
+		throws DMLRuntimeException 
+	{	
+		boolean[] ret = new boolean[_numRows];
+		final int numVals = getNumValues();
+
+		//initialize everything with zero
+		Arrays.fill(ret, true);
+		
+		for (int bitmapIx = 0; bitmapIx < numVals; bitmapIx++) {
+			int bitmapOff = _ptr[bitmapIx];
+			int bitmapLen = len(bitmapIx);
+			
+			int curRunStartOff = 0;
+			int curRunEnd = 0;
+			for (int bufIx = 0; bufIx < bitmapLen; bufIx += 2) {
+				curRunStartOff = curRunEnd + _data[bitmapOff+bufIx];
+				curRunEnd = curRunStartOff + _data[bitmapOff+bufIx + 1];
+				Arrays.fill(ret, curRunStartOff, curRunEnd, false);
+			}
+		}
+		
+		return ret;
+	}
+	
+	@Override
+	protected void countNonZerosPerRow(int[] rnnz)
+	{
+		final int numVals = getNumValues();
+		final int numCols = getNumCols();
+		
+		for (int k = 0; k < numVals; k++) {
+			int bitmapOff = _ptr[k];
+			int bitmapLen = len(k);
+			
+			int curRunStartOff = 0;
+			int curRunEnd = 0;
+			for (int bufIx = 0; bufIx < bitmapLen; bufIx += 2) {
+				curRunStartOff = curRunEnd + _data[bitmapOff+bufIx];
+				curRunEnd = curRunStartOff + _data[bitmapOff+bufIx + 1];
+				for( int i=curRunStartOff; i<curRunEnd; i++ )
+					rnnz[i] += numCols;
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/16e7b1c8/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java
new file mode 100644
index 0000000..d9b75a1
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.compress;
+
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.matrix.data.LibMatrixAgg;
+import org.apache.sysml.runtime.matrix.data.LibMatrixMult;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator;
+import org.apache.sysml.runtime.matrix.operators.ScalarOperator;
+import org.apache.sysml.runtime.util.SortUtils;
+
+
+/**
+ * Column group type for columns that are stored as dense arrays of doubles.
+ * Uses a MatrixBlock internally to store the column contents.
+ * 
+ */
+public class ColGroupUncompressed extends ColGroup 
+{
+	private static final long serialVersionUID = 4870546053280378891L;
+
+	/**
+	 * We store the contents of the columns as a MatrixBlock to take advantage
+	 * of high-performance routines available for this data structure.
+	 */
+	private MatrixBlock _data;
+
+	public ColGroupUncompressed() {
+		super(CompressionType.UNCOMPRESSED, (int[])null, -1);
+	}
+	
+	/**
+	 * Main constructor.
+	 * 
+	 * @param colIndicesList
+	 *            indices (relative to the current block) of the columns that
+	 *            this column group represents.
+	 * @param rawblock
+	 *            the uncompressed block; uncompressed data must be present at
+	 *            the time that the constructor is called
+	 * @throws DMLRuntimeException
+	 */
+	@SuppressWarnings("unused")
+	public ColGroupUncompressed(List<Integer> colIndicesList, MatrixBlock rawblock) 
+		throws DMLRuntimeException 
+	{
+		super(CompressionType.UNCOMPRESSED, colIndicesList, 
+				CompressedMatrixBlock.TRANSPOSE_INPUT ? 
+				rawblock.getNumColumns() : rawblock.getNumRows());
+
+		//prepare meta data
+		int numRows = CompressedMatrixBlock.TRANSPOSE_INPUT ? 
+				rawblock.getNumColumns() : rawblock.getNumRows();
+		
+		// Create a matrix with just the requested rows of the original block
+		_data = new MatrixBlock(numRows,
+				_colIndexes.length, rawblock.isInSparseFormat());
+
+		//ensure sorted col indices
+		if(!SortUtils.isSorted(0, _colIndexes.length, _colIndexes))
+			Arrays.sort(_colIndexes);
+		
+		//special cases empty blocks
+		if (rawblock.isEmptyBlock(false))
+			return;
+		//special cases full block
+		if( !CompressedMatrixBlock.TRANSPOSE_INPUT &&
+			_data.getNumColumns() == rawblock.getNumColumns() ){
+			_data.copy(rawblock);
+			return;
+		}
+		
+		// dense implementation for dense and sparse matrices to avoid linear search
+		int m = numRows;
+		int n = _colIndexes.length;
+		for( int i = 0; i < m; i++) {
+			for( int j = 0; j < n; j++ ) {
+				double val = CompressedMatrixBlock.TRANSPOSE_INPUT ?
+						rawblock.quickGetValue(_colIndexes[j], i) :
+						rawblock.quickGetValue(i, _colIndexes[j]);	
+				_data.appendValue(i, j, val);
+			}
+		}
+		_data.examSparsity();
+	}
+
+	/**
+	 * Constructor for creating temporary decompressed versions of one or more
+	 * compressed column groups.
+	 * 
+	 * @param groupsToDecompress
+	 *            compressed columns to subsume. Must contain at least one
+	 *            element.
+	 */
+	public ColGroupUncompressed(ArrayList<ColGroup> groupsToDecompress) 
+	{
+		super(CompressionType.UNCOMPRESSED, 
+				mergeColIndices(groupsToDecompress),
+				groupsToDecompress.get(0)._numRows);
+
+		// Invert the list of column indices
+		int maxColIndex = _colIndexes[_colIndexes.length - 1];
+		int[] colIndicesInverted = new int[maxColIndex + 1];
+		for (int i = 0; i < _colIndexes.length; i++) {
+			colIndicesInverted[_colIndexes[i]] = i;
+		}
+
+		// Create the buffer that holds the uncompressed data, packed together
+		_data = new MatrixBlock(_numRows, _colIndexes.length, false);
+
+		for (ColGroup colGroup : groupsToDecompress) {
+			colGroup.decompressToBlock(_data, colIndicesInverted);
+		}
+	}
+
+	/**
+	 * Constructor for internal use. Used when a method needs to build an
+	 * instance of this class from scratch.
+	 * 
+	 * @param colIndices
+	 *            column mapping for this column group
+	 * @param numRows
+	 *            number of rows in the column, for passing to the superclass
+	 * @param colContents
+	 *            uncompressed cell values
+	 */
+	public ColGroupUncompressed(int[] colIndices, int numRows, MatrixBlock data) 
+	{
+		super(CompressionType.UNCOMPRESSED, colIndices, numRows);
+		_data = data;
+	}
+
+
+	/**
+	 * Access for superclass
+	 * 
+	 * @return direct pointer to the internal representation of the columns
+	 */
+	public MatrixBlock getData() {
+		return _data;
+	}
+	
+	/**
+	 * Subroutine of constructor.
+	 * 
+	 * @param groupsToDecompress
+	 *            input to the constructor that decompresses into a temporary
+	 *            UncompressedColGroup
+	 * @return a merged set of column indices across all those groups
+	 */
+	private static int[] mergeColIndices(ArrayList<ColGroup> groupsToDecompress) 
+	{
+		// Pass 1: Determine number of columns
+		int sz = 0;
+		for (ColGroup colGroup : groupsToDecompress) {
+			sz += colGroup.getNumCols();
+		}
+
+		// Pass 2: Copy column offsets out
+		int[] ret = new int[sz];
+		int pos = 0;
+		for (ColGroup colGroup : groupsToDecompress) {
+			int[] tmp = colGroup.getColIndices();
+			System.arraycopy(tmp, 0, ret, pos, tmp.length);
+			pos += tmp.length;
+		}
+
+		// Pass 3: Sort and return the list of columns
+		Arrays.sort(ret);
+		return ret;
+	}
+
+	@Override
+	public long estimateInMemorySize() {
+		long size = super.estimateInMemorySize();
+		// adding the size of colContents
+		return size + 8 + _data.estimateSizeInMemory();
+	}
+
+	@Override
+	public void decompressToBlock(MatrixBlock target) {
+		//empty block, nothing to add to output
+		if( _data.isEmptyBlock(false) )
+			return;		
+		for (int row = 0; row < _data.getNumRows(); row++) {
+			for (int colIx = 0; colIx < _colIndexes.length; colIx++) {
+				int col = _colIndexes[colIx];
+				double cellVal = _data.quickGetValue(row, colIx);
+				target.quickSetValue(row, col, cellVal);
+			}
+		}
+	}
+
+	@Override
+	public void decompressToBlock(MatrixBlock target, int[] colIndexTargets) {
+		//empty block, nothing to add to output
+		if( _data.isEmptyBlock(false) ) {
+			return;		
+		}
+		// Run through the rows, putting values into the appropriate locations
+		for (int row = 0; row < _data.getNumRows(); row++) {
+			for (int colIx = 0; colIx < _data.getNumColumns(); colIx++) {
+				int origMatrixColIx = getColIndex(colIx);
+				int col = colIndexTargets[origMatrixColIx];
+				double cellVal = _data.quickGetValue(row, colIx);
+				target.quickSetValue(row, col, cellVal);
+			}
+		}	
+	}
+	
+	@Override
+	public void decompressToBlock(MatrixBlock target, int colpos) {
+		//empty block, nothing to add to output
+		if( _data.isEmptyBlock(false) ) {
+			return;		
+		}
+		// Run through the rows, putting values into the appropriate locations
+		for (int row = 0; row < _data.getNumRows(); row++) {
+			double cellVal = _data.quickGetValue(row, colpos);
+			target.quickSetValue(row, 0, cellVal);
+		}	
+	}
+
+	@Override
+	public void rightMultByVector(MatrixBlock vector, MatrixBlock result, int rl, int ru)
+			throws DMLRuntimeException 
+	{
+		// Pull out the relevant rows of the vector
+		int clen = _colIndexes.length;
+		
+		MatrixBlock shortVector = new MatrixBlock(clen, 1, false);
+		shortVector.allocateDenseBlock();
+		double[] b = shortVector.getDenseBlock();
+		for (int colIx = 0; colIx < clen; colIx++)
+			b[colIx] = vector.quickGetValue(_colIndexes[colIx], 0);
+		shortVector.recomputeNonZeros();
+		
+		// Multiply the selected columns by the appropriate parts of the vector
+		LibMatrixMult.matrixMult(_data, shortVector, result, rl, ru);	
+	}
+	
+	@Override
+	public void leftMultByRowVector(MatrixBlock vector, MatrixBlock result)
+			throws DMLRuntimeException 
+	{
+		MatrixBlock pret = new MatrixBlock(1, _colIndexes.length, false);
+		LibMatrixMult.matrixMult(vector, _data, pret);
+		
+		// copying partialResult to the proper indices of the result
+		if( !pret.isEmptyBlock(false) ) {
+			double[] rsltArr = result.getDenseBlock();
+			for (int colIx = 0; colIx < _colIndexes.length; colIx++)
+				rsltArr[_colIndexes[colIx]] = pret.quickGetValue(0, colIx);
+			result.recomputeNonZeros();
+		}
+	}
+	
+	public void leftMultByRowVector(MatrixBlock vector, MatrixBlock result, int k)
+			throws DMLRuntimeException 
+	{
+		MatrixBlock pret = new MatrixBlock(1, _colIndexes.length, false);
+		LibMatrixMult.matrixMult(vector, _data, pret, k);
+		
+		// copying partialResult to the proper indices of the result
+		if( !pret.isEmptyBlock(false) ) {
+			double[] rsltArr = result.getDenseBlock();
+			for (int colIx = 0; colIx < _colIndexes.length; colIx++)
+				rsltArr[_colIndexes[colIx]] = pret.quickGetValue(0, colIx);
+			result.recomputeNonZeros();
+		}
+	}
+
+	@Override
+	public ColGroup scalarOperation(ScalarOperator op)
+			throws DMLRuntimeException 
+	{
+		//execute scalar operations
+		MatrixBlock retContent = (MatrixBlock) _data
+				.scalarOperations(op, new MatrixBlock());
+
+		//construct new uncompressed column group
+		return new ColGroupUncompressed(getColIndices(), _data.getNumRows(), retContent);
+	}
+	
+	@Override
+	public void unaryAggregateOperations(AggregateUnaryOperator op, MatrixBlock ret)
+		throws DMLRuntimeException 
+	{
+		//execute unary aggregate operations
+		LibMatrixAgg.aggregateUnaryMatrix(_data, ret, op);
+	}
+
+	@Override
+	public void readFields(DataInput in)
+		throws IOException 
+	{
+		//read col contents (w/ meta data)
+		_data = new MatrixBlock();
+		_data.readFields(in);		
+		_numRows = _data.getNumRows();
+		
+		//read col indices
+		int numCols = _data.getNumColumns();
+		_colIndexes = new int[ numCols ];
+		for( int i=0; i<numCols; i++ )
+			_colIndexes[i] = in.readInt();
+	}
+
+	@Override
+	public void write(DataOutput out) 
+		throws IOException 
+	{
+		//write col contents first (w/ meta data)
+		_data.write(out);
+		
+		//write col indices
+		int len = _data.getNumColumns();
+		for( int i=0; i<len; i++ )
+			out.writeInt( _colIndexes[i] );
+	}
+
+	@Override
+	public long getExactSizeOnDisk() {
+		return _data.getExactSizeOnDisk()
+			   + 4 * _data.getNumColumns();
+	}
+	
+	@Override
+	protected void countNonZerosPerRow(int[] rnnz)
+	{
+		for( int i=0; i<_data.getNumRows(); i++ )
+			rnnz[i] += _data.recomputeNonZeros(i, i, 0, _data.getNumColumns()-1);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/16e7b1c8/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
new file mode 100644
index 0000000..6ac26c6
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
@@ -0,0 +1,1342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.compress;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.sysml.hops.OptimizerUtils;
+import org.apache.sysml.lops.MMTSJ.MMTSJType;
+import org.apache.sysml.lops.MapMultChain.ChainType;
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.compress.ColGroup.CompressionType;
+import org.apache.sysml.runtime.compress.estim.CompressedSizeEstimator;
+import org.apache.sysml.runtime.compress.estim.CompressedSizeInfo;
+import org.apache.sysml.runtime.compress.estim.SizeEstimatorFactory;
+import org.apache.sysml.runtime.compress.utils.ConverterUtils;
+import org.apache.sysml.runtime.compress.utils.LinearAlgebraUtils;
+import org.apache.sysml.runtime.controlprogram.parfor.stat.Timing;
+import org.apache.sysml.runtime.functionobjects.KahanPlus;
+import org.apache.sysml.runtime.functionobjects.KahanPlusSq;
+import org.apache.sysml.runtime.functionobjects.Multiply;
+import org.apache.sysml.runtime.functionobjects.ReduceRow;
+import org.apache.sysml.runtime.instructions.cp.KahanObject;
+import org.apache.sysml.runtime.matrix.data.LibMatrixBincell;
+import org.apache.sysml.runtime.matrix.data.LibMatrixReorg;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
+import org.apache.sysml.runtime.matrix.data.MatrixValue;
+import org.apache.sysml.runtime.matrix.data.SparseBlock;
+import org.apache.sysml.runtime.matrix.operators.AggregateBinaryOperator;
+import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator;
+import org.apache.sysml.runtime.matrix.operators.BinaryOperator;
+import org.apache.sysml.runtime.matrix.operators.ScalarOperator;
+
+/**
+ * Experimental version of MatrixBlock that allows a compressed internal
+ * representation.
+ */
+public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
+{
+	private static final long serialVersionUID = 7319972089143154057L;
+	
+	//internal configuration
+	public static final int MAX_NUMBER_COCODING_COLUMNS = 1000;
+	public static final double MIN_COMPRESSION_RATIO = 2.0;
+	public static final double MIN_RLE_RATIO = 1.0; // Minimum additional compression (non-RLE size / RLE size) before we switch to run-length encoding.
+	public static final boolean TRANSPOSE_INPUT = true;
+	public static final boolean MATERIALIZE_ZEROS = false;
+	public static final long MIN_PAR_AGG_THRESHOLD = 16*1024*1024; //16MB
+	public static final boolean INVESTIGATE_ESTIMATES = false;
+	private static final boolean LDEBUG = false; //local debug flag
+	
+	private static final Log LOG = LogFactory.getLog(CompressedMatrixBlock.class.getName());
+	
+	static{
+		// for internal debugging only
+		if( LDEBUG ) {
+			Logger.getLogger("org.apache.sysml.runtime.compress")
+				  .setLevel((Level) Level.DEBUG);
+		}	
+	}
+	
+	protected ArrayList<ColGroup> _colGroups = null;
+	protected CompressionStatistics _stats = null;
+	
+	public CompressedMatrixBlock() {
+		super(-1, -1, true);
+	}
+	
+	/**
+	 * Main constructor for building a block from scratch.
+	 * 
+	 * @param rl
+	 *            number of rows in the block
+	 * @param cl
+	 *            number of columns
+	 * @param sparse
+	 *            true if the UNCOMPRESSED representation of the block should be
+	 *            sparse
+	 */
+	public CompressedMatrixBlock(int rl, int cl, boolean sparse) {
+		super(rl, cl, sparse);
+	}
+
+	/**
+	 * "Copy" constructor to populate this compressed block with the
+	 * uncompressed contents of a conventional block. Does <b>not</b> compress
+	 * the block.
+	 */
+	public CompressedMatrixBlock(MatrixBlock mb) {
+		super(mb.getNumRows(), mb.getNumColumns(), mb.isInSparseFormat());
+		
+		//shallow copy (deep copy on compression, prevents unnecessary copy) 
+		if( isInSparseFormat() )
+			sparseBlock = mb.getSparseBlock();
+		else
+			denseBlock = mb.getDenseBlock();
+		nonZeros = mb.getNonZeros();
+	}
+
+	/**
+	 * 
+	 * @return the column groups constructed by the compression process.
+	 * 
+	 */
+	public ArrayList<ColGroup> getColGroups() {
+		return _colGroups;
+	}
+
+	/**
+	 * @return true if this block is in compressed form; false if the block has
+	 *         not yet been compressed
+	 */
+	public boolean isCompressed() {
+		return (_colGroups != null);
+	}
+	
+	/**
+	 * 
+	 * @return
+	 */
+	public boolean isSingleUncompressedGroup(){
+		return (_colGroups!=null && _colGroups.size()==1 
+				&& _colGroups.get(0) instanceof ColGroupUncompressed);
+	}
+
+	private void allocateColGroupList() {
+		_colGroups = new ArrayList<ColGroup>();
+	}
+	
+	@Override
+	public boolean isEmptyBlock(boolean safe)  {
+		if( !isCompressed() )
+			return super.isEmptyBlock(safe);		
+		return (_colGroups == null || getNonZeros()==0);
+	}
+	
+	/**
+	 * Compress the contents of this matrix block. After compression, the
+	 * uncompressed data is discarded. Attempts to update this block after
+	 * calling this method currently result in INCORRECT BEHAVIOR, something
+	 * which should be fixed if we move ahead with this compression strategy.
+	 * 
+	 * +per column sparsity
+	 */
+	public void compress() 
+		throws DMLRuntimeException 
+	{
+		//check for redundant compression
+		if( isCompressed() ){
+			throw new DMLRuntimeException("Redundant compression, block already compressed.");
+		}
+
+		Timing time = new Timing(true);
+		_stats = new CompressionStatistics();
+		
+		// SAMPLE-BASED DECISIONS:
+		// Decisions such as testing if a column is amenable to bitmap
+		// compression or evaluating co-coding potentionls are made based on a
+		// subset of the rows. For large datasets, sampling might take a
+		// significant amount of time. So, we generate only one sample and use
+		// it for the entire compression process.
+
+		//prepare basic meta data and deep copy / transpose input
+		final int numRows = getNumRows();
+		final int numCols = getNumColumns();
+		final boolean sparse = isInSparseFormat();
+		MatrixBlock rawblock = this;
+		if( TRANSPOSE_INPUT )
+			rawblock = LibMatrixReorg.transpose(rawblock, new MatrixBlock(numCols, numRows, sparse));
+		else
+			rawblock = new MatrixBlock(this);
+		
+		//construct sample-based size estimator
+		CompressedSizeEstimator bitmapSizeEstimator = 
+				SizeEstimatorFactory.getSizeEstimator(rawblock, numRows);
+
+		//allocate list of column groups
+		allocateColGroupList();
+
+		// The current implementation of this method is written for correctness,
+		// not for performance or for minimal use of temporary space.
+
+		// We start with a full set of columns.
+		HashSet<Integer> remainingCols = new HashSet<Integer>();
+		for (int i = 0; i < numCols; i++) {
+			remainingCols.add(i);
+		}
+
+		// PHASE 1: Classify columns by compression type
+		// We start by determining which columns are amenable to bitmap
+		// compression
+
+		// It is correct to use the dense size as the uncompressed size
+		// FIXME not numRows but nnz / col otherwise too aggressive overestimation
+		// of uncompressed size and hence overestimation of compression potential
+		double uncompressedColumnSize = 8 * numRows;
+
+		// information about the bitmap amenable columns
+		List<Integer> bitmapCols = new ArrayList<Integer>();
+		List<Integer> uncompressedCols = new ArrayList<Integer>();
+		List<Integer> colsCardinalities = new ArrayList<Integer>();
+		List<Long> compressedSizes = new ArrayList<Long>();
+		HashMap<Integer, Double> compressionRatios = new HashMap<Integer, Double>();
+		
+		// Minimum ratio (size of uncompressed / size of compressed) that we
+		// will accept when encoding a field with a bitmap.
+		for (int col = 0; col < numCols; col++) 
+		{
+			CompressedSizeInfo compressedSizeInfo = bitmapSizeEstimator
+					.estimateCompressedColGroupSize(new int[] { col });
+			long compressedSize = compressedSizeInfo.getMinSize();
+			double compRatio = uncompressedColumnSize / compressedSize;
+			
+			//FIXME: compression ratio should be checked against 1 instead of min compression
+			//ratio; I think this threshold was only required because we overestimated the 
+			//the uncompressed column size with n\alpha instead of z\alpha
+			if (compRatio >= MIN_COMPRESSION_RATIO) {
+				bitmapCols.add(col);
+				compressionRatios.put(col, compRatio);
+				colsCardinalities.add(compressedSizeInfo.getEstCarinality());
+				compressedSizes.add(compressedSize);
+			}
+			else
+				uncompressedCols.add(col);
+		}
+
+		_stats.timePhase1 = time.stop();
+		if( LOG.isDebugEnabled() )
+			LOG.debug("compression phase 1: "+_stats.timePhase1);
+		
+		// Filters for additional types of compression should be inserted here.
+
+		// PHASE 2: Grouping columns
+		// Divide the bitmap columns into column groups.
+		List<int[]> bitmapColGrps = null;
+		if (bitmapCols.size() > MAX_NUMBER_COCODING_COLUMNS) {
+			// Too many columns to compute co-coding groups with current methods.
+			// Generate singleton groups.
+			bitmapColGrps = new ArrayList<int[]>(bitmapCols.size());
+			for (int col : bitmapCols) {
+				bitmapColGrps.add(new int[] { col });
+			}
+		} 
+		else {
+			bitmapColGrps = PlanningCoCoder.findCocodesByPartitioning(
+					bitmapSizeEstimator, bitmapCols, colsCardinalities,
+					compressedSizes, numRows, isInSparseFormat() ? 
+					OptimizerUtils.getSparsity(numRows, numCols, getNonZeros()): 1);
+		}
+
+		_stats.timePhase2 = time.stop();
+		if( LOG.isDebugEnabled() )
+			LOG.debug("compression phase 2: "+_stats.timePhase2);
+		
+		if( INVESTIGATE_ESTIMATES ) {
+			double est = 0;
+			for( int[] groupIndices : bitmapColGrps )
+				est += bitmapSizeEstimator.estimateCompressedColGroupSize(groupIndices).getMinSize();
+			est += uncompressedCols.size() * uncompressedColumnSize;
+			_stats.estSize = est;
+		}
+		
+		// PHASE 3: Compress and correct sample-based decisions
+		
+		for (int[] groupIndices : bitmapColGrps) 
+		{
+			int[] allGroupIndices = null;
+			int allColsCount = groupIndices.length;
+			CompressedSizeInfo bitmapSizeInfo;
+			// The compression type is decided based on a full bitmap since it
+			// will be reused for the actual compression step.
+			UncompressedBitmap ubm;
+			PriorityQueue<CompressedColumn> compRatioPQ = null;
+			boolean skipGroup = false;
+			while (true) 
+			{
+				ubm = BitmapEncoder.extractBitmap(groupIndices, rawblock); 
+				bitmapSizeInfo = bitmapSizeEstimator
+						.estimateCompressedColGroupSize(ubm);
+				double compRatio = uncompressedColumnSize * groupIndices.length
+						/ bitmapSizeInfo.getMinSize();
+				if (compRatio >= MIN_COMPRESSION_RATIO) {
+					// we have a good group
+					for( Integer col : groupIndices )
+						remainingCols.remove(col);
+					break;
+				} else {
+					// modify the group
+					if (compRatioPQ == null) {
+						// first modification
+						allGroupIndices = Arrays.copyOf(groupIndices, groupIndices.length);
+						compRatioPQ = new PriorityQueue<CompressedMatrixBlock.CompressedColumn>();
+						for (int i = 0; i < groupIndices.length; i++)
+							compRatioPQ.add(new CompressedColumn(i,
+									compressionRatios.get(groupIndices[i])));
+					}
+
+					// index in allGroupIndices
+					int removeIx = compRatioPQ.poll().colIx;
+					allGroupIndices[removeIx] = -1;
+					allColsCount--;
+					if (allColsCount == 0) {
+						skipGroup = true;
+						break;
+					}
+					groupIndices = new int[allColsCount];
+					// copying the values that do not equal -1
+					int ix = 0;
+					for (int col : allGroupIndices) {
+						if (col != -1) {
+							groupIndices[ix++] = col;
+						}
+					}
+
+				}
+			}
+
+			if (skipGroup)
+				continue;
+			long rleNumBytes = bitmapSizeInfo.getRLESize();
+			long offsetNumBytes = bitmapSizeInfo.getOLESize();
+			double rleRatio = (double) offsetNumBytes / (double) rleNumBytes;
+
+			if (rleRatio > MIN_RLE_RATIO) {
+				ColGroupRLE compressedGroup = new ColGroupRLE(groupIndices,
+						numRows, ubm);
+				_colGroups.add(compressedGroup);
+			} 
+			else {
+				ColGroupOLE compressedGroup = new ColGroupOLE(
+						groupIndices, numRows, ubm);
+				_colGroups.add(compressedGroup);
+			}
+
+		}
+		
+		_stats.timePhase3 = time.stop();
+		if( LOG.isDebugEnabled() )
+			LOG.debug("compression phase 3: "+_stats.timePhase3);
+		
+		// Phase 4: Cleanup
+		// The remaining columns are stored uncompressed as one big column group
+		if (remainingCols.size() > 0) {
+			ArrayList<Integer> list = new ArrayList<Integer>(remainingCols);
+			ColGroupUncompressed ucgroup = new ColGroupUncompressed(list, rawblock);
+			_colGroups.add(ucgroup);
+		}
+
+		//final cleanup (discard uncompressed block)
+		rawblock.cleanupBlock(true, true);
+		this.cleanupBlock(true, true);
+		
+		_stats.timePhase4 = time.stop();
+		if( LOG.isDebugEnabled() )
+			LOG.debug("compression phase 4: "+_stats.timePhase4);
+	}
+
+	/**
+	 * @return a new uncompressed matrix block containing the contents of this
+	 *         block
+	 */
+	public MatrixBlock decompress() throws DMLRuntimeException 
+	{
+		//early abort for not yet compressed blocks
+		if( !isCompressed() )
+			return new MatrixBlock(this); 
+		
+		//preallocation sparse rows to avoid repeated reallocations		
+		MatrixBlock ret = new MatrixBlock(getNumRows(), getNumColumns(), isInSparseFormat(), getNonZeros());
+		if( ret.isInSparseFormat() ) {
+			int[] rnnz = new int[rlen];
+			for (ColGroup grp : _colGroups)
+				grp.countNonZerosPerRow(rnnz);
+			ret.allocateSparseRowsBlock();
+			SparseBlock rows = ret.getSparseBlock();
+			for( int i=0; i<rlen; i++ )
+				rows.allocate(i, rnnz[i]);
+		}
+		
+		//core decompression (append if sparse)
+		for (ColGroup grp : _colGroups)
+			grp.decompressToBlock(ret);
+		
+		//post-processing (for append in decompress)
+		if( isInSparseFormat() )
+			ret.sortSparseRows();
+
+		return ret;
+	}
+
+	public CompressionStatistics getCompressionStatistics(){
+		return _stats;
+	}
+
+	/**
+	 * 
+	 * @return an upper bound on the memory used to store this compressed block
+	 *         considering class overhead.
+	 */
+	public long estimateCompressedSizeInMemory() {
+		if (!isCompressed())
+			return 0;
+		// basic data inherited from MatrixBlock
+		long total = MatrixBlock.estimateSizeInMemory(0, 0, 0);
+		// adding the size of colGroups ArrayList overhead
+		// object overhead (32B) + int size (4B) + int modCount (4B) + Object[]
+		// elementData overhead + reference (32+8)B +reference ofr each Object (8B)
+		total += 80 + 8 * _colGroups.size();
+		for (ColGroup grp : _colGroups)
+			total += grp.estimateInMemorySize();
+		return total;
+	}
+
+	private class CompressedColumn implements Comparable<CompressedColumn> {
+		int colIx;
+		double compRatio;
+
+		public CompressedColumn(int colIx, double compRatio) {
+			this.colIx = colIx;
+			this.compRatio = compRatio;
+		}
+
+		@Override
+		public int compareTo(CompressedColumn o) {
+			return (int) Math.signum(compRatio - o.compRatio);
+		}
+	}
+	
+	public static class CompressionStatistics {
+		public double timePhase1 = -1;
+		public double timePhase2 = -1;
+		public double timePhase3 = -1;
+		public double timePhase4 = -1;
+		public double estSize = -1;
+		
+		public CompressionStatistics() {
+			//do nothing
+		}
+		
+		public CompressionStatistics(double t1, double t2, double t3, double t4){
+			timePhase1 = t1;
+			timePhase2 = t2;
+			timePhase3 = t3;
+			timePhase4 = t4;
+		}
+	} 
+
+	
+	//////////////////////////////////////////
+	// Serialization / Deserialization
+
+	@Override
+	public long getExactSizeOnDisk() 
+	{
+		//header information
+		long ret = 12;
+		
+		for( ColGroup grp : _colGroups ) {
+			ret += 1; //type info
+			ret += grp.getExactSizeOnDisk();
+		}
+		
+		return ret;
+	}
+	
+	@Override
+	public void readFields(DataInput in) 
+		throws IOException 
+	{
+		boolean compressed = in.readBoolean();
+		
+		//deserialize uncompressed block
+		if( !compressed ) {
+			super.readFields(in);
+			return;
+		}
+		
+		//deserialize compressed block
+		rlen = in.readInt();
+		clen = in.readInt();
+		nonZeros = in.readLong();
+		int ncolGroups = in.readInt();
+		
+		_colGroups = new ArrayList<ColGroup>(ncolGroups);
+		for( int i=0; i<ncolGroups; i++ ) 
+		{
+			CompressionType ctype = CompressionType.values()[in.readByte()];
+			ColGroup grp = null;
+			
+			//create instance of column group
+			switch( ctype ) {
+				case UNCOMPRESSED:
+					grp = new ColGroupUncompressed(); break;
+				case OLE_BITMAP:
+					grp = new ColGroupOLE(); break;
+				case RLE_BITMAP:
+					grp = new ColGroupRLE(); break;
+			}
+			
+			//deserialize and add column group
+			grp.readFields(in);
+			_colGroups.add(grp);
+		}
+	}
+	
+	@Override
+	public void write(DataOutput out) 
+		throws IOException 
+	{
+		out.writeBoolean( isCompressed() );
+		
+		//serialize uncompressed block
+		if( !isCompressed() ) {
+			super.write(out);
+			return;
+		}
+		
+		//serialize compressed matrix block
+		out.writeInt(rlen);
+		out.writeInt(clen);
+		out.writeLong(nonZeros);
+		out.writeInt(_colGroups.size());
+		
+		for( ColGroup grp : _colGroups ) {
+			out.writeByte( grp.getCompType().ordinal() );
+			grp.write(out); //delegate serialization
+		}
+	}
+	
+	
+	/**
+	 * Redirects the default java serialization via externalizable to our default 
+	 * hadoop writable serialization for efficient broadcast/rdd deserialization. 
+	 * 
+	 * @param is
+	 * @throws IOException
+	 */
+	@Override
+	public void readExternal(ObjectInput is) 
+		throws IOException
+	{
+		readFields(is);
+	}
+	
+	/**
+	 * Redirects the default java serialization via externalizable to our default 
+	 * hadoop writable serialization for efficient broadcast/rdd serialization. 
+	 * 
+	 * @param is
+	 * @throws IOException
+	 */
+	@Override
+	public void writeExternal(ObjectOutput os) 
+		throws IOException
+	{
+		write(os);	
+	}
+	
+	
+	//////////////////////////////////////////
+	// Operations (overwrite existing ops for seamless integration)
+
+	@Override
+	public MatrixValue scalarOperations(ScalarOperator sop, MatrixValue result) 
+		throws DMLRuntimeException
+	{
+		//call uncompressed matrix scalar if necessary
+		if( !isCompressed() ) {
+			return super.scalarOperations(sop, result);
+		}
+		
+		//allocate the output matrix block
+		CompressedMatrixBlock ret = null;
+		if( result==null || !(result instanceof CompressedMatrixBlock) )
+			ret = new CompressedMatrixBlock(getNumRows(), getNumColumns(), sparse);
+		else {
+			ret = (CompressedMatrixBlock) result;
+			ret.reset(rlen, clen);
+		}
+		
+		// Apply the operation recursively to each of the column groups.
+		// Most implementations will only modify metadata.
+		ArrayList<ColGroup> newColGroups = new ArrayList<ColGroup>();
+		for (ColGroup grp : _colGroups) {
+			newColGroups.add(grp.scalarOperation(sop));
+		}
+		ret._colGroups = newColGroups;
+		ret.setNonZeros(rlen*clen);
+		
+		return ret;
+	}
+
+	@Override
+	public MatrixBlock appendOperations(MatrixBlock that, MatrixBlock ret) 
+		throws DMLRuntimeException
+	{
+		//call uncompressed matrix append if necessary
+		if( !isCompressed() ) {
+			if( that instanceof CompressedMatrixBlock )
+				that = ((CompressedMatrixBlock) that).decompress();
+			return super.appendOperations(that, ret);
+		}
+		
+		final int m = rlen;
+		final int n = clen+that.getNumColumns();
+		final long nnz = nonZeros+that.getNonZeros();		
+		
+		//init result matrix 
+		CompressedMatrixBlock ret2 = null;
+		if( ret == null || !(ret instanceof CompressedMatrixBlock) ) {
+			ret2 = new CompressedMatrixBlock(m, n, isInSparseFormat());
+		}
+		else {
+			ret2 = (CompressedMatrixBlock) ret;
+			ret2.reset(m, n);
+		}
+			
+		//shallow copy of lhs column groups
+		ret2.allocateColGroupList();
+		ret2._colGroups.addAll(_colGroups);
+		
+		//copy of rhs column groups w/ col index shifting
+		if( !(that instanceof CompressedMatrixBlock) ) {
+			that = new CompressedMatrixBlock(that);
+			((CompressedMatrixBlock)that).compress();
+		}
+		ArrayList<ColGroup> inColGroups = ((CompressedMatrixBlock) that)._colGroups;
+		for( ColGroup group : inColGroups ) {
+			ColGroup tmp = ConverterUtils.copyColGroup(group);
+			tmp.shiftColIndices(clen);
+			ret2._colGroups.add(tmp);
+		}
+		
+		//meta data maintenance
+		ret2.setNonZeros(nnz);		
+		return ret2;
+	}
+	
+	@Override
+	public MatrixBlock chainMatrixMultOperations(MatrixBlock v, MatrixBlock w, MatrixBlock out, ChainType ctype) 
+		throws DMLRuntimeException 
+	{
+		//call uncompressed matrix mult if necessary
+		if( !isCompressed() ) {
+			return super.chainMatrixMultOperations(v, w, out, ctype);
+		}
+		
+		//single-threaded mmchain of single uncompressed colgroup
+		if( isSingleUncompressedGroup() ){
+			return ((ColGroupUncompressed)_colGroups.get(0))
+				.getData().chainMatrixMultOperations(v, w, out, ctype);
+		}
+		
+		//Timing time = new Timing(true);
+		
+		//prepare result
+		if( out != null )
+			out.reset(clen, 1, false);
+		else 
+			out = new MatrixBlock(clen, 1, false);
+		
+		//empty block handling
+		if( isEmptyBlock(false) ) 
+			return out;
+			
+		//compute matrix mult
+		MatrixBlock tmp = new MatrixBlock(rlen, 1, false);
+		rightMultByVector(v, tmp);
+		if( ctype == ChainType.XtwXv ) {
+			BinaryOperator bop = new BinaryOperator(Multiply.getMultiplyFnObject());
+			LibMatrixBincell.bincellOpInPlace(tmp, w, bop);
+		}
+		leftMultByVectorTranspose(_colGroups, tmp, out, true);
+		
+		//System.out.println("Compressed MMChain in "+time.stop());
+		
+		return out;
+	}
+
+	@Override
+	public MatrixBlock chainMatrixMultOperations(MatrixBlock v, MatrixBlock w, MatrixBlock out, ChainType ctype, int k) 
+		throws DMLRuntimeException 
+	{
+		//call uncompressed matrix mult if necessary
+		if( !isCompressed() ){
+			return super.chainMatrixMultOperations(v, w, out, ctype, k);
+		}
+		
+		//multi-threaded mmchain of single uncompressed colgroup
+		if( isSingleUncompressedGroup() ){
+			return ((ColGroupUncompressed)_colGroups.get(0))
+				.getData().chainMatrixMultOperations(v, w, out, ctype, k);
+		}
+
+		Timing time = LOG.isDebugEnabled() ? new Timing(true) : null;
+		
+		//prepare result
+		if( out != null )
+			out.reset(clen, 1, false);
+		else 
+			out = new MatrixBlock(clen, 1, false);
+		
+		//empty block handling
+		if( isEmptyBlock(false) ) 
+			return out;
+		
+		//compute matrix mult
+		MatrixBlock tmp = new MatrixBlock(rlen, 1, false);
+		rightMultByVector(v, tmp, k);
+		if( ctype == ChainType.XtwXv ) {
+			BinaryOperator bop = new BinaryOperator(Multiply.getMultiplyFnObject());
+			LibMatrixBincell.bincellOpInPlace(tmp, w, bop);
+		}
+		leftMultByVectorTranspose(_colGroups, tmp, out, true, k);
+		
+		if( LOG.isDebugEnabled() )
+			LOG.debug("Compressed MMChain k="+k+" in "+time.stop());
+		
+		return out;
+	}
+	
+	@Override
+	public MatrixValue aggregateBinaryOperations(MatrixValue mv1, MatrixValue mv2, MatrixValue result, AggregateBinaryOperator op)
+			throws DMLRuntimeException 
+	{
+		//call uncompressed matrix mult if necessary
+		if( !isCompressed() ) {
+			return super.aggregateBinaryOperations(mv1, mv2, result, op);
+		}
+	
+		//multi-threaded mm of single uncompressed colgroup
+		if( isSingleUncompressedGroup() ){
+			MatrixBlock tmp = ((ColGroupUncompressed)_colGroups.get(0)).getData();
+			return tmp.aggregateBinaryOperations(this==mv1?tmp:mv1, this==mv2?tmp:mv2, result, op);
+		}
+		
+		Timing time = LOG.isDebugEnabled() ? new Timing(true) : null;
+		
+		//setup meta data (dimensions, sparsity)
+		int rl = mv1.getNumRows();
+		int cl = mv2.getNumColumns();
+		
+		//create output matrix block
+		MatrixBlock ret = (MatrixBlock) result;
+		if( ret==null )
+			ret = new MatrixBlock(rl, cl, false, rl*cl);
+		else
+			ret.reset(rl, cl, false, rl*cl);
+		
+		//compute matrix mult
+		if( mv1.getNumRows()>1 && mv2.getNumColumns()==1 ) { //MV right
+			CompressedMatrixBlock cmb = (CompressedMatrixBlock)mv1;
+			MatrixBlock mb = (MatrixBlock) mv2;
+			if( op.getNumThreads()>1 )
+				cmb.rightMultByVector(mb, ret, op.getNumThreads());
+			else
+				cmb.rightMultByVector(mb, ret);
+		}
+		else if( mv1.getNumRows()==1 && mv2.getNumColumns()>1 ) { //MV left
+			MatrixBlock mb = (MatrixBlock) mv1;
+			if( op.getNumThreads()>1 )
+				leftMultByVectorTranspose(_colGroups, mb, ret, false, op.getNumThreads());
+			else
+				leftMultByVectorTranspose(_colGroups, mb, ret, false);
+		}
+		else {
+			//NOTE: we could decompress and invoke super.aggregateBinary but for now
+			//we want to have an eager fail if this happens
+			throw new DMLRuntimeException("Unsupported matrix-matrix multiplication over compressed matrix block.");
+		}
+		
+		if( LOG.isDebugEnabled() )
+			LOG.debug("Compressed MM in "+time.stop());
+		
+		return ret;
+	}
+	
+	@Override
+	public MatrixValue aggregateUnaryOperations(AggregateUnaryOperator op, MatrixValue result, 
+			int blockingFactorRow, int blockingFactorCol, MatrixIndexes indexesIn, boolean inCP) 
+		throws DMLRuntimeException
+	{
+		//call uncompressed matrix mult if necessary
+		if( !isCompressed() ) {
+			return super.aggregateUnaryOperations(op, result, blockingFactorRow, blockingFactorCol, indexesIn, inCP);
+		}
+		
+		//check for supported operations
+		if( !(op.aggOp.increOp.fn instanceof KahanPlus || op.aggOp.increOp.fn instanceof KahanPlusSq) ){
+			throw new DMLRuntimeException("Unary aggregates other than sums not supported yet.");
+		}
+		
+		//prepare output dimensions
+		CellIndex tempCellIndex = new CellIndex(-1,-1);
+		op.indexFn.computeDimension(rlen, clen, tempCellIndex);
+		if(op.aggOp.correctionExists) {
+			switch(op.aggOp.correctionLocation)
+			{
+				case LASTROW: tempCellIndex.row++;  break;
+				case LASTCOLUMN: tempCellIndex.column++; break;
+				case LASTTWOROWS: tempCellIndex.row+=2; break;
+				case LASTTWOCOLUMNS: tempCellIndex.column+=2; break;
+				default:
+					throw new DMLRuntimeException("unrecognized correctionLocation: "+op.aggOp.correctionLocation);	
+			}
+		}
+		
+		//prepare output
+		if(result==null)
+			result=new MatrixBlock(tempCellIndex.row, tempCellIndex.column, false);
+		else
+			result.reset(tempCellIndex.row, tempCellIndex.column, false);
+		
+		MatrixBlock ret = (MatrixBlock) result;
+		
+		//core unary aggregate
+		if(    op.getNumThreads() > 1 
+			&& getExactSizeOnDisk() > MIN_PAR_AGG_THRESHOLD ) 
+		{
+			// initialize and allocate the result
+			ret.allocateDenseBlock();
+			
+			//multi-threaded execution of all groups 
+			ArrayList<ColGroup>[] grpParts = createStaticTaskPartitioning(op.getNumThreads(), false);
+			ColGroupUncompressed uc = getUncompressedColGroup();
+			try {
+				//compute all compressed column groups
+				ExecutorService pool = Executors.newFixedThreadPool( op.getNumThreads() );
+				ArrayList<UnaryAggregateTask> tasks = new ArrayList<UnaryAggregateTask>();
+				for( ArrayList<ColGroup> grp : grpParts )
+					tasks.add(new UnaryAggregateTask(grp, ret, op));
+				pool.invokeAll(tasks);	
+				pool.shutdown();
+				//compute uncompressed column group in parallel (otherwise bottleneck)
+				if( uc != null )
+					 ret = (MatrixBlock)uc.getData().aggregateUnaryOperations(op, ret, blockingFactorRow, blockingFactorCol, indexesIn, false);					
+				//aggregate partial results
+				if( !(op.indexFn instanceof ReduceRow) ){
+					KahanObject kbuff = new KahanObject(0,0);
+					KahanPlus kplus = KahanPlus.getKahanPlusFnObject();
+					for( int i=0; i<ret.getNumRows(); i++ ) {
+						kbuff.set(ret.quickGetValue(i, 0), ret.quickGetValue(i, 0));
+						for( UnaryAggregateTask task : tasks )
+							kplus.execute2(kbuff, task.getResult().quickGetValue(i, 0));
+						ret.quickSetValue(i, 0, kbuff._sum);
+						ret.quickSetValue(i, 1, kbuff._correction);
+					}
+				}		
+			}
+			catch(Exception ex) {
+				throw new DMLRuntimeException(ex);
+			}
+		}
+		else {
+			for (ColGroup grp : _colGroups) {
+				grp.unaryAggregateOperations(op, ret);
+			}
+		}
+		
+		//drop correction if necessary
+		if(op.aggOp.correctionExists && inCP)
+			ret.dropLastRowsOrColums(op.aggOp.correctionLocation);
+	
+		//post-processing
+		ret.recomputeNonZeros();
+		
+		return ret;
+	}
+	
+	@Override
+	public MatrixBlock transposeSelfMatrixMultOperations(MatrixBlock out, MMTSJType tstype) 
+		throws DMLRuntimeException 
+	{
+		//call uncompressed matrix mult if necessary
+		if( !isCompressed() ) {
+			return super.transposeSelfMatrixMultOperations(out, tstype);
+		}
+				
+		//single-threaded tsmm of single uncompressed colgroup
+		if( isSingleUncompressedGroup() ){
+			return ((ColGroupUncompressed)_colGroups.get(0))
+				.getData().transposeSelfMatrixMultOperations(out, tstype);
+		}
+
+		Timing time = LOG.isDebugEnabled() ? new Timing(true) : null;
+
+		//check for transpose type
+		if( tstype != MMTSJType.LEFT ) //right not supported yet
+			throw new DMLRuntimeException("Invalid MMTSJ type '"+tstype.toString()+"'.");
+		
+		//create output matrix block
+		if( out == null )
+			out = new MatrixBlock(clen, clen, false);
+		else
+			out.reset(clen, clen, false);
+		out.allocateDenseBlock();
+		
+		if( !isEmptyBlock(false) ) {
+			//compute matrix mult
+			leftMultByTransposeSelf(_colGroups, out, 0, _colGroups.size());
+			
+			// post-processing
+			out.recomputeNonZeros();
+		}
+		
+		if( LOG.isDebugEnabled() )
+			LOG.debug("Compressed TSMM in "+time.stop());
+		
+		return out;
+	}
+
+	
+	@Override
+	public MatrixBlock transposeSelfMatrixMultOperations(MatrixBlock out, MMTSJType tstype, int k) 
+		throws DMLRuntimeException 
+	{
+		//call uncompressed matrix mult if necessary
+		if( !isCompressed() ){
+			return super.transposeSelfMatrixMultOperations(out, tstype, k);
+		}
+		
+		//multi-threaded tsmm of single uncompressed colgroup
+		if( isSingleUncompressedGroup() ){
+			return ((ColGroupUncompressed)_colGroups.get(0))
+				.getData().transposeSelfMatrixMultOperations(out, tstype, k);
+		}
+		
+		Timing time = LOG.isDebugEnabled() ? new Timing(true) : null;
+		
+		//check for transpose type
+		if( tstype != MMTSJType.LEFT ) //right not supported yet
+			throw new DMLRuntimeException("Invalid MMTSJ type '"+tstype.toString()+"'.");
+		
+		//create output matrix block
+		if( out == null )
+			out = new MatrixBlock(clen, clen, false);
+		else
+			out.reset(clen, clen, false);
+		out.allocateDenseBlock();
+		
+		if( !isEmptyBlock(false) ) {
+			//compute matrix mult
+			try {
+				ExecutorService pool = Executors.newFixedThreadPool( k );
+				ArrayList<MatrixMultTransposeTask> tasks = new ArrayList<MatrixMultTransposeTask>();
+				int blklen = (int)(Math.ceil((double)clen/(2*k)));
+				for( int i=0; i<2*k & i*blklen<clen; i++ )
+					tasks.add(new MatrixMultTransposeTask(_colGroups, out, i*blklen, Math.min((i+1)*blklen, clen)));
+				List<Future<Object>> ret = pool.invokeAll(tasks);
+				for( Future<Object> tret : ret )
+					tret.get(); //check for errors
+				pool.shutdown();
+			}
+			catch(Exception ex) {
+				throw new DMLRuntimeException(ex);
+			}
+			
+			// post-processing
+			out.recomputeNonZeros();
+		}
+		
+		if( LOG.isDebugEnabled() )
+			LOG.debug("Compressed TSMM k="+k+" in "+time.stop());
+		
+		return out;
+	}
+
+	
+	/**
+	 * Multiply this matrix block by a column vector on the right.
+	 * 
+	 * @param vector
+	 *            right-hand operand of the multiplication
+	 * @param result
+	 *            buffer to hold the result; must have the appropriate size
+	 *            already
+	 */
+	private void rightMultByVector(MatrixBlock vector, MatrixBlock result)
+		throws DMLRuntimeException 
+	{
+		// initialize and allocate the result
+		result.allocateDenseBlock();
+
+		// delegate matrix-vector operation to each column group
+		for( ColGroup grp : _colGroups )
+			if( grp instanceof ColGroupUncompressed ) //overwrites output
+				grp.rightMultByVector(vector, result, 0, result.getNumRows());
+		for( ColGroup grp : _colGroups )
+			if( !(grp instanceof ColGroupUncompressed) ) //adds to output
+				grp.rightMultByVector(vector, result, 0, result.getNumRows());
+		
+		// post-processing
+		result.recomputeNonZeros();
+	}
+
+	/**
+	 * Multi-threaded version of rightMultByVector.
+	 * 
+	 * @param vector
+	 * @param result
+	 * @param k
+	 * @throws DMLRuntimeException
+	 */
+	private void rightMultByVector(MatrixBlock vector, MatrixBlock result, int k)
+		throws DMLRuntimeException 
+	{
+		// initialize and allocate the result
+		result.allocateDenseBlock();
+
+		//multi-threaded execution of all groups
+		try {
+			ExecutorService pool = Executors.newFixedThreadPool( k );
+			int rlen = getNumRows();
+			int seqsz = BitmapEncoder.BITMAP_BLOCK_SZ;
+			int blklen = (int)(Math.ceil((double)rlen/k));
+			blklen += (blklen%seqsz != 0)?seqsz-blklen%seqsz:0;
+			ArrayList<RightMatrixMultTask> tasks = new ArrayList<RightMatrixMultTask>();
+			for( int i=0; i<k & i*blklen<getNumRows(); i++ )
+				tasks.add(new RightMatrixMultTask(_colGroups, vector, result, i*blklen, Math.min((i+1)*blklen,rlen)));
+			pool.invokeAll(tasks);	
+			pool.shutdown();
+		}
+		catch(Exception ex) {
+			throw new DMLRuntimeException(ex);
+		}
+		
+		// post-processing
+		result.recomputeNonZeros();
+	}
+	
+	/**
+	 * Multiply this matrix block by the transpose of a column vector (i.e.
+	 * t(v)%*%X)
+	 * 
+	 * @param vector
+	 *            left-hand operand of the multiplication
+	 * @param result
+	 *            buffer to hold the result; must have the appropriate size
+	 *            already
+	 */
+	private static void leftMultByVectorTranspose(List<ColGroup> colGroups, MatrixBlock vector, MatrixBlock result, boolean doTranspose) 
+		throws DMLRuntimeException 
+	{
+		//transpose vector if required
+		MatrixBlock rowVector = vector;
+		if (doTranspose) {
+			rowVector = new MatrixBlock(1, vector.getNumRows(), false);
+			LibMatrixReorg.transpose(vector, rowVector);
+		}
+		
+		// initialize and allocate the result
+		result.reset();
+		result.allocateDenseBlock();
+		
+		// delegate matrix-vector operation to each column group
+		for (ColGroup grp : colGroups) {			
+			grp.leftMultByRowVector(rowVector, result);
+		}
+
+		// post-processing
+		result.recomputeNonZeros();
+	}
+	
+	/**
+	 * Multi-thread version of leftMultByVectorTranspose.
+	 * 
+	 * @param vector
+	 * @param result
+	 * @param doTranspose
+	 * @param k
+	 * @throws DMLRuntimeException
+	 */
+	private static void leftMultByVectorTranspose(List<ColGroup> colGroups,MatrixBlock vector, MatrixBlock result, boolean doTranspose, int k) 
+		throws DMLRuntimeException 
+	{
+		int kuc = Math.max(1, k - colGroups.size() + 1);
+		
+		//transpose vector if required
+		MatrixBlock rowVector = vector;
+		if (doTranspose) {
+			rowVector = new MatrixBlock(1, vector.getNumRows(), false);
+			LibMatrixReorg.transpose(vector, rowVector);
+		}
+		
+		// initialize and allocate the result
+		result.reset();
+		result.allocateDenseBlock();
+
+		//multi-threaded execution
+		try {
+			ExecutorService pool = Executors.newFixedThreadPool( Math.min(colGroups.size(), k) );
+			ArrayList<LeftMatrixMultTask> tasks = new ArrayList<LeftMatrixMultTask>();
+			for( ColGroup grp : colGroups )
+				tasks.add(new LeftMatrixMultTask(grp, rowVector, result, kuc));
+			pool.invokeAll(tasks);	
+			pool.shutdown();
+		}
+		catch(Exception ex) {
+			throw new DMLRuntimeException(ex);
+		}
+
+		// post-processing
+		result.recomputeNonZeros();
+	}
+
+	/**
+	 * 
+	 * @param result
+	 * @throws DMLRuntimeException
+	 */
+	private static void leftMultByTransposeSelf(ArrayList<ColGroup> groups, MatrixBlock result, int cl, int cu)
+		throws DMLRuntimeException 
+	{
+		final int numRows = groups.get(0).getNumRows();
+		final int numGroups = groups.size();		
+		
+		//preallocated dense matrix block
+		MatrixBlock lhs = new MatrixBlock(numRows, 1, false);
+		lhs.allocateDenseBlock();
+		
+		//approach: for each colgroup, extract uncompressed columns one at-a-time
+		//vector-matrix multiplies against remaining col groups
+		for( int i=cl; i<cu; i++ ) 
+		{
+			//get current group and relevant col groups
+			ColGroup group = groups.get(i);	
+			int[] ixgroup = group.getColIndices();
+			List<ColGroup> tmpList = groups.subList(i, numGroups);
+			
+			//for all uncompressed lhs columns vectors
+			for( int j=0; j<ixgroup.length; j++ ) {
+				//decompress single column
+				lhs.reset(numRows, 1, false);				
+				group.decompressToBlock(lhs, j);
+				
+				if( !lhs.isEmptyBlock(false) ) {
+					//compute vector-matrix partial result
+					MatrixBlock tmpret = new MatrixBlock(1,result.getNumColumns(),false);
+					leftMultByVectorTranspose(tmpList, lhs, tmpret, true);								
+					
+					//write partial results (disjoint non-zeros)
+					LinearAlgebraUtils.copyNonZerosToRowCol(result, tmpret, ixgroup[j]);	
+				}
+			}
+		}
+	}
+	
+	/**
+	 * 
+	 * @param k
+	 * @return
+	 */
+	@SuppressWarnings("unchecked")
+	private ArrayList<ColGroup>[] createStaticTaskPartitioning(int k, boolean inclUncompressed)
+	{
+		// special case: single uncompressed col group
+		if( _colGroups.size()==1 && _colGroups.get(0) instanceof ColGroupUncompressed ){
+			return new ArrayList[0];
+		}
+		
+		// initialize round robin col group distribution
+		// (static task partitioning to reduce mem requirements/final agg)
+		int numTasks = Math.min(k, _colGroups.size());
+		ArrayList<ColGroup>[] grpParts = new ArrayList[numTasks];
+		int pos = 0;
+		for( ColGroup grp : _colGroups ){
+			if( grpParts[pos]==null )
+				grpParts[pos] = new ArrayList<ColGroup>();
+			if( inclUncompressed || !(grp instanceof ColGroupUncompressed) ) {
+				grpParts[pos].add(grp);
+				pos = (pos==numTasks-1) ? 0 : pos+1;
+			}
+		}
+		
+		return grpParts;
+	}
+	
+	/**
+	 * 
+	 * @return
+	 */
+	private ColGroupUncompressed getUncompressedColGroup()
+	{
+		for( ColGroup grp : _colGroups )
+			if( grp instanceof ColGroupUncompressed ) 
+				return (ColGroupUncompressed)grp;
+		
+		return null;
+	}
+	
+	/**
+	 * 
+	 */
+	private static class LeftMatrixMultTask implements Callable<Object> 
+	{
+		private ColGroup _group = null;
+		private MatrixBlock _vect = null;
+		private MatrixBlock _ret = null;
+		private int _kuc = 1;
+		
+		protected LeftMatrixMultTask( ColGroup group, MatrixBlock vect, MatrixBlock ret, int kuc)  {
+			_group = group;
+			_vect = vect;
+			_ret = ret;
+			_kuc = kuc;
+		}
+		
+		@Override
+		public Object call() throws DMLRuntimeException 
+		{
+			// delegate matrix-vector operation to each column group
+			if( _group instanceof ColGroupUncompressed && _kuc >1 && ColGroupBitmap.LOW_LEVEL_OPT )
+				((ColGroupUncompressed)_group).leftMultByRowVector(_vect, _ret, _kuc);
+			else
+				_group.leftMultByRowVector(_vect, _ret);
+			return null;
+		}
+	}
+	
+	/**
+	 * 
+	 */
+	private static class RightMatrixMultTask implements Callable<Object> 
+	{
+		private ArrayList<ColGroup> _groups = null;
+		private MatrixBlock _vect = null;
+		private MatrixBlock _ret = null;
+		private int _rl = -1;
+		private int _ru = -1;
+		
+		protected RightMatrixMultTask( ArrayList<ColGroup> groups, MatrixBlock vect, MatrixBlock ret, int rl, int ru)  {
+			_groups = groups;
+			_vect = vect;
+			_ret = ret;
+			_rl = rl;
+			_ru = ru;
+		}
+		
+		@Override
+		public Object call() throws DMLRuntimeException 
+		{
+			// delegate vector-matrix operation to each column group
+			for( ColGroup grp : _groups )
+				if( grp instanceof ColGroupUncompressed ) //overwrites output
+					grp.rightMultByVector(_vect, _ret, _rl, _ru);
+			for( ColGroup grp : _groups )
+				if( !(grp instanceof ColGroupUncompressed) ) //adds to output
+					grp.rightMultByVector(_vect, _ret, _rl, _ru);
+			return null;
+		}
+	}
+	
+	private static class MatrixMultTransposeTask implements Callable<Object> 
+	{
+		private ArrayList<ColGroup> _groups = null;
+		private MatrixBlock _ret = null;
+		private int _cl = -1;
+		private int _cu = -1;
+		
+		protected MatrixMultTransposeTask(ArrayList<ColGroup> groups, MatrixBlock ret, int cl, int cu)  {
+			_groups = groups;
+			_ret = ret;
+			_cl = cl;
+			_cu = cu;
+		}
+		
+		@Override
+		public Object call() throws DMLRuntimeException {
+			leftMultByTransposeSelf(_groups, _ret, _cl, _cu);
+			return null;
+		}
+	}
+	
+	private static class UnaryAggregateTask implements Callable<Object> 
+	{
+		private ArrayList<ColGroup> _groups = null;
+		private MatrixBlock _ret = null;
+		private AggregateUnaryOperator _op = null;
+		
+		protected UnaryAggregateTask( ArrayList<ColGroup> groups, MatrixBlock ret, AggregateUnaryOperator op)  {
+			_groups = groups;
+			_op = op;
+			
+			if( !(_op.indexFn instanceof ReduceRow) ) { //sum/rowSums
+				_ret = new MatrixBlock(ret.getNumRows(), ret.getNumColumns(), false);
+				_ret.allocateDenseBlock();
+			}
+			else { //colSums
+				_ret = ret;
+			}
+		}
+		
+		@Override
+		public Object call() throws DMLRuntimeException 
+		{
+			// delegate vector-matrix operation to each column group
+			for( ColGroup grp : _groups )
+				grp.unaryAggregateOperations(_op, _ret);
+			return null;
+		}
+		
+		public MatrixBlock getResult(){
+			return _ret;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/16e7b1c8/src/main/java/org/apache/sysml/runtime/compress/PlanningBinPacker.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/PlanningBinPacker.java b/src/main/java/org/apache/sysml/runtime/compress/PlanningBinPacker.java
new file mode 100644
index 0000000..a76223c
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/compress/PlanningBinPacker.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.compress;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.commons.math3.random.RandomDataGenerator;
+
+/**
+ * Used for the finding columns to co-code
+ * 
+ */
+public class PlanningBinPacker 
+{
+	private final float _binWeight;
+	private final List<Integer> _items;
+	private final List<Float> _itemWeights;
+
+	public PlanningBinPacker(float binWeight, List<Integer> items, List<Float> itemWeights) {
+		_binWeight = binWeight;
+		_items = items;
+		_itemWeights = itemWeights;
+	}
+
+	/**
+	 * NOTE: upper bound is 17/10 OPT
+	 * 
+	 * @return key: available space, value: list of the bins that have that free space
+	 */
+	public TreeMap<Float, List<List<Integer>>> packFirstFit() {
+		return packFirstFit(_items, _itemWeights);
+	}
+
+	/**
+	 * shuffling the items to make some potential for having bins of different
+	 * sizes when consecutive columns are of close cardinalities
+	 * 
+	 * @return key: available space, value: list of the bins that have that free
+	 *         space
+	 */
+	public TreeMap<Float, List<List<Integer>>> packFirstFitShuffled() {
+		RandomDataGenerator rnd = new RandomDataGenerator();
+		int[] permutation = rnd.nextPermutation(_items.size(), _items.size());
+		List<Integer> shuffledItems = new ArrayList<Integer>(_items.size());
+		List<Float> shuffledWeights = new ArrayList<Float>(_items.size());
+		for (int ix : permutation) {
+			shuffledItems.add(_items.get(ix));
+			shuffledWeights.add(_itemWeights.get(ix));
+		}
+
+		return packFirstFit(shuffledItems, shuffledWeights);
+	}
+
+	/**
+	 * 
+	 * @param items
+	 * @param itemWeights
+	 * @return
+	 */
+	private TreeMap<Float, List<List<Integer>>> packFirstFit(List<Integer> items, List<Float> itemWeights) 
+	{
+		// when searching for a bin, the first bin in the list is used
+		TreeMap<Float, List<List<Integer>>> bins = new TreeMap<Float, List<List<Integer>>>();
+		// first bin
+		bins.put(_binWeight, createBinList());
+		int numItems = items.size();
+		for (int i = 0; i < numItems; i++) {
+			float itemWeight = itemWeights.get(i);
+			Map.Entry<Float, List<List<Integer>>> entry = bins
+					.ceilingEntry(itemWeight);
+			if (entry == null) {
+				// new bin
+				float newBinWeight = _binWeight - itemWeight;
+				List<List<Integer>> binList = bins.get(newBinWeight);
+				if (binList == null) {
+					bins.put(newBinWeight, createBinList(items.get(i)));
+				} else {
+					List<Integer> newBin = new ArrayList<Integer>();
+					newBin.add(items.get(i));
+					binList.add(newBin);
+				}
+			} else {
+				// add to the first bin in the list
+				List<Integer> assignedBin = entry.getValue().remove(0);
+				assignedBin.add(items.get(i));
+				if (entry.getValue().size() == 0)
+					bins.remove(entry.getKey());
+				float newBinWeight = entry.getKey() - itemWeight;
+				List<List<Integer>> newBinsList = bins.get(newBinWeight);
+				if (newBinsList == null) {
+					// new bin
+					bins.put(newBinWeight, createBinList(assignedBin));
+				} else {
+					newBinsList.add(assignedBin);
+				}
+			}
+		}
+		return bins;
+	}
+
+	/**
+	 * NOTE: upper bound is 11/9 OPT + 6/9 (~1.22 OPT)
+	 * 
+	 * @return
+	 */
+	public TreeMap<Float, List<List<Integer>>> packFirstFitDescending() {
+		// sort items descending based on their weights
+		Integer[] indexes = new Integer[_items.size()];
+		for (int i = 0; i < indexes.length; i++)
+			indexes[i] = i;
+		Arrays.sort(indexes, new Comparator<Integer>() {
+
+			@Override
+			public int compare(Integer o1, Integer o2) {
+				return _itemWeights.get(o1).compareTo(_itemWeights.get(o2));
+			}
+		});
+		List<Integer> sortedItems = new ArrayList<Integer>();
+		List<Float> sortedItemWeights = new ArrayList<Float>();
+		for (int i = indexes.length - 1; i >= 0; i--) {
+			sortedItems.add(_items.get(i));
+			sortedItemWeights.add(_itemWeights.get(i));
+		}
+		return packFirstFit(sortedItems, sortedItemWeights);
+	}
+
+	/**
+	 * NOTE: upper bound is 71/60 OPT + 6/9 (~1.18 OPT)
+	 * 
+	 * @return
+	 */
+	public TreeMap<Float, List<List<Integer>>> packModifiedFirstFitDescending() {
+		throw new UnsupportedOperationException("Not implemented yet!");
+	}
+
+	/**
+	 * 
+	 * @return
+	 */
+	private List<List<Integer>> createBinList() {
+		List<List<Integer>> binList = new ArrayList<List<Integer>>();
+		binList.add(new ArrayList<Integer>());
+		return binList;
+	}
+
+	/**
+	 * 
+	 * @param item
+	 * @return
+	 */
+	private List<List<Integer>> createBinList(int item) {
+		List<List<Integer>> binList = new ArrayList<List<Integer>>();
+		List<Integer> bin = new ArrayList<Integer>();
+		binList.add(bin);
+		bin.add(item);
+		return binList;
+	}
+
+	/**
+	 * 
+	 * @param bin
+	 * @return
+	 */
+	private List<List<Integer>> createBinList(List<Integer> bin) {
+		List<List<Integer>> binList = new ArrayList<List<Integer>>();
+		binList.add(bin);
+		return binList;
+	}
+}