You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/07/28 18:13:17 UTC

[1/3] incubator-systemml git commit: [SYSTEMML-562] Fix serialization/partitioning of partitioned broadcasts

Repository: incubator-systemml
Updated Branches:
  refs/heads/master 97b136601 -> 873bae76b


[SYSTEMML-562] Fix serialization/partitioning of partitioned broadcasts

This patch fixes an issue with incorrect class references on
deserialization as well as various smaller issues related to
incompatible serialization/deserialization (long vs int rlen/clen) and
partitioning (destroyed block references).

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/42906ba1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/42906ba1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/42906ba1

Branch: refs/heads/master
Commit: 42906ba126ed25b3dfee3ab254c9afbb6caf421e
Parents: 97b1366
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Wed Jul 27 20:59:15 2016 -0700
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Wed Jul 27 20:59:15 2016 -0700

----------------------------------------------------------------------
 .../caching/CacheBlockFactory.java              |  57 ++++++
 .../context/SparkExecutionContext.java          |   2 +-
 .../spark/data/PartitionedBlock.java            | 180 +++++++++----------
 3 files changed, 140 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/42906ba1/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheBlockFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheBlockFactory.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheBlockFactory.java
new file mode 100644
index 0000000..3bf86b5
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheBlockFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.controlprogram.caching;
+
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+
+/**
+ * Factory to create instances of matrix/frame blocks given
+ * internal codes.
+ * 
+ */
+public class CacheBlockFactory 
+{
+	/**
+	 * 
+	 * @param code
+	 * @return
+	 */
+	public static CacheBlock newInstance(int code) {
+		switch( code ) {
+			case 0: return new MatrixBlock();
+			case 1: return new FrameBlock();
+		}
+		throw new RuntimeException("Unsupported cache block type: "+code);
+	}
+	
+	/**
+	 * 
+	 * @param block
+	 * @return
+	 */
+	public static int getCode(CacheBlock block) {
+		if( block instanceof MatrixBlock )
+			return 0;
+		else if( block instanceof FrameBlock )
+			return 1;
+		throw new RuntimeException("Unsupported cache block type: "+block.getClass().getName());
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/42906ba1/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
index 99614f2..0eea221 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
@@ -910,7 +910,7 @@ public class SparkExecutionContext extends ExecutionContext
 		
 		long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
 
-		PartitionedBlock<MatrixBlock> out = new PartitionedBlock<MatrixBlock>(rlen, clen, brlen, bclen, new MatrixBlock());
+		PartitionedBlock<MatrixBlock> out = new PartitionedBlock<MatrixBlock>(rlen, clen, brlen, bclen);
 		List<Tuple2<MatrixIndexes,MatrixBlock>> list = rdd.collect();
 		
 		//copy blocks one-at-a-time into output matrix block

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/42906ba1/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java
index 20fcd0b..465fdd5 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java
@@ -27,28 +27,27 @@ import java.io.ObjectInput;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutput;
 import java.io.ObjectOutputStream;
-import java.lang.reflect.Array;
 import java.lang.reflect.Constructor;
 import java.util.ArrayList;
 
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.caching.CacheBlock;
+import org.apache.sysml.runtime.controlprogram.caching.CacheBlockFactory;
 import org.apache.sysml.runtime.matrix.data.Pair;
 import org.apache.sysml.runtime.util.FastBufferedDataInputStream;
 import org.apache.sysml.runtime.util.FastBufferedDataOutputStream;
 import org.apache.sysml.runtime.util.IndexRange;
 
 /**
- * This class is for partitioned matrix/frame blocks, to be used
- * as broadcasts. Distributed tasks require block-partitioned broadcasts but a lazy partitioning per
- * task would create instance-local copies and hence replicate broadcast variables which are shared
- * by all tasks within an executor.  
+ * This class is for partitioned matrix/frame blocks, to be used as broadcasts. 
+ * Distributed tasks require block-partitioned broadcasts but a lazy partitioning 
+ * per task would create instance-local copies and hence replicate broadcast 
+ * variables which are shared by all tasks within an executor.  
  * 
  */
 public class PartitionedBlock<T extends CacheBlock> implements Externalizable
 {
-
-	protected T[] _partBlocks = null; 
+	protected CacheBlock[] _partBlocks = null; 
 	protected long _rlen = -1;
 	protected long _clen = -1;
 	protected int _brlen = -1;
@@ -60,40 +59,6 @@ public class PartitionedBlock<T extends CacheBlock> implements Externalizable
 	}
 	
 	
-	public long getNumRows() {
-		return _rlen;
-	}
-	
-	public long getNumCols() {
-		return _clen;
-	}
-	
-	public long getNumRowsPerBlock() {
-		return _brlen;
-	}
-	
-	public long getNumColumnsPerBlock() {
-		return _bclen;
-	}
-	
-	/**
-	 * 
-	 * @return
-	 */
-	public int getNumRowBlocks() 
-	{
-		return (int)Math.ceil((double)_rlen/_brlen);
-	}
-	
-	/**
-	 * 
-	 * @return
-	 */
-	public int getNumColumnBlocks() 
-	{
-		return (int)Math.ceil((double)_clen/_bclen);
-	}
-	
 	@SuppressWarnings("unchecked")
 	public PartitionedBlock(T block, int brlen, int bclen) 
 	{
@@ -106,17 +71,16 @@ public class PartitionedBlock<T extends CacheBlock> implements Externalizable
 		_clen = clen;
 		_brlen = brlen;
 		_bclen = bclen;
-
 		int nrblks = getNumRowBlocks();
 		int ncblks = getNumColumnBlocks();
+		int code = CacheBlockFactory.getCode(block);
 		
 		try
 		{
-			_partBlocks = (T[])Array.newInstance((block.getClass()), nrblks * ncblks);
+			_partBlocks = new CacheBlock[nrblks * ncblks];
 			for( int i=0, ix=0; i<nrblks; i++ )
-				for( int j=0; j<ncblks; j++, ix++ )
-				{
-					T tmp = (T) block.getClass().newInstance();
+				for( int j=0; j<ncblks; j++, ix++ ) {
+					T tmp = (T) CacheBlockFactory.newInstance(code);
 					block.sliceOperations(i*_brlen, Math.min((i+1)*_brlen, rlen)-1, 
 							           j*_bclen, Math.min((j+1)*_bclen, clen)-1, tmp);
 					_partBlocks[ix] = tmp;
@@ -129,8 +93,7 @@ public class PartitionedBlock<T extends CacheBlock> implements Externalizable
 		_offset = 0;
 	}
 
-	@SuppressWarnings("unchecked")
-	public PartitionedBlock(int rlen, int clen, int brlen, int bclen, T block) 
+	public PartitionedBlock(int rlen, int clen, int brlen, int bclen) 
 	{
 		//partitioning input broadcast
 		_rlen = rlen;
@@ -140,8 +103,60 @@ public class PartitionedBlock<T extends CacheBlock> implements Externalizable
 		
 		int nrblks = getNumRowBlocks();
 		int ncblks = getNumColumnBlocks();
-		_partBlocks = (T[])Array.newInstance((block.getClass()), nrblks * ncblks);
-
+		_partBlocks = new CacheBlock[nrblks * ncblks];
+	}
+	
+	
+	/**
+	 * 
+	 * @param offset
+	 * @param numBlks
+	 * @return
+	 */
+	public PartitionedBlock<T> createPartition( int offset, int numBlks, T block )
+	{
+		PartitionedBlock<T> ret = new PartitionedBlock<T>();
+		ret._rlen = _rlen;
+		ret._clen = _clen;
+		ret._brlen = _brlen;
+		ret._bclen = _bclen;
+		ret._partBlocks = new CacheBlock[numBlks];
+		ret._offset = offset;
+		System.arraycopy(_partBlocks, offset, ret._partBlocks, 0, numBlks);
+		
+		return ret;
+	}
+	
+	public long getNumRows() {
+		return _rlen;
+	}
+	
+	public long getNumCols() {
+		return _clen;
+	}
+	
+	public long getNumRowsPerBlock() {
+		return _brlen;
+	}
+	
+	public long getNumColumnsPerBlock() {
+		return _bclen;
+	}
+	
+	/**
+	 * 
+	 * @return
+	 */
+	public int getNumRowBlocks() {
+		return (int)Math.ceil((double)_rlen/_brlen);
+	}
+	
+	/**
+	 * 
+	 * @return
+	 */
+	public int getNumColumnBlocks() {
+		return (int)Math.ceil((double)_clen/_bclen);
 	}
 	
 	/**
@@ -151,6 +166,7 @@ public class PartitionedBlock<T extends CacheBlock> implements Externalizable
 	 * @return
 	 * @throws DMLRuntimeException 
 	 */
+	@SuppressWarnings("unchecked")
 	public T getBlock(int rowIndex, int colIndex) 
 		throws DMLRuntimeException 
 	{
@@ -165,7 +181,7 @@ public class PartitionedBlock<T extends CacheBlock> implements Externalizable
 		int rix = rowIndex - 1;
 		int cix = colIndex - 1;
 		int ix = rix*ncblks+cix - _offset;
-		return _partBlocks[ix];
+		return (T)_partBlocks[ix];
 	}
 	
 	/**
@@ -189,43 +205,19 @@ public class PartitionedBlock<T extends CacheBlock> implements Externalizable
 		int rix = rowIndex - 1;
 		int cix = colIndex - 1;
 		int ix = rix*ncblks+cix - _offset;
-		_partBlocks[ ix ] = block;
-		
-	}
-	
-	/**
-	 * 
-	 * @param offset
-	 * @param numBlks
-	 * @return
-	 */
-	@SuppressWarnings("unchecked")
-	public PartitionedBlock<T> createPartition( int offset, int numBlks, T block )
-	{
-		PartitionedBlock<T> ret = new PartitionedBlock<T>();
-		ret._rlen = _rlen;
-		ret._clen = _clen;
-		ret._brlen = _brlen;
-		ret._bclen = _bclen;
-
-		_partBlocks = (T[])Array.newInstance(block.getClass(), numBlks);
-		ret._offset = offset;
-		System.arraycopy(_partBlocks, offset, ret._partBlocks, 0, numBlks);
-		
-		return ret;
+		_partBlocks[ ix ] = block;	
 	}
 
 	/**
 	 * 
 	 * @return
 	 */	
-	public long getInMemorySize()
-	{
+	public long getInMemorySize() {
 		long ret = 24; //header
 		ret += 32;    //block array
 		
 		if( _partBlocks != null )
-			for( T block : _partBlocks )
+			for( CacheBlock block : _partBlocks )
 				ret += block.getInMemorySize();
 		
 		return ret;
@@ -236,12 +228,11 @@ public class PartitionedBlock<T extends CacheBlock> implements Externalizable
 	 * @return
 	 */
 	
-	public long getExactSerializedSize()
-	{
+	public long getExactSerializedSize() {
 		long ret = 24; //header
 		
 		if( _partBlocks != null )
-			for( T block :  _partBlocks )
+			for( CacheBlock block : _partBlocks )
 				ret += block.getExactSerializedSize();
 		
 		return ret;
@@ -361,8 +352,9 @@ public class PartitionedBlock<T extends CacheBlock> implements Externalizable
 		dos.writeInt(_bclen);
 		dos.writeInt(_offset);
 		dos.writeInt(_partBlocks.length);
+		dos.writeByte(CacheBlockFactory.getCode(_partBlocks[0]));
 		
-		for( T block : _partBlocks )
+		for( CacheBlock block : _partBlocks )
 			block.write(dos);
 	}
 
@@ -371,29 +363,21 @@ public class PartitionedBlock<T extends CacheBlock> implements Externalizable
 	 * @param din
 	 * @throws IOException 
 	 */
-	@SuppressWarnings("unchecked")
 	private void readHeaderAndPayload(DataInput dis) 
 		throws IOException
 	{
-		_rlen = dis.readInt();
-		_clen = dis.readInt();
+		_rlen = dis.readLong();
+		_clen = dis.readLong();
 		_brlen = dis.readInt();
 		_bclen = dis.readInt();
-		_offset = dis.readInt();
-		
+		_offset = dis.readInt();		
 		int len = dis.readInt();
+		int code = dis.readByte();
 		
-		try
-		{
-			_partBlocks = (T[])Array.newInstance(getClass(), len);
-			for( int i=0; i<len; i++ ) {
-				_partBlocks[i].readFields(dis);
-			}
+		_partBlocks = new CacheBlock[len];
+		for( int i=0; i<len; i++ ) {
+			_partBlocks[i] = CacheBlockFactory.newInstance(code);
+			_partBlocks[i].readFields(dis);
 		}
-		catch(Exception ex) {
-			throw new RuntimeException("Failed partitioning of broadcast variable input.", ex);
-		}
-		
 	}
-	
 }


[3/3] incubator-systemml git commit: [SYSTEMML-821] Multi-threaded matrix block compression, tests

Posted by mb...@apache.org.
[SYSTEMML-821] Multi-threaded matrix block compression, tests

This patch enables multi-threaded matrix block compression in the CP
compression instruction. In detail, this is realized via (1)
multi-threaded transpose, (2) multi-threaded column classification, and
(3) multi-threaded column compression. Down the road we will also add
multi-threaded column co-coding but this requires some major refactoring
first. On the sparse Imagenet dataset (1262102x900) the individual
performance improvements were as follows (compression phases 1-4)
leading to an overall improvement of 4.5x:

(0) baseline: 22.6s, 4.6s, 26.9s, 0.3s,
(1) transpose: 7.2s, 4.6s, 26.9s, 0.3s,
(2) classify: 4.0s, 4.6s, 26.9s, 0.3s,
(3) grouping: 3.8s, 4.7s, 3.4s, 0.3s.

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/873bae76
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/873bae76
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/873bae76

Branch: refs/heads/master
Commit: 873bae76bad9267b2c710404f7a08707ca76ca18
Parents: 44c8f9d
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Thu Jul 28 02:01:07 2016 -0700
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Thu Jul 28 02:01:07 2016 -0700

----------------------------------------------------------------------
 .../runtime/compress/CompressedMatrixBlock.java | 350 ++++++++++++++-----
 .../cp/CompressionCPInstruction.java            |   3 +-
 .../runtime/matrix/data/LibMatrixReorg.java     |   2 +-
 .../functions/compress/ParCompressionTest.java  | 169 +++++++++
 .../functions/compress/ZPackageSuite.java       |   1 +
 5 files changed, 428 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/873bae76/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
index 81d933d..f2ccb43 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
@@ -193,8 +193,22 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 	 * which should be fixed if we move ahead with this compression strategy.
 	 * 
 	 * +per column sparsity
+	 * 
+	 * @throws DMLRuntimeException
 	 */
 	public void compress() 
+		throws DMLRuntimeException
+	{
+		//default sequential execution
+		compress(1);
+	}
+	
+	/**
+	 * 
+	 * @param k  number of threads
+	 * @throws DMLRuntimeException
+	 */
+	public void compress(int k) 
 		throws DMLRuntimeException 
 	{
 		//check for redundant compression
@@ -216,11 +230,8 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 		final int numRows = getNumRows();
 		final int numCols = getNumColumns();
 		final boolean sparse = isInSparseFormat();
-		MatrixBlock rawblock = this;
-		if( TRANSPOSE_INPUT )
-			rawblock = LibMatrixReorg.transpose(rawblock, new MatrixBlock(numCols, numRows, sparse));
-		else
-			rawblock = new MatrixBlock(this);
+		MatrixBlock rawblock = !TRANSPOSE_INPUT ? new MatrixBlock(this) :
+			LibMatrixReorg.transpose(this, new MatrixBlock(numCols, numRows, sparse), k);
 		
 		//construct sample-based size estimator
 		CompressedSizeEstimator bitmapSizeEstimator = 
@@ -234,18 +245,12 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 
 		// We start with a full set of columns.
 		HashSet<Integer> remainingCols = new HashSet<Integer>();
-		for (int i = 0; i < numCols; i++) {
+		for (int i = 0; i < numCols; i++)
 			remainingCols.add(i);
-		}
 
 		// PHASE 1: Classify columns by compression type
-		// We start by determining which columns are amenable to bitmap
-		// compression
-
-		// It is correct to use the dense size as the uncompressed size
-		// FIXME not numRows but nnz / col otherwise too aggressive overestimation
-		// of uncompressed size and hence overestimation of compression potential
-		double uncompressedColumnSize = 8 * numRows;
+		// We start by determining which columns are amenable to bitmap compression
+		double uncompressedColumnSize = getUncompressedSize(numRows, 1);
 
 		// information about the bitmap amenable columns
 		List<Integer> bitmapCols = new ArrayList<Integer>();
@@ -256,11 +261,12 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 		
 		// Minimum ratio (size of uncompressed / size of compressed) that we
 		// will accept when encoding a field with a bitmap.
+		CompressedSizeInfo[] sizeInfos = (k > 1) ?
+				computeCompressedSizeInfos(bitmapSizeEstimator, numCols, k) : 
+				computeCompressedSizeInfos(bitmapSizeEstimator, numCols);		
 		for (int col = 0; col < numCols; col++) 
-		{
-			CompressedSizeInfo compressedSizeInfo = bitmapSizeEstimator
-					.estimateCompressedColGroupSize(new int[] { col });
-			long compressedSize = compressedSizeInfo.getMinSize();
+		{	
+			long compressedSize = sizeInfos[col].getMinSize();
 			double compRatio = uncompressedColumnSize / compressedSize;
 			
 			//FIXME: compression ratio should be checked against 1 instead of min compression
@@ -269,7 +275,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 			if (compRatio >= MIN_COMPRESSION_RATIO) {
 				bitmapCols.add(col);
 				compressionRatios.put(col, compRatio);
-				colsCardinalities.add(compressedSizeInfo.getEstCarinality());
+				colsCardinalities.add(sizeInfos[col].getEstCarinality());
 				compressedSizes.add(compressedSize);
 			}
 			else
@@ -313,77 +319,15 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 		}
 		
 		// PHASE 3: Compress and correct sample-based decisions
-		
-		for (int[] groupIndices : bitmapColGrps) 
-		{
-			int[] allGroupIndices = null;
-			int allColsCount = groupIndices.length;
-			CompressedSizeInfo bitmapSizeInfo;
-			// The compression type is decided based on a full bitmap since it
-			// will be reused for the actual compression step.
-			UncompressedBitmap ubm;
-			PriorityQueue<CompressedColumn> compRatioPQ = null;
-			boolean skipGroup = false;
-			while (true) 
-			{
-				ubm = BitmapEncoder.extractBitmap(groupIndices, rawblock); 
-				bitmapSizeInfo = bitmapSizeEstimator
-						.estimateCompressedColGroupSize(ubm);
-				double compRatio = uncompressedColumnSize * groupIndices.length
-						/ bitmapSizeInfo.getMinSize();
-				if (compRatio >= MIN_COMPRESSION_RATIO) {
-					// we have a good group
-					for( Integer col : groupIndices )
-						remainingCols.remove(col);
-					break;
-				} else {
-					// modify the group
-					if (compRatioPQ == null) {
-						// first modification
-						allGroupIndices = Arrays.copyOf(groupIndices, groupIndices.length);
-						compRatioPQ = new PriorityQueue<CompressedMatrixBlock.CompressedColumn>();
-						for (int i = 0; i < groupIndices.length; i++)
-							compRatioPQ.add(new CompressedColumn(i,
-									compressionRatios.get(groupIndices[i])));
-					}
-
-					// index in allGroupIndices
-					int removeIx = compRatioPQ.poll().colIx;
-					allGroupIndices[removeIx] = -1;
-					allColsCount--;
-					if (allColsCount == 0) {
-						skipGroup = true;
-						break;
-					}
-					groupIndices = new int[allColsCount];
-					// copying the values that do not equal -1
-					int ix = 0;
-					for (int col : allGroupIndices) {
-						if (col != -1) {
-							groupIndices[ix++] = col;
-						}
-					}
-
-				}
+		ColGroup[] colGroups = (k > 1) ?
+				compressColGroups(rawblock, bitmapSizeEstimator, compressionRatios, numRows, bitmapColGrps, k) : 
+				compressColGroups(rawblock, bitmapSizeEstimator, compressionRatios, numRows, bitmapColGrps); 	
+		for( int j=0; j<colGroups.length; j++ ) {
+			if( colGroups[j] != null ) {
+				for( int col : colGroups[j].getColIndices() )
+					remainingCols.remove(col);
+				_colGroups.add(colGroups[j]);
 			}
-
-			if (skipGroup)
-				continue;
-			long rleNumBytes = bitmapSizeInfo.getRLESize();
-			long offsetNumBytes = bitmapSizeInfo.getOLESize();
-			double rleRatio = (double) offsetNumBytes / (double) rleNumBytes;
-
-			if (rleRatio > MIN_RLE_RATIO) {
-				ColGroupRLE compressedGroup = new ColGroupRLE(groupIndices,
-						numRows, ubm);
-				_colGroups.add(compressedGroup);
-			} 
-			else {
-				ColGroupOLE compressedGroup = new ColGroupOLE(
-						groupIndices, numRows, ubm);
-				_colGroups.add(compressedGroup);
-			}
-
 		}
 		
 		_stats.timePhase3 = time.stop();
@@ -407,6 +351,182 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 			LOG.debug("compression phase 4: "+_stats.timePhase4);
 	}
 
+	public CompressionStatistics getCompressionStatistics() {
+		return _stats;
+	}
+
+	/**
+	 * 
+	 * @param estim
+	 * @param clen
+	 * @return
+	 */
+	private static CompressedSizeInfo[] computeCompressedSizeInfos(CompressedSizeEstimator estim, int clen) {
+		CompressedSizeInfo[] ret = new CompressedSizeInfo[clen];
+		for( int col=0; col<clen; col++ )
+			ret[col] = estim.estimateCompressedColGroupSize(new int[] { col });
+		return ret;
+	}
+	
+	/**
+	 * 
+	 * @param estim
+	 * @param clen
+	 * @param k
+	 * @return
+	 * @throws DMLRuntimeException
+	 */
+	private static CompressedSizeInfo[] computeCompressedSizeInfos(CompressedSizeEstimator estim, int clen, int k) 
+		throws DMLRuntimeException 
+	{	
+		try {
+			ExecutorService pool = Executors.newFixedThreadPool( k );
+			ArrayList<SizeEstimTask> tasks = new ArrayList<SizeEstimTask>();
+			for( int col=0; col<clen; col++ )
+				tasks.add(new SizeEstimTask(estim, col));
+			List<Future<CompressedSizeInfo>> rtask = pool.invokeAll(tasks);	
+			ArrayList<CompressedSizeInfo> ret = new ArrayList<CompressedSizeInfo>();
+			for( Future<CompressedSizeInfo> lrtask : rtask )
+				ret.add(lrtask.get());
+			pool.shutdown();
+			return ret.toArray(new CompressedSizeInfo[0]);
+		}
+		catch(Exception ex) {
+			throw new DMLRuntimeException(ex);
+		}
+	}
+
+	/**
+	 * 
+	 * @param in
+	 * @param estim
+	 * @param compRatios
+	 * @param rlen
+	 * @param groups
+	 * @return
+	 */
+	private static ColGroup[] compressColGroups(MatrixBlock in, CompressedSizeEstimator estim, HashMap<Integer, Double> compRatios, int rlen, List<int[]> groups)
+	{
+		ColGroup[] ret = new ColGroup[groups.size()];
+		for( int i=0; i<groups.size(); i++ )
+			ret[i] = compressColGroup(in, estim, compRatios, rlen, groups.get(i));
+		
+		return ret;
+	}
+	
+	/**
+	 * 
+	 * @param in
+	 * @param estim
+	 * @param compRatios
+	 * @param rlen
+	 * @param groups
+	 * @param k
+	 * @return
+	 * @throws DMLRuntimeException 
+	 */
+	private static ColGroup[] compressColGroups(MatrixBlock in, CompressedSizeEstimator estim, HashMap<Integer, Double> compRatios, int rlen, List<int[]> groups, int k) 
+		throws DMLRuntimeException
+	{
+		try {
+			ExecutorService pool = Executors.newFixedThreadPool( k );
+			ArrayList<CompressTask> tasks = new ArrayList<CompressTask>();
+			for( int[] colIndexes : groups )
+				tasks.add(new CompressTask(in, estim, compRatios, rlen, colIndexes));
+			List<Future<ColGroup>> rtask = pool.invokeAll(tasks);	
+			ArrayList<ColGroup> ret = new ArrayList<ColGroup>();
+			for( Future<ColGroup> lrtask : rtask )
+				ret.add(lrtask.get());
+			pool.shutdown();
+			return ret.toArray(new ColGroup[0]);
+		}
+		catch(Exception ex) {
+			throw new DMLRuntimeException(ex);
+		}
+	}
+	
+	/**
+	 * 
+	 * @param in
+	 * @param estim
+	 * @param compRatios
+	 * @param rlen
+	 * @param colIndexes
+	 * @return
+	 */
+	private static ColGroup compressColGroup(MatrixBlock in, CompressedSizeEstimator estim, HashMap<Integer, Double> compRatios, int rlen, int[] colIndexes) 
+	{
+		int[] allGroupIndices = null;
+		int allColsCount = colIndexes.length;
+		CompressedSizeInfo sizeInfo;
+		// The compression type is decided based on a full bitmap since it
+		// will be reused for the actual compression step.
+		UncompressedBitmap ubm = null;
+		PriorityQueue<CompressedColumn> compRatioPQ = null;
+		boolean skipGroup = false;
+		while (true) 
+		{
+			//exact big list and observe compression ratio
+			ubm = BitmapEncoder.extractBitmap(colIndexes, in); 
+			sizeInfo = estim.estimateCompressedColGroupSize(ubm);		
+			double compRatio = getUncompressedSize(rlen, colIndexes.length) / sizeInfo.getMinSize();
+			
+			if (compRatio >= MIN_COMPRESSION_RATIO) {
+				break; // we have a good group
+			} 
+			
+			// modify the group
+			if (compRatioPQ == null) {
+				// first modification
+				allGroupIndices = Arrays.copyOf(colIndexes, colIndexes.length);
+				compRatioPQ = new PriorityQueue<CompressedMatrixBlock.CompressedColumn>();
+				for (int i = 0; i < colIndexes.length; i++)
+					compRatioPQ.add(new CompressedColumn(i, compRatios.get(colIndexes[i])));
+			}
+
+			// index in allGroupIndices
+			int removeIx = compRatioPQ.poll().colIx;
+			allGroupIndices[removeIx] = -1;
+			allColsCount--;
+			if (allColsCount == 0) {
+				skipGroup = true;
+				break;
+			}
+			colIndexes = new int[allColsCount];
+			// copying the values that do not equal -1
+			int ix = 0;
+			for (int col : allGroupIndices)
+				if (col != -1)
+					colIndexes[ix++] = col;
+		}
+
+		//add group to uncompressed fallback
+		if( skipGroup )
+			return null;
+
+		//create compressed column group
+		long rleNumBytes = sizeInfo.getRLESize();
+		long offsetNumBytes = sizeInfo.getOLESize();
+		double rleRatio = (double) offsetNumBytes / (double) rleNumBytes;
+		if (rleRatio > MIN_RLE_RATIO)
+			return new ColGroupRLE(colIndexes, rlen, ubm);
+		else
+			return new ColGroupOLE(colIndexes, rlen, ubm);
+	}
+	
+	/**
+	 * 
+	 * @param rlen
+	 * @param clen
+	 * @return
+	 */
+	private static double getUncompressedSize(int rlen, int clen) {
+		// It is correct to use the dense size as the uncompressed size
+		// FIXME not numRows but nnz / col otherwise too aggressive overestimation
+		// of uncompressed size and hence overestimation of compression potential
+		return 8 * rlen * clen;
+	}
+
 	/**
 	 * @return a new uncompressed matrix block containing the contents of this
 	 *         block
@@ -439,11 +559,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 
 		return ret;
 	}
-
-	public CompressionStatistics getCompressionStatistics(){
-		return _stats;
-	}
-
+	
 	/**
 	 * 
 	 * @return an upper bound on the memory used to store this compressed block
@@ -463,7 +579,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 		return total;
 	}
 
-	private class CompressedColumn implements Comparable<CompressedColumn> {
+	private static class CompressedColumn implements Comparable<CompressedColumn> {
 		int colIx;
 		double compRatio;
 
@@ -1372,6 +1488,50 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
 		}
 	}
 	
+	/**
+	 * 
+	 */
+	private static class SizeEstimTask implements Callable<CompressedSizeInfo> 
+	{
+		private CompressedSizeEstimator _estim = null;
+		private int _col = -1;
+		
+		protected SizeEstimTask( CompressedSizeEstimator estim, int col )  {
+			_estim = estim;
+			_col = col;
+		}
+		
+		@Override
+		public CompressedSizeInfo call() throws DMLRuntimeException {
+			return _estim.estimateCompressedColGroupSize(new int[] { _col });
+		}
+	}
+	
+	/**
+	 *
+	 */
+	private static class CompressTask implements Callable<ColGroup> 
+	{
+		private MatrixBlock _in = null;
+		private CompressedSizeEstimator _estim = null;
+		private HashMap<Integer, Double> _compRatios = null;
+		private int _rlen = -1;
+		private int[] _colIndexes = null;
+		
+		protected CompressTask( MatrixBlock in, CompressedSizeEstimator estim, HashMap<Integer, Double> compRatios, int rlen, int[] colIndexes )  {
+			_in = in;
+			_estim = estim;
+			_compRatios = compRatios;
+			_rlen = rlen;
+			_colIndexes = colIndexes;
+		}
+		
+		@Override
+		public ColGroup call() throws DMLRuntimeException {
+			return compressColGroup(_in, _estim, _compRatios, _rlen, _colIndexes);
+		}
+	}
+	
 	//////////////////////////////////////////
 	// Graceful fallback to uncompressed linear algebra
 	

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/873bae76/src/main/java/org/apache/sysml/runtime/instructions/cp/CompressionCPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/cp/CompressionCPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/cp/CompressionCPInstruction.java
index 333bfbb..5230945 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/cp/CompressionCPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/cp/CompressionCPInstruction.java
@@ -19,6 +19,7 @@
 
 package org.apache.sysml.runtime.instructions.cp;
 
+import org.apache.sysml.hops.OptimizerUtils;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.compress.CompressedMatrixBlock;
 import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
@@ -54,7 +55,7 @@ public class CompressionCPInstruction extends UnaryCPInstruction
 		
 		//compress the matrix block
 		CompressedMatrixBlock cmb = new CompressedMatrixBlock(in);
-		cmb.compress();
+		cmb.compress(OptimizerUtils.getConstrainedNumThreads(-1));
 		
 		//set output and release input
 		ec.releaseMatrixInput(input1.getName());

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/873bae76/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
index 59663b5..a2e1252 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
@@ -185,7 +185,7 @@ public class LibMatrixReorg
 		throws DMLRuntimeException
 	{
 		//redirect small or special cases to sequential execution
-		if( in.isEmptyBlock(false) || (in.rlen * in.clen < PAR_NUMCELL_THRESHOLD)
+		if( in.isEmptyBlock(false) || (in.rlen * in.clen < PAR_NUMCELL_THRESHOLD) || k == 1
 			|| (SHALLOW_DENSE_VECTOR_TRANSPOSE && !in.sparse && !out.sparse && (in.rlen==1 || in.clen==1) )
 			|| (in.sparse && !out.sparse && in.rlen==1) || (!in.sparse && out.sparse && in.rlen==1) 
 			|| (!in.sparse && out.sparse) || !out.isThreadSafe())

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/873bae76/src/test/java/org/apache/sysml/test/integration/functions/compress/ParCompressionTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/compress/ParCompressionTest.java b/src/test/java/org/apache/sysml/test/integration/functions/compress/ParCompressionTest.java
new file mode 100644
index 0000000..e0fe847
--- /dev/null
+++ b/src/test/java/org/apache/sysml/test/integration/functions/compress/ParCompressionTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.test.integration.functions.compress;
+
+import org.apache.sysml.runtime.compress.CompressedMatrixBlock;
+import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.util.DataConverter;
+import org.apache.sysml.test.integration.AutomatedTestBase;
+import org.apache.sysml.test.utils.TestUtils;
+import org.junit.Test;
+
+/**
+ * 
+ */
+public class ParCompressionTest extends AutomatedTestBase
+{	
+	private static final int rows = 1023;
+	private static final int cols = 20;
+	private static final double sparsity1 = 0.9;
+	private static final double sparsity2 = 0.1;
+	private static final double sparsity3 = 0.0;
+	
+	public enum SparsityType {
+		DENSE,
+		SPARSE,
+		EMPTY,
+	}
+	
+	public enum ValueType {
+		RAND,
+		RAND_ROUND,
+		CONST,
+	}
+	
+	@Override
+	public void setUp() {
+		
+	}
+	
+	@Test
+	public void testDenseRandDataCompression() {
+		runCompressionTest(SparsityType.DENSE, ValueType.RAND, true);
+	}
+	
+	@Test
+	public void testSparseRandDataCompression() {
+		runCompressionTest(SparsityType.SPARSE, ValueType.RAND, true);
+	}
+	
+	@Test
+	public void testEmptyCompression() {
+		runCompressionTest(SparsityType.EMPTY, ValueType.RAND, true);
+	}
+	
+	@Test
+	public void testDenseRoundRandDataCompression() {
+		runCompressionTest(SparsityType.DENSE, ValueType.RAND_ROUND, true);
+	}
+	
+	@Test
+	public void testSparseRoundRandDataCompression() {
+		runCompressionTest(SparsityType.SPARSE, ValueType.RAND_ROUND, true);
+	}
+	
+	@Test
+	public void testDenseConstantDataCompression() {
+		runCompressionTest(SparsityType.DENSE, ValueType.CONST, true);
+	}
+	
+	@Test
+	public void testSparseConstDataCompression() {
+		runCompressionTest(SparsityType.SPARSE, ValueType.CONST, true);
+	}
+	
+	@Test
+	public void testDenseRandDataNoCompression() {
+		runCompressionTest(SparsityType.DENSE, ValueType.RAND, false);
+	}
+	
+	@Test
+	public void testSparseRandDataNoCompression() {
+		runCompressionTest(SparsityType.SPARSE, ValueType.RAND, false);
+	}
+	
+	@Test
+	public void testEmptyNoCompression() {
+		runCompressionTest(SparsityType.EMPTY, ValueType.RAND, false);
+	}
+	
+	@Test
+	public void testDenseRoundRandDataNoCompression() {
+		runCompressionTest(SparsityType.DENSE, ValueType.RAND_ROUND, false);
+	}
+	
+	@Test
+	public void testSparseRoundRandDataNoCompression() {
+		runCompressionTest(SparsityType.SPARSE, ValueType.RAND_ROUND, false);
+	}
+	
+	@Test
+	public void testDenseConstDataNoCompression() {
+		runCompressionTest(SparsityType.DENSE, ValueType.CONST, false);
+	}
+	
+	@Test
+	public void testSparseConstDataNoCompression() {
+		runCompressionTest(SparsityType.SPARSE, ValueType.CONST, false);
+	}
+	
+
+	/**
+	 * 
+	 * @param mb
+	 */
+	private void runCompressionTest(SparsityType sptype, ValueType vtype, boolean compress)
+	{
+		try
+		{
+			//prepare sparsity for input data
+			double sparsity = -1;
+			switch( sptype ){
+				case DENSE: sparsity = sparsity1; break;
+				case SPARSE: sparsity = sparsity2; break;
+				case EMPTY: sparsity = sparsity3; break;
+			}
+			
+			//generate input data
+			double min = (vtype==ValueType.CONST)? 10 : -10;
+			double[][] input = TestUtils.generateTestMatrix(rows, cols, min, 10, sparsity, 7);
+			if( vtype==ValueType.RAND_ROUND )
+				input = TestUtils.round(input);
+			MatrixBlock mb = DataConverter.convertToMatrixBlock(input);
+			
+			//compress given matrix block
+			CompressedMatrixBlock cmb = new CompressedMatrixBlock(mb);
+			if( compress )
+				cmb.compress(InfrastructureAnalyzer.getLocalParallelism());
+			
+			//decompress the compressed matrix block
+			MatrixBlock tmp = cmb.decompress();
+			
+			//compare result with input
+			double[][] d1 = DataConverter.convertToDoubleMatrix(mb);
+			double[][] d2 = DataConverter.convertToDoubleMatrix(tmp);
+			TestUtils.compareMatrices(d1, d2, rows, cols, 0);
+		}
+		catch(Exception ex) {
+			throw new RuntimeException(ex);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/873bae76/src/test_suites/java/org/apache/sysml/test/integration/functions/compress/ZPackageSuite.java
----------------------------------------------------------------------
diff --git a/src/test_suites/java/org/apache/sysml/test/integration/functions/compress/ZPackageSuite.java b/src/test_suites/java/org/apache/sysml/test/integration/functions/compress/ZPackageSuite.java
index c8dc906..7d19ae9 100644
--- a/src/test_suites/java/org/apache/sysml/test/integration/functions/compress/ZPackageSuite.java
+++ b/src/test_suites/java/org/apache/sysml/test/integration/functions/compress/ZPackageSuite.java
@@ -43,6 +43,7 @@ import org.junit.runners.Suite;
 	LargeMatrixVectorMultTest.class,
 	LargeParMatrixVectorMultTest.class,
 	LargeVectorMatrixMultTest.class,
+	ParCompressionTest.class,
 	ParMatrixMultChainTest.class,
 	ParMatrixVectorMultTest.class,
 	ParTransposeSelfLeftMatrixMultTest.class,


[2/3] incubator-systemml git commit: [SYSTEMML-562][SYSTEMML-413] Basic cleanup of cache block abstraction

Posted by mb...@apache.org.
[SYSTEMML-562][SYSTEMML-413] Basic cleanup of cache block abstraction

Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/44c8f9d4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/44c8f9d4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/44c8f9d4

Branch: refs/heads/master
Commit: 44c8f9d4b1120901f5bf9c14e686189cb17c948b
Parents: 42906ba
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Wed Jul 27 22:52:22 2016 -0700
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Wed Jul 27 22:52:22 2016 -0700

----------------------------------------------------------------------
 .../controlprogram/caching/CacheBlock.java      | 49 ++++++++++++++------
 .../caching/CacheBlockFactory.java              | 18 +++++++
 .../spark/data/PartitionedBlock.java            | 11 ++---
 .../sysml/runtime/matrix/data/FrameBlock.java   | 13 ------
 .../sysml/runtime/matrix/data/MatrixBlock.java  | 11 -----
 .../matrix/data/OperationsOnMatrixValues.java   | 24 ++++++++++
 6 files changed, 81 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/44c8f9d4/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheBlock.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheBlock.java
index f58a7c2..b73584c 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheBlock.java
@@ -19,12 +19,8 @@
 
 package org.apache.sysml.runtime.controlprogram.caching;
 
-import java.util.ArrayList;
-
 import org.apache.hadoop.io.Writable;
 import org.apache.sysml.runtime.DMLRuntimeException;
-import org.apache.sysml.runtime.matrix.data.Pair;
-import org.apache.sysml.runtime.util.IndexRange;
 
 
 /**
@@ -35,6 +31,18 @@ import org.apache.sysml.runtime.util.IndexRange;
 public interface CacheBlock extends Writable 
 {
 	/**
+	 * 
+	 * @return
+	 */
+	public int getNumRows();
+	
+	/**
+	 * 
+	 * @return
+	 */
+	public int getNumColumns();
+	
+	/**
 	 * Get the in-memory size in bytes of the cache block.
 	 * @return
 	 */
@@ -60,18 +68,29 @@ public interface CacheBlock extends Writable
 	 */
 	public void compactEmptyBlock();
 	
-	public int getNumRows();
-	public int getNumColumns();
-	
+	/**
+	 * Slice a sub block out of the current block and write into the given output block.
+	 * This method returns the passed instance if not null.
+	 * 
+	 * @param rl
+	 * @param ru
+	 * @param cl
+	 * @param cu
+	 * @param block
+	 * @return
+	 * @throws DMLRuntimeException
+	 */
 	public CacheBlock sliceOperations(int rl, int ru, int cl, int cu, CacheBlock block) 
-			throws DMLRuntimeException;
+		throws DMLRuntimeException;
 	
+	/**
+	 * Merge the given block into the current block. Both blocks needs to be of equal 
+	 * dimensions and contain disjoint non-zero cells.
+	 * 
+	 * @param that
+	 * @param appendOnly
+	 * @throws DMLRuntimeException
+	 */
 	public void merge(CacheBlock that, boolean appendOnly) 
-			throws DMLRuntimeException;
-	
-	@SuppressWarnings("rawtypes")
-	public 	ArrayList getPairList();
-
-	ArrayList<Pair<?, ?>> performSlice(IndexRange ixrange, int brlen, int bclen, int iix, int jix, CacheBlock in) 
-			throws DMLRuntimeException;
+		throws DMLRuntimeException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/44c8f9d4/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheBlockFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheBlockFactory.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheBlockFactory.java
index 3bf86b5..9595441 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheBlockFactory.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheBlockFactory.java
@@ -19,8 +19,12 @@
 
 package org.apache.sysml.runtime.controlprogram.caching;
 
+import java.util.ArrayList;
+
 import org.apache.sysml.runtime.matrix.data.FrameBlock;
 import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
+import org.apache.sysml.runtime.matrix.data.Pair;
 
 /**
  * Factory to create instances of matrix/frame blocks given
@@ -54,4 +58,18 @@ public class CacheBlockFactory
 			return 1;
 		throw new RuntimeException("Unsupported cache block type: "+block.getClass().getName());
 	}
+	
+	/**
+	 * 
+	 * @param block
+	 * @return
+	 */
+	public static ArrayList<?> getPairList(CacheBlock block) {
+		int code = getCode(block);
+		switch( code ) {
+			case 0: return new ArrayList<Pair<MatrixIndexes,MatrixBlock>>();
+			case 1: return new ArrayList<Pair<Long,FrameBlock>>();
+		}
+		throw new RuntimeException("Unsupported cache block type: "+code);
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/44c8f9d4/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java
index 465fdd5..fbd7a25 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java
@@ -33,6 +33,7 @@ import java.util.ArrayList;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.controlprogram.caching.CacheBlock;
 import org.apache.sysml.runtime.controlprogram.caching.CacheBlockFactory;
+import org.apache.sysml.runtime.matrix.data.OperationsOnMatrixValues;
 import org.apache.sysml.runtime.matrix.data.Pair;
 import org.apache.sysml.runtime.util.FastBufferedDataInputStream;
 import org.apache.sysml.runtime.util.FastBufferedDataOutputStream;
@@ -260,19 +261,17 @@ public class PartitionedBlock<T extends CacheBlock> implements Externalizable
 		int lcl = (int) cl;
 		int lcu = (int) cu;
 		
-		ArrayList<Pair<?, ?>> allBlks = block.getPairList();
+		ArrayList<Pair<?, ?>> allBlks = (ArrayList<Pair<?, ?>>) CacheBlockFactory.getPairList(block);
 		int start_iix = (lrl-1)/_brlen+1;
 		int end_iix = (lru-1)/_brlen+1;
 		int start_jix = (lcl-1)/_bclen+1;
 		int end_jix = (lcu-1)/_bclen+1;
 				
 		for( int iix = start_iix; iix <= end_iix; iix++ )
-			for(int jix = start_jix; jix <= end_jix; jix++)		
-			{
-				T in = getBlock(iix, jix);
+			for(int jix = start_jix; jix <= end_jix; jix++) {
 				IndexRange ixrange = new IndexRange(rl, ru, cl, cu);
-				ArrayList<Pair<?, ?>> outlist = block.performSlice(ixrange, _brlen, _bclen, iix, jix, in);
-				allBlks.addAll(outlist);
+				allBlks.addAll(OperationsOnMatrixValues.performSlice(
+						ixrange, _brlen, _bclen, iix, jix, getBlock(iix, jix)));
 			}
 		
 		if(allBlks.size() == 1) {

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/44c8f9d4/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
index 78fa165..1c8ffd0 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
@@ -1305,17 +1305,4 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable
 			_mvValue = mvVal;
 		}
 	}
-
-	//TODO generalize these methods and remove from frame block
-	
-	@Override
-	public ArrayList getPairList() {
-		return new ArrayList<Pair<Long, FrameBlock>>();
-	}
-
-	public ArrayList<Pair<?, ?>> performSlice(IndexRange ixrange, int brlen, int bclen, int iix, int jix, CacheBlock in) throws DMLRuntimeException
-	{
-		return OperationsOnMatrixValues.performSlice(ixrange, brlen, bclen, iix, jix, (FrameBlock)in);
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/44c8f9d4/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
index 842982d..452eddb 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
@@ -6240,15 +6240,4 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
 		}
 		public SparsityEstimate(){}
 	}
-
-	public ArrayList<Pair<MatrixIndexes, MatrixBlock>> getPairList()
-	{
-		return new ArrayList<Pair<MatrixIndexes,MatrixBlock>>();
-	}
-	
-	@SuppressWarnings("unchecked")
-	public ArrayList<Pair<?, ?>> performSlice(IndexRange ixrange, int brlen, int bclen, int iix, int jix, CacheBlock in) throws DMLRuntimeException
-	{
-		return OperationsOnMatrixValues.performSlice(ixrange, brlen, bclen, iix, jix, (MatrixBlock)in);
-	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/44c8f9d4/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java b/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java
index 219a1e7..a105e6d 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java
@@ -27,6 +27,7 @@ import java.util.List;
 import org.apache.sysml.parser.Expression.ValueType;
 import org.apache.sysml.runtime.DMLRuntimeException;
 import org.apache.sysml.runtime.compress.CompressedMatrixBlock;
+import org.apache.sysml.runtime.controlprogram.caching.CacheBlock;
 import org.apache.sysml.runtime.controlprogram.caching.MatrixObject.UpdateType;
 import org.apache.sysml.runtime.functionobjects.Builtin;
 import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils;
@@ -300,6 +301,29 @@ public class OperationsOnMatrixValues
 		value1.aggregateBinaryOperations(value1, value2, valueOut, op);
 	}
 	
+	/**
+	 * 
+	 * @param ixrange
+	 * @param brlen
+	 * @param bclen
+	 * @param iix
+	 * @param jix
+	 * @param in
+	 * @return
+	 * @throws DMLRuntimeException
+	 */
+	@SuppressWarnings("rawtypes")
+	public static ArrayList performSlice(IndexRange ixrange, int brlen, int bclen, int iix, int jix, CacheBlock in) 
+		throws DMLRuntimeException
+	{
+		if( in instanceof MatrixBlock )
+			return performSlice(ixrange, brlen, bclen, iix, jix, (MatrixBlock)in);
+		else if( in instanceof FrameBlock )
+			return performSlice(ixrange, brlen, bclen, iix, jix, (FrameBlock)in);
+		throw new DMLRuntimeException("Unsupported cache block type: "+in.getClass().getName());
+	}
+	
+	
 	@SuppressWarnings("rawtypes")
 	public static ArrayList performSlice(IndexRange ixrange, int brlen, int bclen, int iix, int jix, MatrixBlock in) 
 			throws DMLRuntimeException