You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2017/09/17 23:48:48 UTC

[2/2] systemml git commit: [SYSTEMML-1919] Shared dictionary for all DDC1 single-column groups

[SYSTEMML-1919] Shared dictionary for all DDC1 single-column groups

This patch extends the CLA (compressed linear algebra) framework, by an
additional compression phase that creates - in a best-effort manner - a
shared dictionary for all DDC1 single-column groups if the total number
of distinct values is <= 255 (or <=256 if the distinct values include
zero). This constraint ensures that all DDC1 column groups remain in
DDC1 format and simply need to be recoded with respect to the shared
dictionary. Also, having <=256 values ensures that the shared dictionary
easily fits into L1 cache (32KB). Since DDC1 column groups are very
common, this approach greatly improves compression ratios, especially
for distributed matrix representations, where the header can dominate
the total matrix size.

On an example scenario with Mnist240m (540GB in uncompressed format),
this patch reduces the RDD storage size as follows:

block size = 1,024:  219.3GB -> 136.1GB
block size = 16,384: 97.5GB -> 90.3GB

Thus, this patch significantly increases the compression potential,
especially with small block sizes such as our default block size of 1K,
which is important because we now apply compression by default.


Project: http://git-wip-us.apache.org/repos/asf/systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/f86879bd
Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/f86879bd
Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/f86879bd

Branch: refs/heads/master
Commit: f86879bd0af5eb046c6fe00a444ff04c603a2e91
Parents: 119893f
Author: Matthias Boehm <mb...@gmail.com>
Authored: Sun Sep 17 16:48:28 2017 -0700
Committer: Matthias Boehm <mb...@gmail.com>
Committed: Sun Sep 17 16:48:54 2017 -0700

----------------------------------------------------------------------
 .../sysml/runtime/compress/ColGroupDDC1.java    |  13 ++
 .../sysml/runtime/compress/ColGroupValue.java   |  15 +-
 .../runtime/compress/CompressedMatrixBlock.java | 139 +++++++++++++++----
 3 files changed, 133 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/systemml/blob/f86879bd/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC1.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC1.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC1.java
index 89ca931..63ab3e7 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC1.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupDDC1.java
@@ -23,6 +23,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.HashMap;
 
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.compress.utils.ConverterUtils;
@@ -102,6 +103,18 @@ public class ColGroupDDC1 extends ColGroupDDC
 		return (_data[r]&0xFF);
 	}
 	
+	public void recodeData(HashMap<Double,Integer> map) {
+		//prepare translation table
+		final int numVals = getNumValues();
+		byte[] lookup = new byte[numVals];
+		for( int k=0; k<numVals; k++ )
+			lookup[k] = map.get(_values[k]).byteValue();
+		
+		//recode the data
+		for( int i=0; i<_numRows; i++ )
+			_data[i] = lookup[_data[i]&0xFF];
+	}
+	
 	@Override
 	public void write(DataOutput out) throws IOException {
 		int numCols = getNumCols();

http://git-wip-us.apache.org/repos/asf/systemml/blob/f86879bd/src/main/java/org/apache/sysml/runtime/compress/ColGroupValue.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupValue.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroupValue.java
index febac4c..63185f2 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/ColGroupValue.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupValue.java
@@ -107,12 +107,15 @@ public abstract class ColGroupValue extends ColGroup
 		
 		// adding the size of values
 		size += 8; //array reference
-		if (_values != null) {
-			size += 32 + _values.length * 8; //values
-		}
+		size += getValuesSize(); // values
 	
 		return size;
 	}
+	
+	public long getValuesSize() {
+		return ( _values != null ) ? 
+			32 + _values.length * 8 : 0;
+	}
 
 	/**
 	 * Obtain number of distinct sets of values associated with the bitmaps in this column group.
@@ -123,11 +126,15 @@ public abstract class ColGroupValue extends ColGroup
 	public int getNumValues() {
 		return _values.length / _colIndexes.length;
 	}
-
+	
 	public double[] getValues() {
 		return _values;
 	}
 	
+	public void setValues(double[] values) {
+		_values = values;
+	}
+	
 	public double getValue(int k, int col) {
 		return _values[k*getNumCols()+col];
 	}

http://git-wip-us.apache.org/repos/asf/systemml/blob/f86879bd/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
index b95b3af..98529c8 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
@@ -108,8 +108,9 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 	public static final boolean TRANSPOSE_INPUT = true;
 	public static final boolean MATERIALIZE_ZEROS = false;
 	public static final long MIN_PAR_AGG_THRESHOLD = 16*1024*1024; //16MB
-	public static boolean INVESTIGATE_ESTIMATES = false;
+	public static final boolean INVESTIGATE_ESTIMATES = false;
 	public static boolean ALLOW_DDC_ENCODING = true;
+	public static final boolean ALLOW_SHARED_DDC1_DICTIONARY = true;
 	private static final boolean LDEBUG = true; //local debug flag
 	private static final Level LDEBUG_LEVEL = Level.INFO; //DEBUG/TRACE for details
 	
@@ -119,12 +120,13 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 		// for internal debugging only
 		if( LDEBUG ) {
 			Logger.getLogger("org.apache.sysml.runtime.compress")
-				  .setLevel((Level) LDEBUG_LEVEL);
+				.setLevel((Level) LDEBUG_LEVEL);
 		}	
 	}
 	
 	protected ArrayList<ColGroup> _colGroups = null;
 	protected CompressionStatistics _stats = null;
+	protected boolean _sharedDDC1Dict = false;
 	
 	public CompressedMatrixBlock() {
 		super(-1, -1, true);
@@ -199,7 +201,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 	@Override
 	public boolean isEmptyBlock(boolean safe)  {
 		if( !isCompressed() )
-			return super.isEmptyBlock(safe);		
+			return super.isEmptyBlock(safe);
 		return (_colGroups == null || getNonZeros()==0);
 	}
 	
@@ -265,12 +267,12 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 		// where a column is compressible if ratio > 1.
 		CompressedSizeInfo[] sizeInfos = (k > 1) ?
 				computeCompressedSizeInfos(bitmapSizeEstimator, numCols, k) : 
-				computeCompressedSizeInfos(bitmapSizeEstimator, numCols);	
+				computeCompressedSizeInfos(bitmapSizeEstimator, numCols);
 		long nnzUC = 0;		
-		for (int col = 0; col < numCols; col++)  {	
+		for (int col = 0; col < numCols; col++)  {
 			double uncompSize = getUncompressedSize(numRows, 1, 
 				OptimizerUtils.getSparsity(numRows, 1, sizeInfos[col].getEstNnz()));
-			double compRatio = uncompSize / sizeInfos[col].getMinSize();			
+			double compRatio = uncompSize / sizeInfos[col].getMinSize();
 			if( compRatio > 1 ) {
 				colsC.add(col);
 				compRatios.put(col, compRatio);
@@ -287,7 +289,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 			for( int i=0; i<colsUC.size(); i++ ) {
 				int col = colsUC.get(i);
 				double uncompSize = getUncompressedSize(numRows, 1, 1.0);
-				double compRatio = uncompSize / sizeInfos[col].getMinSize();			
+				double compRatio = uncompSize / sizeInfos[col].getMinSize();
 				if( compRatio > 1 ) {
 					colsC.add(col);
 					colsUC.remove(i); i--;
@@ -325,7 +327,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 		// PHASE 3: Compress and correct sample-based decisions
 		ColGroup[] colGroups = (k > 1) ?
 				compressColGroups(rawblock, bitmapSizeEstimator, compRatios, numRows, bitmapColGrps, colsUC.isEmpty(), k) : 
-				compressColGroups(rawblock, bitmapSizeEstimator, compRatios, numRows, bitmapColGrps, colsUC.isEmpty()); 	
+				compressColGroups(rawblock, bitmapSizeEstimator, compRatios, numRows, bitmapColGrps, colsUC.isEmpty()); 
 		allocateColGroupList();
 		HashSet<Integer> remainingCols = seq(0, numCols-1, 1);
 		for( int j=0; j<colGroups.length; j++ ) {
@@ -340,8 +342,20 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 			_stats.timePhase3 = time.stop();
 			LOG.debug("--compression phase 3: "+_stats.timePhase3);
 		}
-			
-		// Phase 4: Cleanup
+		
+		// PHASE 4: Best-effort dictionary sharing for DDC1 single-col groups
+		double[] dict = createSharedDDC1Dictionary(_colGroups);
+		if( dict != null ) {
+			applySharedDDC1Dictionary(_colGroups, dict);
+			_sharedDDC1Dict = true;
+		}
+		
+		if( LOG.isDebugEnabled() ) {
+			_stats.timePhase4 = time.stop();
+			LOG.debug("--compression phase 4: "+_stats.timePhase4);
+		}
+		
+		// Phase 5: Cleanup
 		// The remaining columns are stored uncompressed as one big column group
 		if( !remainingCols.isEmpty() ) {
 			ArrayList<Integer> list = new ArrayList<Integer>(remainingCols);
@@ -357,9 +371,9 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 		this.cleanupBlock(true, true);
 		
 		if( LOG.isDebugEnabled() ) {
-			_stats.timePhase4 = time.stop();
+			_stats.timePhase5 = time.stop();
 			int[] counts = getColGroupCounts(_colGroups);
-			LOG.debug("--compression phase 4: "+_stats.timePhase4);
+			LOG.debug("--compression phase 5: "+_stats.timePhase5);
 			LOG.debug("--num col groups: "+_colGroups.size());
 			LOG.debug("--col groups types (OLE,RLE,DDC1,DDC2,UC): "
 					+counts[2]+","+counts[1]+","+counts[3]+","+counts[4]+","+counts[0]);
@@ -460,7 +474,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 		{
 			//exact big list and observe compression ratio
 			ubm = BitmapEncoder.extractBitmap(colIndexes, in); 
-			sizeInfo = estim.estimateCompressedColGroupSize(ubm);	
+			sizeInfo = estim.estimateCompressedColGroupSize(ubm);
 			double sp2 = denseEst ? 1.0 : OptimizerUtils.getSparsity(rlen, 1, ubm.getNumOffsets());
 			double compRatio = getUncompressedSize(rlen, colIndexes.length, sp2) / sizeInfo.getMinSize();
 		
@@ -529,6 +543,47 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 		//eventually represented in dense
 		return Math.min(8d * rlen * clen, 4d * rlen + 12d * rlen * clen * sparsity);
 	}
+	
+	private static double[] createSharedDDC1Dictionary(ArrayList<ColGroup> colGroups) {
+		if( !ALLOW_DDC_ENCODING || !ALLOW_SHARED_DDC1_DICTIONARY )
+			return null;
+		
+		//create joint dictionary
+		HashSet<Double> tmp = new HashSet<Double>();
+		int numQual = 0;
+		for( ColGroup grp : colGroups )
+			if( grp.getNumCols()==1 && grp instanceof ColGroupDDC1 ) {
+				ColGroupDDC1 grpDDC1 = (ColGroupDDC1) grp;
+				for( double val : grpDDC1.getValues() )
+					tmp.add(val);
+				numQual ++;
+			}
+		
+		//abort shared dictionary creation if empty or too large
+		int maxSize = tmp.contains(0d) ? 256 : 255;
+		if( tmp.isEmpty() || tmp.size() > maxSize || numQual < 2 )
+			return null;
+		LOG.debug("Created shared directionary for "
+			+ numQual+" DDC1 single column groups.");
+		
+		//build consolidated dictionary
+		return tmp.stream().mapToDouble(Double::doubleValue).toArray();
+	}
+	
+	private static void applySharedDDC1Dictionary(ArrayList<ColGroup> colGroups, double[] dict) {
+		//create joint mapping table
+		HashMap<Double, Integer> map = new HashMap<>();
+		for(int i=0; i<dict.length; i++)
+			map.put(dict[i], i);
+		
+		//recode data of all relevant DDC1 groups
+		for( ColGroup grp : colGroups )
+			if( grp.getNumCols()==1 && grp instanceof ColGroupDDC1 ) {
+				ColGroupDDC1 grpDDC1 = (ColGroupDDC1) grp;
+				grpDDC1.recodeData(map);
+				grpDDC1.setValues(dict);
+			}
+	}
 
 	/**
 	 * Decompress block.
@@ -545,7 +600,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 		
 		Timing time = new Timing(true);
 		
-		//preallocation sparse rows to avoid repeated reallocations		
+		//preallocation sparse rows to avoid repeated reallocations
 		MatrixBlock ret = new MatrixBlock(getNumRows(), getNumColumns(), isInSparseFormat(), getNonZeros());
 		if( ret.isInSparseFormat() ) {
 			int[] rnnz = new int[rlen];
@@ -588,7 +643,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 		if( k <= 1 )
 			return decompress();
 		
-		Timing time = new Timing(true);
+		Timing time = LOG.isDebugEnabled() ? new Timing(true) : null;
 		
 		MatrixBlock ret = new MatrixBlock(rlen, clen, sparse, nonZeros);
 		ret.allocateDenseOrSparseBlock();
@@ -602,7 +657,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 			ArrayList<DecompressTask> tasks = new ArrayList<DecompressTask>();
 			for( int i=0; i<k & i*blklen<getNumRows(); i++ )
 				tasks.add(new DecompressTask(_colGroups, ret, i*blklen, Math.min((i+1)*blklen,rlen)));
-			List<Future<Object>> rtasks = pool.invokeAll(tasks);	
+			List<Future<Object>> rtasks = pool.invokeAll(tasks);
 			pool.shutdown();
 			for( Future<Object> rt : rtasks )
 				rt.get(); //error handling
@@ -637,6 +692,16 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 		total += 80 + 8 * _colGroups.size();
 		for (ColGroup grp : _colGroups)
 			total += grp.estimateInMemorySize();
+		//correction for shared DDC1 dictionary
+		if( _sharedDDC1Dict ) {
+			boolean seenDDC1 = false;
+			for (ColGroup grp : _colGroups)
+				if( grp.getNumCols()==1 && grp instanceof ColGroupDDC1 ) {
+					if( seenDDC1 ) 
+						total -= ((ColGroupDDC1)grp).getValuesSize();
+					seenDDC1 = true;
+				}
+		}
 		return total;
 	}
 
@@ -660,6 +725,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 		public double timePhase2 = -1;
 		public double timePhase3 = -1;
 		public double timePhase4 = -1;
+		public double timePhase5 = -1;
 		public double estSize = -1;
 		public double size = -1;
 		public double ratio = -1;
@@ -668,11 +734,12 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 			//do nothing
 		}
 		
-		public CompressionStatistics(double t1, double t2, double t3, double t4){
+		public CompressionStatistics(double t1, double t2, double t3, double t4, double t5){
 			timePhase1 = t1;
 			timePhase2 = t2;
 			timePhase3 = t3;
 			timePhase4 = t4;
+			timePhase5 = t5;
 		}
 	} 
 
@@ -726,9 +793,11 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 		rlen = in.readInt();
 		clen = in.readInt();
 		nonZeros = in.readLong();
+		_sharedDDC1Dict = in.readBoolean();
 		int ncolGroups = in.readInt();
 		
 		_colGroups = new ArrayList<ColGroup>(ncolGroups);
+		double[] sharedDict = null;
 		for( int i=0; i<ncolGroups; i++ ) 
 		{
 			CompressionType ctype = CompressionType.values()[in.readByte()];
@@ -745,11 +814,20 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 				case DDC1:
 					grp = new ColGroupDDC1(); break;
 				case DDC2:
-					grp = new ColGroupDDC2(); break;	
+					grp = new ColGroupDDC2(); break;
 			}
 			
 			//deserialize and add column group
 			grp.readFields(in);
+			
+			//use shared DDC1 dictionary if applicable
+			if( _sharedDDC1Dict && grp.getNumCols()==1 ) {
+				if( sharedDict == null )
+					sharedDict = ((ColGroupDDC1)grp).getValues();
+				else
+					((ColGroupDDC1)grp).setValues(sharedDict);
+			}
+			
 			_colGroups.add(grp);
 		}
 	}
@@ -770,6 +848,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 		out.writeInt(rlen);
 		out.writeInt(clen);
 		out.writeLong(nonZeros);
+		out.writeBoolean(_sharedDDC1Dict);
 		out.writeInt(_colGroups.size());
 		
 		for( ColGroup grp : _colGroups ) {
@@ -876,7 +955,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 		
 		final int m = rlen;
 		final int n = clen+that.getNumColumns();
-		final long nnz = nonZeros+that.getNonZeros();		
+		final long nnz = nonZeros+that.getNonZeros();
 		
 		//init result matrix 
 		CompressedMatrixBlock ret2 = null;
@@ -905,7 +984,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 		}
 		
 		//meta data maintenance
-		ret2.setNonZeros(nnz);		
+		ret2.setNonZeros(nnz);
 		return ret2;
 	}
 	
@@ -1121,7 +1200,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 				else
 					for( ArrayList<ColGroup> grp : grpParts )
 						tasks.add(new UnaryAggregateTask(grp, ret, 0, rlen, op));
-				List<Future<MatrixBlock>> rtasks = pool.invokeAll(tasks);	
+				List<Future<MatrixBlock>> rtasks = pool.invokeAll(tasks);
 				pool.shutdown();
 				
 				//aggregate partial results
@@ -1362,7 +1441,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 			
 			//compute uncompressed column group in parallel 
 			if( uc != null )
-				uc.rightMultByVector(vector, result, k);					
+				uc.rightMultByVector(vector, result, k);
 			
 			//compute remaining compressed column groups in parallel
 			ExecutorService pool = Executors.newFixedThreadPool( k );
@@ -1452,7 +1531,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 			ColGroupValue.setupThreadLocalMemory(getMaxNumValues(colGroups));
 		
 		// delegate matrix-vector operation to each column group
-		for (ColGroup grp : colGroups) {			
+		for (ColGroup grp : colGroups) {
 			grp.leftMultByRowVector(rowVector, result);
 		}
 		
@@ -1469,7 +1548,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 		result.reset();
 		
 		// delegate matrix-vector operation to each column group
-		for( ColGroup grp : colGroups ) {			
+		for( ColGroup grp : colGroups ) {
 			((ColGroupValue)grp).leftMultByRowVector(vector, result);
 		}
 		
@@ -1509,7 +1588,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 			//compute uncompressed column group in parallel 
 			ColGroupUncompressed uc = getUncompressedColGroup();
 			if( uc != null )
-				uc.leftMultByRowVector(vector, result, k);					
+				uc.leftMultByRowVector(vector, result, k);
 			
 			//compute remaining compressed column groups in parallel
 			ExecutorService pool = Executors.newFixedThreadPool( Math.min(colGroups.size()-((uc!=null)?1:0), k) );
@@ -1534,7 +1613,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 		throws DMLRuntimeException 
 	{
 		final int numRows = groups.get(0).getNumRows();
-		final int numGroups = groups.size();		
+		final int numGroups = groups.size();
 		final boolean containsUC = containsUncompressedColGroup(groups);
 		
 		//preallocated dense tmp matrix blocks
@@ -1551,7 +1630,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 		for( int i=gl; i<gu; i++ ) 
 		{
 			//get current group and relevant col groups
-			ColGroup group = groups.get(i);	
+			ColGroup group = groups.get(i);
 			int[] ixgroup = group.getColIndices();
 			List<ColGroup> tmpList = groups.subList(i, numGroups);
 			
@@ -1559,7 +1638,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 				&& ixgroup.length==1 && !containsUC && numRows<BitmapEncoder.BITMAP_BLOCK_SZ ) 
 			{
 				//compute vector-matrix partial result
-				leftMultByVectorTranspose(tmpList, (ColGroupDDC)group, tmpret);								
+				leftMultByVectorTranspose(tmpList, (ColGroupDDC)group, tmpret);
 				
 				//write partial results (disjoint non-zeros)
 				LinearAlgebraUtils.copyNonZerosToUpperTriangle(result, tmpret, ixgroup[0]);	
@@ -1571,10 +1650,10 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 					
 					if( !lhs.isEmptyBlock(false) ) {
 						//compute vector-matrix partial result
-						leftMultByVectorTranspose(tmpList, lhs, tmpret, false, false);								
+						leftMultByVectorTranspose(tmpList, lhs, tmpret, false, false);
 						
 						//write partial results (disjoint non-zeros)
-						LinearAlgebraUtils.copyNonZerosToUpperTriangle(result, tmpret, ixgroup[j]);	
+						LinearAlgebraUtils.copyNonZerosToUpperTriangle(result, tmpret, ixgroup[j]);
 					}
 				}	
 			}