You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/07/28 18:13:19 UTC

[3/3] incubator-systemml git commit: [SYSTEMML-821] Multi-threaded matrix block compression, tests

[SYSTEMML-821] Multi-threaded matrix block compression, tests

This patch enables multi-threaded matrix block compression in the CP
compression instruction. In detail, this is realized via (1)
multi-threaded transpose, (2) multi-threaded column classification, and
(3) multi-threaded column compression. Down the road we will also add
multi-threaded column co-coding but this requires some major refactoring
first. On the sparse Imagenet dataset (1262102x900) the individual
performance improvements were as follows (compression phases 1-4)
leading to an overall improvement of 4.5x:

(0) baseline: 22.6s, 4.6s, 26.9s, 0.3s,
(1) transpose: 7.2s, 4.6s, 26.9s, 0.3s,
(2) classify: 4.0s, 4.6s, 26.9s, 0.3s,
(3) grouping: 3.8s, 4.7s, 3.4s, 0.3s.

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/873bae76
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/873bae76
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/873bae76

Branch: refs/heads/master
Commit: 873bae76bad9267b2c710404f7a08707ca76ca18
Parents: 44c8f9d
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Thu Jul 28 02:01:07 2016 -0700
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Thu Jul 28 02:01:07 2016 -0700

----------------------------------------------------------------------
 .../runtime/compress/CompressedMatrixBlock.java | 350 ++++++++++++++-----
 .../cp/CompressionCPInstruction.java            |   3 +-
 .../runtime/matrix/data/LibMatrixReorg.java     |   2 +-
 .../functions/compress/ParCompressionTest.java  | 169 +++++++++
 .../functions/compress/ZPackageSuite.java       |   1 +
 5 files changed, 428 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/873bae76/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
index 81d933d..f2ccb43 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
@@ -193,8 +193,22 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 	 * which should be fixed if we move ahead with this compression strategy.
 	 * 
 	 * +per column sparsity
+	 * 
+	 * @throws DMLRuntimeException
 	 */
 	public void compress() 
+		throws DMLRuntimeException
+	{
+		//default sequential execution
+		compress(1);
+	}
+	
+	/**
+	 * 
+	 * @param k  number of threads
+	 * @throws DMLRuntimeException
+	 */
+	public void compress(int k) 
 		throws DMLRuntimeException 
 	{
 		//check for redundant compression
@@ -216,11 +230,8 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 		final int numRows = getNumRows();
 		final int numCols = getNumColumns();
 		final boolean sparse = isInSparseFormat();
-		MatrixBlock rawblock = this;
-		if( TRANSPOSE_INPUT )
-			rawblock = LibMatrixReorg.transpose(rawblock, new MatrixBlock(numCols, numRows, sparse));
-		else
-			rawblock = new MatrixBlock(this);
+		MatrixBlock rawblock = !TRANSPOSE_INPUT ? new MatrixBlock(this) :
+			LibMatrixReorg.transpose(this, new MatrixBlock(numCols, numRows, sparse), k);
 		
 		//construct sample-based size estimator
 		CompressedSizeEstimator bitmapSizeEstimator = 
@@ -234,18 +245,12 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 
 		// We start with a full set of columns.
 		HashSet<Integer> remainingCols = new HashSet<Integer>();
-		for (int i = 0; i < numCols; i++) {
+		for (int i = 0; i < numCols; i++)
 			remainingCols.add(i);
-		}
 
 		// PHASE 1: Classify columns by compression type
-		// We start by determining which columns are amenable to bitmap
-		// compression
-
-		// It is correct to use the dense size as the uncompressed size
-		// FIXME not numRows but nnz / col otherwise too aggressive overestimation
-		// of uncompressed size and hence overestimation of compression potential
-		double uncompressedColumnSize = 8 * numRows;
+		// We start by determining which columns are amenable to bitmap compression
+		double uncompressedColumnSize = getUncompressedSize(numRows, 1);
 
 		// information about the bitmap amenable columns
 		List<Integer> bitmapCols = new ArrayList<Integer>();
@@ -256,11 +261,12 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 		
 		// Minimum ratio (size of uncompressed / size of compressed) that we
 		// will accept when encoding a field with a bitmap.
+		CompressedSizeInfo[] sizeInfos = (k > 1) ?
+				computeCompressedSizeInfos(bitmapSizeEstimator, numCols, k) : 
+				computeCompressedSizeInfos(bitmapSizeEstimator, numCols);		
 		for (int col = 0; col < numCols; col++) 
-		{
-			CompressedSizeInfo compressedSizeInfo = bitmapSizeEstimator
-					.estimateCompressedColGroupSize(new int[] { col });
-			long compressedSize = compressedSizeInfo.getMinSize();
+		{	
+			long compressedSize = sizeInfos[col].getMinSize();
 			double compRatio = uncompressedColumnSize / compressedSize;
 			
 			//FIXME: compression ratio should be checked against 1 instead of min compression
@@ -269,7 +275,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 			if (compRatio >= MIN_COMPRESSION_RATIO) {
 				bitmapCols.add(col);
 				compressionRatios.put(col, compRatio);
-				colsCardinalities.add(compressedSizeInfo.getEstCarinality());
+				colsCardinalities.add(sizeInfos[col].getEstCarinality());
 				compressedSizes.add(compressedSize);
 			}
 			else
@@ -313,77 +319,15 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 		}
 		
 		// PHASE 3: Compress and correct sample-based decisions
-		
-		for (int[] groupIndices : bitmapColGrps) 
-		{
-			int[] allGroupIndices = null;
-			int allColsCount = groupIndices.length;
-			CompressedSizeInfo bitmapSizeInfo;
-			// The compression type is decided based on a full bitmap since it
-			// will be reused for the actual compression step.
-			UncompressedBitmap ubm;
-			PriorityQueue<CompressedColumn> compRatioPQ = null;
-			boolean skipGroup = false;
-			while (true) 
-			{
-				ubm = BitmapEncoder.extractBitmap(groupIndices, rawblock); 
-				bitmapSizeInfo = bitmapSizeEstimator
-						.estimateCompressedColGroupSize(ubm);
-				double compRatio = uncompressedColumnSize * groupIndices.length
-						/ bitmapSizeInfo.getMinSize();
-				if (compRatio >= MIN_COMPRESSION_RATIO) {
-					// we have a good group
-					for( Integer col : groupIndices )
-						remainingCols.remove(col);
-					break;
-				} else {
-					// modify the group
-					if (compRatioPQ == null) {
-						// first modification
-						allGroupIndices = Arrays.copyOf(groupIndices, groupIndices.length);
-						compRatioPQ = new PriorityQueue<CompressedMatrixBlock.CompressedColumn>();
-						for (int i = 0; i < groupIndices.length; i++)
-							compRatioPQ.add(new CompressedColumn(i,
-									compressionRatios.get(groupIndices[i])));
-					}
-
-					// index in allGroupIndices
-					int removeIx = compRatioPQ.poll().colIx;
-					allGroupIndices[removeIx] = -1;
-					allColsCount--;
-					if (allColsCount == 0) {
-						skipGroup = true;
-						break;
-					}
-					groupIndices = new int[allColsCount];
-					// copying the values that do not equal -1
-					int ix = 0;
-					for (int col : allGroupIndices) {
-						if (col != -1) {
-							groupIndices[ix++] = col;
-						}
-					}
-
-				}
+		ColGroup[] colGroups = (k > 1) ?
+				compressColGroups(rawblock, bitmapSizeEstimator, compressionRatios, numRows, bitmapColGrps, k) : 
+				compressColGroups(rawblock, bitmapSizeEstimator, compressionRatios, numRows, bitmapColGrps); 	
+		for( int j=0; j<colGroups.length; j++ ) {
+			if( colGroups[j] != null ) {
+				for( int col : colGroups[j].getColIndices() )
+					remainingCols.remove(col);
+				_colGroups.add(colGroups[j]);
 			}
-
-			if (skipGroup)
-				continue;
-			long rleNumBytes = bitmapSizeInfo.getRLESize();
-			long offsetNumBytes = bitmapSizeInfo.getOLESize();
-			double rleRatio = (double) offsetNumBytes / (double) rleNumBytes;
-
-			if (rleRatio > MIN_RLE_RATIO) {
-				ColGroupRLE compressedGroup = new ColGroupRLE(groupIndices,
-						numRows, ubm);
-				_colGroups.add(compressedGroup);
-			} 
-			else {
-				ColGroupOLE compressedGroup = new ColGroupOLE(
-						groupIndices, numRows, ubm);
-				_colGroups.add(compressedGroup);
-			}
-
 		}
 		
 		_stats.timePhase3 = time.stop();
@@ -407,6 +351,182 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 			LOG.debug("compression phase 4: "+_stats.timePhase4);
 	}
 
+	public CompressionStatistics getCompressionStatistics() {
+		return _stats;
+	}
+
+	/**
+	 * 
+	 * @param estim
+	 * @param clen
+	 * @return
+	 */
+	private static CompressedSizeInfo[] computeCompressedSizeInfos(CompressedSizeEstimator estim, int clen) {
+		CompressedSizeInfo[] ret = new CompressedSizeInfo[clen];
+		for( int col=0; col<clen; col++ )
+			ret[col] = estim.estimateCompressedColGroupSize(new int[] { col });
+		return ret;
+	}
+	
+	/**
+	 * 
+	 * @param estim
+	 * @param clen
+	 * @param k
+	 * @return
+	 * @throws DMLRuntimeException
+	 */
+	private static CompressedSizeInfo[] computeCompressedSizeInfos(CompressedSizeEstimator estim, int clen, int k) 
+		throws DMLRuntimeException 
+	{	
+		try {
+			ExecutorService pool = Executors.newFixedThreadPool( k );
+			ArrayList<SizeEstimTask> tasks = new ArrayList<SizeEstimTask>();
+			for( int col=0; col<clen; col++ )
+				tasks.add(new SizeEstimTask(estim, col));
+			List<Future<CompressedSizeInfo>> rtask = pool.invokeAll(tasks);	
+			ArrayList<CompressedSizeInfo> ret = new ArrayList<CompressedSizeInfo>();
+			for( Future<CompressedSizeInfo> lrtask : rtask )
+				ret.add(lrtask.get());
+			pool.shutdown();
+			return ret.toArray(new CompressedSizeInfo[0]);
+		}
+		catch(Exception ex) {
+			throw new DMLRuntimeException(ex);
+		}
+	}
+
+	/**
+	 * 
+	 * @param in
+	 * @param estim
+	 * @param compRatios
+	 * @param rlen
+	 * @param groups
+	 * @return
+	 */
+	private static ColGroup[] compressColGroups(MatrixBlock in, CompressedSizeEstimator estim, HashMap<Integer, Double> compRatios, int rlen, List<int[]> groups)
+	{
+		ColGroup[] ret = new ColGroup[groups.size()];
+		for( int i=0; i<groups.size(); i++ )
+			ret[i] = compressColGroup(in, estim, compRatios, rlen, groups.get(i));
+		
+		return ret;
+	}
+	
+	/**
+	 * 
+	 * @param in
+	 * @param estim
+	 * @param compRatios
+	 * @param rlen
+	 * @param groups
+	 * @param k
+	 * @return
+	 * @throws DMLRuntimeException 
+	 */
+	private static ColGroup[] compressColGroups(MatrixBlock in, CompressedSizeEstimator estim, HashMap<Integer, Double> compRatios, int rlen, List<int[]> groups, int k) 
+		throws DMLRuntimeException
+	{
+		try {
+			ExecutorService pool = Executors.newFixedThreadPool( k );
+			ArrayList<CompressTask> tasks = new ArrayList<CompressTask>();
+			for( int[] colIndexes : groups )
+				tasks.add(new CompressTask(in, estim, compRatios, rlen, colIndexes));
+			List<Future<ColGroup>> rtask = pool.invokeAll(tasks);	
+			ArrayList<ColGroup> ret = new ArrayList<ColGroup>();
+			for( Future<ColGroup> lrtask : rtask )
+				ret.add(lrtask.get());
+			pool.shutdown();
+			return ret.toArray(new ColGroup[0]);
+		}
+		catch(Exception ex) {
+			throw new DMLRuntimeException(ex);
+		}
+	}
+	
+	/**
+	 * 
+	 * @param in
+	 * @param estim
+	 * @param compRatios
+	 * @param rlen
+	 * @param colIndexes
+	 * @return
+	 */
+	private static ColGroup compressColGroup(MatrixBlock in, CompressedSizeEstimator estim, HashMap<Integer, Double> compRatios, int rlen, int[] colIndexes) 
+	{
+		int[] allGroupIndices = null;
+		int allColsCount = colIndexes.length;
+		CompressedSizeInfo sizeInfo;
+		// The compression type is decided based on a full bitmap since it
+		// will be reused for the actual compression step.
+		UncompressedBitmap ubm = null;
+		PriorityQueue<CompressedColumn> compRatioPQ = null;
+		boolean skipGroup = false;
+		while (true) 
+		{
+			//exact big list and observe compression ratio
+			ubm = BitmapEncoder.extractBitmap(colIndexes, in); 
+			sizeInfo = estim.estimateCompressedColGroupSize(ubm);		
+			double compRatio = getUncompressedSize(rlen, colIndexes.length) / sizeInfo.getMinSize();
+			
+			if (compRatio >= MIN_COMPRESSION_RATIO) {
+				break; // we have a good group
+			} 
+			
+			// modify the group
+			if (compRatioPQ == null) {
+				// first modification
+				allGroupIndices = Arrays.copyOf(colIndexes, colIndexes.length);
+				compRatioPQ = new PriorityQueue<CompressedMatrixBlock.CompressedColumn>();
+				for (int i = 0; i < colIndexes.length; i++)
+					compRatioPQ.add(new CompressedColumn(i, compRatios.get(colIndexes[i])));
+			}
+
+			// index in allGroupIndices
+			int removeIx = compRatioPQ.poll().colIx;
+			allGroupIndices[removeIx] = -1;
+			allColsCount--;
+			if (allColsCount == 0) {
+				skipGroup = true;
+				break;
+			}
+			colIndexes = new int[allColsCount];
+			// copying the values that do not equal -1
+			int ix = 0;
+			for (int col : allGroupIndices)
+				if (col != -1)
+					colIndexes[ix++] = col;
+		}
+
+		//add group to uncompressed fallback
+		if( skipGroup )
+			return null;
+
+		//create compressed column group
+		long rleNumBytes = sizeInfo.getRLESize();
+		long offsetNumBytes = sizeInfo.getOLESize();
+		double rleRatio = (double) offsetNumBytes / (double) rleNumBytes;
+		if (rleRatio > MIN_RLE_RATIO)
+			return new ColGroupRLE(colIndexes, rlen, ubm);
+		else
+			return new ColGroupOLE(colIndexes, rlen, ubm);
+	}
+	
+	/**
+	 * 
+	 * @param rlen
+	 * @param clen
+	 * @return
+	 */
+	private static double getUncompressedSize(int rlen, int clen) {
+		// It is correct to use the dense size as the uncompressed size
+		// FIXME not numRows but nnz / col otherwise too aggressive overestimation
+		// of uncompressed size and hence overestimation of compression potential
+		return 8 * rlen * clen;
+	}
+
 	/**
 	 * @return a new uncompressed matrix block containing the contents of this
 	 *         block
@@ -439,11 +559,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 
 		return ret;
 	}
-
-	public CompressionStatistics getCompressionStatistics(){
-		return _stats;
-	}
-
+	
 	/**
 	 * 
 	 * @return an upper bound on the memory used to store this compressed block
@@ -463,7 +579,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 		return total;
 	}
 
-	private class CompressedColumn implements Comparable<CompressedColumn> {
+	private static class CompressedColumn implements Comparable<CompressedColumn> {
 		int colIx;
 		double compRatio;
 
@@ -1372,6 +1488,50 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 		}
 	}
 	
+	/**
+	 * 
+	 */
+	private static class SizeEstimTask implements Callable<CompressedSizeInfo> 
+	{
+		private CompressedSizeEstimator _estim = null;
+		private int _col = -1;
+		
+		protected SizeEstimTask( CompressedSizeEstimator estim, int col )  {
+			_estim = estim;
+			_col = col;
+		}
+		
+		@Override
+		public CompressedSizeInfo call() throws DMLRuntimeException {
+			return _estim.estimateCompressedColGroupSize(new int[] { _col });
+		}
+	}
+	
+	/**
+	 *
+	 */
+	private static class CompressTask implements Callable<ColGroup> 
+	{
+		private MatrixBlock _in = null;
+		private CompressedSizeEstimator _estim = null;
+		private HashMap<Integer, Double> _compRatios = null;
+		private int _rlen = -1;
+		private int[] _colIndexes = null;
+		
+		protected CompressTask( MatrixBlock in, CompressedSizeEstimator estim, HashMap<Integer, Double> compRatios, int rlen, int[] colIndexes )  {
+			_in = in;
+			_estim = estim;
+			_compRatios = compRatios;
+			_rlen = rlen;
+			_colIndexes = colIndexes;
+		}
+		
+		@Override
+		public ColGroup call() throws DMLRuntimeException {
+			return compressColGroup(_in, _estim, _compRatios, _rlen, _colIndexes);
+		}
+	}
+	
 	//////////////////////////////////////////
 	// Graceful fallback to uncompressed linear algebra
 	

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/873bae76/src/main/java/org/apache/sysml/runtime/instructions/cp/CompressionCPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/cp/CompressionCPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/cp/CompressionCPInstruction.java
index 333bfbb..5230945 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/cp/CompressionCPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/cp/CompressionCPInstruction.java
@@ -19,6 +19,7 @@
 
 package org.apache.sysml.runtime.instructions.cp;
 
+import org.apache.sysml.hops.OptimizerUtils;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.compress.CompressedMatrixBlock;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
@@ -54,7 +55,7 @@ public class CompressionCPInstruction extends UnaryCPInstruction
 		
 		//compress the matrix block
 		CompressedMatrixBlock cmb = new CompressedMatrixBlock(in);
-		cmb.compress();
+		cmb.compress(OptimizerUtils.getConstrainedNumThreads(-1));
 		
 		//set output and release input
 		ec.releaseMatrixInput(input1.getName());

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/873bae76/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
index 59663b5..a2e1252 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
@@ -185,7 +185,7 @@ public class LibMatrixReorg
 		throws DMLRuntimeException
 	{
 		//redirect small or special cases to sequential execution
-		if( in.isEmptyBlock(false) || (in.rlen * in.clen < PAR_NUMCELL_THRESHOLD)
+		if( in.isEmptyBlock(false) || (in.rlen * in.clen < PAR_NUMCELL_THRESHOLD) || k == 1
 			|| (SHALLOW_DENSE_VECTOR_TRANSPOSE && !in.sparse && !out.sparse && (in.rlen==1 || in.clen==1) )
 			|| (in.sparse && !out.sparse && in.rlen==1) || (!in.sparse && out.sparse && in.rlen==1) 
 			|| (!in.sparse && out.sparse) || !out.isThreadSafe())

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/873bae76/src/test/java/org/apache/sysml/test/integration/functions/compress/ParCompressionTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/compress/ParCompressionTest.java b/src/test/java/org/apache/sysml/test/integration/functions/compress/ParCompressionTest.java
new file mode 100644
index 0000000..e0fe847
--- /dev/null
+++ b/src/test/java/org/apache/sysml/test/integration/functions/compress/ParCompressionTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.test.integration.functions.compress;
+
+import org.apache.sysml.runtime.compress.CompressedMatrixBlock;
+import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.util.DataConverter;
+import org.apache.sysml.test.integration.AutomatedTestBase;
+import org.apache.sysml.test.utils.TestUtils;
+import org.junit.Test;
+
+/**
+ * 
+ */
+public class ParCompressionTest extends AutomatedTestBase
+{	
+	private static final int rows = 1023;
+	private static final int cols = 20;
+	private static final double sparsity1 = 0.9;
+	private static final double sparsity2 = 0.1;
+	private static final double sparsity3 = 0.0;
+	
+	public enum SparsityType {
+		DENSE,
+		SPARSE,
+		EMPTY,
+	}
+	
+	public enum ValueType {
+		RAND,
+		RAND_ROUND,
+		CONST,
+	}
+	
+	@Override
+	public void setUp() {
+		
+	}
+	
+	@Test
+	public void testDenseRandDataCompression() {
+		runCompressionTest(SparsityType.DENSE, ValueType.RAND, true);
+	}
+	
+	@Test
+	public void testSparseRandDataCompression() {
+		runCompressionTest(SparsityType.SPARSE, ValueType.RAND, true);
+	}
+	
+	@Test
+	public void testEmptyCompression() {
+		runCompressionTest(SparsityType.EMPTY, ValueType.RAND, true);
+	}
+	
+	@Test
+	public void testDenseRoundRandDataCompression() {
+		runCompressionTest(SparsityType.DENSE, ValueType.RAND_ROUND, true);
+	}
+	
+	@Test
+	public void testSparseRoundRandDataCompression() {
+		runCompressionTest(SparsityType.SPARSE, ValueType.RAND_ROUND, true);
+	}
+	
+	@Test
+	public void testDenseConstantDataCompression() {
+		runCompressionTest(SparsityType.DENSE, ValueType.CONST, true);
+	}
+	
+	@Test
+	public void testSparseConstDataCompression() {
+		runCompressionTest(SparsityType.SPARSE, ValueType.CONST, true);
+	}
+	
+	@Test
+	public void testDenseRandDataNoCompression() {
+		runCompressionTest(SparsityType.DENSE, ValueType.RAND, false);
+	}
+	
+	@Test
+	public void testSparseRandDataNoCompression() {
+		runCompressionTest(SparsityType.SPARSE, ValueType.RAND, false);
+	}
+	
+	@Test
+	public void testEmptyNoCompression() {
+		runCompressionTest(SparsityType.EMPTY, ValueType.RAND, false);
+	}
+	
+	@Test
+	public void testDenseRoundRandDataNoCompression() {
+		runCompressionTest(SparsityType.DENSE, ValueType.RAND_ROUND, false);
+	}
+	
+	@Test
+	public void testSparseRoundRandDataNoCompression() {
+		runCompressionTest(SparsityType.SPARSE, ValueType.RAND_ROUND, false);
+	}
+	
+	@Test
+	public void testDenseConstDataNoCompression() {
+		runCompressionTest(SparsityType.DENSE, ValueType.CONST, false);
+	}
+	
+	@Test
+	public void testSparseConstDataNoCompression() {
+		runCompressionTest(SparsityType.SPARSE, ValueType.CONST, false);
+	}
+	
+
+	/**
+	 * 
+	 * @param mb
+	 */
+	private void runCompressionTest(SparsityType sptype, ValueType vtype, boolean compress)
+	{
+		try
+		{
+			//prepare sparsity for input data
+			double sparsity = -1;
+			switch( sptype ){
+				case DENSE: sparsity = sparsity1; break;
+				case SPARSE: sparsity = sparsity2; break;
+				case EMPTY: sparsity = sparsity3; break;
+			}
+			
+			//generate input data
+			double min = (vtype==ValueType.CONST)? 10 : -10;
+			double[][] input = TestUtils.generateTestMatrix(rows, cols, min, 10, sparsity, 7);
+			if( vtype==ValueType.RAND_ROUND )
+				input = TestUtils.round(input);
+			MatrixBlock mb = DataConverter.convertToMatrixBlock(input);
+			
+			//compress given matrix block
+			CompressedMatrixBlock cmb = new CompressedMatrixBlock(mb);
+			if( compress )
+				cmb.compress(InfrastructureAnalyzer.getLocalParallelism());
+			
+			//decompress the compressed matrix block
+			MatrixBlock tmp = cmb.decompress();
+			
+			//compare result with input
+			double[][] d1 = DataConverter.convertToDoubleMatrix(mb);
+			double[][] d2 = DataConverter.convertToDoubleMatrix(tmp);
+			TestUtils.compareMatrices(d1, d2, rows, cols, 0);
+		}
+		catch(Exception ex) {
+			throw new RuntimeException(ex);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/873bae76/src/test_suites/java/org/apache/sysml/test/integration/functions/compress/ZPackageSuite.java
----------------------------------------------------------------------
diff --git a/src/test_suites/java/org/apache/sysml/test/integration/functions/compress/ZPackageSuite.java b/src/test_suites/java/org/apache/sysml/test/integration/functions/compress/ZPackageSuite.java
index c8dc906..7d19ae9 100644
--- a/src/test_suites/java/org/apache/sysml/test/integration/functions/compress/ZPackageSuite.java
+++ b/src/test_suites/java/org/apache/sysml/test/integration/functions/compress/ZPackageSuite.java
@@ -43,6 +43,7 @@ import org.junit.runners.Suite;
 	LargeMatrixVectorMultTest.class,
 	LargeParMatrixVectorMultTest.class,
 	LargeVectorMatrixMultTest.class,
+	ParCompressionTest.class,
 	ParMatrixMultChainTest.class,
 	ParMatrixVectorMultTest.class,
 	ParTransposeSelfLeftMatrixMultTest.class,