You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by ba...@apache.org on 2022/09/16 17:35:46 UTC

[systemds] branch main updated (652b4077be -> a3ea6c13e8)

This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


    from 652b4077be [SYSTEMDS-3429] Federated Multithreaded transformencode
     new 06b6712e2b [SYSTEMDS-3437] CLA Invalid Unique estimate DDC
     new a3ea6c13e8 [SYSTEMDS-2699] CLA IO Compressed Matrices

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../sysds/runtime/compress/colgroup/AColGroup.java |  27 +-
 .../compress/colgroup/AColGroupCompressed.java     |  24 +-
 .../runtime/compress/colgroup/AColGroupOffset.java |  34 +--
 .../runtime/compress/colgroup/AColGroupValue.java  |   4 -
 .../compress/colgroup/ADictBasedColGroup.java      |  39 +--
 .../compress/colgroup/AMorphingMMColGroup.java     |   7 -
 .../sysds/runtime/compress/colgroup/APreAgg.java   |   8 -
 .../sysds/runtime/compress/colgroup/ASDC.java      |  19 +-
 .../sysds/runtime/compress/colgroup/ASDCZero.java  |   5 -
 .../runtime/compress/colgroup/ColGroupConst.java   |  15 +-
 .../runtime/compress/colgroup/ColGroupDDC.java     |  19 +-
 .../runtime/compress/colgroup/ColGroupDDCFOR.java  |  23 +-
 .../compress/colgroup/ColGroupDeltaDDC.java        | 101 +++----
 .../runtime/compress/colgroup/ColGroupEmpty.java   |  12 +-
 .../runtime/compress/colgroup/ColGroupFactory.java | 218 ++++++--------
 .../runtime/compress/colgroup/ColGroupIO.java      |  57 ++--
 .../colgroup/ColGroupLinearFunctional.java         |  20 +-
 .../runtime/compress/colgroup/ColGroupOLE.java     |  21 +-
 .../runtime/compress/colgroup/ColGroupRLE.java     |  22 +-
 .../runtime/compress/colgroup/ColGroupSDC.java     |  30 +-
 .../runtime/compress/colgroup/ColGroupSDCFOR.java  |  27 +-
 .../compress/colgroup/ColGroupSDCSingle.java       |  23 +-
 .../compress/colgroup/ColGroupSDCSingleZeros.java  |  21 +-
 .../compress/colgroup/ColGroupSDCZeros.java        |  21 +-
 .../compress/colgroup/ColGroupUncompressed.java    |  23 +-
 .../runtime/compress/colgroup/offset/AOffset.java  |   4 +
 .../runtime/compress/estim/EstimationFactors.java  |   6 +-
 .../component/compress/CompressedTestBase.java     |   2 +-
 .../component/compress/colgroup/ColGroupBase.java  |  31 ++
 .../compress/colgroup/ColGroupFactoryTest.java     | 314 +++++++++++++++++++++
 .../ColGroupMorphingPerformanceCompare.java        |   5 -
 .../component/compress/colgroup/ColGroupTest.java  | 105 ++++++-
 ...genOffsetOLETest.java => GenOffsetOLETest.java} |   2 +-
 33 files changed, 784 insertions(+), 505 deletions(-)
 create mode 100644 src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupFactoryTest.java
 rename src/test/java/org/apache/sysds/test/component/compress/colgroup/{genOffsetOLETest.java => GenOffsetOLETest.java} (98%)


[systemds] 02/02: [SYSTEMDS-2699] CLA IO Compressed Matrices

Posted by ba...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git

commit a3ea6c13e89a26e319038294e585cef27365606c
Author: baunsgaard <ba...@tugraz.at>
AuthorDate: Fri Sep 16 13:13:35 2022 +0200

    [SYSTEMDS-2699] CLA IO Compressed Matrices
    
    This commit cleans up the IO interface of the Column groups to remove
    the option of creating ColumnGroups from the serialization constructors,
    that provide the opportunity to construct the column groups wrongly.
    With this change opportunities for exploiting the non empty structure
    of all column groups is closer, and the change make the IO and Serialization
    of column groups one step closer.
---
 .../sysds/runtime/compress/colgroup/AColGroup.java |  27 ++----
 .../compress/colgroup/AColGroupCompressed.java     |   4 -
 .../runtime/compress/colgroup/AColGroupOffset.java |  34 +++----
 .../runtime/compress/colgroup/AColGroupValue.java  |   4 -
 .../compress/colgroup/ADictBasedColGroup.java      |  12 ---
 .../compress/colgroup/AMorphingMMColGroup.java     |   7 --
 .../sysds/runtime/compress/colgroup/APreAgg.java   |   8 --
 .../sysds/runtime/compress/colgroup/ASDC.java      |  19 +---
 .../sysds/runtime/compress/colgroup/ASDCZero.java  |   5 -
 .../runtime/compress/colgroup/ColGroupConst.java   |  15 ++-
 .../runtime/compress/colgroup/ColGroupDDC.java     |  15 ++-
 .../runtime/compress/colgroup/ColGroupDDCFOR.java  |  23 ++---
 .../compress/colgroup/ColGroupDeltaDDC.java        | 101 ++++++++++-----------
 .../runtime/compress/colgroup/ColGroupEmpty.java   |  12 ++-
 .../runtime/compress/colgroup/ColGroupIO.java      |  57 ++++++------
 .../colgroup/ColGroupLinearFunctional.java         |  20 ++--
 .../runtime/compress/colgroup/ColGroupOLE.java     |  21 +++--
 .../runtime/compress/colgroup/ColGroupRLE.java     |  20 ++--
 .../runtime/compress/colgroup/ColGroupSDC.java     |  30 ++----
 .../runtime/compress/colgroup/ColGroupSDCFOR.java  |  27 ++----
 .../compress/colgroup/ColGroupSDCSingle.java       |  23 ++---
 .../compress/colgroup/ColGroupSDCSingleZeros.java  |  21 ++---
 .../compress/colgroup/ColGroupSDCZeros.java        |  21 ++---
 .../compress/colgroup/ColGroupUncompressed.java    |  15 +--
 .../ColGroupMorphingPerformanceCompare.java        |   5 -
 25 files changed, 208 insertions(+), 338 deletions(-)

diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroup.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroup.java
index b9ba6a5df4..ffee7563b7 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroup.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroup.java
@@ -68,10 +68,6 @@ public abstract class AColGroup implements Serializable {
 	/** The ColGroup Indexes contained in the ColGroup */
 	protected int[] _colIndexes;
 
-	/** Empty constructor, used for serializing into an empty new object of ColGroup. */
-	protected AColGroup() {
-		// empty
-	}
 
 	/**
 	 * Main constructor.
@@ -174,18 +170,12 @@ public abstract class AColGroup implements Serializable {
 			out.writeInt(_colIndexes[i]);
 	}
 
-	/**
-	 * Deserialize column group from data input.
-	 * 
-	 * @param in data input
-	 * @throws IOException if IOException occurs
-	 */
-	protected void readFields(DataInput in) throws IOException {
-		// column group type is read in ColGroupIO
+	protected static int[] readCols(DataInput in) throws IOException {
 		final int numCols = in.readInt();
-		_colIndexes = new int[numCols];
+		int[] cols = new int[numCols];
 		for(int i = 0; i < numCols; i++)
-			_colIndexes[i] = in.readInt();
+			cols[i] = in.readInt();
+		return cols;
 	}
 
 	/**
@@ -358,7 +348,7 @@ public abstract class AColGroup implements Serializable {
 	 * @param right The MatrixBlock on the right of this matrix multiplication
 	 * @return The new Column Group or null that is the result of the matrix multiplication.
 	 */
-	public final AColGroup rightMultByMatrix(MatrixBlock right){
+	public final AColGroup rightMultByMatrix(MatrixBlock right) {
 		return rightMultByMatrix(right, null);
 	}
 
@@ -367,8 +357,9 @@ public abstract class AColGroup implements Serializable {
 	 * 
 	 * This method can return null, meaning that the output overlapping group would have been empty.
 	 * 
-	 * @param right The MatrixBlock on the right of this matrix multiplication
-	 * @param allCols A pre-materialized list of all col indexes, that can be shared across all column groups if use full, can be set to null.
+	 * @param right   The MatrixBlock on the right of this matrix multiplication
+	 * @param allCols A pre-materialized list of all col indexes, that can be shared across all column groups if use
+	 *                full, can be set to null.
 	 * @return The new Column Group or null that is the result of the matrix multiplication.
 	 */
 	public abstract AColGroup rightMultByMatrix(MatrixBlock right, int[] allCols);
@@ -406,7 +397,7 @@ public abstract class AColGroup implements Serializable {
 	 * @param lhs    The left hand side Column group to multiply with, the left hand side should be considered
 	 *               transposed. Also it should be guaranteed that this column group is not empty.
 	 * @param result The result matrix to insert the result of the multiplication into
-	 * @param nRows   Number of rows in the lhs colGroup
+	 * @param nRows  Number of rows in the lhs colGroup
 	 */
 	public abstract void leftMultByAColGroup(AColGroup lhs, MatrixBlock result, int nRows);
 
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroupCompressed.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroupCompressed.java
index 1af529c08a..75319e08eb 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroupCompressed.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroupCompressed.java
@@ -45,10 +45,6 @@ public abstract class AColGroupCompressed extends AColGroup {
 
 	private static final long serialVersionUID = 6219835795420081223L;
 
-	protected AColGroupCompressed() {
-		super();
-	}
-
 	protected AColGroupCompressed(int[] colIndices) {
 		super(colIndices);
 	}
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroupOffset.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroupOffset.java
index 2e263df164..516caecab6 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroupOffset.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroupOffset.java
@@ -45,16 +45,6 @@ public abstract class AColGroupOffset extends APreAgg {
 
 	protected boolean _zeros;
 
-	/**
-	 * Constructor for serialization
-	 * 
-	 * @param numRows Number of rows contained
-	 */
-	protected AColGroupOffset(int numRows) {
-		super();
-		_numRows = numRows;
-	}
-
 	protected AColGroupOffset(int[] colIndices, int numRows, boolean zeros, ADictionary dict, int[] cachedCounts) {
 		super(colIndices, dict, cachedCounts);
 		_numRows = numRows;
@@ -104,20 +94,18 @@ public abstract class AColGroupOffset extends APreAgg {
 		return ret;
 	}
 
-	@Override
-	public void readFields(DataInput in) throws IOException {
-		super.readFields(in);
-
-		// read bitmaps
-		_ptr = new int[in.readInt()];
-		for(int i = 0; i < _ptr.length; i++)
-			_ptr[i] = in.readInt();
-
-		_data = new char[in.readInt()];
-		for(int i = 0; i < _data.length; i++)
-			_data[i] = in.readChar();
+	public static int[] readPointers(DataInput in) throws IOException {
+		int[] ptr = new int[in.readInt()];
+		for(int i = 0; i < ptr.length; i++)
+			ptr[i] = in.readInt();
+		return ptr;
+	}
 
-		_zeros = in.readBoolean();
+	public static char[] readData(DataInput in) throws IOException {
+		char[] data = new char[in.readInt()];
+		for(int i = 0; i < data.length; i++)
+			data[i] = in.readChar();
+		return data;
 	}
 
 	@Override
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroupValue.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroupValue.java
index 615f0cc5e6..85b64783dc 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroupValue.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroupValue.java
@@ -37,10 +37,6 @@ public abstract class AColGroupValue extends ADictBasedColGroup implements Clone
 	/** The count of each distinct value contained in the dictionary */
 	private SoftReference<int[]> counts = null;
 
-	protected AColGroupValue() {
-		super();
-	}
-
 	/**
 	 * A abstract class for column groups that contain ADictionary for values.
 	 * 
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ADictBasedColGroup.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ADictBasedColGroup.java
index 467d6dac5d..8ce8b6731e 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ADictBasedColGroup.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ADictBasedColGroup.java
@@ -18,7 +18,6 @@
  */
 package org.apache.sysds.runtime.compress.colgroup;
 
-import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Arrays;
@@ -27,7 +26,6 @@ import java.util.Set;
 
 import org.apache.sysds.runtime.compress.colgroup.dictionary.ADictionary;
 import org.apache.sysds.runtime.compress.colgroup.dictionary.Dictionary;
-import org.apache.sysds.runtime.compress.colgroup.dictionary.DictionaryFactory;
 import org.apache.sysds.runtime.compress.colgroup.dictionary.MatrixBlockDictionary;
 import org.apache.sysds.runtime.compress.utils.Util;
 import org.apache.sysds.runtime.data.DenseBlock;
@@ -39,10 +37,6 @@ public abstract class ADictBasedColGroup extends AColGroupCompressed {
 	/** Distinct value tuples associated with individual bitmaps. */
 	protected ADictionary _dict;
 
-	protected ADictBasedColGroup() {
-		super();
-	}
-
 	/**
 	 * A Abstract class for column groups that contain ADictionary for values.
 	 * 
@@ -143,12 +137,6 @@ public abstract class ADictBasedColGroup extends AColGroupCompressed {
 	protected abstract void decompressToSparseBlockDenseDictionary(SparseBlock ret, int rl, int ru, int offR, int offC,
 		double[] values);
 
-	@Override
-	public void readFields(DataInput in) throws IOException {
-		super.readFields(in);
-		_dict = DictionaryFactory.read(in);
-	}
-
 	@Override
 	public void write(DataOutput out) throws IOException {
 		super.write(out);
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/AMorphingMMColGroup.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/AMorphingMMColGroup.java
index ff609fefce..6e4acd0466 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/AMorphingMMColGroup.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/AMorphingMMColGroup.java
@@ -35,13 +35,6 @@ import org.apache.sysds.runtime.matrix.data.MatrixBlock;
 public abstract class AMorphingMMColGroup extends AColGroupValue {
 	private static final long serialVersionUID = -4265713396790607199L;
 
-	/**
-	 * Constructor for serialization
-	 */
-	protected AMorphingMMColGroup() {
-		super();
-	}
-
 	/**
 	 * A Abstract class for column groups that contain ADictionary for values.
 	 * 
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/APreAgg.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/APreAgg.java
index 41f0963e1e..ffe58023f8 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/APreAgg.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/APreAgg.java
@@ -39,14 +39,6 @@ public abstract class APreAgg extends AColGroupValue {
 
 	private static boolean loggedWarningForDirect = false;
 
-	/**
-	 * Constructor for serialization
-	 * 
-	 */
-	protected APreAgg() {
-		super();
-	}
-
 	/**
 	 * A Abstract class for column groups that contain ADictionary for values.
 	 * 
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ASDC.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ASDC.java
index e598553b3e..91c4eb6f92 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ASDC.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ASDC.java
@@ -34,34 +34,23 @@ public abstract class ASDC extends AMorphingMMColGroup {
 
 	/** Sparse row indexes for the data */
 	protected AOffset _indexes;
-	
-	final protected int _numRows;
 
-	/**
-	 * Constructor for serialization
-	 * 
-	 * @param numRows Number of rows contained
-	 */
-	protected ASDC(int numRows) {
-		super();
-		_numRows = numRows;
-	}
+	final protected int _numRows;
 
-	protected ASDC(int[] colIndices, int numRows, ADictionary dict,  AOffset offsets,
-		int[] cachedCounts) {
+	protected ASDC(int[] colIndices, int numRows, ADictionary dict, AOffset offsets, int[] cachedCounts) {
 		super(colIndices, dict, cachedCounts);
 
 		_indexes = offsets;
 		_numRows = numRows;
 	}
 
-	public int getNumRows(){
+	public int getNumRows() {
 		return _numRows;
 	}
 
 	public abstract double[] getDefaultTuple();
 
-	public AOffset getOffsets(){
+	public AOffset getOffsets() {
 		return _indexes;
 	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ASDCZero.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ASDCZero.java
index a0a1fd58e1..59786ddfcb 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ASDCZero.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ASDCZero.java
@@ -34,11 +34,6 @@ public abstract class ASDCZero extends APreAgg {
 	protected AOffset _indexes;
 	final protected int _numRows;
 
-	protected ASDCZero(int numRows) {
-		super();
-		_numRows = numRows;
-	}
-
 	protected ASDCZero(int[] colIndices, int numRows, ADictionary dict, AOffset offsets, int[] cachedCounts) {
 		super(colIndices, dict, cachedCounts);
 		_indexes = offsets;
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupConst.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupConst.java
index 57db52f158..c87b8ed4fe 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupConst.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupConst.java
@@ -19,9 +19,13 @@
 
 package org.apache.sysds.runtime.compress.colgroup;
 
+import java.io.DataInput;
+import java.io.IOException;
+
 import org.apache.sysds.runtime.compress.DMLCompressionException;
 import org.apache.sysds.runtime.compress.colgroup.dictionary.ADictionary;
 import org.apache.sysds.runtime.compress.colgroup.dictionary.Dictionary;
+import org.apache.sysds.runtime.compress.colgroup.dictionary.DictionaryFactory;
 import org.apache.sysds.runtime.compress.colgroup.dictionary.MatrixBlockDictionary;
 import org.apache.sysds.runtime.compress.cost.ComputationCostEstimator;
 import org.apache.sysds.runtime.compress.lib.CLALibLeftMultBy;
@@ -40,11 +44,6 @@ public class ColGroupConst extends ADictBasedColGroup {
 
 	private static final long serialVersionUID = -7387793538322386611L;
 
-	/** Constructor for serialization */
-	protected ColGroupConst() {
-		super();
-	}
-
 	/**
 	 * Constructs an Constant Colum Group, that contains only one tuple, with the given value.
 	 * 
@@ -514,6 +513,12 @@ public class ColGroupConst extends ADictBasedColGroup {
 			return null;
 	}
 
+	public static ColGroupConst read(DataInput in) throws IOException {
+		int[] cols = readCols(in);
+		ADictionary dict = DictionaryFactory.read(in);
+		return new ColGroupConst(cols, dict);
+	}
+
 	@Override
 	public String toString() {
 		StringBuilder sb = new StringBuilder();
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java
index 1321bafc0e..10dcc58eef 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java
@@ -26,6 +26,7 @@ import java.io.IOException;
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.sysds.runtime.compress.colgroup.dictionary.ADictionary;
 import org.apache.sysds.runtime.compress.colgroup.dictionary.Dictionary;
+import org.apache.sysds.runtime.compress.colgroup.dictionary.DictionaryFactory;
 import org.apache.sysds.runtime.compress.colgroup.dictionary.MatrixBlockDictionary;
 import org.apache.sysds.runtime.compress.colgroup.mapping.AMapToData;
 import org.apache.sysds.runtime.compress.colgroup.mapping.MapToFactory;
@@ -49,11 +50,6 @@ public class ColGroupDDC extends APreAgg {
 
 	protected AMapToData _data;
 
-	/** Constructor for serialization */
-	protected ColGroupDDC() {
-		super();
-	}
-
 	private ColGroupDDC(int[] colIndexes, ADictionary dict, AMapToData data, int[] cachedCounts) {
 		super(colIndexes, dict, cachedCounts);
 		_data = data;
@@ -427,10 +423,11 @@ public class ColGroupDDC extends APreAgg {
 		_data.write(out);
 	}
 
-	@Override
-	public void readFields(DataInput in) throws IOException {
-		super.readFields(in);
-		_data = MapToFactory.readIn(in);
+	public static ColGroupDDC read(DataInput in) throws IOException {
+		int[] cols = AColGroup.readCols(in);
+		ADictionary dict = DictionaryFactory.read(in);
+		AMapToData data = MapToFactory.readIn(in);
+		return new ColGroupDDC(cols, dict, data, null);
 	}
 
 	@Override
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDCFOR.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDCFOR.java
index 9def9b075d..3d89d7626a 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDCFOR.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDCFOR.java
@@ -28,6 +28,7 @@ import org.apache.commons.lang.NotImplementedException;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.compress.DMLCompressionException;
 import org.apache.sysds.runtime.compress.colgroup.dictionary.ADictionary;
+import org.apache.sysds.runtime.compress.colgroup.dictionary.DictionaryFactory;
 import org.apache.sysds.runtime.compress.colgroup.dictionary.MatrixBlockDictionary;
 import org.apache.sysds.runtime.compress.colgroup.mapping.AMapToData;
 import org.apache.sysds.runtime.compress.colgroup.mapping.MapToFactory;
@@ -57,11 +58,6 @@ public class ColGroupDDCFOR extends AMorphingMMColGroup {
 	/** Reference values in this column group */
 	protected double[] _reference;
 
-	/** Constructor for serialization */
-	protected ColGroupDDCFOR() {
-		super();
-	}
-
 	private ColGroupDDCFOR(int[] colIndexes, ADictionary dict, double[] reference, AMapToData data, int[] cachedCounts) {
 		super(colIndexes, dict, cachedCounts);
 		if(data.getUnique() != dict.getNumberOfValues(colIndexes.length))
@@ -91,10 +87,10 @@ public class ColGroupDDCFOR extends AMorphingMMColGroup {
 		// It is assumed whoever call this does not use an empty Dictionary in g.
 		final int nCol = g.getColIndices().length;
 		final MatrixBlockDictionary mbd = g._dict.getMBDict(nCol);
-		if(mbd != null){
+		if(mbd != null) {
 
 			final MatrixBlock mb = mbd.getMatrixBlock();
-			
+
 			final double[] ref = ColGroupUtils.extractMostCommonValueInColumns(mb);
 			if(ref != null) {
 				MatrixBlockDictionary mDict = mbd.binOpRight(new BinaryOperator(Minus.getMinusFnObject()), ref);
@@ -217,13 +213,12 @@ public class ColGroupDDCFOR extends AMorphingMMColGroup {
 			out.writeDouble(d);
 	}
 
-	@Override
-	public void readFields(DataInput in) throws IOException {
-		super.readFields(in);
-		_data = MapToFactory.readIn(in);
-		_reference = new double[_colIndexes.length];
-		for(int i = 0; i < _colIndexes.length; i++)
-			_reference[i] = in.readDouble();
+	public static ColGroupDDCFOR read(DataInput in) throws IOException {
+		int[] cols = AColGroup.readCols(in);
+		ADictionary dict = DictionaryFactory.read(in);
+		AMapToData data = MapToFactory.readIn(in);
+		double[] ref = ColGroupIO.readDoubleArray(cols.length, in);
+		return new ColGroupDDCFOR(cols, dict, ref, data, null);
 	}
 
 	@Override
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDeltaDDC.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDeltaDDC.java
index c26504ecfe..2ec72f7cca 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDeltaDDC.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDeltaDDC.java
@@ -19,69 +19,62 @@
 
 package org.apache.sysds.runtime.compress.colgroup;
 
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.sysds.runtime.compress.colgroup.dictionary.ADictionary;
-import org.apache.sysds.runtime.compress.colgroup.mapping.AMapToData;
-import org.apache.sysds.runtime.data.DenseBlock;
-import org.apache.sysds.runtime.data.SparseBlock;
-import org.apache.sysds.runtime.matrix.operators.ScalarOperator;
-
 /**
  * Class to encapsulate information about a column group that is first delta encoded then encoded with dense dictionary
  * encoding (DeltaDDC).
  */
-public class ColGroupDeltaDDC extends ColGroupDDC {
+// public class ColGroupDeltaDDC extends ColGroupDDC {
 
-	private static final long serialVersionUID = -1045556313148564147L;
+// 	private static final long serialVersionUID = -1045556313148564147L;
 
-	/** Constructor for serialization */
-	protected ColGroupDeltaDDC() {
-	}
+// 	/** Constructor for serialization */
+// 	protected ColGroupDeltaDDC() {
+// 	}
 
-	private ColGroupDeltaDDC(int[] colIndexes, ADictionary dict, AMapToData data, int[] cachedCounts) {
-		super();
-		LOG.info("Carefully use of DeltaDDC since implementation is not finished.");
-		_colIndexes = colIndexes;
-		_dict = dict;
-		_data = data;
-	}
+// 	private ColGroupDeltaDDC(int[] colIndexes, ADictionary dict, AMapToData data, int[] cachedCounts) {
+// 		super();
+// 		LOG.info("Carefully use of DeltaDDC since implementation is not finished.");
+// 		_colIndexes = colIndexes;
+// 		_dict = dict;
+// 		_data = data;
+// 	}
 
-	public static AColGroup create(int[] colIndices, ADictionary dict, AMapToData data, int[] cachedCounts) {
-		if(dict == null)
-			throw new NotImplementedException("Not implemented constant delta group");
-		else
-			return new ColGroupDeltaDDC(colIndices, dict, data, cachedCounts);
-	}
+// 	public static AColGroup create(int[] colIndices, ADictionary dict, AMapToData data, int[] cachedCounts) {
+// 		if(dict == null)
+// 			throw new NotImplementedException("Not implemented constant delta group");
+// 		else
+// 			return new ColGroupDeltaDDC(colIndices, dict, data, cachedCounts);
+// 	}
 
-	public CompressionType getCompType() {
-		return CompressionType.DeltaDDC;
-	}
+// 	public CompressionType getCompType() {
+// 		return CompressionType.DeltaDDC;
+// 	}
 
-	@Override
-	protected void decompressToDenseBlockDenseDictionary(DenseBlock db, int rl, int ru, int offR, int offC,
-		double[] values) {
-		final int nCol = _colIndexes.length;
-		for(int i = rl, offT = rl + offR; i < ru; i++, offT++) {
-			final double[] c = db.values(offT);
-			final int off = db.pos(offT) + offC;
-			final int rowIndex = _data.getIndex(i) * nCol;
-			final int prevOff = (off == 0) ? off : off - nCol;
-			for(int j = 0; j < nCol; j++) {
-				// Here we use the values in the previous row to compute current values along with the delta
-				double newValue = c[prevOff + j] + values[rowIndex + j];
-				c[off + _colIndexes[j]] += newValue;
-			}
-		}
-	}
+// 	@Override
+// 	protected void decompressToDenseBlockDenseDictionary(DenseBlock db, int rl, int ru, int offR, int offC,
+// 		double[] values) {
+// 		final int nCol = _colIndexes.length;
+// 		for(int i = rl, offT = rl + offR; i < ru; i++, offT++) {
+// 			final double[] c = db.values(offT);
+// 			final int off = db.pos(offT) + offC;
+// 			final int rowIndex = _data.getIndex(i) * nCol;
+// 			final int prevOff = (off == 0) ? off : off - nCol;
+// 			for(int j = 0; j < nCol; j++) {
+// 				// Here we use the values in the previous row to compute current values along with the delta
+// 				double newValue = c[prevOff + j] + values[rowIndex + j];
+// 				c[off + _colIndexes[j]] += newValue;
+// 			}
+// 		}
+// 	}
 
-	@Override
-	protected void decompressToSparseBlockDenseDictionary(SparseBlock ret, int rl, int ru, int offR, int offC,
-		double[] values) {
-		throw new NotImplementedException();
-	}
+// 	@Override
+// 	protected void decompressToSparseBlockDenseDictionary(SparseBlock ret, int rl, int ru, int offR, int offC,
+// 		double[] values) {
+// 		throw new NotImplementedException();
+// 	}
 
-	@Override
-	public AColGroup scalarOperation(ScalarOperator op) {
-		return new ColGroupDeltaDDC(_colIndexes, _dict.applyScalarOp(op), _data, getCachedCounts());
-	}
-}
+// 	@Override
+// 	public AColGroup scalarOperation(ScalarOperator op) {
+// 		return new ColGroupDeltaDDC(_colIndexes, _dict.applyScalarOp(op), _data, getCachedCounts());
+// 	}
+// }
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupEmpty.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupEmpty.java
index 2a427caea9..ce1c61dde4 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupEmpty.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupEmpty.java
@@ -19,6 +19,8 @@
 
 package org.apache.sysds.runtime.compress.colgroup;
 
+import java.io.DataInput;
+import java.io.IOException;
 import java.util.Arrays;
 
 import org.apache.sysds.runtime.DMLRuntimeException;
@@ -39,11 +41,6 @@ import org.apache.sysds.runtime.matrix.operators.UnaryOperator;
 public class ColGroupEmpty extends AColGroupCompressed {
 	private static final long serialVersionUID = -2307677253622099958L;
 
-	/** Constructor for serialization */
-	protected ColGroupEmpty() {
-		super();
-	}
-
 	/**
 	 * Constructs an Constant Colum Group, that contains only one tuple, with the given value.
 	 * 
@@ -304,4 +301,9 @@ public class ColGroupEmpty extends AColGroupCompressed {
 	public boolean isEmpty() {
 		return true;
 	}
+
+	public static ColGroupEmpty read(DataInput in) throws IOException {
+		int[] cols = readCols(in);
+		return new ColGroupEmpty(cols);
+	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupIO.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupIO.java
index a15e36d2ee..f5a1fb2d80 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupIO.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupIO.java
@@ -48,20 +48,13 @@ public interface ColGroupIO {
 
 		// Read in how many colGroups there are
 		final int nColGroups = in.readInt();
-		final boolean trace = LOG.isTraceEnabled();
 
 		// Allocate that amount into an ArrayList
 		final List<AColGroup> _colGroups = new ArrayList<>(nColGroups);
 
 		// Read each ColGroup one at a time.
-		for(int i = 0; i < nColGroups; i++) {
-			ColGroupType ctype = ColGroupType.values()[in.readByte()];
-			if(trace)
-				LOG.trace("Reading in : " + ctype);
-			final AColGroup grp = constructColGroup(ctype, nRows);
-			grp.readFields(in);
-			_colGroups.add(grp);
-		}
+		for(int i = 0; i < nColGroups; i++)
+			_colGroups.add(readColGroup(in, nRows));
 
 		return _colGroups;
 	}
@@ -93,36 +86,44 @@ public interface ColGroupIO {
 		return ret;
 	}
 
-	private static AColGroup constructColGroup(ColGroupType ctype, int nRows) {
+	public static AColGroup readColGroup(DataInput in, int nRows) throws IOException {
+		final ColGroupType ctype = ColGroupType.values()[in.readByte()];
 		switch(ctype) {
-			case UNCOMPRESSED:
-				return new ColGroupUncompressed();
+			case DDC:
+				return ColGroupDDC.read(in);
+			case DDCFOR:
+				return ColGroupDDCFOR.read(in);
 			case OLE:
-				return new ColGroupOLE(nRows);
+				return ColGroupOLE.read(in, nRows);
 			case RLE:
-				return new ColGroupRLE(nRows);
-			case DDC:
-				return new ColGroupDDC();
-			case DeltaDDC:
-				return new ColGroupDeltaDDC();
+				return ColGroupRLE.read(in, nRows);
 			case CONST:
-				return new ColGroupConst();
+				return ColGroupConst.read(in);
 			case EMPTY:
-				return new ColGroupEmpty();
+				return ColGroupEmpty.read(in);
+			case UNCOMPRESSED:
+				return ColGroupUncompressed.read(in);
 			case SDC:
-				return new ColGroupSDC(nRows);
+				return ColGroupSDC.read(in, nRows);
 			case SDCSingle:
-				return new ColGroupSDCSingle(nRows);
+				return ColGroupSDCSingle.read(in, nRows);
 			case SDCSingleZeros:
-				return new ColGroupSDCSingleZeros(nRows);
+				return ColGroupSDCSingleZeros.read(in, nRows);
 			case SDCZeros:
-				return new ColGroupSDCZeros(nRows);
+				return ColGroupSDCZeros.read(in, nRows);
 			case SDCFOR:
-				return new ColGroupSDCFOR(nRows);
-			case DDCFOR:
-				return new ColGroupDDCFOR();
+				return ColGroupSDCFOR.read(in, nRows);
+			case LinearFunctional:
+				return ColGroupLinearFunctional.read(in, nRows);
 			default:
-				throw new DMLRuntimeException("Unsupported ColGroup Type used:  " + ctype);
+				throw new DMLRuntimeException("Unsupported ColGroup Type used: " + ctype);
 		}
 	}
+
+	public static double[] readDoubleArray(int length, DataInput in) throws IOException {
+		double[] ret = new double[length];
+		for(int i = 0; i < length; i++)
+			ret[i] = in.readDouble();
+		return ret;
+	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupLinearFunctional.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupLinearFunctional.java
index 832207b94b..bffb7a24cd 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupLinearFunctional.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupLinearFunctional.java
@@ -57,11 +57,6 @@ public class ColGroupLinearFunctional extends AColGroupCompressed {
 
 	protected int _numRows;
 
-	/** Constructor for serialization */
-	protected ColGroupLinearFunctional() {
-		super();
-	}
-
 	/**
 	 * Constructs a Linear Functional Column Group that compresses its content using a linear functional.
 	 *
@@ -417,12 +412,10 @@ public class ColGroupLinearFunctional extends AColGroupCompressed {
 		throw new DMLCompressionException("This method should never be called");
 	}
 
-
 	@Override
 	public void leftMultByAColGroup(AColGroup lhs, MatrixBlock result, int nRows) {
-		if(lhs instanceof ColGroupEmpty) 
+		if(lhs instanceof ColGroupEmpty)
 			return;
-		
 
 		MatrixBlock tmpRet = new MatrixBlock(lhs.getNumCols(), _colIndexes.length, 0);
 
@@ -526,14 +519,17 @@ public class ColGroupLinearFunctional extends AColGroupCompressed {
 		throw new NotImplementedException();
 	}
 
-	@Override
-	public void readFields(DataInput in) throws IOException {
-		throw new NotImplementedException();
+	public static ColGroupLinearFunctional read(DataInput in, int nRows) throws IOException {
+		int[] cols = readCols(in);
+		double[] coefficients = ColGroupIO.readDoubleArray(2 * cols.length, in);
+		return new ColGroupLinearFunctional(cols, coefficients, nRows);
 	}
 
 	@Override
 	public void write(DataOutput out) throws IOException {
-		throw new NotImplementedException();
+		super.write(out);
+		for(double d : _coefficents)
+			out.writeDouble(d);
 	}
 
 	@Override
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupOLE.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupOLE.java
index 19ad6ddaf6..1e28e9805a 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupOLE.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupOLE.java
@@ -19,6 +19,8 @@
 
 package org.apache.sysds.runtime.compress.colgroup;
 
+import java.io.DataInput;
+import java.io.IOException;
 import java.util.Arrays;
 
 import org.apache.commons.lang.NotImplementedException;
@@ -43,15 +45,6 @@ import org.apache.sysds.runtime.matrix.operators.UnaryOperator;
 public class ColGroupOLE extends AColGroupOffset {
 	private static final long serialVersionUID = 5723227906925121066L;
 
-	/**
-	 * Constructor for serialization
-	 * 
-	 * @param numRows Number of rows contained
-	 */
-	protected ColGroupOLE(int numRows) {
-		super(numRows);
-	}
-
 	private ColGroupOLE(int[] colIndices, int numRows, boolean zero, ADictionary dict, char[] bitmaps, int[] bitmapOffs,
 		int[] counts) {
 		super(colIndices, numRows, zero, dict, counts);
@@ -644,4 +637,14 @@ public class ColGroupOLE extends AColGroupOffset {
 	public double getCost(ComputationCostEstimator e, int nRows) {
 		throw new NotImplementedException();
 	}
+
+	public static ColGroupOLE read(DataInput in, int nRows) throws IOException {
+		int[] cols = readCols(in);
+		ADictionary dict = DictionaryFactory.read(in);
+		int[] ptr = readPointers(in);
+		char[] data = readData(in);
+		boolean zeros = in.readBoolean();
+		return new ColGroupOLE(cols, nRows, zeros, dict, data, ptr, null);
+	}
+
 }
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupRLE.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupRLE.java
index ee7cd280ae..995e88c84e 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupRLE.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupRLE.java
@@ -19,6 +19,8 @@
 
 package org.apache.sysds.runtime.compress.colgroup;
 
+import java.io.DataInput;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -43,15 +45,6 @@ import org.apache.sysds.runtime.matrix.operators.UnaryOperator;
 public class ColGroupRLE extends AColGroupOffset {
 	private static final long serialVersionUID = -1560710477952862791L;
 
-	/**
-	 * Constructor for serialization
-	 * 
-	 * @param numRows Number of rows contained
-	 */
-	protected ColGroupRLE(int numRows) {
-		super(numRows);
-	}
-
 	private ColGroupRLE(int[] colIndexes, int numRows, boolean zeros, ADictionary dict, char[] bitmaps, int[] bitmapOffs,
 		int[] cachedCounts) {
 		super(colIndexes, numRows, zeros, dict, cachedCounts);
@@ -976,6 +969,15 @@ public class ColGroupRLE extends AColGroupOffset {
 		return e.getCost(_numRows, _data.length, nCols, nVals, _dict.getSparsity());
 	}
 
+	public static ColGroupRLE read(DataInput in, int nRows) throws IOException {
+		int[] cols = readCols(in);
+		ADictionary dict = DictionaryFactory.read(in);
+		int[] ptr = readPointers(in);
+		char[] data = readData(in);
+		boolean zeros = in.readBoolean();
+		return new ColGroupRLE(cols, nRows, zeros, dict, data, ptr,null);
+	}
+
 	/**
 	 * Encodes the bitmap as a series of run lengths and offsets.
 	 * 
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDC.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDC.java
index eec893df05..b3c83cb9df 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDC.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDC.java
@@ -28,6 +28,7 @@ import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.compress.DMLCompressionException;
 import org.apache.sysds.runtime.compress.colgroup.dictionary.ADictionary;
 import org.apache.sysds.runtime.compress.colgroup.dictionary.Dictionary;
+import org.apache.sysds.runtime.compress.colgroup.dictionary.DictionaryFactory;
 import org.apache.sysds.runtime.compress.colgroup.mapping.AMapToData;
 import org.apache.sysds.runtime.compress.colgroup.mapping.MapToFactory;
 import org.apache.sysds.runtime.compress.colgroup.offset.AIterator;
@@ -57,15 +58,6 @@ public class ColGroupSDC extends ASDC {
 	/** The default value stored in this column group */
 	protected double[] _defaultTuple;
 
-	/**
-	 * Constructor for serialization
-	 * 
-	 * @param numRows Number of rows contained
-	 */
-	protected ColGroupSDC(int numRows) {
-		super(numRows);
-	}
-
 	protected ColGroupSDC(int[] colIndices, int numRows, ADictionary dict, double[] defaultTuple, AOffset offsets,
 		AMapToData data, int[] cachedCounts) {
 		super(colIndices, numRows, dict, offsets, cachedCounts);
@@ -81,7 +73,6 @@ public class ColGroupSDC extends ASDC {
 
 		_data = data;
 		_defaultTuple = defaultTuple;
-
 	}
 
 	public static AColGroup create(int[] colIndices, int numRows, ADictionary dict, double[] defaultTuple,
@@ -112,11 +103,11 @@ public class ColGroupSDC extends ASDC {
 	}
 
 	@Override
-	public  double[] getDefaultTuple(){
+	public double[] getDefaultTuple() {
 		return _defaultTuple;
 	}
 
-	public AMapToData getMapping(){
+	public AMapToData getMapping() {
 		return _data;
 	}
 
@@ -431,14 +422,13 @@ public class ColGroupSDC extends ASDC {
 			out.writeDouble(d);
 	}
 
-	@Override
-	public void readFields(DataInput in) throws IOException {
-		super.readFields(in);
-		_indexes = OffsetFactory.readIn(in);
-		_data = MapToFactory.readIn(in);
-		_defaultTuple = new double[_colIndexes.length];
-		for(int i = 0; i < _colIndexes.length; i++)
-			_defaultTuple[i] = in.readDouble();
+	public static ColGroupSDC read(DataInput in, int nRows) throws IOException {
+		int[] cols = readCols(in);
+		ADictionary dict = DictionaryFactory.read(in);
+		AOffset indexes = OffsetFactory.readIn(in);
+		AMapToData data = MapToFactory.readIn(in);
+		double[] defaultTuple = ColGroupIO.readDoubleArray(cols.length, in);
+		return new ColGroupSDC(cols, nRows, dict, defaultTuple, indexes, data, null);
 	}
 
 	@Override
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDCFOR.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDCFOR.java
index ab5005f3c6..9ea41cb6d7 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDCFOR.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDCFOR.java
@@ -26,6 +26,7 @@ import java.util.Arrays;
 
 import org.apache.sysds.runtime.compress.DMLCompressionException;
 import org.apache.sysds.runtime.compress.colgroup.dictionary.ADictionary;
+import org.apache.sysds.runtime.compress.colgroup.dictionary.DictionaryFactory;
 import org.apache.sysds.runtime.compress.colgroup.mapping.AMapToData;
 import org.apache.sysds.runtime.compress.colgroup.mapping.MapToFactory;
 import org.apache.sysds.runtime.compress.colgroup.offset.AIterator;
@@ -63,15 +64,6 @@ public class ColGroupSDCFOR extends ASDC {
 	/** Reference values in this column group */
 	protected double[] _reference;
 
-	/**
-	 * Constructor for serialization
-	 * 
-	 * @param numRows Number of rows contained
-	 */
-	protected ColGroupSDCFOR(int numRows) {
-		super(numRows);
-	}
-
 	private ColGroupSDCFOR(int[] colIndices, int numRows, ADictionary dict, AOffset indexes, AMapToData data,
 		int[] cachedCounts, double[] reference) {
 		super(colIndices, numRows, dict, indexes, cachedCounts);
@@ -113,7 +105,7 @@ public class ColGroupSDCFOR extends ASDC {
 	}
 
 	@Override
-	public  double[] getDefaultTuple(){
+	public double[] getDefaultTuple() {
 		return _reference;
 	}
 
@@ -211,14 +203,13 @@ public class ColGroupSDCFOR extends ASDC {
 			out.writeDouble(d);
 	}
 
-	@Override
-	public void readFields(DataInput in) throws IOException {
-		super.readFields(in);
-		_indexes = OffsetFactory.readIn(in);
-		_data = MapToFactory.readIn(in);
-		_reference = new double[_colIndexes.length];
-		for(int i = 0; i < _colIndexes.length; i++)
-			_reference[i] = in.readDouble();
+	public static ColGroupSDCFOR read(DataInput in, int nRows) throws IOException {
+		int[] cols = readCols(in);
+		ADictionary dict = DictionaryFactory.read(in);
+		AOffset indexes = OffsetFactory.readIn(in);
+		AMapToData data = MapToFactory.readIn(in);
+		double[] reference = ColGroupIO.readDoubleArray(cols.length, in);
+		return new ColGroupSDCFOR(cols, nRows, dict, indexes, data, null, reference);
 	}
 
 	@Override
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDCSingle.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDCSingle.java
index e83401a957..e1e8a58864 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDCSingle.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDCSingle.java
@@ -28,6 +28,7 @@ import org.apache.commons.lang.NotImplementedException;
 import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.compress.colgroup.dictionary.ADictionary;
 import org.apache.sysds.runtime.compress.colgroup.dictionary.Dictionary;
+import org.apache.sysds.runtime.compress.colgroup.dictionary.DictionaryFactory;
 import org.apache.sysds.runtime.compress.colgroup.offset.AIterator;
 import org.apache.sysds.runtime.compress.colgroup.offset.AOffset;
 import org.apache.sysds.runtime.compress.colgroup.offset.AOffsetIterator;
@@ -55,15 +56,6 @@ public class ColGroupSDCSingle extends ASDC {
 	/** The default value stored in this column group */
 	protected double[] _defaultTuple;
 
-	/**
-	 * Constructor for serialization
-	 * 
-	 * @param numRows Number of rows contained
-	 */
-	protected ColGroupSDCSingle(int numRows) {
-		super(numRows);
-	}
-
 	private ColGroupSDCSingle(int[] colIndices, int numRows, ADictionary dict, double[] defaultTuple, AOffset offsets,
 		int[] cachedCounts) {
 		super(colIndices, numRows, dict == null ? Dictionary.createNoCheck(new double[colIndices.length]) : dict, offsets,
@@ -434,13 +426,12 @@ public class ColGroupSDCSingle extends ASDC {
 			out.writeDouble(d);
 	}
 
-	@Override
-	public void readFields(DataInput in) throws IOException {
-		super.readFields(in);
-		_indexes = OffsetFactory.readIn(in);
-		_defaultTuple = new double[_colIndexes.length];
-		for(int i = 0; i < _colIndexes.length; i++)
-			_defaultTuple[i] = in.readDouble();
+	public static ColGroupSDCSingle read(DataInput in, int nRows) throws IOException {
+		int[] cols = readCols(in);
+		ADictionary dict = DictionaryFactory.read(in);
+		AOffset indexes = OffsetFactory.readIn(in);
+		double[] defaultTuple = ColGroupIO.readDoubleArray(cols.length, in);
+		return new ColGroupSDCSingle(cols, nRows, dict, defaultTuple, indexes, null);
 	}
 
 	@Override
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDCSingleZeros.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDCSingleZeros.java
index 139b433c0c..7b1c92cad8 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDCSingleZeros.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDCSingleZeros.java
@@ -28,6 +28,7 @@ import org.apache.commons.lang.NotImplementedException;
 import org.apache.sysds.runtime.compress.DMLCompressionException;
 import org.apache.sysds.runtime.compress.colgroup.dictionary.ADictionary;
 import org.apache.sysds.runtime.compress.colgroup.dictionary.Dictionary;
+import org.apache.sysds.runtime.compress.colgroup.dictionary.DictionaryFactory;
 import org.apache.sysds.runtime.compress.colgroup.offset.AIterator;
 import org.apache.sysds.runtime.compress.colgroup.offset.AOffset;
 import org.apache.sysds.runtime.compress.colgroup.offset.AOffsetIterator;
@@ -52,15 +53,6 @@ import org.apache.sysds.runtime.matrix.operators.UnaryOperator;
 public class ColGroupSDCSingleZeros extends ASDCZero {
 	private static final long serialVersionUID = 8033235615964315078L;
 
-	/**
-	 * Constructor for serialization
-	 * 
-	 * @param numRows Number of rows contained
-	 */
-	protected ColGroupSDCSingleZeros(int numRows) {
-		super(numRows);
-	}
-
 	private ColGroupSDCSingleZeros(int[] colIndices, int numRows, ADictionary dict, AOffset offsets,
 		int[] cachedCounts) {
 		super(colIndices, numRows, dict, offsets, cachedCounts);
@@ -95,7 +87,7 @@ public class ColGroupSDCSingleZeros extends ASDCZero {
 			return;
 		else if(it.value() >= ru)
 			_indexes.cacheIterator(it, ru);
-		else{
+		else {
 			decompressToDenseBlockDenseDictionaryWithProvidedIterator(db, rl, ru, offR, offC, values, it);
 			_indexes.cacheIterator(it, ru);
 		}
@@ -563,10 +555,11 @@ public class ColGroupSDCSingleZeros extends ASDCZero {
 		_indexes.write(out);
 	}
 
-	@Override
-	public void readFields(DataInput in) throws IOException {
-		super.readFields(in);
-		_indexes = OffsetFactory.readIn(in);
+	public static ColGroupSDCSingleZeros read(DataInput in, int nRows) throws IOException {
+		int[] cols = readCols(in);
+		ADictionary dict = DictionaryFactory.read(in);
+		AOffset indexes = OffsetFactory.readIn(in);
+		return new ColGroupSDCSingleZeros(cols, nRows, dict, indexes, null);
 	}
 
 	@Override
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDCZeros.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDCZeros.java
index 6e3b399ccb..91d16da037 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDCZeros.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupSDCZeros.java
@@ -27,6 +27,7 @@ import java.util.Arrays;
 import org.apache.sysds.runtime.compress.DMLCompressionException;
 import org.apache.sysds.runtime.compress.colgroup.dictionary.ADictionary;
 import org.apache.sysds.runtime.compress.colgroup.dictionary.Dictionary;
+import org.apache.sysds.runtime.compress.colgroup.dictionary.DictionaryFactory;
 import org.apache.sysds.runtime.compress.colgroup.mapping.AMapToData;
 import org.apache.sysds.runtime.compress.colgroup.mapping.MapToFactory;
 import org.apache.sysds.runtime.compress.colgroup.offset.AIterator;
@@ -59,15 +60,6 @@ public class ColGroupSDCZeros extends ASDCZero {
 	/** Pointers to row indexes in the dictionary. Note the dictionary has one extra entry. */
 	protected AMapToData _data;
 
-	/**
-	 * Constructor for serialization
-	 * 
-	 * @param numRows Number of rows contained
-	 */
-	protected ColGroupSDCZeros(int numRows) {
-		super(numRows);
-	}
-
 	private ColGroupSDCZeros(int[] colIndices, int numRows, ADictionary dict, AOffset indexes, AMapToData data,
 		int[] cachedCounts) {
 		super(colIndices, numRows, dict, indexes, cachedCounts);
@@ -549,11 +541,12 @@ public class ColGroupSDCZeros extends ASDCZero {
 		_data.write(out);
 	}
 
-	@Override
-	public void readFields(DataInput in) throws IOException {
-		super.readFields(in);
-		_indexes = OffsetFactory.readIn(in);
-		_data = MapToFactory.readIn(in);
+	public static ColGroupSDCZeros read(DataInput in, int nRows) throws IOException {
+		int[] cols = readCols(in);
+		ADictionary dict = DictionaryFactory.read(in);
+		AOffset indexes = OffsetFactory.readIn(in);
+		AMapToData data = MapToFactory.readIn(in);
+		return new ColGroupSDCZeros(cols, nRows, dict, indexes, data, null);
 	}
 
 	@Override
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupUncompressed.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupUncompressed.java
index 6a72136ab6..96a4b2b447 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupUncompressed.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupUncompressed.java
@@ -65,11 +65,6 @@ public class ColGroupUncompressed extends AColGroup {
 	 */
 	private MatrixBlock _data;
 
-	/** Constructor for serialization */
-	protected ColGroupUncompressed() {
-		super();
-	}
-
 	private ColGroupUncompressed(MatrixBlock mb, int[] colIndexes) {
 		super(colIndexes);
 		_data = mb;
@@ -454,11 +449,11 @@ public class ColGroupUncompressed extends AColGroup {
 		}
 	}
 
-	@Override
-	public void readFields(DataInput in) throws IOException {
-		super.readFields(in);
-		_data = new MatrixBlock();
-		_data.readFields(in);
+	public static ColGroupUncompressed read(DataInput in) throws IOException {
+		int[] cols = readCols(in);
+		MatrixBlock data = new MatrixBlock();
+		data.readFields(in);
+		return new ColGroupUncompressed(data, cols);
 	}
 
 	@Override
diff --git a/src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupMorphingPerformanceCompare.java b/src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupMorphingPerformanceCompare.java
index 14962b038a..8b67f9ae19 100644
--- a/src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupMorphingPerformanceCompare.java
+++ b/src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupMorphingPerformanceCompare.java
@@ -153,11 +153,6 @@ public class ColGroupMorphingPerformanceCompare {
 		private static final long serialVersionUID = -7157464508602251065L;
 		private final MatrixBlock mbDict;
 
-		protected SDCNoMorph(int numRows) {
-			super(numRows);
-			mbDict = null;
-		}
-
 		public SDCNoMorph(ColGroupSDC g) {
 			this(g.getColIndices(), g.getNumRows(), g.getDictionary(), g.getDefaultTuple(), g.getOffsets(), g.getMapping(),
 				null);


[systemds] 01/02: [SYSTEMDS-3437] CLA Invalid Unique estimate DDC

Posted by ba...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git

commit 06b6712e2b463d0a4f9f986bff9f15ed4060e832
Author: baunsgaard <ba...@tugraz.at>
AuthorDate: Thu Sep 15 19:24:40 2022 +0200

    [SYSTEMDS-3437] CLA Invalid Unique estimate DDC
    
    This commit fixes another edge case in DDC compression.
    If the row of a sparse matrix is full, then a zero entry was
    added to DDC anyway. This commit fixes this to not add
    unnecessary dictionary entries.
    
    Also added is an extensive test case for the ColGroupFactory, to
    cover the above mention bug and other edge cases found.
    
    Closes #1696
---
 .../compress/colgroup/AColGroupCompressed.java     |  20 +-
 .../compress/colgroup/ADictBasedColGroup.java      |  27 +-
 .../runtime/compress/colgroup/ColGroupDDC.java     |   4 -
 .../runtime/compress/colgroup/ColGroupFactory.java | 218 ++++++--------
 .../runtime/compress/colgroup/ColGroupRLE.java     |   2 -
 .../compress/colgroup/ColGroupUncompressed.java    |   8 +-
 .../runtime/compress/colgroup/offset/AOffset.java  |   4 +
 .../runtime/compress/estim/EstimationFactors.java  |   6 +-
 .../component/compress/CompressedTestBase.java     |   2 +-
 .../component/compress/colgroup/ColGroupBase.java  |  31 ++
 .../compress/colgroup/ColGroupFactoryTest.java     | 314 +++++++++++++++++++++
 .../component/compress/colgroup/ColGroupTest.java  | 105 ++++++-
 ...genOffsetOLETest.java => GenOffsetOLETest.java} |   2 +-
 13 files changed, 576 insertions(+), 167 deletions(-)

diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroupCompressed.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroupCompressed.java
index 24c95f2de5..1af529c08a 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroupCompressed.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/AColGroupCompressed.java
@@ -201,19 +201,13 @@ public abstract class AColGroupCompressed extends AColGroup {
 
 	protected static void tsmm(double[] result, int numColumns, int[] counts, ADictionary dict, int[] colIndexes) {
 		dict = dict.getMBDict(colIndexes.length);
-		if(dict != null) {
-			if(dict instanceof MatrixBlockDictionary) {
-				MatrixBlockDictionary mbd = (MatrixBlockDictionary) dict;
-				MatrixBlock mb = mbd.getMatrixBlock();
-				// Guaranteed not to be empty
-				if(mb.isInSparseFormat())
-					tsmmSparse(result, numColumns, mb.getSparseBlock(), counts, colIndexes);
-				else
-					tsmmDense(result, numColumns, mb.getDenseBlockValues(), counts, colIndexes);
-			}
-			else
-				tsmmDense(result, numColumns, dict.getValues(), counts, colIndexes);
-		}
+		if(dict == null) // null if empty
+			return;
+		MatrixBlock mb = ((MatrixBlockDictionary) dict).getMatrixBlock();
+		if(mb.isInSparseFormat())
+			tsmmSparse(result, numColumns, mb.getSparseBlock(), counts, colIndexes);
+		else
+			tsmmDense(result, numColumns, mb.getDenseBlockValues(), counts, colIndexes);
 
 	}
 
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ADictBasedColGroup.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ADictBasedColGroup.java
index dc09fe9877..467d6dac5d 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ADictBasedColGroup.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ADictBasedColGroup.java
@@ -57,7 +57,7 @@ public abstract class ADictBasedColGroup extends AColGroupCompressed {
 
 	}
 
-	public ADictionary getDictionary(){
+	public ADictionary getDictionary() {
 		return _dict;
 	}
 
@@ -174,6 +174,7 @@ public abstract class ADictBasedColGroup extends AColGroupCompressed {
 	public final AColGroup rightMultByMatrix(MatrixBlock right, int[] allCols) {
 		if(right.isEmpty())
 			return null;
+
 		final int nCol = right.getNumColumns();
 		// make sure allCols is allocated
 		allCols = allCols == null ? Util.genColsIndices(nCol) : allCols;
@@ -182,9 +183,10 @@ public abstract class ADictBasedColGroup extends AColGroupCompressed {
 			rightMMGetColsSparse(right.getSparseBlock(), nCol, allCols) : // sparse
 			rightMMGetColsDense(right.getDenseBlockValues(), nCol, allCols, right.getNonZeros()); // dense
 
-		final int nVals = getNumValues();
-		if(agCols == null || nVals == 0)
+		if(agCols == null)
 			return null;
+			
+		final int nVals = getNumValues();
 		final ADictionary preAgg = (right.isInSparseFormat()) ? // Chose Sparse or Dense
 			rightMMPreAggSparse(nVals, right.getSparseBlock(), agCols, 0, nCol) : // sparse
 			_dict.preaggValuesFromDense(nVals, _colIndexes, agCols, right.getDenseBlockValues(), nCol); // dense
@@ -199,7 +201,7 @@ public abstract class ADictBasedColGroup extends AColGroupCompressed {
 	 * @param b       The dense values in the right matrix
 	 * @param nCols   The max number of columns in the right matrix
 	 * @param allCols The all columns int list
-	 * @param nnz The number of non zero values in b
+	 * @param nnz     The number of non zero values in b
 	 * @return a list of the column indexes effected in the output column group
 	 */
 	protected int[] rightMMGetColsDense(double[] b, final int nCols, int[] allCols, long nnz) {
@@ -216,9 +218,9 @@ public abstract class ADictBasedColGroup extends AColGroupCompressed {
 						continue;
 					}
 
-				if(aggregateColumnsSet.size() == nCols)
-					return allCols;
 			}
+			if(aggregateColumnsSet.size() == nCols)
+				return allCols;
 			if(aggregateColumnsSet.size() == 0)
 				return null;
 
@@ -269,12 +271,13 @@ public abstract class ADictBasedColGroup extends AColGroupCompressed {
 			for(int i = b.pos(colIdx); i < b.size(colIdx) + b.pos(colIdx); i++) {
 				while(aggregateColumns[retIdx] < sIndexes[i])
 					retIdx++;
-				if(sIndexes[i] == aggregateColumns[retIdx])
-					for(int j = 0, offOrg = h;
-						j < numVals * aggregateColumns.length;
-						j += aggregateColumns.length, offOrg += _colIndexes.length) {
-						ret[j + retIdx] += _dict.getValue(offOrg) * sValues[i];
-					}
+				// It is known in this case that the sIndex always correspond to the aggregateColumns.
+				// if(sIndexes[i] == aggregateColumns[retIdx])
+				for(int j = 0, offOrg = h;
+					j < numVals * aggregateColumns.length;
+					j += aggregateColumns.length, offOrg += _colIndexes.length) {
+					ret[j + retIdx] += _dict.getValue(offOrg) * sValues[i];
+				}
 			}
 
 		}
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java
index f5cf44e53f..1321bafc0e 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java
@@ -24,7 +24,6 @@ import java.io.DataOutput;
 import java.io.IOException;
 
 import org.apache.commons.lang.NotImplementedException;
-import org.apache.sysds.runtime.compress.DMLCompressionException;
 import org.apache.sysds.runtime.compress.colgroup.dictionary.ADictionary;
 import org.apache.sysds.runtime.compress.colgroup.dictionary.Dictionary;
 import org.apache.sysds.runtime.compress.colgroup.dictionary.MatrixBlockDictionary;
@@ -57,9 +56,6 @@ public class ColGroupDDC extends APreAgg {
 
 	private ColGroupDDC(int[] colIndexes, ADictionary dict, AMapToData data, int[] cachedCounts) {
 		super(colIndexes, dict, cachedCounts);
-		if(data.getUnique() != dict.getNumberOfValues(colIndexes.length))
-			throw new DMLCompressionException("Invalid construction of DDC group " + data.getUnique() + " vs. "
-				+ dict.getNumberOfValues(colIndexes.length));
 		_data = data;
 	}
 
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java
index 7257cb1002..1f94655521 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java
@@ -26,14 +26,12 @@ import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
 import org.apache.commons.lang3.NotImplementedException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.sysds.runtime.DMLRuntimeException;
 import org.apache.sysds.runtime.compress.CompressionSettings;
 import org.apache.sysds.runtime.compress.DMLCompressionException;
 import org.apache.sysds.runtime.compress.bitmap.ABitmap;
@@ -160,7 +158,7 @@ public class ColGroupFactory {
 		}
 	}
 
-	private List<AColGroup> compressExecute() {
+	private List<AColGroup> compressExecute() throws Exception {
 		if(in.isEmpty()) {
 			AColGroup empty = ColGroupEmpty.create(cs.transposed ? in.getNumRows() : in.getNumColumns());
 			return Collections.singletonList(empty);
@@ -171,7 +169,7 @@ public class ColGroupFactory {
 			return compressColGroupsParallel();
 	}
 
-	private List<AColGroup> compressColGroupsSingleThreaded() {
+	private List<AColGroup> compressColGroupsSingleThreaded() throws Exception {
 		List<AColGroup> ret = new ArrayList<>(csi.getNumberColGroups());
 
 		for(CompressedSizeInfoColGroup g : csi.getInfo())
@@ -180,33 +178,30 @@ public class ColGroupFactory {
 		return ret;
 	}
 
-	private List<AColGroup> compressColGroupsParallel() {
-		try {
-			final List<CompressedSizeInfoColGroup> groups = csi.getInfo();
-			final int nGroups = groups.size();
-			// final int blkz = nGroups * 10 / k;
-			final int skip = Math.min(k * 10, nGroups);
-			final List<CompressTask> tasks = new ArrayList<>(skip);
+	private List<AColGroup> compressColGroupsParallel() throws Exception {
 
-			// sort to make the "assumed" big jobs first.
-			Collections.sort(groups, Comparator.comparing(g -> -g.getNumVals()));
+		final List<CompressedSizeInfoColGroup> groups = csi.getInfo();
+		final int nGroups = groups.size();
+		// final int blkz = nGroups * 10 / k;
+		final int skip = Math.min(k * 10, nGroups);
+		final List<CompressTask> tasks = new ArrayList<>(skip);
 
-			final AColGroup[] ret = new AColGroup[nGroups];
+		// sort to make the "assumed" big jobs first.
+		Collections.sort(groups, Comparator.comparing(g -> -g.getNumVals()));
 
-			for(int i = 0; i < skip; i++)
-				tasks.add(new CompressTask(groups, ret, i, skip));
+		final AColGroup[] ret = new AColGroup[nGroups];
 
-			for(Future<Object> t : pool.invokeAll(tasks))
-				t.get();
+		for(int i = 0; i < skip; i++)
+			tasks.add(new CompressTask(groups, ret, i, skip));
+
+		for(Future<Object> t : pool.invokeAll(tasks))
+			t.get();
+
+		return Arrays.asList(ret);
 
-			return Arrays.asList(ret);
-		}
-		catch(InterruptedException | ExecutionException e) {
-			throw new DMLRuntimeException("Failed compression ", e);
-		}
 	}
 
-	protected AColGroup compressColGroup(CompressedSizeInfoColGroup cg) {
+	protected AColGroup compressColGroup(CompressedSizeInfoColGroup cg) throws Exception {
 		if(LOG.isDebugEnabled() && nCol < 1000 && ce != null) {
 			final Timing time = new Timing(true);
 			final AColGroup ret = compressColGroupAllSteps(cg);
@@ -223,26 +218,19 @@ public class ColGroupFactory {
 		final String cols = Arrays.toString(est.getColumns());
 		final String wanted = est.getBestCompressionType().toString();
 		if(estC < actC * 0.75) {
-			// StringBuilder sb = new StringBuilder();
-			// sb.append("The estimate cost is significantly off : " + est);
-			// sb.append(est.getNumVals());
-			// sb.append(" ");
-			// sb.append(act.getNumValues());
-			// sb.append(" estimate offsets:");
-			// sb.append(est.getNumOffs());
 			String warning = "The estimate cost is significantly off : " + est;
-
-			LOG.debug(String.format("time[ms]: %10.2f %25s est %10.0f -- act %10.0f distinct:%5d cols:%s wanted:%s\n\t\t%s", time, retType,
-				estC, actC, act.getNumValues(), cols, wanted, warning));
+			LOG.debug(
+				String.format("time[ms]: %10.2f %25s est %10.0f -- act %10.0f distinct:%5d cols:%s wanted:%s\n\t\t%s", time,
+					retType, estC, actC, act.getNumValues(), cols, wanted, warning));
 		}
 		else {
-			LOG.debug(String.format("time[ms]: %10.2f %25s est %10.0f -- act %10.0f distinct:%5d cols:%s wanted:%s", time, retType,
-				estC, actC, act.getNumValues(), cols, wanted));
+			LOG.debug(String.format("time[ms]: %10.2f %25s est %10.0f -- act %10.0f distinct:%5d cols:%s wanted:%s", time,
+				retType, estC, actC, act.getNumValues(), cols, wanted));
 		}
 
 	}
 
-	private AColGroup compressColGroupAllSteps(CompressedSizeInfoColGroup cg) {
+	private AColGroup compressColGroupAllSteps(CompressedSizeInfoColGroup cg) throws Exception {
 		AColGroup g = compress(cg);
 		if(ce != null && ce.shouldSparsify() && nCol >= 4)
 			g = sparsifyFOR(g);
@@ -258,7 +246,7 @@ public class ColGroupFactory {
 			return g;
 	}
 
-	private AColGroup compress(CompressedSizeInfoColGroup cg) {
+	private AColGroup compress(CompressedSizeInfoColGroup cg) throws Exception {
 		final int[] colIndexes = cg.getColumns();
 		final CompressionType ct = cg.getBestCompressionType();
 		final boolean t = cs.transposed;
@@ -268,13 +256,22 @@ public class ColGroupFactory {
 			return new ColGroupEmpty(colIndexes);
 		else if(ct == CompressionType.UNCOMPRESSED) // don't construct mapping if uncompressed
 			return ColGroupUncompressed.create(colIndexes, in, t);
-		else if((ct == CompressionType.SDC || ct == CompressionType.CONST) && in.isInSparseFormat() && t &&
-			((colIndexes.length > 1 && cg.getNumOffs() < 0.3 * nRow) || colIndexes.length == 1))
+		else if((ct == CompressionType.SDC || ct == CompressionType.CONST) //
+			&& in.isInSparseFormat() //
+			&& t && (//
+			(colIndexes.length > 1 && cg.getNumOffs() < 0.3 * nRow) //
+				|| colIndexes.length == 1))
 			return compressSDCFromSparseTransposedBlock(colIndexes, cg.getNumVals(), cg.getTupleSparsity());
 		else if(ct == CompressionType.DDC)
 			return directCompressDDC(colIndexes, cg);
 		else if(ct == CompressionType.LinearFunctional)
 			return compressLinearFunctional(colIndexes, in, cs);
+		else if(ct == CompressionType.DDCFOR) {
+			AColGroup g = directCompressDDC(colIndexes, cg);
+			if(g instanceof ColGroupDDC)
+				return ColGroupDDCFOR.sparsifyFOR((ColGroupDDC) g);
+			return g;
+		}
 
 		final ABitmap ubm = BitmapEncoder.extractBitmap(colIndexes, in, cg.getNumVals(), cs);
 		if(ubm == null) // no values ... therefore empty
@@ -291,18 +288,22 @@ public class ColGroupFactory {
 				return ColGroupRLE.compressRLE(colIndexes, ubm, nRow, tupleSparsity);
 			case OLE:
 				return ColGroupOLE.compressOLE(colIndexes, ubm, nRow, tupleSparsity);
-			case CONST: // in case somehow one requested const, but it was not const fall back to SDC.
+			case CONST: // in case somehow one requested const, but it was not const column fall back to SDC.
 			case EMPTY:
 				LOG.warn("Requested " + ct + " on non constant column, fallback to SDC");
 			case SDC:
 				return compressSDC(colIndexes, nRow, ubm, cs, tupleSparsity);
-			case DDC: // DDC have direct/fast path compression without use of ABitmap.
+			case SDCFOR:
+				AColGroup g = compressSDC(colIndexes, nRow, ubm, cs, tupleSparsity);
+				if(g instanceof ColGroupSDC)
+					return ColGroupSDCFOR.sparsifyFOR((ColGroupSDC) g);
+				return g;
 			default:
 				throw new DMLCompressionException("Not implemented compression of " + ct + " in factory.");
 		}
 	}
 
-	private AColGroup directCompressDDC(int[] colIndexes, CompressedSizeInfoColGroup cg) {
+	private AColGroup directCompressDDC(int[] colIndexes, CompressedSizeInfoColGroup cg) throws Exception {
 		if(colIndexes.length > 1)
 			return directCompressDDCMultiCol(colIndexes, cg);
 		else
@@ -326,7 +327,7 @@ public class ColGroupFactory {
 		return ColGroupDDC.create(colIndexes, dict, resData, null);
 	}
 
-	private AColGroup directCompressDDCMultiCol(int[] colIndexes, CompressedSizeInfoColGroup cg) {
+	private AColGroup directCompressDDCMultiCol(int[] colIndexes, CompressedSizeInfoColGroup cg) throws Exception {
 		final AMapToData d = MapToFactory.create(nRow, Math.max(Math.min(cg.getNumOffs() + 1, nRow), 126));
 		final int fill = d.getUpperBoundValue();
 		d.fill(fill);
@@ -345,23 +346,12 @@ public class ColGroupFactory {
 			return new ColGroupEmpty(colIndexes);
 
 		ADictionary dict = DictionaryFactory.create(map, colIndexes.length, extra, cg.getTupleSparsity());
-		if(dict == null)
-			// Again highly unlikely but possible.
-			return new ColGroupEmpty(colIndexes);
-		try {
-			if(extra)
-				d.replace(fill, map.size());
-
-			final int nUnique = map.size() + (extra ? 1 : 0);
 
-			final AMapToData resData = MapToFactory.resize(d, nUnique);
-			return ColGroupDDC.create(colIndexes, dict, resData, null);
-		}
-		catch(Exception e) {
-			ReaderColumnSelection reader = ReaderColumnSelection.createReader(in, colIndexes, cs.transposed, 0, nRow);
-			throw new DMLCompressionException("direct compress DDC Multi col failed extra:" + extra + " with reader type:"
-				+ reader.getClass().getSimpleName(), e);
-		}
+		if(extra)
+			d.replace(fill, map.size());
+		final int nUnique = map.size() + (extra ? 1 : 0);
+		final AMapToData resData = MapToFactory.resize(d, nUnique);
+		return ColGroupDDC.create(colIndexes, dict, resData, null);
 	}
 
 	private boolean readToMapDDC(int[] colIndexes, DblArrayCountHashMap map, AMapToData data, int rl, int ru, int fill) {
@@ -424,14 +414,17 @@ public class ColGroupFactory {
 	private void readToMapDDCTransposed(int col, DoubleCountHashMap map, AMapToData data) {
 		if(in.isInSparseFormat()) {
 			final SparseBlock sb = in.getSparseBlock();
-			// It should never be empty here.
+			if(sb.isEmpty(col))
+				// It should never be empty here.
+				return;
 
 			final int apos = sb.pos(col);
 			final int alen = sb.size(col) + apos;
 			final int[] aix = sb.indexes(col);
 			final double[] aval = sb.values(col);
 			// count zeros
-			map.increment(0, nRow - apos - alen);
+			if(nRow - apos - alen > 0)
+				map.increment(0, nRow - apos - alen);
 			// insert all other counts
 			for(int j = apos; j < alen; j++) {
 				final int id = map.increment(aval[j]);
@@ -448,25 +441,21 @@ public class ColGroupFactory {
 	}
 
 	private boolean parallelReadToMapDDC(int[] colIndexes, DblArrayCountHashMap map, AMapToData data, int rlen, int fill,
-		int k) {
+		int k) throws Exception {
 
-		try {
-			final int blk = Math.max(rlen / colIndexes.length / k, 64000 / colIndexes.length);
+		final int blk = Math.max(rlen / colIndexes.length / k, 64000 / colIndexes.length);
 
-			List<readToMapDDCTask> tasks = new ArrayList<>();
-			for(int i = 0; i < rlen; i += blk) {
-				int end = Math.min(rlen, i + blk);
-				tasks.add(new readToMapDDCTask(colIndexes, map, data, i, end, fill));
-			}
-			boolean extra = false;
-			for(Future<Boolean> t : pool.invokeAll(tasks))
-				extra |= t.get();
-
-			return extra;
-		}
-		catch(Exception e) {
-			throw new DMLRuntimeException("Failed to parallelize DDC compression");
+		List<readToMapDDCTask> tasks = new ArrayList<>();
+		for(int i = 0; i < rlen; i += blk) {
+			int end = Math.min(rlen, i + blk);
+			tasks.add(new readToMapDDCTask(colIndexes, map, data, i, end, fill));
 		}
+		boolean extra = false;
+		for(Future<Boolean> t : pool.invokeAll(tasks))
+			extra |= t.get();
+
+		return extra;
+
 	}
 
 	private static AColGroup compressSDC(int[] colIndexes, int rlen, ABitmap ubm, CompressionSettings cs,
@@ -486,17 +475,20 @@ public class ColGroupFactory {
 				index++;
 			}
 		}
+		final int nVal = ubm.getNumValues();
 
 		// Currently not effecient allocation of the dictionary.
-		if(ubm.getNumValues() == 1 && numZeros >= largestOffset) {
+		if(nVal == 1 && numZeros >= largestOffset) {
 			ADictionary dict = DictionaryFactory.create(ubm, tupleSparsity);
 			final AOffset off = OffsetFactory.createOffset(ubm.getOffsetList()[0].extractValues(true));
 			return ColGroupSDCSingleZeros.create(colIndexes, rlen, dict, off, null);
 		}
-		else if((ubm.getNumValues() == 2 && numZeros == 0) || (ubm.getNumValues() == 1 && numZeros < largestOffset)) {
+		else if((nVal == 2 && numZeros == 0) // case 1 : two distinct non zero values
+			|| (nVal == 1 && numZeros < largestOffset) // case 2: 1 non zero value more frequent than zero.
+		) {
 			double[] defaultTuple = new double[colIndexes.length];
 			ADictionary dict = DictionaryFactory.create(ubm, largestIndex, defaultTuple, tupleSparsity, numZeros > 0);
-			return compressSDCSingle(colIndexes, rlen, ubm,largestIndex, dict, defaultTuple);
+			return compressSDCSingle(colIndexes, rlen, ubm, largestIndex, dict, defaultTuple);
 		}
 		else if(numZeros >= largestOffset) {
 			ADictionary dict = DictionaryFactory.create(ubm, tupleSparsity);
@@ -529,14 +521,14 @@ public class ColGroupFactory {
 		return ColGroupSDC.create(colIndexes, rlen, dict, defaultTuple, indexes, _data, null);
 	}
 
-	private static AColGroup compressSDCSingle(int[] colIndexes, int rlen, ABitmap ubm, int largestIndex, ADictionary dict,
-		double[] defaultTuple) {
-		if(ubm.getOffsetList().length > 1){
+	private static AColGroup compressSDCSingle(int[] colIndexes, int rlen, ABitmap ubm, int largestIndex,
+		ADictionary dict, double[] defaultTuple) {
+		if(ubm.getOffsetList().length > 1) {
 			// flipping first bit is same as saying index 1 if zero else index 0 if one or !
 			AOffset off = OffsetFactory.createOffset(ubm.getOffsetsList(largestIndex ^ 1));
 			return ColGroupSDCSingle.create(colIndexes, rlen, dict, defaultTuple, off, null);
 		}
-		else{
+		else {
 			IntArrayList inv = ubm.getOffsetsList(0);
 			int[] indexes = new int[rlen - inv.size()];
 			int p = 0;
@@ -547,11 +539,11 @@ public class ColGroupFactory {
 					indexes[p++] = v++;
 				v++;
 			}
-	
+
 			while(v < rlen)
 				indexes[p++] = v++;
 			AOffset off = OffsetFactory.createOffset(indexes);
-	
+
 			return ColGroupSDCSingle.create(colIndexes, rlen, dict, defaultTuple, off, null);
 		}
 	}
@@ -562,35 +554,6 @@ public class ColGroupFactory {
 		return ColGroupLinearFunctional.create(colIndexes, coefficients, numRows);
 	}
 
-	// private static AColGroup compressDDC(int[] colIndexes, int rlen, ABitmap ubm, CompressionSettings cs,
-	// double tupleSparsity) {
-	// boolean zeros = ubm.getNumOffsets() < rlen;
-	// ADictionary dict = DictionaryFactory.create(ubm, tupleSparsity, zeros);
-	// AMapToData data = MapToFactory.create(rlen, zeros, ubm.getOffsetList());
-	// return ColGroupDDC.create(colIndexes, rlen, dict, data, null);
-	// }
-
-	// private static AColGroup compressOLE(int[] colIndexes, ABitmap ubm, double tupleSparsity) {
-
-	// ADictionary dict = DictionaryFactory.create(ubm, tupleSparsity);
-	// ColGroupOLE ole = new ColGroupOLE(nRow);
-
-	// final int numVals = ubm.getNumValues();
-	// char[][] lBitMaps = new char[numVals][];
-	// int totalLen = 0;
-	// for(int i = 0; i < numVals; i++) {
-	// lBitMaps[i] = ColGroupOLE.genOffsetBitmap(ubm.getOffsetsList(i).extractValues(), ubm.getNumOffsets(i));
-	// totalLen += lBitMaps[i].length;
-	// }
-
-	// ColGroupOffset.createCompressedBitmaps()
-	// // compact bitmaps to linearized representation
-	// ole.createCompressedBitmaps(numVals, totalLen, lBitMaps);
-	// ole._dict = dict;
-	// ole._colIndexes = colIndexes;
-	// return ole;
-	// }
-
 	private AColGroup compressSDCFromSparseTransposedBlock(int[] cols, int nrUniqueEstimate, double tupleSparsity) {
 		if(cols.length > 1)
 			return compressMultiColSDCFromSparseTransposedBlock(cols, nrUniqueEstimate, tupleSparsity);
@@ -607,7 +570,7 @@ public class ColGroupFactory {
 
 		for(int i = 0; i < cols.length; i++) {
 			if(sb.isEmpty(cols[i]))
-				throw new DMLCompressionException("Empty columns should not be entering here");
+				continue;
 
 			int apos = sb.pos(cols[i]);
 			int alen = sb.size(cols[i]) + apos;
@@ -616,6 +579,9 @@ public class ColGroupFactory {
 				offsetsSet.add(aix[j]);
 		}
 
+		if(offsetsSet.isEmpty())
+			return new ColGroupEmpty(cols);
+
 		int[] offsetsInt = offsetsSet.stream().mapToInt(Number::intValue).toArray();
 		Arrays.sort(offsetsInt);
 
@@ -625,6 +591,8 @@ public class ColGroupFactory {
 		double[] subV = sub.getDenseBlockValues();
 
 		for(int i = 0; i < cols.length; i++) {
+			if(sb.isEmpty(cols[i]))
+				continue;
 			int apos = sb.pos(cols[i]);
 			int alen = sb.size(cols[i]) + apos;
 			int[] aix = sb.indexes(cols[i]);
@@ -664,6 +632,9 @@ public class ColGroupFactory {
 
 		// This method should only be called if the cols argument is length 1.
 		final SparseBlock sb = in.getSparseBlock();
+		if(sb.isEmpty(cols[0]))
+			return new ColGroupEmpty(cols);
+
 		final int sbRow = cols[0];
 		final int apos = sb.pos(sbRow);
 		final int alen = sb.size(sbRow) + apos;
@@ -745,15 +716,10 @@ public class ColGroupFactory {
 		}
 
 		@Override
-		public Object call() {
-			try {
-				for(int i = _off; i < _groups.size(); i += _step)
-					_ret[i] = compressColGroup(_groups.get(i));
-				return null;
-			}
-			catch(Exception e) {
-				throw e;
-			}
+		public Object call() throws Exception {
+			for(int i = _off; i < _groups.size(); i += _step)
+				_ret[i] = compressColGroup(_groups.get(i));
+			return null;
 		}
 	}
 
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupRLE.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupRLE.java
index 9da00bdc26..ee7cd280ae 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupRLE.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupRLE.java
@@ -941,8 +941,6 @@ public class ColGroupRLE extends AColGroupOffset {
 			for(int apos = _ptr[k], rs = 0, re = 0; apos < blen; apos += 2) {
 				rs = re + _data[apos];
 				re = rs + _data[apos + 1];
-				if(rs == re)// empty run
-					continue;
 				for(int tk = 0; tk < tnv; tk++) {
 					final int tblen = that._ptr[tk + 1];
 					int tapos = skip[tk];
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupUncompressed.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupUncompressed.java
index 097ee43183..6a72136ab6 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupUncompressed.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupUncompressed.java
@@ -528,17 +528,17 @@ public class ColGroupUncompressed extends AColGroup {
 	}
 
 	private void leftMultByAPreAggColGroup(APreAgg paCG, MatrixBlock result) {
+		final int nCols = paCG.getNumCols();
+		final MatrixBlock dictM = paCG._dict.getMBDict(nCols).getMatrixBlock();
+		if(dictM == null)
+			return;
 		LOG.warn("\nInefficient transpose of uncompressed to fit to"
 			+ " t(AColGroup) %*% UncompressedColGroup mult by colGroup uncompressed column"
 			+ "\nCurrently solved by t(t(Uncompressed) %*% AColGroup)");
 		final int k = InfrastructureAnalyzer.getLocalParallelism();
 		final MatrixBlock ucCGT = LibMatrixReorg.transpose(getData(), k);
-		final int nCols = paCG.getNumCols();
 		final MatrixBlock preAgg = new MatrixBlock(1, paCG.getNumValues(), false);
 		final MatrixBlock tmpRes = new MatrixBlock(1, nCols, false);
-		final MatrixBlock dictM = paCG._dict.getMBDict(nCols).getMatrixBlock();
-		if(dictM == null)
-			return;
 		preAgg.allocateDenseBlock();
 		tmpRes.allocateDenseBlock();
 		final int nRowsTransposed = ucCGT.getNumRows();
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/offset/AOffset.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/offset/AOffset.java
index 9905ca272e..d1b9f974d4 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/offset/AOffset.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/offset/AOffset.java
@@ -347,6 +347,8 @@ public abstract class AOffset implements Serializable {
 
 		while(i < last) { // while we are not done iterating
 			for(int r = rl; r < ru; r++) {
+				if(sb.isEmpty(r))
+					continue;
 				final int off = r - rl;
 				int apos = aOffs[off]; // current offset
 				final int alen = sb.size(r) + sb.pos(r);
@@ -363,6 +365,8 @@ public abstract class AOffset implements Serializable {
 
 		// process final element
 		for(int r = rl; r < ru; r++) {
+			if(sb.isEmpty(r))
+				continue;
 			final int off = r - rl;
 			int apos = aOffs[off];
 			final int alen = sb.size(r) + sb.pos(r);
diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/EstimationFactors.java b/src/main/java/org/apache/sysds/runtime/compress/estim/EstimationFactors.java
index 005d79cd74..8e2203099b 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/estim/EstimationFactors.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/estim/EstimationFactors.java
@@ -60,6 +60,10 @@ public class EstimationFactors {
 		this(numVals, numRows, -1, null, -1, numRows, false, false, 1.0, tupleSparsity);
 	}
 
+	public EstimationFactors(int numVals, int numRows, int numOffs, double tupleSparsity) {
+		this(numVals, numOffs, -1, null, -1, numRows, false, false, 1.0, tupleSparsity);
+	}
+
 	public EstimationFactors(int numVals, int numOffs, int largestOff, int[] frequencies, int numSingle, int numRows,
 		boolean lossy, boolean zeroIsMostFrequent, double overAllSparsity, double tupleSparsity) {
 		this(numVals, numOffs, largestOff, frequencies, numSingle, numRows, numOffs, lossy, zeroIsMostFrequent,
@@ -90,7 +94,7 @@ public class EstimationFactors {
 				"Invalid number of instance of most common element should be lower than number of rows. " + largestOff
 					+ " > numRows: " + numRows);
 		else if(numVals > numOffs)
-			throw new DMLCompressionException("Num vals cannot be greater than num offs");
+			throw new DMLCompressionException("Num vals cannot be greater than num offs: vals: "+ numVals + " offs: " + numOffs);
 	}
 
 	public int[] getFrequencies(){
diff --git a/src/test/java/org/apache/sysds/test/component/compress/CompressedTestBase.java b/src/test/java/org/apache/sysds/test/component/compress/CompressedTestBase.java
index eb99429740..e9414826b9 100644
--- a/src/test/java/org/apache/sysds/test/component/compress/CompressedTestBase.java
+++ b/src/test/java/org/apache/sysds/test/component/compress/CompressedTestBase.java
@@ -92,7 +92,7 @@ public abstract class CompressedTestBase extends TestBase {
 
 	protected static SparsityType[] usedSparsityTypes = new SparsityType[] { //
 		SparsityType.FULL, //
-		SparsityType.SPARSE, //
+		// SparsityType.SPARSE, //
 		SparsityType.ULTRA_SPARSE //
 	};
 
diff --git a/src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupBase.java b/src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupBase.java
index 8f8bdd24a5..d046a3d6da 100644
--- a/src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupBase.java
+++ b/src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupBase.java
@@ -49,6 +49,7 @@ import org.apache.sysds.runtime.compress.estim.CompressedSizeInfo;
 import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup;
 import org.apache.sysds.runtime.compress.estim.EstimationFactors;
 import org.apache.sysds.runtime.compress.utils.Util;
+import org.apache.sysds.runtime.data.DenseBlockFP64;
 import org.apache.sysds.runtime.functionobjects.Modulus;
 import org.apache.sysds.runtime.functionobjects.Plus;
 import org.apache.sysds.runtime.matrix.data.LibMatrixReorg;
@@ -106,6 +107,7 @@ public abstract class ColGroupBase {
 		this.maxCol = Arrays.stream(base.getColIndices()).max().getAsInt() + 1;
 	}
 
+
 	protected AColGroup serializeAndBack(AColGroup g) {
 		try {
 			ByteArrayOutputStream bos = new ByteArrayOutputStream();
@@ -143,6 +145,35 @@ public abstract class ColGroupBase {
 		return t;
 	}
 
+	protected MatrixBlock multiBlockDenseMB(int nCol){
+
+		MatrixBlock t = new MatrixBlock(nRow, nCol, false);
+		t.allocateDenseBlock();
+
+		double[] values = t.getDenseBlockValues();
+		DenseBlockFP64Mock m = new DenseBlockFP64Mock(new int[]{nRow, nCol}, values);
+
+		return new MatrixBlock(nRow, nCol, m);
+	}
+
+	private class DenseBlockFP64Mock extends DenseBlockFP64 {
+		private static final long serialVersionUID = -3601232958390554672L;
+
+		public DenseBlockFP64Mock(int[] dims, double[] data) {
+			super(dims, data);
+		}
+
+		@Override
+		public boolean isContiguous() {
+			return false;
+		}
+
+		@Override
+		public int numBlocks() {
+			return 2;
+		}
+	}
+
 	protected static void compare(MatrixBlock m1, MatrixBlock m2) {
 		if(m1.isEmpty())
 			m1.recomputeNonZeros();
diff --git a/src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupFactoryTest.java b/src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupFactoryTest.java
new file mode 100644
index 0000000000..e38cd2081d
--- /dev/null
+++ b/src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupFactoryTest.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.test.component.compress.colgroup;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.sysds.runtime.compress.CompressionSettings;
+import org.apache.sysds.runtime.compress.CompressionSettingsBuilder;
+import org.apache.sysds.runtime.compress.colgroup.AColGroup;
+import org.apache.sysds.runtime.compress.colgroup.AColGroup.CompressionType;
+import org.apache.sysds.runtime.compress.colgroup.ColGroupFactory;
+import org.apache.sysds.runtime.compress.cost.ACostEstimate;
+import org.apache.sysds.runtime.compress.cost.ComputationCostEstimator;
+import org.apache.sysds.runtime.compress.cost.DistinctCostEstimator;
+import org.apache.sysds.runtime.compress.estim.CompressedSizeInfo;
+import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup;
+import org.apache.sysds.runtime.compress.estim.EstimationFactors;
+import org.apache.sysds.runtime.compress.utils.Util;
+import org.apache.sysds.runtime.data.DenseBlockFP64;
+import org.apache.sysds.runtime.matrix.data.LibMatrixReorg;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.test.TestUtils;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(value = Parameterized.class)
+public class ColGroupFactoryTest {
+
+	private final MatrixBlock mb;
+	private final MatrixBlock mbt;
+	private final ACostEstimate ce;
+	private final CompressionSettingsBuilder csb;
+	private final int nRow;
+	private final int nCol;
+	private final CompressionType ct;
+	private final List<AColGroup> g;
+
+	private final int[] cols;
+
+	@Parameters
+	public static Collection<Object[]> data() {
+		ArrayList<Object[]> tests = new ArrayList<>();
+		// add(tests, 40, 5, 2, 5, 0.7, 234);
+		// add(tests, 40, 5, 1, 1, 0.7, 234);
+		add(tests, 40, 1, 2, 5, 0.7, 234);
+		add(tests, 40, 1, 1, 1, 0.7, 234);
+		add(tests, 40, 5, 2, 5, 0.2, 234);
+		add(tests, 40, 5, 2, 5, 0.1, 234);
+		add(tests, 40, 1, 2, 5, 0.1, 234);
+		add(tests, 40, 1, 1, 1, 0.8, 234);
+		add(tests, 40, 1, 1, 1, 0.1, 234);
+		add(tests, 40, 5, 2, 5, 0.0, 234);
+		add(tests, 40, 1, 1, 3, 1.0, 234);
+		add(tests, 40, 1, 1, 3, 1.0, 234);
+
+		addWithEmpty(tests, 40, 1, 2, 5, 0.1, 234);
+		addWithEmpty(tests, 40, 1, 1, 1, 0.1, 234);
+		addWithEmpty(tests, 40, 1, 1, 1, 0.8, 234);
+		addWithEmpty(tests, 40, 3, 2, 5, 0.1, 234);
+
+		addWithEmptyReverse(tests, 40, 1, 2, 5, 0.1, 234);
+		addWithEmptyReverse(tests, 40, 1, 2, 5, 0.7, 234);
+		addWithEmptyReverse(tests, 40, 1, 1, 1, 0.7, 234);
+		addWithEmptyReverse(tests, 40, 3, 2, 5, 0.1, 234);
+
+		addDenseMultiBlock(tests, 40, 3, 2, 5, 0.7, 234);
+		addDenseMultiBlock(tests, 40, 1, 2, 5, 0.7, 234);
+
+		return tests;
+	}
+
+	private static void addDenseMultiBlock(ArrayList<Object[]> tests, int nRows, int nCols, int min, int max,
+		double sparsity, int seed) {
+		MatrixBlock mb = TestUtils.generateTestMatrixBlock(nRows, nCols, min, max, sparsity, seed);
+		mb = TestUtils.ceil(mb);
+
+		MatrixBlock mbt = LibMatrixReorg.transpose(mb);
+
+		mb = new MatrixBlock(mb.getNumRows(), mb.getNumColumns(),
+			new DenseBlockFP64Mock(new int[] {mb.getNumRows(), mb.getNumColumns()}, mb.getDenseBlockValues()));
+		mbt = new MatrixBlock(mbt.getNumRows(), mbt.getNumColumns(),
+			new DenseBlockFP64Mock(new int[] {mbt.getNumRows(), mbt.getNumColumns()}, mbt.getDenseBlockValues()));
+
+		add(tests, nCols + 3, mb, mbt);
+	}
+
+	private static void addWithEmpty(ArrayList<Object[]> tests, int nRows, int nCols, int min, int max, double sparsity,
+		int seed) {
+		MatrixBlock mb = TestUtils.generateTestMatrixBlock(nRows, nCols, min, max, sparsity, seed);
+		mb = TestUtils.ceil(mb);
+
+		mb = mb.append(new MatrixBlock(nRows, 3, false), null);
+
+		MatrixBlock mbt = LibMatrixReorg.transpose(mb);
+
+		mb.denseToSparse(true);
+		mbt.denseToSparse(true);
+
+		add(tests, nCols + 3, mb, mbt);
+	}
+
+	private static void addWithEmptyReverse(ArrayList<Object[]> tests, int nRows, int nCols, int min, int max,
+		double sparsity, int seed) {
+		MatrixBlock mb = TestUtils.generateTestMatrixBlock(nRows, nCols, min, max, sparsity, seed);
+		mb = TestUtils.ceil(mb);
+
+		mb = new MatrixBlock(nRows, 3, false).append(mb, null);
+
+		MatrixBlock mbt = LibMatrixReorg.transpose(mb);
+
+		mb.denseToSparse(true);
+		mbt.denseToSparse(true);
+
+		add(tests, nCols + 3, mb, mbt);
+	}
+
+	private static void add(ArrayList<Object[]> tests, int nRows, int nCols, int min, int max, double sparsity,
+		int seed) {
+
+		MatrixBlock mb = TestUtils.generateTestMatrixBlock(nRows, nCols, min, max, sparsity, seed);
+		mb = TestUtils.ceil(mb);
+		MatrixBlock mbt = LibMatrixReorg.transpose(mb);
+
+		if(sparsity < 0.4) {
+			mb.denseToSparse(true);
+			mbt.denseToSparse(true);
+		}
+		add(tests, nCols, mb, mbt);
+	}
+
+	private static void add(ArrayList<Object[]> tests, int nCols, MatrixBlock mb, MatrixBlock mbt) {
+		final CompressionSettingsBuilder csb = new CompressionSettingsBuilder();
+
+		ACostEstimate cce = new ComputationCostEstimator(2, 2, 2, 2, 2, 2, 2, 2, true);
+		ACostEstimate dce = new DistinctCostEstimator(mb.getNumRows(), csb.create(), mb.getSparsity());
+
+		int[] cols = Util.genColsIndices(nCols);
+		int[] cols2 = Util.genColsIndices(1, nCols);
+		int[] cols3 = new int[] {nCols - 1};
+		try {
+			for(CompressionType ct : CompressionType.values()) {
+				if(ct == CompressionType.DeltaDDC)
+					continue;
+				for(ACostEstimate ce : new ACostEstimate[] {null, cce, dce}) {
+
+					tests.add(new Object[] {mb, mbt, ce, csb, ct, cols});
+					if(nCols > 1) {
+						tests.add(new Object[] {mb, mbt, ce, csb, ct, cols2});
+						tests.add(new Object[] {mb, mbt, ce, csb, ct, cols3});
+					}
+				}
+
+			}
+		}
+		catch(Exception e) {
+			e.printStackTrace();
+			fail("failed constructing tests");
+		}
+	}
+
+	public ColGroupFactoryTest(MatrixBlock mb, MatrixBlock mbt, ACostEstimate ce, CompressionSettingsBuilder csb,
+		CompressionType ct, int[] cols) {
+		this.mb = mb;
+		this.nRow = mb.getNumRows();
+		this.nCol = mb.getNumColumns();
+		this.ce = ce;
+		this.csb = csb;
+		this.mbt = mbt;
+		this.ct = ct;
+		this.cols = cols;
+		g = compST();
+	}
+
+	@Test
+	public void testCompressTransposedSingleThread() {
+		compare(compTransposedST());
+	}
+
+	@Test
+	public void testCompressTransposedMultiThread() {
+		compare(compTransposedMT());
+	}
+
+	@Test
+	public void testCompressMultiThread() {
+		compare(compMT());
+	}
+
+	@Test
+	public void testCompressMultiThreadDDC() {
+		if(ct == CompressionType.DDC) {
+			CompressionSettings.PAR_DDC_THRESHOLD = 1;
+			compare(compMT());
+			compare(compST());
+			CompressionSettings.PAR_DDC_THRESHOLD = 13425;
+		}
+	}
+
+	@Test
+	public void testCompressMultipleTimes() {
+		final int offs = Math.min((int) (mbt.getSparsity() * nRow * nCol), nRow);
+		final EstimationFactors f = new EstimationFactors(Math.min(nRow, offs), nRow, offs, mbt.getSparsity());
+		final List<CompressedSizeInfoColGroup> es = new ArrayList<>();
+		es.add(new CompressedSizeInfoColGroup(cols, f, 312152, ct));
+		es.add(new CompressedSizeInfoColGroup(cols, f, 312152, ct));// second time.
+		final CompressedSizeInfo csi = new CompressedSizeInfo(es);
+		CompressionSettings cs = csb.create();
+
+		cs.transposed = true;
+		if(ce != null)
+			ColGroupFactory.compressColGroups(mbt, csi, cs, ce, 4);
+		else
+			ColGroupFactory.compressColGroups(mbt, csi, cs, 4);
+	}
+
+	private void compare(List<AColGroup> gt) {
+		for(int i = 0; i < g.size(); i++)
+			compare(gt.get(i), g.get(i));
+	}
+
+	private void compare(AColGroup gtt, AColGroup gg) {
+		assertArrayEquals(gtt.getColIndices(), gg.getColIndices());
+	}
+
+	private List<AColGroup> compST() {
+		return comp(1, false);
+	}
+
+	private List<AColGroup> compMT() {
+		return comp(4, false);
+	}
+
+	private List<AColGroup> compTransposedST() {
+		return comp(1, true);
+	}
+
+	private List<AColGroup> compTransposedMT() {
+		return comp(4, true);
+	}
+
+	private List<AColGroup> comp(int k, boolean transposed) {
+		try {
+			final int offs = Math.min((int) (mbt.getSparsity() * nRow * nCol), nRow);
+			final EstimationFactors f = new EstimationFactors(Math.min(nRow, offs), nRow, offs, mbt.getSparsity());
+			final List<CompressedSizeInfoColGroup> es = new ArrayList<>();
+			es.add(new CompressedSizeInfoColGroup(cols, f, 314152, ct));
+			final CompressedSizeInfo csi = new CompressedSizeInfo(es);
+			CompressionSettings cs = csb.create();
+
+			if(transposed) {
+				cs.transposed = true;
+				if(ce != null)
+					return ColGroupFactory.compressColGroups(mbt, csi, cs, ce, 1);
+				else
+					return ColGroupFactory.compressColGroups(mbt, csi, cs, 1);
+			}
+			else {
+				if(ce != null)
+					return ColGroupFactory.compressColGroups(mb, csi, cs, ce, k);
+				else if(k == 1)
+					return ColGroupFactory.compressColGroups(mb, csi, cs);
+				else
+					return ColGroupFactory.compressColGroups(mb, csi, cs, k);
+			}
+		}
+		catch(Exception e) {
+			e.printStackTrace();
+			fail("Failed to compress");
+			return null;
+		}
+	}
+
+	private static class DenseBlockFP64Mock extends DenseBlockFP64 {
+		private static final long serialVersionUID = -3601232958390554672L;
+
+		public DenseBlockFP64Mock(int[] dims, double[] data) {
+			super(dims, data);
+		}
+
+		@Override
+		public boolean isContiguous() {
+			return false;
+		}
+
+		@Override
+		public int numBlocks() {
+			return 2;
+		}
+	}
+}
diff --git a/src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupTest.java b/src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupTest.java
index 51709eea5a..458e04bd46 100644
--- a/src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupTest.java
+++ b/src/test/java/org/apache/sysds/test/component/compress/colgroup/ColGroupTest.java
@@ -20,6 +20,7 @@
 package org.apache.sysds.test.component.compress.colgroup;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -287,6 +288,13 @@ public class ColGroupTest extends ColGroupBase {
 		decompressToDenseBlock(ot, bt, nRow - 5, nRow - 1);
 	}
 
+	@Test
+	public void decompressToMultiBlockDenseBlock() {
+		MatrixBlock ot = multiBlockDenseMB(maxCol);
+		MatrixBlock bt = multiBlockDenseMB(maxCol);
+		decompressToDenseBlock(ot, bt, nRow - 5, nRow - 1);
+	}
+
 	private void decompressToDenseBlock(MatrixBlock ot, MatrixBlock bt, int rl, int ru) {
 		decompressToDenseBlock(ot, bt, other, base, rl, ru);
 	}
@@ -1225,10 +1233,28 @@ public class ColGroupTest extends ColGroupBase {
 	}
 
 	public void leftMultNoPreAgg(int nRowLeft, int rl, int ru, int cl, int cu, double sparsity) {
+
+		final MatrixBlock left = TestUtils
+			.round(TestUtils.generateTestMatrixBlock(nRowLeft, nRow, -10, 10, sparsity, 1342));
+
+		leftMultNoPreAgg(nRowLeft, rl, ru, cl, cu, left);
+	}
+
+	@Test(expected = NotImplementedException.class)
+	public void leftMultNoPreAggWithEmptyRows() {
+
+		MatrixBlock left = TestUtils.round(TestUtils.generateTestMatrixBlock(3, nRow, -10, 10, 0.2, 222));
+
+		left = left.append(new MatrixBlock(3, nRow, true), null, false);
+		left.denseToSparse(true);
+		left.recomputeNonZeros();
+		leftMultNoPreAgg(6, 2, 5, 0, nRow, left);
+		throw new NotImplementedException("Make test parse since the check actually says it is correct");
+	}
+
+	public void leftMultNoPreAgg(int nRowLeft, int rl, int ru, int cl, int cu, MatrixBlock left) {
 		try {
 
-			final MatrixBlock left = TestUtils
-				.round(TestUtils.generateTestMatrixBlock(nRowLeft, nRow, -10, 10, sparsity, 1342));
 			final MatrixBlock bt = new MatrixBlock(nRowLeft, maxCol, false);
 			bt.allocateDenseBlock();
 
@@ -1255,16 +1281,31 @@ public class ColGroupTest extends ColGroupBase {
 		preAggLeftMult(new MatrixBlock(1, nRow, 1.0), 0, 1);
 	}
 
+	@Test
+	public void preAggLeftMulRand() {
+		preAggLeftMult(TestUtils.ceil(TestUtils.generateTestMatrixBlock(1, nRow, -10, 10, 1.0, 32)), 0, 1);
+	}
+
 	@Test
 	public void preAggLeftMultTwoRows() {
 		preAggLeftMult(new MatrixBlock(2, nRow, 1.0), 0, 2);
 	}
 
+	@Test
+	public void preAggLeftMultTwoRowsRand() {
+		preAggLeftMult(TestUtils.ceil(TestUtils.generateTestMatrixBlock(2, nRow, -10, 10, 1.0, 324)), 0, 2);
+	}
+
 	@Test
 	public void preAggLeftMultSecondRow() {
 		preAggLeftMult(new MatrixBlock(2, nRow, 1.0), 1, 2);
 	}
 
+	@Test
+	public void preAggLeftMultSecondRowRand() {
+		preAggLeftMult(TestUtils.ceil(TestUtils.generateTestMatrixBlock(2, nRow, -10, 10, 1.0, 241)), 0, 2);
+	}
+
 	@Test
 	public void preAggLeftMultSparse() {
 		MatrixBlock mb = TestUtils.ceil(TestUtils.generateTestMatrixBlock(1, nRow, -1, 10, 0.2, 1342));
@@ -1282,6 +1323,7 @@ public class ColGroupTest extends ColGroupBase {
 	@Test
 	public void preAggLeftMultSparseFiveRows() {
 		MatrixBlock mb = TestUtils.ceil(TestUtils.generateTestMatrixBlock(5, nRow, -1, 10, 0.2, 1342));
+		mb.sparseToDense();
 		mb.denseToSparse(true);
 		preAggLeftMult(mb, 0, 5);
 	}
@@ -1289,6 +1331,7 @@ public class ColGroupTest extends ColGroupBase {
 	@Test
 	public void preAggLeftMultSparseFiveRowsMCSR() {
 		MatrixBlock mb = TestUtils.ceil(TestUtils.generateTestMatrixBlock(5, nRow, -1, 10, 0.2, 1342));
+		mb.sparseToDense();
 		mb.denseToSparse(false);
 		preAggLeftMult(mb, 0, 5);
 	}
@@ -1305,9 +1348,19 @@ public class ColGroupTest extends ColGroupBase {
 		MatrixBlock mb = TestUtils.ceil(TestUtils.generateTestMatrixBlock(2, nRow, -1, 10, 0.2, 1342));
 		mb = mb.append(new MatrixBlock(2, nRow, false), null, false);
 		mb.denseToSparse(true);
+		mb.recomputeNonZeros();
 		preAggLeftMult(mb, 3, 4);
 	}
 
+	@Test
+	public void preAggLeftMultSparseSomeEmptyRows() {
+		MatrixBlock mb = TestUtils.ceil(TestUtils.generateTestMatrixBlock(3, nRow, -1, 10, 0.2, 1342));
+		mb = mb.append(new MatrixBlock(3, nRow, false), null, false);
+		mb.denseToSparse(true);
+		mb.recomputeNonZeros();
+		preAggLeftMult(mb, 2, 4);
+	}
+
 	public void preAggLeftMult(MatrixBlock mb, int rl, int ru) {
 		try {
 
@@ -1415,7 +1468,7 @@ public class ColGroupTest extends ColGroupBase {
 		// if we get here throw the exception
 	}
 
-	private class DenseBlockFP64Mock extends DenseBlockFP64 {
+	private static class DenseBlockFP64Mock extends DenseBlockFP64 {
 		private static final long serialVersionUID = -3601232958390554672L;
 
 		public DenseBlockFP64Mock(int[] dims, double[] data) {
@@ -1676,6 +1729,16 @@ public class ColGroupTest extends ColGroupBase {
 		rightMult(new MatrixBlock(maxCol, 10, 1.0));
 	}
 
+	@Test
+	public void rightMultDenseMatrixSomewhatSparse() {
+		rightMultWithAllCols(TestUtils.generateTestMatrixBlock(maxCol, 10, 1, 1, 0.6, 1342));
+	}
+
+	@Test
+	public void rightMultDenseMatrixSomewhatSparseManyColumns() {
+		rightMultWithAllCols(TestUtils.generateTestMatrixBlock(maxCol, 201, 1, 1, 0.6, 1342));
+	}
+
 	@Test
 	public void rightMultEmptyMatrix() {
 		rightMult(new MatrixBlock(maxCol, 10, false));
@@ -1726,6 +1789,31 @@ public class ColGroupTest extends ColGroupBase {
 		rightMult(mb);
 	}
 
+	@Test
+	public void rightMultMatrixDiagonalSparseWithCols() {
+		MatrixBlock mb = new MatrixBlock(maxCol, 10, false);
+		mb.allocateDenseBlock();
+		for(int i = 0; i < maxCol; i++) {
+			mb.setValue(i, i % 10, i);
+		}
+		mb.denseToSparse(true);
+		rightMultWithAllCols(mb);
+	}
+
+	public void rightMultWithAllCols(MatrixBlock right) {
+		try {
+			final int[] cols = Util.genColsIndices(right.getNumColumns());
+			AColGroup b = base.rightMultByMatrix(right, cols);
+			AColGroup o = other.rightMultByMatrix(right, cols);
+			if(!(b == null && o == null))
+				compare(b, o);
+		}
+		catch(Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
 	public void rightMult(MatrixBlock right) {
 		try {
 			AColGroup b = base.rightMultByMatrix(right);
@@ -2014,4 +2102,15 @@ public class ColGroupTest extends ColGroupBase {
 		assertTrue(co < eo);
 	}
 
+	@Test
+	public void copyMaintainPointers() {
+		AColGroup a = base.copy();
+		AColGroup b = other.copy();
+
+		assertTrue(a.getColIndices() == base.getColIndices());
+		assertTrue(b.getColIndices() == other.getColIndices());
+		// assertFalse(a.getColIndices() == other.getColIndices());
+		assertFalse(a == base);
+		assertFalse(b == other);
+	}
 }
diff --git a/src/test/java/org/apache/sysds/test/component/compress/colgroup/genOffsetOLETest.java b/src/test/java/org/apache/sysds/test/component/compress/colgroup/GenOffsetOLETest.java
similarity index 98%
rename from src/test/java/org/apache/sysds/test/component/compress/colgroup/genOffsetOLETest.java
rename to src/test/java/org/apache/sysds/test/component/compress/colgroup/GenOffsetOLETest.java
index 838d89cd3a..d5b9a386e6 100644
--- a/src/test/java/org/apache/sysds/test/component/compress/colgroup/genOffsetOLETest.java
+++ b/src/test/java/org/apache/sysds/test/component/compress/colgroup/GenOffsetOLETest.java
@@ -26,7 +26,7 @@ import org.apache.sysds.runtime.compress.CompressionSettings;
 import org.apache.sysds.runtime.compress.colgroup.ColGroupOLE;
 import org.junit.Test;
 
-public class genOffsetOLETest {
+public class GenOffsetOLETest {
 
 	@Test
 	public void testEmpty() {