You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2017/02/08 02:22:28 UTC

[3/5] incubator-systemml git commit: [SYSTEMML-449] Compressed linear algebra v2

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/cocode/ColumnGroupPartitionerStatic.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/cocode/ColumnGroupPartitionerStatic.java b/src/main/java/org/apache/sysml/runtime/compress/cocode/ColumnGroupPartitionerStatic.java
new file mode 100644
index 0000000..e2f00f3
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/compress/cocode/ColumnGroupPartitionerStatic.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.compress.cocode;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.sysml.runtime.compress.cocode.PlanningCoCoder.GroupableColInfo;
+
+/**
+ * Column group partitioning with static distribution heuristic.
+ * 
+ */
+public class ColumnGroupPartitionerStatic extends ColumnGroupPartitioner
+{
+	private static final int MAX_COL_PER_GROUP = 20;
+
+	@Override
+	public List<List<Integer>> partitionColumns(List<Integer> groupCols, HashMap<Integer, GroupableColInfo> groupColsInfo) 
+	{
+		List<List<Integer>> ret = new ArrayList<List<Integer>>();
+		int numParts = (int)Math.ceil((double)groupCols.size()/MAX_COL_PER_GROUP);
+		int partSize = (int)Math.ceil((double)groupCols.size()/numParts);
+		
+		for( int i=0, pos=0; i<numParts; i++, pos+=partSize ) {
+			List<Integer> tmp = new ArrayList<Integer>();
+			for( int j=0; j<partSize && pos+j<groupCols.size(); j++ )
+				tmp.add(groupCols.get(pos+j));
+			ret.add(tmp);
+		}
+		
+		return ret;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/cocode/PlanningCoCoder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/cocode/PlanningCoCoder.java b/src/main/java/org/apache/sysml/runtime/compress/cocode/PlanningCoCoder.java
new file mode 100644
index 0000000..778f221
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/compress/cocode/PlanningCoCoder.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.compress.cocode;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.compress.estim.CompressedSizeEstimator;
+import org.apache.sysml.runtime.compress.estim.CompressedSizeInfo;
+
+public class PlanningCoCoder 
+{
+	//internal configurations 
+	private final static PartitionerType COLUMN_PARTITIONER = PartitionerType.BIN_PACKING;
+	
+	private static final Log LOG = LogFactory.getLog(PlanningCoCoder.class.getName());
+	
+	public enum PartitionerType {
+		BIN_PACKING,
+		STATIC,
+	}
+	
+	public static List<int[]> findCocodesByPartitioning(CompressedSizeEstimator sizeEstimator, List<Integer> cols, 
+			CompressedSizeInfo[] colInfos, int numRows, int k) 
+		throws DMLRuntimeException 
+	{
+		// filtering out non-groupable columns as singleton groups
+		// weight is the ratio of its cardinality to the number of rows 
+		int numCols = cols.size();
+		List<Integer> groupCols = new ArrayList<Integer>();
+		HashMap<Integer, GroupableColInfo> groupColsInfo = new HashMap<Integer, GroupableColInfo>();
+		for (int i = 0; i < numCols; i++) {
+			int colIx = cols.get(i);
+			double cardinality = colInfos[colIx].getEstCard();
+			double weight = cardinality / numRows;
+			groupCols.add(colIx);
+			groupColsInfo.put(colIx, new GroupableColInfo(weight,colInfos[colIx].getMinSize()));
+		}
+		
+		// use column group partitioner to create partitions of columns
+		List<List<Integer>> bins = createColumnGroupPartitioner(COLUMN_PARTITIONER)
+				.partitionColumns(groupCols, groupColsInfo);
+
+		// brute force grouping within each partition
+		return (k > 1) ?
+				getCocodingGroupsBruteForce(bins, groupColsInfo, sizeEstimator, numRows, k) :
+				getCocodingGroupsBruteForce(bins, groupColsInfo, sizeEstimator, numRows);
+	}
+
+	private static List<int[]> getCocodingGroupsBruteForce(List<List<Integer>> bins, HashMap<Integer, GroupableColInfo> groupColsInfo, CompressedSizeEstimator estim, int rlen) 
+	{
+		List<int[]> retGroups = new ArrayList<int[]>();		
+		for (List<Integer> bin : bins) {
+			// building an array of singleton CoCodingGroup
+			ArrayList<PlanningCoCodingGroup> sgroups = new ArrayList<PlanningCoCodingGroup>();
+			for (Integer col : bin)
+				sgroups.add(new PlanningCoCodingGroup(col, groupColsInfo.get(col)));
+			// brute force co-coding	
+			PlanningCoCodingGroup[] outputGroups = findCocodesBruteForce(
+					estim, rlen, sgroups.toArray(new PlanningCoCodingGroup[0]));
+			for (PlanningCoCodingGroup grp : outputGroups)
+				retGroups.add(grp.getColIndices());
+		}
+		
+		return retGroups;
+	}
+
+	private static List<int[]> getCocodingGroupsBruteForce(List<List<Integer>> bins, HashMap<Integer, GroupableColInfo> groupColsInfo, CompressedSizeEstimator estim, int rlen, int k) 
+		throws DMLRuntimeException 
+	{
+		List<int[]> retGroups = new ArrayList<int[]>();		
+		try {
+			ExecutorService pool = Executors.newFixedThreadPool( k );
+			ArrayList<CocodeTask> tasks = new ArrayList<CocodeTask>();
+			for (List<Integer> bin : bins) {
+				// building an array of singleton CoCodingGroup
+				ArrayList<PlanningCoCodingGroup> sgroups = new ArrayList<PlanningCoCodingGroup>();
+				for (Integer col : bin)
+					sgroups.add(new PlanningCoCodingGroup(col, groupColsInfo.get(col)));
+				tasks.add(new CocodeTask(estim, sgroups, rlen));
+			}
+			List<Future<PlanningCoCodingGroup[]>> rtask = pool.invokeAll(tasks);	
+			for( Future<PlanningCoCodingGroup[]> lrtask : rtask )
+				for (PlanningCoCodingGroup grp : lrtask.get())
+					retGroups.add(grp.getColIndices());
+			pool.shutdown();
+		}
+		catch(Exception ex) {
+			throw new DMLRuntimeException(ex);
+		}
+		
+		return retGroups;
+	}
+
+	/**
+	 * Identify columns to code together. Uses a greedy approach that merges
+	 * pairs of column groups into larger groups. Each phase of the greedy
+	 * algorithm considers all combinations of pairs to merge.
+	 * 
+	 * @param sizeEstimator compressed size estimator
+	 * @param numRowsWeight number of rows weight
+	 * @param singltonGroups planning co-coding groups
+	 * @return
+	 */
+	private static PlanningCoCodingGroup[] findCocodesBruteForce(
+			CompressedSizeEstimator estim, int numRows,
+			PlanningCoCodingGroup[] singletonGroups) 
+	{
+		if( LOG.isTraceEnabled() )
+			LOG.trace("Cocoding: process "+singletonGroups.length);
+		
+		List<PlanningCoCodingGroup> workset = 
+				new ArrayList<PlanningCoCodingGroup>(Arrays.asList(singletonGroups));
+		
+		//establish memo table for extracted column groups
+		PlanningMemoTable memo = new PlanningMemoTable();
+		
+		//process merging iterations until no more change
+		boolean changed = true;
+		while( changed && workset.size()>1 ) {
+			//find best merge, incl memoization
+			PlanningCoCodingGroup tmp = null;
+			for( int i=0; i<workset.size(); i++ ) {
+				for( int j=i+1; j<workset.size(); j++ ) {
+					PlanningCoCodingGroup c1 = workset.get(i);
+					PlanningCoCodingGroup c2 = workset.get(j);
+					memo.incrStats(1, 0, 0);
+					
+					//pruning filter: skip dominated candidates
+					if( -Math.min(c1.getEstSize(), c2.getEstSize()) > memo.getOptChangeInSize() )
+						continue;
+					
+					//memoization or newly created group (incl bitmap extraction)
+					PlanningCoCodingGroup c1c2 = memo.getOrCreate(c1, c2, estim, numRows);
+		
+					//keep best merged group only
+					if( tmp == null || c1c2.getChangeInSize() < tmp.getChangeInSize()
+						|| (c1c2.getChangeInSize() == tmp.getChangeInSize() 
+							&& c1c2.getColIndices().length < tmp.getColIndices().length))
+						tmp = c1c2;
+				}
+			}
+			
+			//modify working set
+			if( tmp != null && tmp.getChangeInSize() < 0 ) {
+				workset.remove(tmp.getLeftGroup());
+				workset.remove(tmp.getRightGroup());
+				workset.add(tmp);
+				memo.remove(tmp);
+				
+				if( LOG.isTraceEnabled() ) {
+					LOG.trace("--merge groups: "+Arrays.toString(tmp.getLeftGroup().getColIndices())+" and "
+							+Arrays.toString(tmp.getRightGroup().getColIndices()));
+				}
+			}
+			else {
+				changed = false;
+			}
+		}
+		
+		if( LOG.isTraceEnabled() )
+			LOG.trace("--stats: "+Arrays.toString(memo.getStats()));
+		
+		return workset.toArray(new PlanningCoCodingGroup[0]);
+	}
+
+	private static ColumnGroupPartitioner createColumnGroupPartitioner(PartitionerType type) {
+		switch( type ) {
+			case BIN_PACKING: 
+				return new ColumnGroupPartitionerBinPacking();
+				
+			case STATIC:
+				return new ColumnGroupPartitionerStatic();
+				
+			default:
+				throw new RuntimeException(
+					"Unsupported column group partitioner: "+type.toString());
+		}
+	}
+	
+	public static class GroupableColInfo {
+		public final double cardRatio;
+		public final long size;
+
+		public GroupableColInfo(double lcardRatio, long lsize) {
+			cardRatio = lcardRatio;
+			size = lsize;
+		}
+	}
+
+	private static class CocodeTask implements Callable<PlanningCoCodingGroup[]> 
+	{
+		private CompressedSizeEstimator _estim = null;
+		private ArrayList<PlanningCoCodingGroup> _sgroups = null;
+		private int _rlen = -1;
+		
+		protected CocodeTask( CompressedSizeEstimator estim, ArrayList<PlanningCoCodingGroup> sgroups, int rlen )  {
+			_estim = estim;
+			_sgroups = sgroups;
+			_rlen = rlen;
+		}
+		
+		@Override
+		public PlanningCoCodingGroup[] call() throws DMLRuntimeException {
+			// brute force co-coding	
+			return findCocodesBruteForce(_estim, _rlen, 
+					_sgroups.toArray(new PlanningCoCodingGroup[0]));
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/cocode/PlanningCoCodingGroup.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/cocode/PlanningCoCodingGroup.java b/src/main/java/org/apache/sysml/runtime/compress/cocode/PlanningCoCodingGroup.java
new file mode 100644
index 0000000..caaa271
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/compress/cocode/PlanningCoCodingGroup.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.compress.cocode;
+
+import java.util.Arrays;
+
+import org.apache.sysml.runtime.compress.cocode.PlanningCoCoder.GroupableColInfo;
+import org.apache.sysml.runtime.compress.estim.CompressedSizeEstimator;
+import org.apache.sysml.runtime.compress.estim.CompressedSizeInfo;
+
+/** 
+ * Class to represent information about co-coding a group of columns. 
+ * 
+ */
+public class PlanningCoCodingGroup 
+{
+	private int[] _colIndexes;
+	private PlanningCoCodingGroup _leftGrp;
+	private PlanningCoCodingGroup _rightGrp;
+	
+	private long _estSize;
+	private double _cardRatio;
+	
+	
+	/**
+	 * Constructor for a one-column group; i.e. do not co-code a given column.
+	 * 
+	 * @param col column
+	 * @param info groupable column info
+	 */
+	public PlanningCoCodingGroup(int col, GroupableColInfo info) {
+		_colIndexes = new int[]{col};
+		_estSize = info.size;
+		_cardRatio = info.cardRatio;
+	}
+
+	/**
+	 * Constructor for merging two disjoint groups of columns
+	 * 
+	 * @param grp1   first group of columns to merge
+	 * @param grp2   second group to merge
+	 * @param bitmapSizeEstimator bitmap size estimator
+	 * @param numRowsWeight numRows x sparsity
+	 */
+	public PlanningCoCodingGroup(PlanningCoCodingGroup grp1, PlanningCoCodingGroup grp2,
+			CompressedSizeEstimator estim, int numRows) 
+	{
+		_colIndexes = getMergedIndexes(grp1._colIndexes, grp2._colIndexes);
+		
+		// estimating size info
+		CompressedSizeInfo groupSizeInfo = estim
+				.estimateCompressedColGroupSize(_colIndexes);
+		_estSize = groupSizeInfo.getMinSize();
+		_cardRatio = groupSizeInfo.getEstCard() / numRows;
+		
+		_leftGrp = grp1;
+		_rightGrp = grp2;
+	}
+
+	public int[] getColIndices() {
+		return _colIndexes;
+	}
+
+	/**
+	 * Obtain estimated compressed size of the grouped columns.
+	 * 
+	 * @return estimated compressed size of the grouped columns
+	 */
+	public long getEstSize() {
+		return _estSize;
+	}
+	
+	public double getChangeInSize() {
+		if( _leftGrp == null || _rightGrp == null )
+			return 0;
+		
+		return getEstSize() 
+			- _leftGrp.getEstSize() 
+			- _rightGrp.getEstSize();
+	}
+
+	public double getCardinalityRatio() {
+		return _cardRatio;
+	}
+	
+	public PlanningCoCodingGroup getLeftGroup() {
+		return _leftGrp;
+	}
+	
+	public PlanningCoCodingGroup getRightGroup() {
+		return _rightGrp;
+	}
+	
+	@Override 
+	public int hashCode() {
+		return Arrays.hashCode(_colIndexes);
+	}
+	
+	@Override 
+	public boolean equals(Object that) {
+		if( !(that instanceof PlanningCoCodingGroup) )
+			return false;
+		
+		PlanningCoCodingGroup thatgrp = (PlanningCoCodingGroup) that;
+		return Arrays.equals(_colIndexes, thatgrp._colIndexes);
+	}
+
+	@Override
+	public String toString() {
+		return Arrays.toString(_colIndexes);
+	}
+	
+	public static int[] getMergedIndexes(int[] indexes1, int[] indexes2) {
+		// merge sorted non-empty arrays
+		int[] ret = new int[indexes1.length + indexes2.length];		
+		int grp1Ptr = 0, grp2Ptr = 0;
+		for (int mergedIx = 0; mergedIx < ret.length; mergedIx++) {
+			if (indexes1[grp1Ptr] < indexes2[grp2Ptr]) {
+				ret[mergedIx] = indexes1[grp1Ptr++];
+				if (grp1Ptr == indexes1.length) {
+					System.arraycopy(indexes2, grp2Ptr, ret, mergedIx + 1, indexes2.length - grp2Ptr);
+					break;
+				}
+			} 
+			else {
+				ret[mergedIx] = indexes2[grp2Ptr++];
+				if (grp2Ptr == indexes2.length) {
+					System.arraycopy(indexes1, grp1Ptr, ret, mergedIx + 1, indexes1.length - grp1Ptr);
+					break;
+				}
+			}
+		}
+		
+		return ret;
+	}
+	
+	public static class ColIndexes {
+		final int[] _colIndexes;
+		
+		public ColIndexes(int[] colIndexes) {
+			_colIndexes = colIndexes;
+		}
+	
+		@Override 
+		public int hashCode() {
+			return Arrays.hashCode(_colIndexes);
+		}
+		
+		@Override 
+		public boolean equals(Object that) {
+			if( !(that instanceof ColIndexes) )
+				return false;
+			
+			ColIndexes thatgrp = (ColIndexes) that;
+			return Arrays.equals(_colIndexes, thatgrp._colIndexes);
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/cocode/PlanningMemoTable.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/cocode/PlanningMemoTable.java b/src/main/java/org/apache/sysml/runtime/compress/cocode/PlanningMemoTable.java
new file mode 100644
index 0000000..3f683c2
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/compress/cocode/PlanningMemoTable.java
@@ -0,0 +1,75 @@
+package org.apache.sysml.runtime.compress.cocode;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.sysml.runtime.compress.cocode.PlanningCoCodingGroup.ColIndexes;
+import org.apache.sysml.runtime.compress.estim.CompressedSizeEstimator;
+
+public class PlanningMemoTable 
+{
+	private HashMap<ColIndexes,PlanningCoCodingGroup> _memo = new HashMap<ColIndexes,PlanningCoCodingGroup>();
+	private double _optChangeInSize = 0; 
+	private int[] _stats = new int[3];
+	
+	public PlanningCoCodingGroup getOrCreate(PlanningCoCodingGroup c1, PlanningCoCodingGroup c2, CompressedSizeEstimator estim, int numRows) 
+	{
+		ColIndexes c1c2Indexes = new ColIndexes(PlanningCoCodingGroup
+				.getMergedIndexes(c1.getColIndices(), c2.getColIndices()));	
+		
+		//probe memo table for existing column group (avoid extraction)
+		PlanningCoCodingGroup c1c2 = _memo.get(c1c2Indexes);
+		
+		//create non-existing group and maintain global stats
+		incrStats(0, 1, 0); //probed plans
+		if( c1c2 == null ) { 
+			c1c2 = new PlanningCoCodingGroup(c1, c2, estim, numRows);
+			_memo.put(c1c2Indexes, c1c2);
+			_optChangeInSize = Math.min(_optChangeInSize, c1c2.getChangeInSize());
+			incrStats(0, 0, 1); //created plans
+		}
+		
+		return c1c2;
+	}
+	
+	public void remove(PlanningCoCodingGroup grp) {
+		//remove atomic groups
+		_memo.remove(new ColIndexes(grp.getColIndices()));
+		_memo.remove(new ColIndexes(grp.getLeftGroup().getColIndices()));
+		_memo.remove(new ColIndexes(grp.getRightGroup().getColIndices()));
+		
+		_optChangeInSize = 0;
+		
+		//remove overlapping groups and recompute min size
+		Iterator<Entry<ColIndexes,PlanningCoCodingGroup>> iter 
+			= _memo.entrySet().iterator();
+		while( iter.hasNext() ) {
+			PlanningCoCodingGroup tmp = iter.next().getValue();
+			if( Arrays.equals(tmp.getLeftGroup().getColIndices(), grp.getLeftGroup().getColIndices())
+				|| Arrays.equals(tmp.getLeftGroup().getColIndices(), grp.getRightGroup().getColIndices())
+				|| Arrays.equals(tmp.getRightGroup().getColIndices(), grp.getLeftGroup().getColIndices())
+				|| Arrays.equals(tmp.getRightGroup().getColIndices(), grp.getRightGroup().getColIndices()))
+			{
+				iter.remove();
+			}
+			else
+				_optChangeInSize = Math.min(_optChangeInSize, tmp.getChangeInSize());
+		}
+	}
+	
+	public void incrStats(int v1, int v2, int v3) {
+		_stats[0] += v1;
+		_stats[1] += v2;
+		_stats[2] += v3;
+	}
+	
+	public double getOptChangeInSize() {
+		return _optChangeInSize;
+	}
+
+	public int[] getStats() {
+		return _stats;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/estim/CompressedSizeEstimator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/estim/CompressedSizeEstimator.java b/src/main/java/org/apache/sysml/runtime/compress/estim/CompressedSizeEstimator.java
index 2b49403..4c470e2 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/estim/CompressedSizeEstimator.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/estim/CompressedSizeEstimator.java
@@ -20,6 +20,7 @@
 package org.apache.sysml.runtime.compress.estim;
 
 import org.apache.sysml.runtime.compress.BitmapEncoder;
+import org.apache.sysml.runtime.compress.CompressedMatrixBlock;
 import org.apache.sysml.runtime.compress.UncompressedBitmap;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 
@@ -29,9 +30,16 @@ import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 public abstract class CompressedSizeEstimator 
 {
 	protected MatrixBlock _data;
+	protected final int _numRows;
 
 	public CompressedSizeEstimator(MatrixBlock data) {
 		_data = data;
+		_numRows = CompressedMatrixBlock.TRANSPOSE_INPUT ? 
+				_data.getNumColumns() : _data.getNumRows();
+	}
+	
+	public int getNumRows() {
+		return _numRows;
 	}
 
 	public abstract CompressedSizeInfo estimateCompressedColGroupSize(int[] colIndexes);
@@ -47,15 +55,19 @@ public abstract class CompressedSizeEstimator
 		
 		//compute size estimation factors
 		for (int i = 0; i < numVals; i++) {
-			int[] list = ubm.getOffsetsList(i);
-			numOffs += list.length;
-			numSegs += list[list.length - 1] / BitmapEncoder.BITMAP_BLOCK_SZ + 1;
-			numSingle += (list.length==1) ? 1 : 0;
+			int[] list = ubm.getOffsetsList(i).extractValues();
+			int listSize = ubm.getNumOffsets(i);
+			numOffs += listSize;
+			numSegs += list[listSize - 1] / BitmapEncoder.BITMAP_BLOCK_SZ + 1;
+			numSingle += (listSize==1) ? 1 : 0;
 			if( inclRLE ) {
 				int lastOff = -2;
-				for (int j = 0; j < list.length; j++) {
-					if (list[j] != lastOff + 1)
-						numRuns++;
+				for (int j = 0; j < listSize; j++) {
+					if( list[j] != lastOff + 1 ) {
+						numRuns++; //new run
+						numRuns += (list[j]-lastOff) / //empty runs
+								BitmapEncoder.BITMAP_BLOCK_SZ;
+					}
 					lastOff = list[j];
 				}
 			}
@@ -107,6 +119,27 @@ public abstract class CompressedSizeEstimator
 		ret += 2 * numSeqs;
 		return ret;
 	}
+	
+	/**
+	 * Estimates the number of bytes needed to encode this column group 
+	 * in DDC1 or DDC2 format.
+	 * 
+	 * @param numVals number of value tuples
+	 * @param numRows number of rows
+	 * @param numCols number of columns
+	 * @return number of bytes to encode column group in RLE format
+	 */
+	protected static long getDDCSize(int numVals, int numRows, int numCols) {
+		if( numVals > Character.MAX_VALUE-1 )
+			return Long.MAX_VALUE;
+		
+		int ret = 0;
+		//distinct value tuples [double per col]
+		ret += 8 * numVals * numCols;
+		//data [byte or char per row]
+		ret += ((numVals>255) ? 2 : 1) * numRows;
+		return ret;
+	}
 
 	protected static class SizeEstimationFactors {
  		protected int numVals;   //num value tuples

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/estim/CompressedSizeEstimatorExact.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/estim/CompressedSizeEstimatorExact.java b/src/main/java/org/apache/sysml/runtime/compress/estim/CompressedSizeEstimatorExact.java
index d24255d..3677c23 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/estim/CompressedSizeEstimatorExact.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/estim/CompressedSizeEstimatorExact.java
@@ -46,8 +46,9 @@ public class CompressedSizeEstimatorExact extends CompressedSizeEstimator
 		SizeEstimationFactors fact = computeSizeEstimationFactors(ubm, true);
 		
 		//construct new size info summary
-		return new CompressedSizeInfo(fact.numVals,
+		return new CompressedSizeInfo(fact.numVals, fact.numOffs,
 				getRLESize(fact.numVals, fact.numRuns, ubm.getNumColumns()),
-				getOLESize(fact.numVals, fact.numOffs, fact.numSegs, ubm.getNumColumns()));
+				getOLESize(fact.numVals, fact.numOffs, fact.numSegs, ubm.getNumColumns()),
+				getDDCSize(fact.numVals, _numRows, ubm.getNumColumns()));
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/estim/CompressedSizeEstimatorSample.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/estim/CompressedSizeEstimatorSample.java b/src/main/java/org/apache/sysml/runtime/compress/estim/CompressedSizeEstimatorSample.java
index eb0040f..a59893d 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/estim/CompressedSizeEstimatorSample.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/estim/CompressedSizeEstimatorSample.java
@@ -21,103 +21,106 @@ package org.apache.sysml.runtime.compress.estim;
 
 import java.util.Arrays;
 import java.util.HashMap;
-import java.util.HashSet;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.math3.analysis.UnivariateFunction;
+import org.apache.commons.math3.analysis.solvers.UnivariateSolverUtils;
 import org.apache.commons.math3.distribution.ChiSquaredDistribution;
 import org.apache.commons.math3.random.RandomDataGenerator;
-import org.apache.sysml.hops.OptimizerUtils;
+import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.compress.BitmapEncoder;
 import org.apache.sysml.runtime.compress.ReaderColumnSelection;
 import org.apache.sysml.runtime.compress.CompressedMatrixBlock;
-import org.apache.sysml.runtime.compress.ReaderColumnSelectionDense;
-import org.apache.sysml.runtime.compress.ReaderColumnSelectionDenseSample;
-import org.apache.sysml.runtime.compress.ReaderColumnSelectionSparse;
 import org.apache.sysml.runtime.compress.UncompressedBitmap;
 import org.apache.sysml.runtime.compress.utils.DblArray;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 
 public class CompressedSizeEstimatorSample extends CompressedSizeEstimator 
 {
-	private static final boolean CORRECT_NONZERO_ESTIMATE = false; //TODO enable for production
 	private final static double SHLOSSER_JACKKNIFE_ALPHA = 0.975;
-	public static final float HAAS_AND_STOKES_ALPHA1 = 0.9F; //0.9 recommended in paper
-	public static final float HAAS_AND_STOKES_ALPHA2 = 30F; //30 recommended in paper
-	public static final float HAAS_AND_STOKES_UJ2A_C = 50; //50 recommend in paper
-
-	private int[] _sampleRows = null;
-	private RandomDataGenerator _rng = null;
-	private int _numRows = -1;
-
-	public CompressedSizeEstimatorSample(MatrixBlock data, int[] sampleRows) {
+	public static final double HAAS_AND_STOKES_ALPHA1 = 0.9; //0.9 recommended in paper
+	public static final double HAAS_AND_STOKES_ALPHA2 = 30; //30 recommended in paper
+	public static final int HAAS_AND_STOKES_UJ2A_C = 50; //50 recommend in paper
+	public static final boolean HAAS_AND_STOKES_UJ2A_CUT2 = true; //cut frequency in half
+	public static final boolean HAAS_AND_STOKES_UJ2A_SOLVE = true; //true recommended
+	public static final int MAX_SOLVE_CACHE_SIZE = 64*1024; //global 2MB cache
+	//note: we use a relatively high ALPHA2 and the cut-in-half approach because it
+	//leads to moderate overestimation (compared to systematic underestimation) in
+	//order to follow a conservative approach
+	
+	private static final Log LOG = LogFactory.getLog(CompressedSizeEstimatorSample.class.getName());
+
+	private static ThreadLocal<RandomDataGenerator> _rng = new ThreadLocal<RandomDataGenerator>() {
+        protected RandomDataGenerator initialValue() { return new RandomDataGenerator(); }
+    };
+    
+    private int[] _sampleRows = null;
+    private HashMap<Integer, Double> _solveCache = null;
+	
+    
+	public CompressedSizeEstimatorSample(MatrixBlock data, int sampleSize) 
+		throws DMLRuntimeException 
+	{
 		super(data);
-		_sampleRows = sampleRows;
-		_rng = new RandomDataGenerator();
-		_numRows = CompressedMatrixBlock.TRANSPOSE_INPUT ? 
-				_data.getNumColumns() : _data.getNumRows();
-	}
-
-	public CompressedSizeEstimatorSample(MatrixBlock mb, int sampleSize) {
-		this(mb, null);
+		
+		//get sample of rows, incl eager extraction 
 		_sampleRows = getSortedUniformSample(_numRows, sampleSize);
-	}
-
-	/**
-	 * set the sample rows (assumed to be sorted)
-	 * 
-	 * @param sampleRows sample rows, assumed to be sorted
-	 */
-	public void setSampleRows(int[] sampleRows) {
-		_sampleRows = sampleRows;
+		if( SizeEstimatorFactory.EXTRACT_SAMPLE_ONCE ) {
+			MatrixBlock select = new MatrixBlock(_numRows, 1, false);
+			for( int i=0; i<sampleSize; i++ )
+				select.quickSetValue(_sampleRows[i], 0, 1);
+			_data = _data.removeEmptyOperations(new MatrixBlock(), 
+					!CompressedMatrixBlock.TRANSPOSE_INPUT, select);
+		}
+		
+		//establish estimator-local cache for numeric solve
+		_solveCache = new HashMap<Integer, Double>();
 	}
 
 	@Override
 	public CompressedSizeInfo estimateCompressedColGroupSize(int[] colIndexes) 
 	{
+		int sampleSize = _sampleRows.length;
+		int numCols = colIndexes.length;
+		int[] sampleRows = _sampleRows;
+		
 		//extract statistics from sample
-		UncompressedBitmap ubm = BitmapEncoder.extractBitmapFromSample(
-				colIndexes, _data, _sampleRows);
+		UncompressedBitmap ubm = SizeEstimatorFactory.EXTRACT_SAMPLE_ONCE ?
+				BitmapEncoder.extractBitmap(colIndexes, _data) :
+				BitmapEncoder.extractBitmapFromSample(colIndexes, _data, sampleRows);
 		SizeEstimationFactors fact = computeSizeEstimationFactors(ubm, false);
-
-		//estimate number of distinct values 
-		int totalCardinality = getNumDistinctValues(colIndexes);
-		totalCardinality = Math.max(totalCardinality, fact.numVals); //fix anomalies w/ large sample fraction
-		totalCardinality = Math.min(totalCardinality, _numRows); //fix anomalies w/ large sample fraction
 		
-		//estimate unseen values
-		// each unseen is assumed to occur only once (it did not show up in the sample because it is rare)
-		int unseen = Math.max(0, totalCardinality - fact.numVals);
-		int sampleSize = _sampleRows.length;
-		
-		//estimate number of offsets
-		double sparsity = OptimizerUtils.getSparsity(
-				_data.getNumRows(), _data.getNumColumns(), _data.getNonZeros());
+		//estimate number of distinct values (incl fixes for anomalies w/ large sample fraction)
+		int totalCardinality = getNumDistinctValues(ubm, _numRows, sampleRows, _solveCache);
+		totalCardinality = Math.max(totalCardinality, fact.numVals);
+		totalCardinality = Math.min(totalCardinality, _numRows); 
 		
-		// expected value given that we don't store the zero values
-		float totalNumOffs = (float) (_numRows * (1 - Math.pow(1 - sparsity,colIndexes.length)));		
-		if( CORRECT_NONZERO_ESTIMATE ) {
-			long numZeros = sampleSize - fact.numOffs;
-			float C = Math.max(1-(float)fact.numSingle/sampleSize, (float)sampleSize/_numRows); 
-			totalNumOffs = _numRows - ((numZeros>0)? (float)_numRows/sampleSize*C*numZeros : 0);
-		}
+		//estimate unseen values
+		int unseenVals = totalCardinality - fact.numVals;
 		
-		// For a single offset, the number of blocks depends on the value of
-		// that offset. small offsets (first group of rows in the matrix)
-		// require a small number of blocks and large offsets (last group of
-		// rows) require a large number of blocks. The unseen offsets are
-		// distributed over the entire offset range. A reasonable and fast
-		// estimate for the number of blocks is to use the arithmetic mean of
-		// the number of blocks used for the first index (=1) and that of the
-		// last index.
-		int numUnseenSeg = Math.round(unseen
-				* (2.0f * BitmapEncoder.BITMAP_BLOCK_SZ + _numRows) / 2
-				/ BitmapEncoder.BITMAP_BLOCK_SZ);
+		//estimate number of non-zeros (conservatively round up)
+		double C = Math.max(1 - (double)fact.numSingle/sampleSize, (double)sampleSize/_numRows); 
+		int numZeros = sampleSize - fact.numOffs; //>=0
+		int numNonZeros = (int)Math.ceil(_numRows - (double)_numRows/sampleSize * C * numZeros);
+		numNonZeros = Math.max(numNonZeros, totalCardinality); //handle anomaly of zi=0
+
+		if( totalCardinality<=0 || unseenVals<0 || numZeros<0 || numNonZeros<=0 )
+			LOG.warn("Invalid estimates detected for "+Arrays.toString(colIndexes)+": "
+					+totalCardinality+" "+unseenVals+" "+numZeros+" "+numNonZeros);
+			
+		// estimate number of segments and number of runs incl correction for
+		// empty segments and empty runs (via expected mean of offset value)
+		int numUnseenSeg = (int) (unseenVals * 
+			Math.ceil((double)_numRows/BitmapEncoder.BITMAP_BLOCK_SZ/2));
 		int totalNumSeg = fact.numSegs + numUnseenSeg;
-		int totalNumRuns = getNumRuns(ubm, sampleSize, _numRows) + unseen;
+		int totalNumRuns = getNumRuns(ubm, sampleSize, _numRows, sampleRows) + numUnseenSeg;
 
 		//construct new size info summary
-		return new CompressedSizeInfo(totalCardinality,
-				getRLESize(totalCardinality, totalNumRuns, colIndexes.length),
-				getOLESize(totalCardinality, totalNumOffs, totalNumSeg, colIndexes.length));
+		return new CompressedSizeInfo(totalCardinality, numNonZeros,
+				getRLESize(totalCardinality, totalNumRuns, numCols),
+				getOLESize(totalCardinality, numNonZeros, totalNumSeg, numCols),
+				getDDCSize(totalCardinality, _numRows, numCols));
 	}
 
 	@Override
@@ -127,47 +130,50 @@ public class CompressedSizeEstimatorSample extends CompressedSizeEstimator
 		SizeEstimationFactors fact = computeSizeEstimationFactors(ubm, true);
 		
 		//construct new size info summary
-		return new CompressedSizeInfo(fact.numVals,
+		return new CompressedSizeInfo(fact.numVals, fact.numOffs,
 				getRLESize(fact.numVals, fact.numRuns, ubm.getNumColumns()),
-				getOLESize(fact.numVals, fact.numOffs, fact.numSegs, ubm.getNumColumns()));
+				getOLESize(fact.numVals, fact.numOffs, fact.numSegs, ubm.getNumColumns()),
+				getDDCSize(fact.numVals, _numRows, ubm.getNumColumns()));
 	}
 
-	private int getNumDistinctValues(int[] colIndexes) {
-		return haasAndStokes(colIndexes);
+	private static int getNumDistinctValues(UncompressedBitmap ubm, int numRows, int[] sampleRows, 
+			HashMap<Integer, Double> solveCache) {
+		return haasAndStokes(ubm, numRows, sampleRows.length, solveCache);
 	}
 
-	private int getNumRuns(UncompressedBitmap sampleUncompressedBitmap,
-			int sampleSize, int totalNumRows) {
-		int numVals = sampleUncompressedBitmap.getNumValues();
+	private static int getNumRuns(UncompressedBitmap ubm,
+			int sampleSize, int totalNumRows, int[] sampleRows) {
+		int numVals = ubm.getNumValues();
 		// all values in the sample are zeros
 		if (numVals == 0)
 			return 0;
-		float numRuns = 0;
+		double numRuns = 0;
 		for (int vi = 0; vi < numVals; vi++) {
-			int[] offsets = sampleUncompressedBitmap.getOffsetsList(vi);
-			float offsetsRatio = ((float) offsets.length) / sampleSize;
-			float avgAdditionalOffsets = offsetsRatio * totalNumRows
+			int[] offsets = ubm.getOffsetsList(vi).extractValues();
+			int offsetsSize = ubm.getNumOffsets(vi);
+			double offsetsRatio = ((double) offsetsSize) / sampleSize;
+			double avgAdditionalOffsets = offsetsRatio * totalNumRows
 					/ sampleSize;
 			if (avgAdditionalOffsets < 1) {
 				// Ising-Stevens does not hold
 				// fall-back to using the expected number of offsets as an upper
 				// bound on the number of runs
-				numRuns += ((float) offsets.length) * totalNumRows / sampleSize;
+				numRuns += ((double) offsetsSize) * totalNumRows / sampleSize;
 				continue;
 			}
 			int intervalEnd, intervalSize;
-			float additionalOffsets;
+			double additionalOffsets;
 			// probability of an index being non-offset in current and previous
 			// interval respectively
-			float nonOffsetProb, prevNonOffsetProb = 1;
+			double nonOffsetProb, prevNonOffsetProb = 1;
 			boolean reachedSampleEnd = false;
 			// handling the first interval separately for simplicity
 			int intervalStart = -1;
-			if (_sampleRows[0] == 0) {
+			if (sampleRows[0] == 0) {
 				// empty interval
 				intervalStart = 0;
 			} else {
-				intervalEnd = _sampleRows[0];
+				intervalEnd = sampleRows[0];
 				intervalSize = intervalEnd - intervalStart - 1;
 				// expected value of a multivariate hypergeometric distribution
 				additionalOffsets = offsetsRatio * intervalSize;
@@ -188,7 +194,7 @@ public class CompressedSizeEstimatorSample extends CompressedSizeEstimator
 				// intervalStart will always be pointing at the current value
 				// in the separator block
 
-				if (offsetsPtrs < offsets.length
+				if (offsetsPtrs < offsetsSize
 						&& offsets[offsetsPtrs] == intervalStart) {
 					startedWithOffset = true;
 					offsetsPtrs++;
@@ -197,10 +203,10 @@ public class CompressedSizeEstimatorSample extends CompressedSizeEstimator
 					seenNonOffset = true;
 					endedWithOffset = false;
 				}
-				while (intervalStart + 1 == _sampleRows[ix]) {
-					intervalStart = _sampleRows[ix];
+				while (intervalStart + 1 == sampleRows[ix]) {
+					intervalStart = sampleRows[ix];
 					if (seenNonOffset) {
-						if (offsetsPtrs < offsets.length
+						if (offsetsPtrs < offsetsSize
 								&& offsets[offsetsPtrs] == intervalStart) {
 							withinSepRun = 1;
 							offsetsPtrs++;
@@ -210,7 +216,7 @@ public class CompressedSizeEstimatorSample extends CompressedSizeEstimator
 							withinSepRun = 0;
 							endedWithOffset = false;
 						}
-					} else if (offsetsPtrs < offsets.length
+					} else if (offsetsPtrs < offsetsSize
 							&& offsets[offsetsPtrs] == intervalStart) {
 						offsetsPtrs++;
 						endedWithOffset = true;
@@ -230,7 +236,7 @@ public class CompressedSizeEstimatorSample extends CompressedSizeEstimator
 				// runs within an interval of unknowns
 				if (reachedSampleEnd)
 					break;
-				intervalEnd = _sampleRows[ix];
+				intervalEnd = sampleRows[ix];
 				intervalSize = intervalEnd - intervalStart - 1;
 				// expected value of a multivariate hypergeometric distribution
 				additionalOffsets = offsetsRatio * intervalSize;
@@ -280,7 +286,7 @@ public class CompressedSizeEstimatorSample extends CompressedSizeEstimator
 			}
 			// additional runs resulting from x's on the boundaries of the
 			// separators
-			endedWithOffset = intervalStart == offsets[offsets.length - 1];
+			endedWithOffset = intervalStart == offsets[offsetsSize - 1];
 			if (seenNonOffset) {
 				if (startedWithOffset) {
 					numRuns += prevNonOffsetProb;
@@ -296,31 +302,7 @@ public class CompressedSizeEstimatorSample extends CompressedSizeEstimator
 					numRuns += prevNonOffsetProb * nonOffsetProb;
 			}
 		}
-		return Math.round(numRuns);
-	}
-
-	private int haasAndStokes(int[] colIndexes) {
-		ReaderColumnSelection reader =  new ReaderColumnSelectionDenseSample(_data, 
-				colIndexes, _sampleRows, !CompressedMatrixBlock.MATERIALIZE_ZEROS);
-		return haasAndStokes(_numRows, _sampleRows.length, reader);
-	}
-
-	/**
-	 * TODO remove, just for local debugging.
-	 * 
-	 * @param colIndexes column indexes
-	 * @return exact number of district values
-	 */
-	@SuppressWarnings("unused")
-	private int getExactNumDistinctValues(int[] colIndexes) {
-		HashSet<DblArray> distinctVals = new HashSet<DblArray>();
-		ReaderColumnSelection reader = (_data.isInSparseFormat() && CompressedMatrixBlock.TRANSPOSE_INPUT) ? 
-				new ReaderColumnSelectionSparse(_data, colIndexes, !CompressedMatrixBlock.MATERIALIZE_ZEROS) : 
-				new ReaderColumnSelectionDense(_data, colIndexes, !CompressedMatrixBlock.MATERIALIZE_ZEROS);
-		DblArray val = null;
-		while (null != (val = reader.nextRow()))
-			distinctVals.add(val);
-		return distinctVals.size();
+		return (int)Math.min(Math.round(numRuns), Integer.MAX_VALUE);
 	}
 
 	/**
@@ -330,10 +312,11 @@ public class CompressedSizeEstimatorSample extends CompressedSizeEstimator
 	 * @param smplSize sample size
 	 * @return sorted array of integers
 	 */
-	private int[] getSortedUniformSample(int range, int smplSize) {
+	private static int[] getSortedUniformSample(int range, int smplSize) {
 		if (smplSize == 0)
 			return new int[] {};
-		int[] sample = _rng.nextPermutation(range, smplSize);
+		RandomDataGenerator rng = _rng.get();
+		int[] sample = rng.nextPermutation(range, smplSize);
 		Arrays.sort(sample);
 		return sample;
 	}
@@ -380,22 +363,13 @@ public class CompressedSizeEstimatorSample extends CompressedSizeEstimator
 	 * @param sampleRowsReader reader
 	 * @return estimator
 	 */
-	@SuppressWarnings("unused")
-	private static int shlosserEstimator(int nRows, int sampleSize,
-			ReaderColumnSelection sampleRowsReader) 
-	{
-		return shlosserEstimator(nRows, sampleSize, sampleRowsReader,
-				getValCounts(sampleRowsReader));
-	}
-
-	private static int shlosserEstimator(int nRows, int sampleSize,
-			ReaderColumnSelection sampleRowsReader,
-			HashMap<DblArray, Integer> valsCount) 
+	private static int shlosserEstimator(UncompressedBitmap ubm, int nRows, int sampleSize) 
 	{
 		double q = ((double) sampleSize) / nRows;
 		double oneMinusQ = 1 - q;
 
-		int[] freqCounts = getFreqCounts(valsCount);
+		int numVals = ubm.getNumValues();
+		int[] freqCounts = getFreqCounts(ubm);
 
 		double numerSum = 0, denomSum = 0;
 		int iPlusOne = 1;
@@ -403,7 +377,7 @@ public class CompressedSizeEstimatorSample extends CompressedSizeEstimator
 			numerSum += Math.pow(oneMinusQ, iPlusOne) * freqCounts[i];
 			denomSum += iPlusOne * q * Math.pow(oneMinusQ, i) * freqCounts[i];
 		}
-		int estimate = (int) Math.round(valsCount.size() + freqCounts[0]
+		int estimate = (int) Math.round(numVals + freqCounts[0]
 				* numerSum / denomSum);
 		return estimate < 1 ? 1 : estimate;
 	}
@@ -418,25 +392,16 @@ public class CompressedSizeEstimatorSample extends CompressedSizeEstimator
 	 * @param sampleRowsReader row reader
 	 * @return estimator
 	 */
-	@SuppressWarnings("unused")
-	private static int smoothedJackknifeEstimator(int nRows, int sampleSize,
-			ReaderColumnSelection sampleRowsReader) 
-	{
-		return smoothedJackknifeEstimator(nRows, sampleSize, sampleRowsReader,
-				getValCounts(sampleRowsReader));
-	}
-
-	private static int smoothedJackknifeEstimator(int nRows, int sampleSize,
-			ReaderColumnSelection sampleRowsReader,
-			HashMap<DblArray, Integer> valsCount) 
+	private static int smoothedJackknifeEstimator(UncompressedBitmap ubm, int nRows, int sampleSize) 
 	{
-		int[] freqCounts = getFreqCounts(valsCount);
+		int numVals = ubm.getNumValues();
+		int[] freqCounts = getFreqCounts(ubm);
 		// all values in the sample are zeros
 		if (freqCounts.length == 0)
 			return 0;
 		// nRows is N and sampleSize is n
 
-		int d = valsCount.size();
+		int d = numVals;
 		double f1 = freqCounts[0];
 		int Nn = nRows * sampleSize;
 		double D0 = (d - f1 / sampleSize)
@@ -515,43 +480,31 @@ public class CompressedSizeEstimatorSample extends CompressedSizeEstimator
 	 * @return estimator
 	 */
 	@SuppressWarnings("unused")
-	private static int shlosserJackknifeEstimator(int nRows, int sampleSize,
-			ReaderColumnSelection sampleRowsReader) {
-		HashMap<DblArray, Integer> valsCount = getValCounts(sampleRowsReader);
-
+	private static int shlosserJackknifeEstimator(UncompressedBitmap ubm, int nRows, int sampleSize) 
+	{
+		int numVals = ubm.getNumValues();
+		CriticalValue cv = computeCriticalValue(sampleSize);
+		
 		// uniformity chi-square test
-		double nBar = ((double) sampleSize) / valsCount.size();
+		double nBar = ((double) sampleSize) / numVals;
 		// test-statistic
 		double u = 0;
-		for (int cnt : valsCount.values()) {
-			u += Math.pow(cnt - nBar, 2);
+		for( int i=0; i<numVals; i++ ) {
+			u += Math.pow(ubm.getNumOffsets(i) - nBar, 2);
 		}
 		u /= nBar;
-		if (sampleSize != usedSampleSize)
+		if (sampleSize != cv.usedSampleSize)
 			computeCriticalValue(sampleSize);
-		if (u < uniformityCriticalValue) {
-			// uniform
-			return smoothedJackknifeEstimator(nRows, sampleSize,
-					sampleRowsReader, valsCount);
-		} else {
-			return shlosserEstimator(nRows, sampleSize, sampleRowsReader,
-					valsCount);
-		}
+		if (u < cv.uniformityCriticalValue) // uniform
+			return smoothedJackknifeEstimator(ubm, nRows, sampleSize);
+		else 
+			return shlosserEstimator(ubm, nRows, sampleSize);
 	}
 
-	/*
-	 * In the shlosserSmoothedJackknifeEstimator as long as the sample size did
-	 * not change, we will have the same critical value each time the estimator
-	 * is used (given that alpha is the same). We cache the critical value to
-	 * avoid recomputing it in each call.
-	 */
-	private static double uniformityCriticalValue;
-	private static int usedSampleSize;
-	
-	private static void computeCriticalValue(int sampleSize) {
+	private static CriticalValue computeCriticalValue(int sampleSize) {
 		ChiSquaredDistribution chiSqr = new ChiSquaredDistribution(sampleSize - 1);
-		uniformityCriticalValue = chiSqr.inverseCumulativeProbability(SHLOSSER_JACKKNIFE_ALPHA);
-		usedSampleSize = sampleSize;
+		return new CriticalValue(
+			chiSqr.inverseCumulativeProbability(SHLOSSER_JACKKNIFE_ALPHA), sampleSize);
 	}
 
 	/**
@@ -563,115 +516,43 @@ public class CompressedSizeEstimatorSample extends CompressedSizeEstimator
 	 * 
 	 * @param nRows number of rows
 	 * @param sampleSize sample size
+	 * @param solveCache 
 	 * @param sampleRowsReader row reader
 	 * @return estimator
 	 */
-	private static int haasAndStokes(int nRows, int sampleSize,
-			ReaderColumnSelection sampleRowsReader) 
+	private static int haasAndStokes(UncompressedBitmap ubm, int nRows, int sampleSize, HashMap<Integer, Double> solveCache)
 	{
-		HashMap<DblArray, Integer> valsCount = getValCounts(sampleRowsReader);
+		//obtain value and frequency histograms
+		int numVals = ubm.getNumValues();
+		int[] freqCounts = getFreqCounts(ubm);
+	
 		// all values in the sample are zeros.
-		if (valsCount.size() == 0)
+		if( numVals == 0 )
 			return 1;
-		int[] freqCounts = getFreqCounts(valsCount);
-		float q = ((float) sampleSize) / nRows;
-		float _1MinusQ = 1 - q;
-		// Eq. 11
-		float duj1Fraction = ((float) sampleSize)
-				/ (sampleSize - _1MinusQ * freqCounts[0]);
-		float duj1 = duj1Fraction * valsCount.size();
-		// Eq. 16
-		float gamma = 0;
-		for (int i = 1; i <= freqCounts.length; i++) {
-			gamma += i * (i - 1) * freqCounts[i - 1];
-		}
-		gamma *= duj1 / sampleSize / sampleSize;
-		gamma += duj1 / nRows - 1;
-		gamma = Math.max(gamma, 0);
-		int estimate;
 		
-		if (gamma < HAAS_AND_STOKES_ALPHA1) {
-			// UJ2 - begining of page 1479
-		//	System.out.println("uj2");
-			estimate = (int) (duj1Fraction * (valsCount.size() - freqCounts[0]
-					* _1MinusQ * Math.log(_1MinusQ) * gamma / q));
-		} else if (gamma < HAAS_AND_STOKES_ALPHA2) {
-			// UJ2a - end of page 1998
-			//System.out.println("uj2a");
-			int numRemovedClasses = 0;
-			float updatedNumRows = nRows;
-			int updatedSampleSize = sampleSize;
-
-			for (Integer cnt : valsCount.values()) {
-				if (cnt > HAAS_AND_STOKES_UJ2A_C) {
-					numRemovedClasses++;
-					freqCounts[cnt - 1]--;
-					updatedSampleSize -= cnt;
-					/*
-					 * To avoid solving Eq. 20 numerically for the class size in
-					 * the full population (N_j), the current implementation
-					 * just scales cnt (n_j) by the sampling ratio (q).
-					 * Intuitively, the scaling should be fine since cnt is
-					 * large enough. Also, N_j in Eq. 20 is lower-bounded by cnt
-					 * which is already large enough to make the denominator in
-					 * Eq. 20 very close to 1.
-					 */
-					updatedNumRows -= ((float) cnt) / q;
-				}
-			}
-			if (updatedSampleSize == 0) {
-				// use uJ2a
-				
-				estimate = (int) (duj1Fraction * (valsCount.size() - freqCounts[0]
-						* (_1MinusQ) * Math.log(_1MinusQ) * gamma / q));
-			} else {
-				float updatedQ = ((float) updatedSampleSize) / updatedNumRows;
-				int updatedSampleCardinality = valsCount.size()
-						- numRemovedClasses;
-				float updatedDuj1Fraction = ((float) updatedSampleSize)
-						/ (updatedSampleSize - (1 - updatedQ) * freqCounts[0]);
-				float updatedDuj1 = updatedDuj1Fraction
-						* updatedSampleCardinality;
-				float updatedGamma = 0;
-				for (int i = 1; i <= freqCounts.length; i++) {
-					updatedGamma += i * (i - 1) * freqCounts[i - 1];
-				}
-				updatedGamma *= updatedDuj1 / updatedSampleSize
-						/ updatedSampleSize;
-				updatedGamma += updatedDuj1 / updatedNumRows - 1;
-				updatedGamma = Math.max(updatedGamma, 0);
-
-				estimate = (int) (updatedDuj1Fraction * (updatedSampleCardinality - freqCounts[0]
-						* (1 - updatedQ)
-						* Math.log(1 - updatedQ)
-						* updatedGamma / updatedQ))
-						+ numRemovedClasses;
-			}
-
-		} else {
-			// Sh3 - end of section 3
-			float fraq1Numer = 0;
-			float fraq1Denom = 0;
-			float fraq2Numer = 0;
-			float fraq2Denom = 0;
-			for (int i = 1; i <= freqCounts.length; i++) {
-				fraq1Numer += i * q * q * Math.pow(1 - q * q, i - 1)
-						* freqCounts[i - 1];
-				fraq1Denom += Math.pow(_1MinusQ, i) * (Math.pow(1 + q, i) - 1)
-						* freqCounts[i - 1];
-				fraq2Numer += Math.pow(_1MinusQ, i) * freqCounts[i - 1];
-				fraq2Denom += i * q * Math.pow(_1MinusQ, i - 1)
-						* freqCounts[i - 1];
-			}
-			estimate = (int) (valsCount.size() + freqCounts[0] * fraq1Numer
-					/ fraq1Denom * fraq2Numer * fraq2Numer / fraq2Denom
-					/ fraq2Denom);
-		}
-		return estimate < 1 ? 1 : estimate;
+		double q = ((double) sampleSize) / nRows;
+		double f1 = freqCounts[0];
+		
+		//compute basic Duj1 estimate
+		double duj1 = getDuj1Estimate(q, f1, sampleSize, numVals);
+		
+		//compute gamma based on Duj1
+		double gamma = getGammaSquared(duj1, freqCounts, sampleSize, nRows);
+		double d = -1;
+		
+		//core hybrid estimator based on gamma
+		if (gamma < HAAS_AND_STOKES_ALPHA1)
+			d = getDuj2Estimate(q, f1, sampleSize, numVals, gamma);
+		else if (gamma < HAAS_AND_STOKES_ALPHA2)
+			d = getDuj2aEstimate(q, freqCounts, sampleSize, numVals, gamma, nRows, solveCache);
+		else
+			d = getSh3Estimate(q, freqCounts, numVals);
+		
+		//round and ensure min value 1
+		return Math.max(1, (int)Math.round(d));
 	}
 
-	private static HashMap<DblArray, Integer> getValCounts(
-			ReaderColumnSelection sampleRowsReader) 
+	private static HashMap<DblArray, Integer> getValCounts(ReaderColumnSelection sampleRowsReader) 
 	{
 		HashMap<DblArray, Integer> valsCount = new HashMap<DblArray, Integer>();
 		DblArray val = null;
@@ -681,27 +562,179 @@ public class CompressedSizeEstimatorSample extends CompressedSizeEstimator
 			if (cnt == null)
 				cnt = 0;
 			cnt++;
-			valsCount.put(val, cnt);
+			valsCount.put(new DblArray(val), cnt);
 		}
 		return valsCount;
 	}
 
-	private static int[] getFreqCounts(HashMap<DblArray, Integer> valsCount) 
+	/**
+	 * Creates an inverted histogram, where freqCounts[i-1] indicates 
+	 * how many values occurred with a frequency i. Note that freqCounts[0]
+	 * represents the special values of the number of singletons. 
+	 * 
+	 * @param ubm uncompressed bitmap
+	 * @return frequency counts
+	 */
+	private static int[] getFreqCounts(UncompressedBitmap ubm) 
 	{
+		//determine max frequency
+		int numVals = ubm.getNumValues();
 		int maxCount = 0;
-		for (Integer c : valsCount.values()) {
-			if (c > maxCount)
-				maxCount = c;
-		}
-		
-		/*
-		 * freqCounts[i-1] = how many values occured with a frequecy i
-		 */
+		for( int i=0; i<numVals; i++ )
+			maxCount = Math.max(maxCount, ubm.getNumOffsets(i));
+			
+		//create frequency histogram
 		int[] freqCounts = new int[maxCount];
-		for (Integer c : valsCount.values()) {
-			freqCounts[c - 1]++;
-		}
+		for( int i=0; i<numVals; i++ )
+			freqCounts[ubm.getNumOffsets(i)-1] ++;
+
 		return freqCounts;
 
 	}
+
+	/**
+	 * Computes the "unsmoothed first-order jackknife estimator" (Eq 11).
+	 * 
+	 */
+	private static double getDuj1Estimate(double q, double f1, int n, int dn) {
+		return dn / (1 - ((1-q) * f1)/n);
+	}
+	
+	/**
+	 * Computes the "unsmoothed second-order jackknife estimator" (Eq 18b).
+	 * 
+	 */
+	private static double getDuj2Estimate(double q, double f1, int n, int dn, double gammaDuj1) {
+		return (dn - (1-q) * f1 * Math.log(1-q) * gammaDuj1 / q) / (1 - ((1-q) * f1)/n);
+	}
+	
+	/**
+	 * Computes the "unsmoothed second-order jackknife estimator" with additional  
+	 * stabilization procedure, which removes the classes whose frequency exceed c,
+	 * computes Duj2 over the reduced sample, and finally adds the removed frequencies.
+	 * 
+	 */
+	private static double getDuj2aEstimate(double q, int f[], int n, int dn, double gammaDuj1, int N, 
+			HashMap<Integer, Double> solveCache) {
+		int c = HAAS_AND_STOKES_UJ2A_CUT2 ? 
+			f.length/2+1 : HAAS_AND_STOKES_UJ2A_C+1;
+		
+		//compute adjusted sample size after removing classes that
+		//exceed a fixed frequency  c
+		int nB = 0, cardB = 0;
+		for( int i=c; i<=f.length; i++ ) 
+			if( f[i-1] != 0 ) {
+				nB += f[i-1] * i; //numVals times frequency 
+				cardB += f[i-1];
+			}
+		
+		//fallback to Duj2 over full sample if only high frequency columns
+		if( n - nB == 0 )
+			return getDuj2Estimate(q, f[0], n, dn, gammaDuj1);
+
+		//compute reduced population size via numeric solve
+		int updatedN = N; 
+		for( int i=c; i<=f.length; i++ )
+			if( f[i-1] != 0 )
+				updatedN -= f[i-1] * (!HAAS_AND_STOKES_UJ2A_SOLVE ? i/q :
+					getMethodOfMomentsEstimate(i, q, 1, N, solveCache));
+		
+		//remove classes that exceed a fixed frequency c
+		for( int i=c; i<=f.length; i++ )
+			f[i-1] = 0; 
+		
+		//compute duj2a over reduced sample
+		double updatedDuj1 = getDuj1Estimate(q, f[0], n-nB, dn-cardB);
+		double updatedGammaDuj1 = getGammaSquared(updatedDuj1, f, n-nB, updatedN);
+		double duj2 = getDuj2Estimate(q, f[0], n-nB, dn-cardB, updatedGammaDuj1);
+		return duj2 + cardB;		
+	}
+	
+	/**
+	 * Computed the "shlosser third-order estimator". (Eq 30b)
+	 * 
+	 * Note that this estimator can show anomalies with NaN as the results
+	 * due to terms such as Math.pow(1+q, i) which exceed Double.MAX_VALUE
+	 * even for moderately large i, e.g., q=0.05 at around 14K.
+	 * 
+	 */
+	private static double getSh3Estimate(double q, int[] f, double dn) {
+		double fraq11 = 0, fraq12 = 0, fraq21 = 0, fraq22 = 0;
+		for( int i=1; i<=f.length; i++ ) 
+			if( f[i-1] != 0 ) {
+				fraq11 += i * q*q * Math.pow(1 - q*q, i-1) * f[i-1];
+				//NOTE: numerically unstable due to Math.pow(1+q, i) overflows
+				//fraq12 += Math.pow(1 - q, i) * (Math.pow(1+q, i)-1) * f[i-1];
+				fraq12 += (Math.pow(1 - q*q, i) - Math.pow(1 - q, i)) * f[i-1];
+				fraq21 += Math.pow(1 - q, i) * f[i-1];
+				fraq22 += i * q * Math.pow(1 - q, i-1) * f[i-1];
+			}
+		return dn + f[0] * fraq11/fraq12 * Math.pow(fraq21/fraq22, 2); 
+	}
+	
+	/**
+	 * Computes the "squared coefficient of variation" based on a given 
+	 * initial estimate D (Eq 16).
+	 * 
+	 */
+	private static double getGammaSquared(double D, int[] f, int n, int N) {
+		double gamma = 0;
+		for( int i=1; i<=f.length; i++) 
+			if( f[i-1] != 0 )
+				gamma += i * (i-1) * f[i-1];
+		gamma *= D / n / n;
+		gamma += D / N - 1;
+		return Math.max(0, gamma);
+	}
+	
+	/**
+	 * Solves the method-of-moments estimate numerically. We use a cache
+	 * on the same observed instances in the sample as q is constant and
+	 * min/max are chosen conservatively.
+	 * 
+	 */
+	private static double getMethodOfMomentsEstimate(int nj, double q, double min, double max, 
+		HashMap<Integer, Double> solveCache) {
+		if( solveCache.containsKey(nj) )
+			return solveCache.get(nj);
+		
+		double est = UnivariateSolverUtils
+			.solve(new MethodOfMomentsFunction(nj, q), min, max, 1e-9);
+		
+		if( solveCache.size()<MAX_SOLVE_CACHE_SIZE )
+			solveCache.put(nj, est);
+		
+		return est;
+	}
+	
+	/*
+	 * In the shlosserSmoothedJackknifeEstimator as long as the sample size did
+	 * not change, we will have the same critical value each time the estimator
+	 * is used (given that alpha is the same). We cache the critical value to
+	 * avoid recomputing it in each call.
+	 */
+	private static class CriticalValue {
+		public final double uniformityCriticalValue;
+		public final int usedSampleSize;
+		
+		public CriticalValue(double cv, int size) {
+			uniformityCriticalValue = cv;
+			usedSampleSize = size;
+		} 
+	}
+	
+	private static class MethodOfMomentsFunction implements UnivariateFunction {
+		private final int _nj;
+		private final double _q;
+		
+		public MethodOfMomentsFunction(int nj, double q) {
+			_nj = nj;
+			_q = q;
+		}
+		
+		@Override
+		public double value(double x) {
+			return _q*x / (1-Math.pow(1-_q, x)) - _nj;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/estim/CompressedSizeInfo.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/estim/CompressedSizeInfo.java b/src/main/java/org/apache/sysml/runtime/compress/estim/CompressedSizeInfo.java
index 430783d..60acdeb 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/estim/CompressedSizeInfo.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/estim/CompressedSizeInfo.java
@@ -19,51 +19,53 @@
 
 package org.apache.sysml.runtime.compress.estim;
 
+import org.apache.sysml.runtime.compress.CompressedMatrixBlock;
+
 /**
  * 
  * A helper reusable object for maintaining bitmap sizes
  */
 public class CompressedSizeInfo 
 {
-	private int _estCard = -1;
-	private long _rleSize = -1; 
-	private long _oleSize = -1;
-
-	public CompressedSizeInfo() {
-		
-	}
+	private final int _estCard;
+	private final int _estNnz;
+	private final long _rleSize; 
+	private final long _oleSize;
+	private final long _ddcSize;
 
-	public CompressedSizeInfo(int estCard, long rleSize, long oleSize) {
+	public CompressedSizeInfo(int estCard, int estNnz, long rleSize, long oleSize, long ddcSize) {
 		_estCard = estCard;
+		_estNnz = estNnz;
 		_rleSize = rleSize;
 		_oleSize = oleSize;
+		_ddcSize = ddcSize;
 	}
 
-	public void setRLESize(long rleSize) {
-		_rleSize = rleSize;
-	}
-	
 	public long getRLESize() {
 		return _rleSize;
 	}
-	
-	public void setOLESize(long oleSize) {
-		_oleSize = oleSize;
-	}
 
 	public long getOLESize() {
 		return _oleSize;
 	}
-
-	public long getMinSize() {
-		return Math.min(_rleSize, _oleSize);
+	
+	public long getDDCSize() {
+		return CompressedMatrixBlock.ALLOW_DDC_ENCODING ? 
+			_ddcSize : Long.MAX_VALUE; 
 	}
 
-	public void setEstCardinality(int estCard) {
-		_estCard = estCard;
+	public long getMinSize() {
+		return Math.min(Math.min(
+			getRLESize(), 
+			getOLESize()),
+			getDDCSize());
 	}
 
-	public int getEstCarinality() {
+	public int getEstCard() {
 		return _estCard;
 	}
+	
+	public int getEstNnz() {
+		return _estNnz;
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/estim/SizeEstimatorFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/estim/SizeEstimatorFactory.java b/src/main/java/org/apache/sysml/runtime/compress/estim/SizeEstimatorFactory.java
index c142103..63a092c 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/estim/SizeEstimatorFactory.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/estim/SizeEstimatorFactory.java
@@ -19,14 +19,16 @@
 
 package org.apache.sysml.runtime.compress.estim;
 
+import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
 
 public class SizeEstimatorFactory 
 {
-	public static final float SAMPLING_RATIO = 0.01f; //conservative default
+	public static final double SAMPLING_RATIO = 0.05; //conservative default
+	public static final boolean EXTRACT_SAMPLE_ONCE = true;
 
 	@SuppressWarnings("unused")
-	public static CompressedSizeEstimator getSizeEstimator(MatrixBlock data, int numRows) {
+	public static CompressedSizeEstimator getSizeEstimator(MatrixBlock data, int numRows) throws DMLRuntimeException {
 		return (SAMPLING_RATIO == 1.0) ?
 				new CompressedSizeEstimatorExact(data):
 				new CompressedSizeEstimatorSample(data, (int) (numRows*SAMPLING_RATIO));

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/utils/ConverterUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/utils/ConverterUtils.java b/src/main/java/org/apache/sysml/runtime/compress/utils/ConverterUtils.java
index 37b2984..f4d9f1c 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/utils/ConverterUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/utils/ConverterUtils.java
@@ -19,6 +19,7 @@
 
 package org.apache.sysml.runtime.compress.utils;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 
 import org.apache.sysml.runtime.compress.ColGroup;
@@ -70,4 +71,19 @@ public class ConverterUtils
 		else 
 			return vector.getDenseBlock();
 	}
+
+	public static MatrixBlock getUncompressedColBlock( ColGroup group )
+	{
+		MatrixBlock ret = null;
+		if( group instanceof ColGroupUncompressed ) {
+			ret = ((ColGroupUncompressed) group).getData();
+		}
+		else {
+			ArrayList<ColGroup> tmpGroup = new ArrayList<ColGroup>(Arrays.asList(group));
+			ColGroupUncompressed decompressedCols = new ColGroupUncompressed(tmpGroup);
+			ret = decompressedCols.getData();
+		}
+		
+		return ret;
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/utils/IntArrayList.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/utils/IntArrayList.java b/src/main/java/org/apache/sysml/runtime/compress/utils/IntArrayList.java
index ef4d476..0f2f091 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/utils/IntArrayList.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/utils/IntArrayList.java
@@ -66,11 +66,18 @@ public class IntArrayList
 		_size++;
 	}
 
+	/**
+	 * Returns the underlying array of offsets. Note that this array might be 
+	 * physically larger than the actual length of the offset lists. Use size() 
+	 * to obtain the actual length.
+	 * 
+	 * @return
+	 */
 	public int[] extractValues() {
 		if( _size == 1 )
 			return new int[] { _val0 };
 		else
-			return Arrays.copyOfRange(_data, 0, _size);
+			return _data;
 	}
 
 	private void resize() {
@@ -80,8 +87,6 @@ public class IntArrayList
 					"IntArrayList resize leads to integer overflow: size=" + _size);
 
 		// resize data array and copy existing contents
-		int[] newdata = new int[_data.length * RESIZE_FACTOR];
-		System.arraycopy(_data, 0, newdata, 0, _size);
-		_data = newdata;
+		_data = Arrays.copyOf(_data, _data.length * RESIZE_FACTOR);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/utils/LinearAlgebraUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/utils/LinearAlgebraUtils.java b/src/main/java/org/apache/sysml/runtime/compress/utils/LinearAlgebraUtils.java
index 3bf0ad4..7a4a013 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/utils/LinearAlgebraUtils.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/utils/LinearAlgebraUtils.java
@@ -28,6 +28,86 @@ import org.apache.sysml.runtime.matrix.data.MatrixBlock;
  */
 public class LinearAlgebraUtils {
 
+	public static double dotProduct(double[] a, double[] b, final int len) 
+	{
+		double val = 0;
+		final int bn = len % 8;
+
+		// compute rest
+		for (int i = 0; i < bn; i++)
+			val += a[i] * b[i];
+
+		// unrolled 8-block (for better instruction-level parallelism)
+		for (int i = bn; i < len; i += 8) {
+			// read 64B cachelines of a and b
+			// compute cval' = sum(a * b) + cval
+			val += a[i + 0] * b[i + 0] 
+				 + a[i + 1] * b[i + 1] 
+				 + a[i + 2] * b[i + 2] 
+				 + a[i + 3] * b[i + 3] 
+				 + a[i + 4] * b[i + 4]
+				 + a[i + 5] * b[i + 5] 
+				 + a[i + 6] * b[i + 6] 
+				 + a[i + 7] * b[i + 7];
+		}
+
+		// scalar result
+		return val;
+	}
+
+	public static double dotProduct( double[] a, double[] b, int ai, int bi, final int len )
+	{
+		double val = 0;
+		final int bn = len%8;
+				
+		//compute rest
+		for( int i = 0; i < bn; i++, ai++, bi++ )
+			val += a[ ai ] * b[ bi ];
+		
+		//unrolled 8-block (for better instruction-level parallelism)
+		for( int i = bn; i < len; i+=8, ai+=8, bi+=8 )
+		{
+			//read 64B cachelines of a and b
+			//compute cval' = sum(a * b) + cval
+			val += a[ ai+0 ] * b[ bi+0 ]
+			     + a[ ai+1 ] * b[ bi+1 ]
+			     + a[ ai+2 ] * b[ bi+2 ]
+			     + a[ ai+3 ] * b[ bi+3 ]
+			     + a[ ai+4 ] * b[ bi+4 ]
+			     + a[ ai+5 ] * b[ bi+5 ]
+			     + a[ ai+6 ] * b[ bi+6 ]
+			     + a[ ai+7 ] * b[ bi+7 ];
+		}
+		
+		//scalar result
+		return val; 
+	}
+
+	public static void vectAdd( double[] a, double[] c, int ai, int ci, final int len )
+	{
+		final int bn = len%8;
+		
+		//rest, not aligned to 8-blocks
+		for( int j = 0; j < bn; j++, ai++, ci++)
+			c[ ci ] += a[ ai ];
+		
+		//unrolled 8-block  (for better instruction-level parallelism)
+		for( int j = bn; j < len; j+=8, ai+=8, ci+=8) 
+		{
+			//read 64B cachelines of a and c
+			//compute c' = c * a
+			//write back 64B cacheline of c = c'
+			c[ ci+0 ] += a[ ai+0 ];
+			c[ ci+1 ] += a[ ai+1 ];
+			c[ ci+2 ] += a[ ai+2 ];
+			c[ ci+3 ] += a[ ai+3 ];
+			c[ ci+4 ] += a[ ai+4 ];
+			c[ ci+5 ] += a[ ai+5 ];
+			c[ ci+6 ] += a[ ai+6 ];
+			c[ ci+7 ] += a[ ai+7 ];
+		}
+	}
+
 	public static void vectAdd( final double aval, double[] c, char[] bix, final int bi, final int ci, final int len )
 	{
 		final int bn = len%8;
@@ -72,6 +152,53 @@ public class LinearAlgebraUtils {
 		}
 	}
 
+	public static void vectMultiplyAdd( final double aval, double[] b, double[] c, int[] bix, final int bi, final int ci, final int len )
+	{
+		final int bn = (len-bi)%8;
+		
+		//rest, not aligned to 8-blocks
+		for( int j = bi; j < bi+bn; j++ )
+			c[ ci + bix[j] ] += aval * b[ j ];
+		
+		//unrolled 8-block (for better instruction-level parallelism)
+		for( int j = bi+bn; j < len; j+=8 )
+		{
+			c[ ci+bix[j+0] ] += aval * b[ j+0 ];
+			c[ ci+bix[j+1] ] += aval * b[ j+1 ];
+			c[ ci+bix[j+2] ] += aval * b[ j+2 ];
+			c[ ci+bix[j+3] ] += aval * b[ j+3 ];
+			c[ ci+bix[j+4] ] += aval * b[ j+4 ];
+			c[ ci+bix[j+5] ] += aval * b[ j+5 ];
+			c[ ci+bix[j+6] ] += aval * b[ j+6 ];
+			c[ ci+bix[j+7] ] += aval * b[ j+7 ];
+		}
+	}
+
+	public static void vectMultiplyAdd( final double aval, double[] b, double[] c, int bi, int ci, final int len )
+	{
+		final int bn = len%8;
+		
+		//rest, not aligned to 8-blocks
+		for( int j = 0; j < bn; j++, bi++, ci++)
+			c[ ci ] += aval * b[ bi ];
+		
+		//unrolled 8-block  (for better instruction-level parallelism)
+		for( int j = bn; j < len; j+=8, bi+=8, ci+=8) 
+		{
+			//read 64B cachelines of b and c
+			//compute c' = aval * b + c
+			//write back 64B cacheline of c = c'
+			c[ ci+0 ] += aval * b[ bi+0 ];
+			c[ ci+1 ] += aval * b[ bi+1 ];
+			c[ ci+2 ] += aval * b[ bi+2 ];
+			c[ ci+3 ] += aval * b[ bi+3 ];
+			c[ ci+4 ] += aval * b[ bi+4 ];
+			c[ ci+5 ] += aval * b[ bi+5 ];
+			c[ ci+6 ] += aval * b[ bi+6 ];
+			c[ ci+7 ] += aval * b[ bi+7 ];
+		}
+	}
+
 	public static double vectSum( double[] a, char[] bix, final int ai, final int bi, final int len )
 	{
 		double val = 0;
@@ -122,6 +249,18 @@ public class LinearAlgebraUtils {
 		return val;
 	}
 
+	public static void copyUpperToLowerTriangle( MatrixBlock ret )
+	{
+		double[] c = ret.getDenseBlock();
+		final int m = ret.getNumRows();
+		final int n = ret.getNumColumns();
+		
+		//copy symmetric values
+		for( int i=0, uix=0; i<m; i++, uix+=n )
+			for( int j=i+1, lix=j*n+i; j<n; j++, lix+=n )
+				c[ lix ] = c[ uix+j ];
+	}
+
 	public static void copyNonZerosToRowCol( MatrixBlock ret, MatrixBlock tmp, int ix )
 	{
 		for(int i=0; i<tmp.getNumColumns(); i++) {
@@ -132,4 +271,29 @@ public class LinearAlgebraUtils {
 			}
 		}
 	}
+	
+	/**
+	 * Obtain the index of the closest element in a to the value x.
+	 * 
+	 * @param a array of ints
+	 * @param x value
+	 * @return the index of the closest element in a to the value x
+	 */
+	public static int getClosestK(int[] a, int x) {
+
+		int low = 0;
+		int high = a.length - 1;
+
+		while (low < high) {
+			int mid = (low + high) / 2;
+			int d1 = Math.abs(a[mid] - x);
+			int d2 = Math.abs(a[mid + 1] - x);
+			if (d2 <= d1) {
+				low = mid + 1;
+			} else {
+				high = mid;
+			}
+		}
+		return high;
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicCompressionTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicCompressionTest.java b/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicCompressionTest.java
index 2ec2f61..2d1b592 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicCompressionTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicCompressionTest.java
@@ -44,9 +44,10 @@ public class BasicCompressionTest extends AutomatedTestBase
 	}
 	
 	public enum ValueType {
-		RAND,
-		RAND_ROUND,
-		CONST,
+		RAND, //UC
+		CONST, //RLE
+		RAND_ROUND_OLE, //OLE
+		RAND_ROUND_DDC, //RLE
 	}
 	
 	@Override
@@ -70,13 +71,23 @@ public class BasicCompressionTest extends AutomatedTestBase
 	}
 	
 	@Test
-	public void testDenseRoundRandDataCompression() {
-		runCompressionTest(SparsityType.DENSE, ValueType.RAND_ROUND, true);
+	public void testDenseRoundRandDataOLECompression() {
+		runCompressionTest(SparsityType.DENSE, ValueType.RAND_ROUND_OLE, true);
 	}
 	
 	@Test
-	public void testSparseRoundRandDataCompression() {
-		runCompressionTest(SparsityType.SPARSE, ValueType.RAND_ROUND, true);
+	public void testSparseRoundRandDataOLECompression() {
+		runCompressionTest(SparsityType.SPARSE, ValueType.RAND_ROUND_OLE, true);
+	}
+	
+	@Test
+	public void testDenseRoundRandDataDDCCompression() {
+		runCompressionTest(SparsityType.DENSE, ValueType.RAND_ROUND_DDC, true);
+	}
+	
+	@Test
+	public void testSparseRoundRandDataDDCCompression() {
+		runCompressionTest(SparsityType.SPARSE, ValueType.RAND_ROUND_DDC, true);
 	}
 	
 	@Test
@@ -105,13 +116,13 @@ public class BasicCompressionTest extends AutomatedTestBase
 	}
 	
 	@Test
-	public void testDenseRoundRandDataNoCompression() {
-		runCompressionTest(SparsityType.DENSE, ValueType.RAND_ROUND, false);
+	public void testDenseRoundRandDataOLENoCompression() {
+		runCompressionTest(SparsityType.DENSE, ValueType.RAND_ROUND_OLE, false);
 	}
 	
 	@Test
-	public void testSparseRoundRandDataNoCompression() {
-		runCompressionTest(SparsityType.SPARSE, ValueType.RAND_ROUND, false);
+	public void testSparseRoundRandDataOLENoCompression() {
+		runCompressionTest(SparsityType.SPARSE, ValueType.RAND_ROUND_OLE, false);
 	}
 	
 	@Test
@@ -144,8 +155,10 @@ public class BasicCompressionTest extends AutomatedTestBase
 			//generate input data
 			double min = (vtype==ValueType.CONST)? 10 : -10;
 			double[][] input = TestUtils.generateTestMatrix(rows, cols, min, 10, sparsity, 7);
-			if( vtype==ValueType.RAND_ROUND )
+			if( vtype==ValueType.RAND_ROUND_OLE || vtype==ValueType.RAND_ROUND_DDC ) {
+				CompressedMatrixBlock.ALLOW_DDC_ENCODING = (vtype==ValueType.RAND_ROUND_DDC);
 				input = TestUtils.round(input);
+			}
 			MatrixBlock mb = DataConverter.convertToMatrixBlock(input);
 			
 			//compress given matrix block
@@ -164,5 +177,8 @@ public class BasicCompressionTest extends AutomatedTestBase
 		catch(Exception ex) {
 			throw new RuntimeException(ex);
 		}
+		finally {
+			CompressedMatrixBlock.ALLOW_DDC_ENCODING = true;
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicGetValueTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicGetValueTest.java b/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicGetValueTest.java
index 0515acb..47c9fcc 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicGetValueTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicGetValueTest.java
@@ -46,9 +46,10 @@ public class BasicGetValueTest extends AutomatedTestBase
 	}
 	
 	public enum ValueType {
-		RAND,
-		RAND_ROUND,
-		CONST,
+		RAND, //UC
+		CONST, //RLE
+		RAND_ROUND_OLE, //OLE
+		RAND_ROUND_DDC, //RLE
 	}
 	
 	@Override
@@ -72,13 +73,23 @@ public class BasicGetValueTest extends AutomatedTestBase
 	}
 	
 	@Test
-	public void testDenseRoundRandDataCompression() {
-		runGetValueTest(SparsityType.DENSE, ValueType.RAND_ROUND, true);
+	public void testDenseRoundRandDataOLECompression() {
+		runGetValueTest(SparsityType.DENSE, ValueType.RAND_ROUND_OLE, true);
 	}
 	
 	@Test
-	public void testSparseRoundRandDataCompression() {
-		runGetValueTest(SparsityType.SPARSE, ValueType.RAND_ROUND, true);
+	public void testSparseRoundRandDataOLECompression() {
+		runGetValueTest(SparsityType.SPARSE, ValueType.RAND_ROUND_OLE, true);
+	}
+	
+	@Test
+	public void testDenseRoundRandDataDDCCompression() {
+		runGetValueTest(SparsityType.DENSE, ValueType.RAND_ROUND_DDC, true);
+	}
+	
+	@Test
+	public void testSparseRoundRandDataDDCCompression() {
+		runGetValueTest(SparsityType.SPARSE, ValueType.RAND_ROUND_DDC, true);
 	}
 	
 	@Test
@@ -107,13 +118,13 @@ public class BasicGetValueTest extends AutomatedTestBase
 	}
 	
 	@Test
-	public void testDenseRoundRandDataNoCompression() {
-		runGetValueTest(SparsityType.DENSE, ValueType.RAND_ROUND, false);
+	public void testDenseRoundRandDataOLENoCompression() {
+		runGetValueTest(SparsityType.DENSE, ValueType.RAND_ROUND_OLE, false);
 	}
 	
 	@Test
-	public void testSparseRoundRandDataNoCompression() {
-		runGetValueTest(SparsityType.SPARSE, ValueType.RAND_ROUND, false);
+	public void testSparseRoundRandDataOLENoCompression() {
+		runGetValueTest(SparsityType.SPARSE, ValueType.RAND_ROUND_OLE, false);
 	}
 	
 	@Test
@@ -146,8 +157,10 @@ public class BasicGetValueTest extends AutomatedTestBase
 			//generate input data
 			double min = (vtype==ValueType.CONST)? 10 : -10;
 			double[][] input = TestUtils.generateTestMatrix(rows, cols, min, 10, sparsity, 7);
-			if( vtype==ValueType.RAND_ROUND )
+			if( vtype==ValueType.RAND_ROUND_OLE || vtype==ValueType.RAND_ROUND_DDC ) {
+				CompressedMatrixBlock.ALLOW_DDC_ENCODING = (vtype==ValueType.RAND_ROUND_DDC);
 				input = TestUtils.round(input);
+			}
 			MatrixBlock mb = DataConverter.convertToMatrixBlock(input);
 			
 			//compress given matrix block
@@ -166,5 +179,8 @@ public class BasicGetValueTest extends AutomatedTestBase
 		catch(Exception ex) {
 			throw new RuntimeException(ex);
 		}
+		finally {
+			CompressedMatrixBlock.ALLOW_DDC_ENCODING = true;
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicMatrixAppendTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicMatrixAppendTest.java b/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicMatrixAppendTest.java
index 93324b3..3bd6f0c 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicMatrixAppendTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicMatrixAppendTest.java
@@ -45,9 +45,10 @@ public class BasicMatrixAppendTest extends AutomatedTestBase
 	}
 	
 	public enum ValueType {
-		RAND,
-		RAND_ROUND,
-		CONST,
+		RAND, //UC
+		CONST, //RLE
+		RAND_ROUND_OLE, //OLE
+		RAND_ROUND_DDC, //RLE
 	}
 	
 	@Override
@@ -71,13 +72,23 @@ public class BasicMatrixAppendTest extends AutomatedTestBase
 	}
 	
 	@Test
-	public void testDenseRoundRandDataCompression() {
-		runMatrixAppendTest(SparsityType.DENSE, ValueType.RAND_ROUND, true);
+	public void testDenseRoundRandDataOLECompression() {
+		runMatrixAppendTest(SparsityType.DENSE, ValueType.RAND_ROUND_OLE, true);
 	}
 	
 	@Test
-	public void testSparseRoundRandDataCompression() {
-		runMatrixAppendTest(SparsityType.SPARSE, ValueType.RAND_ROUND, true);
+	public void testSparseRoundRandDataOLECompression() {
+		runMatrixAppendTest(SparsityType.SPARSE, ValueType.RAND_ROUND_OLE, true);
+	}
+	
+	@Test
+	public void testDenseRoundRandDataDDCCompression() {
+		runMatrixAppendTest(SparsityType.DENSE, ValueType.RAND_ROUND_DDC, true);
+	}
+	
+	@Test
+	public void testSparseRoundRandDataDDCCompression() {
+		runMatrixAppendTest(SparsityType.SPARSE, ValueType.RAND_ROUND_DDC, true);
 	}
 	
 	@Test
@@ -106,13 +117,13 @@ public class BasicMatrixAppendTest extends AutomatedTestBase
 	}
 	
 	@Test
-	public void testDenseRoundRandDataNoCompression() {
-		runMatrixAppendTest(SparsityType.DENSE, ValueType.RAND_ROUND, false);
+	public void testDenseRoundRandDataOLENoCompression() {
+		runMatrixAppendTest(SparsityType.DENSE, ValueType.RAND_ROUND_OLE, false);
 	}
 	
 	@Test
-	public void testSparseRoundRandDataNoCompression() {
-		runMatrixAppendTest(SparsityType.SPARSE, ValueType.RAND_ROUND, false);
+	public void testSparseRoundRandDataOLENoCompression() {
+		runMatrixAppendTest(SparsityType.SPARSE, ValueType.RAND_ROUND_OLE, false);
 	}
 	
 	@Test
@@ -145,8 +156,10 @@ public class BasicMatrixAppendTest extends AutomatedTestBase
 			//generate input data
 			double min = (vtype==ValueType.CONST)? 10 : -10;
 			double[][] input = TestUtils.generateTestMatrix(rows, cols1, min, 10, sparsity, 7);
-			if( vtype==ValueType.RAND_ROUND )
+			if( vtype==ValueType.RAND_ROUND_OLE || vtype==ValueType.RAND_ROUND_DDC ) {
+				CompressedMatrixBlock.ALLOW_DDC_ENCODING = (vtype==ValueType.RAND_ROUND_DDC);
 				input = TestUtils.round(input);
+			}
 			MatrixBlock mb = DataConverter.convertToMatrixBlock(input);
 			MatrixBlock vector = DataConverter.convertToMatrixBlock(
 					TestUtils.generateTestMatrix(rows, cols2, 1, 1, 1.0, 3));
@@ -172,5 +185,8 @@ public class BasicMatrixAppendTest extends AutomatedTestBase
 		catch(Exception ex) {
 			throw new RuntimeException(ex);
 		}
+		finally {
+			CompressedMatrixBlock.ALLOW_DDC_ENCODING = true;
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicMatrixMultChainTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicMatrixMultChainTest.java b/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicMatrixMultChainTest.java
index 8f17f91..fe46107 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicMatrixMultChainTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicMatrixMultChainTest.java
@@ -45,9 +45,10 @@ public class BasicMatrixMultChainTest extends AutomatedTestBase
 	}
 	
 	public enum ValueType {
-		RAND,
-		RAND_ROUND,
-		CONST,
+		RAND, //UC
+		CONST, //RLE
+		RAND_ROUND_OLE, //OLE
+		RAND_ROUND_DDC, //RLE
 	}
 	
 	@Override
@@ -71,13 +72,23 @@ public class BasicMatrixMultChainTest extends AutomatedTestBase
 	}
 	
 	@Test
-	public void testDenseRoundRandDataNoWeightsCompression() {
-		runMatrixMultChainTest(SparsityType.DENSE, ValueType.RAND_ROUND, ChainType.XtXv, true);
+	public void testDenseRoundRandDataOLENoWeightsCompression() {
+		runMatrixMultChainTest(SparsityType.DENSE, ValueType.RAND_ROUND_OLE, ChainType.XtXv, true);
 	}
 	
 	@Test
-	public void testSparseRoundRandDataNoWeightsCompression() {
-		runMatrixMultChainTest(SparsityType.SPARSE, ValueType.RAND_ROUND, ChainType.XtXv, true);
+	public void testSparseRoundRandDataOLENoWeightsCompression() {
+		runMatrixMultChainTest(SparsityType.SPARSE, ValueType.RAND_ROUND_OLE, ChainType.XtXv, true);
+	}
+	
+	@Test
+	public void testDenseRoundRandDataDDCNoWeightsCompression() {
+		runMatrixMultChainTest(SparsityType.DENSE, ValueType.RAND_ROUND_DDC, ChainType.XtXv, true);
+	}
+	
+	@Test
+	public void testSparseRoundRandDataDDCNoWeightsCompression() {
+		runMatrixMultChainTest(SparsityType.SPARSE, ValueType.RAND_ROUND_DDC, ChainType.XtXv, true);
 	}
 	
 	@Test
@@ -106,13 +117,23 @@ public class BasicMatrixMultChainTest extends AutomatedTestBase
 	}
 	
 	@Test
-	public void testDenseRoundRandDataNoWeightsNoCompression() {
-		runMatrixMultChainTest(SparsityType.DENSE, ValueType.RAND_ROUND, ChainType.XtXv, false);
+	public void testDenseRoundRandDataOLENoWeightsNoCompression() {
+		runMatrixMultChainTest(SparsityType.DENSE, ValueType.RAND_ROUND_OLE, ChainType.XtXv, false);
+	}
+	
+	@Test
+	public void testSparseRoundRandDataOLENoWeightsNoCompression() {
+		runMatrixMultChainTest(SparsityType.SPARSE, ValueType.RAND_ROUND_OLE, ChainType.XtXv, false);
+	}
+	
+	@Test
+	public void testDenseRoundRandDataDDCNoWeightsNoCompression() {
+		runMatrixMultChainTest(SparsityType.DENSE, ValueType.RAND_ROUND_DDC, ChainType.XtXv, false);
 	}
 	
 	@Test
-	public void testSparseRoundRandDataNoWeightsNoCompression() {
-		runMatrixMultChainTest(SparsityType.SPARSE, ValueType.RAND_ROUND, ChainType.XtXv, false);
+	public void testSparseRoundRandDataDDCNoWeightsNoCompression() {
+		runMatrixMultChainTest(SparsityType.SPARSE, ValueType.RAND_ROUND_DDC, ChainType.XtXv, false);
 	}
 	
 	@Test
@@ -141,13 +162,23 @@ public class BasicMatrixMultChainTest extends AutomatedTestBase
 	}
 	
 	@Test
-	public void testDenseRoundRandDataWeightsCompression() {
-		runMatrixMultChainTest(SparsityType.DENSE, ValueType.RAND_ROUND, ChainType.XtwXv, true);
+	public void testDenseRoundRandDataOLEWeightsCompression() {
+		runMatrixMultChainTest(SparsityType.DENSE, ValueType.RAND_ROUND_OLE, ChainType.XtwXv, true);
+	}
+	
+	@Test
+	public void testSparseRoundRandDataOLEWeightsCompression() {
+		runMatrixMultChainTest(SparsityType.SPARSE, ValueType.RAND_ROUND_OLE, ChainType.XtwXv, true);
 	}
 	
 	@Test
-	public void testSparseRoundRandDataWeightsCompression() {
-		runMatrixMultChainTest(SparsityType.SPARSE, ValueType.RAND_ROUND, ChainType.XtwXv, true);
+	public void testDenseRoundRandDataDDCWeightsCompression() {
+		runMatrixMultChainTest(SparsityType.DENSE, ValueType.RAND_ROUND_DDC, ChainType.XtwXv, true);
+	}
+	
+	@Test
+	public void testSparseRoundRandDataDDCWeightsCompression() {
+		runMatrixMultChainTest(SparsityType.SPARSE, ValueType.RAND_ROUND_DDC, ChainType.XtwXv, true);
 	}
 	
 	@Test
@@ -176,13 +207,13 @@ public class BasicMatrixMultChainTest extends AutomatedTestBase
 	}
 	
 	@Test
-	public void testDenseRoundRandDataWeightsNoCompression() {
-		runMatrixMultChainTest(SparsityType.DENSE, ValueType.RAND_ROUND, ChainType.XtwXv, false);
+	public void testDenseRoundRandDataOLEWeightsNoCompression() {
+		runMatrixMultChainTest(SparsityType.DENSE, ValueType.RAND_ROUND_OLE, ChainType.XtwXv, false);
 	}
 	
 	@Test
-	public void testSparseRoundRandDataWeightsNoCompression() {
-		runMatrixMultChainTest(SparsityType.SPARSE, ValueType.RAND_ROUND, ChainType.XtwXv, false);
+	public void testSparseRoundRandDataOLEWeightsNoCompression() {
+		runMatrixMultChainTest(SparsityType.SPARSE, ValueType.RAND_ROUND_OLE, ChainType.XtwXv, false);
 	}
 	
 	@Test
@@ -214,8 +245,10 @@ public class BasicMatrixMultChainTest extends AutomatedTestBase
 			//generate input data
 			double min = (vtype==ValueType.CONST)? 10 : -10;
 			double[][] input = TestUtils.generateTestMatrix(rows, cols, min, 10, sparsity, 7);
-			if( vtype==ValueType.RAND_ROUND )
+			if( vtype==ValueType.RAND_ROUND_OLE || vtype==ValueType.RAND_ROUND_DDC ) {
+				CompressedMatrixBlock.ALLOW_DDC_ENCODING = (vtype==ValueType.RAND_ROUND_DDC);
 				input = TestUtils.round(input);
+			}
 			MatrixBlock mb = DataConverter.convertToMatrixBlock(input);
 			MatrixBlock vector1 = DataConverter.convertToMatrixBlock(
 					TestUtils.generateTestMatrix(cols, 1, 0, 1, 1.0, 3));
@@ -241,5 +274,8 @@ public class BasicMatrixMultChainTest extends AutomatedTestBase
 		catch(Exception ex) {
 			throw new RuntimeException(ex);
 		}
+		finally {
+			CompressedMatrixBlock.ALLOW_DDC_ENCODING = true;
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicMatrixTransposeSelfMultTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicMatrixTransposeSelfMultTest.java b/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicMatrixTransposeSelfMultTest.java
index ff2a103..c00f25f 100644
--- a/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicMatrixTransposeSelfMultTest.java
+++ b/src/test/java/org/apache/sysml/test/integration/functions/compress/BasicMatrixTransposeSelfMultTest.java
@@ -45,9 +45,10 @@ public class BasicMatrixTransposeSelfMultTest extends AutomatedTestBase
 	}
 	
 	public enum ValueType {
-		RAND,
-		RAND_ROUND,
-		CONST,
+		RAND, //UC
+		CONST, //RLE
+		RAND_ROUND_OLE, //OLE
+		RAND_ROUND_DDC, //RLE
 	}
 	
 	@Override
@@ -71,13 +72,23 @@ public class BasicMatrixTransposeSelfMultTest extends AutomatedTestBase
 	}
 	
 	@Test
-	public void testDenseRoundRandDataCompression() {
-		runTransposeSelfMatrixMultTest(SparsityType.DENSE, ValueType.RAND_ROUND, true);
+	public void testDenseRoundRandDataOLECompression() {
+		runTransposeSelfMatrixMultTest(SparsityType.DENSE, ValueType.RAND_ROUND_OLE, true);
 	}
 	
 	@Test
-	public void testSparseRoundRandDataCompression() {
-		runTransposeSelfMatrixMultTest(SparsityType.SPARSE, ValueType.RAND_ROUND, true);
+	public void testSparseRoundRandDataOLECompression() {
+		runTransposeSelfMatrixMultTest(SparsityType.SPARSE, ValueType.RAND_ROUND_OLE, true);
+	}
+	
+	@Test
+	public void testDenseRoundRandDataDDCCompression() {
+		runTransposeSelfMatrixMultTest(SparsityType.DENSE, ValueType.RAND_ROUND_DDC, true);
+	}
+	
+	@Test
+	public void testSparseRoundRandDataDDCCompression() {
+		runTransposeSelfMatrixMultTest(SparsityType.SPARSE, ValueType.RAND_ROUND_DDC, true);
 	}
 	
 	@Test
@@ -106,13 +117,13 @@ public class BasicMatrixTransposeSelfMultTest extends AutomatedTestBase
 	}
 	
 	@Test
-	public void testDenseRoundRandDataNoCompression() {
-		runTransposeSelfMatrixMultTest(SparsityType.DENSE, ValueType.RAND_ROUND, false);
+	public void testDenseRoundRandDataOLENoCompression() {
+		runTransposeSelfMatrixMultTest(SparsityType.DENSE, ValueType.RAND_ROUND_OLE, false);
 	}
 	
 	@Test
-	public void testSparseRoundRandDataNoCompression() {
-		runTransposeSelfMatrixMultTest(SparsityType.SPARSE, ValueType.RAND_ROUND, false);
+	public void testSparseRoundRandDataOLENoCompression() {
+		runTransposeSelfMatrixMultTest(SparsityType.SPARSE, ValueType.RAND_ROUND_OLE, false);
 	}
 	
 	@Test
@@ -145,8 +156,10 @@ public class BasicMatrixTransposeSelfMultTest extends AutomatedTestBase
 			//generate input data
 			double min = (vtype==ValueType.CONST)? 10 : -10;
 			double[][] input = TestUtils.generateTestMatrix(rows, cols, min, 10, sparsity, 7);
-			if( vtype==ValueType.RAND_ROUND )
+			if( vtype==ValueType.RAND_ROUND_OLE || vtype==ValueType.RAND_ROUND_DDC ) {
+				CompressedMatrixBlock.ALLOW_DDC_ENCODING = (vtype==ValueType.RAND_ROUND_DDC);
 				input = TestUtils.round(input);
+			}
 			MatrixBlock mb = DataConverter.convertToMatrixBlock(input);
 			
 			//compress given matrix block
@@ -168,5 +181,8 @@ public class BasicMatrixTransposeSelfMultTest extends AutomatedTestBase
 		catch(Exception ex) {
 			throw new RuntimeException(ex);
 		}
+		finally {
+			CompressedMatrixBlock.ALLOW_DDC_ENCODING = true;
+		}
 	}
 }