You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/07/28 18:13:17 UTC
[1/3] incubator-systemml git commit: [SYSTEMML-562] Fix
serialization/partitioning of partitioned broadcasts
Repository: incubator-systemml
Updated Branches:
refs/heads/master 97b136601 -> 873bae76b
[SYSTEMML-562] Fix serialization/partitioning of partitioned broadcasts
This patch fixes an issue with incorrect class references on
deserialization as well as various smaller issues related to
incompatible serialization/deserialization (long vs int rlen/clen) and
partitioning (destroyed block references).
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/42906ba1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/42906ba1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/42906ba1
Branch: refs/heads/master
Commit: 42906ba126ed25b3dfee3ab254c9afbb6caf421e
Parents: 97b1366
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Wed Jul 27 20:59:15 2016 -0700
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Wed Jul 27 20:59:15 2016 -0700
----------------------------------------------------------------------
.../caching/CacheBlockFactory.java | 57 ++++++
.../context/SparkExecutionContext.java | 2 +-
.../spark/data/PartitionedBlock.java | 180 +++++++++----------
3 files changed, 140 insertions(+), 99 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/42906ba1/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheBlockFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheBlockFactory.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheBlockFactory.java
new file mode 100644
index 0000000..3bf86b5
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheBlockFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.controlprogram.caching;
+
+import org.apache.sysml.runtime.matrix.data.FrameBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+
+/**
+ * Factory to create instances of matrix/frame blocks given
+ * internal codes.
+ *
+ */
+public class CacheBlockFactory
+{
+ /**
+ *
+ * @param code
+ * @return
+ */
+ public static CacheBlock newInstance(int code) {
+ switch( code ) {
+ case 0: return new MatrixBlock();
+ case 1: return new FrameBlock();
+ }
+ throw new RuntimeException("Unsupported cache block type: "+code);
+ }
+
+ /**
+ *
+ * @param block
+ * @return
+ */
+ public static int getCode(CacheBlock block) {
+ if( block instanceof MatrixBlock )
+ return 0;
+ else if( block instanceof FrameBlock )
+ return 1;
+ throw new RuntimeException("Unsupported cache block type: "+block.getClass().getName());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/42906ba1/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
index 99614f2..0eea221 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
@@ -910,7 +910,7 @@ public class SparkExecutionContext extends ExecutionContext
long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
- PartitionedBlock<MatrixBlock> out = new PartitionedBlock<MatrixBlock>(rlen, clen, brlen, bclen, new MatrixBlock());
+ PartitionedBlock<MatrixBlock> out = new PartitionedBlock<MatrixBlock>(rlen, clen, brlen, bclen);
List<Tuple2<MatrixIndexes,MatrixBlock>> list = rdd.collect();
//copy blocks one-at-a-time into output matrix block
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/42906ba1/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java
index 20fcd0b..465fdd5 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java
@@ -27,28 +27,27 @@ import java.io.ObjectInput;
import java.io.ObjectInputStream;
import java.io.ObjectOutput;
import java.io.ObjectOutputStream;
-import java.lang.reflect.Array;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.controlprogram.caching.CacheBlock;
+import org.apache.sysml.runtime.controlprogram.caching.CacheBlockFactory;
import org.apache.sysml.runtime.matrix.data.Pair;
import org.apache.sysml.runtime.util.FastBufferedDataInputStream;
import org.apache.sysml.runtime.util.FastBufferedDataOutputStream;
import org.apache.sysml.runtime.util.IndexRange;
/**
- * This class is for partitioned matrix/frame blocks, to be used
- * as broadcasts. Distributed tasks require block-partitioned broadcasts but a lazy partitioning per
- * task would create instance-local copies and hence replicate broadcast variables which are shared
- * by all tasks within an executor.
+ * This class is for partitioned matrix/frame blocks, to be used as broadcasts.
+ * Distributed tasks require block-partitioned broadcasts but a lazy partitioning
+ * per task would create instance-local copies and hence replicate broadcast
+ * variables which are shared by all tasks within an executor.
*
*/
public class PartitionedBlock<T extends CacheBlock> implements Externalizable
{
-
- protected T[] _partBlocks = null;
+ protected CacheBlock[] _partBlocks = null;
protected long _rlen = -1;
protected long _clen = -1;
protected int _brlen = -1;
@@ -60,40 +59,6 @@ public class PartitionedBlock<T extends CacheBlock> implements Externalizable
}
- public long getNumRows() {
- return _rlen;
- }
-
- public long getNumCols() {
- return _clen;
- }
-
- public long getNumRowsPerBlock() {
- return _brlen;
- }
-
- public long getNumColumnsPerBlock() {
- return _bclen;
- }
-
- /**
- *
- * @return
- */
- public int getNumRowBlocks()
- {
- return (int)Math.ceil((double)_rlen/_brlen);
- }
-
- /**
- *
- * @return
- */
- public int getNumColumnBlocks()
- {
- return (int)Math.ceil((double)_clen/_bclen);
- }
-
@SuppressWarnings("unchecked")
public PartitionedBlock(T block, int brlen, int bclen)
{
@@ -106,17 +71,16 @@ public class PartitionedBlock<T extends CacheBlock> implements Externalizable
_clen = clen;
_brlen = brlen;
_bclen = bclen;
-
int nrblks = getNumRowBlocks();
int ncblks = getNumColumnBlocks();
+ int code = CacheBlockFactory.getCode(block);
try
{
- _partBlocks = (T[])Array.newInstance((block.getClass()), nrblks * ncblks);
+ _partBlocks = new CacheBlock[nrblks * ncblks];
for( int i=0, ix=0; i<nrblks; i++ )
- for( int j=0; j<ncblks; j++, ix++ )
- {
- T tmp = (T) block.getClass().newInstance();
+ for( int j=0; j<ncblks; j++, ix++ ) {
+ T tmp = (T) CacheBlockFactory.newInstance(code);
block.sliceOperations(i*_brlen, Math.min((i+1)*_brlen, rlen)-1,
j*_bclen, Math.min((j+1)*_bclen, clen)-1, tmp);
_partBlocks[ix] = tmp;
@@ -129,8 +93,7 @@ public class PartitionedBlock<T extends CacheBlock> implements Externalizable
_offset = 0;
}
- @SuppressWarnings("unchecked")
- public PartitionedBlock(int rlen, int clen, int brlen, int bclen, T block)
+ public PartitionedBlock(int rlen, int clen, int brlen, int bclen)
{
//partitioning input broadcast
_rlen = rlen;
@@ -140,8 +103,60 @@ public class PartitionedBlock<T extends CacheBlock> implements Externalizable
int nrblks = getNumRowBlocks();
int ncblks = getNumColumnBlocks();
- _partBlocks = (T[])Array.newInstance((block.getClass()), nrblks * ncblks);
-
+ _partBlocks = new CacheBlock[nrblks * ncblks];
+ }
+
+
+ /**
+ *
+ * @param offset
+ * @param numBlks
+ * @return
+ */
+ public PartitionedBlock<T> createPartition( int offset, int numBlks, T block )
+ {
+ PartitionedBlock<T> ret = new PartitionedBlock<T>();
+ ret._rlen = _rlen;
+ ret._clen = _clen;
+ ret._brlen = _brlen;
+ ret._bclen = _bclen;
+ ret._partBlocks = new CacheBlock[numBlks];
+ ret._offset = offset;
+ System.arraycopy(_partBlocks, offset, ret._partBlocks, 0, numBlks);
+
+ return ret;
+ }
+
+ public long getNumRows() {
+ return _rlen;
+ }
+
+ public long getNumCols() {
+ return _clen;
+ }
+
+ public long getNumRowsPerBlock() {
+ return _brlen;
+ }
+
+ public long getNumColumnsPerBlock() {
+ return _bclen;
+ }
+
+ /**
+ *
+ * @return
+ */
+ public int getNumRowBlocks() {
+ return (int)Math.ceil((double)_rlen/_brlen);
+ }
+
+ /**
+ *
+ * @return
+ */
+ public int getNumColumnBlocks() {
+ return (int)Math.ceil((double)_clen/_bclen);
}
/**
@@ -151,6 +166,7 @@ public class PartitionedBlock<T extends CacheBlock> implements Externalizable
* @return
* @throws DMLRuntimeException
*/
+ @SuppressWarnings("unchecked")
public T getBlock(int rowIndex, int colIndex)
throws DMLRuntimeException
{
@@ -165,7 +181,7 @@ public class PartitionedBlock<T extends CacheBlock> implements Externalizable
int rix = rowIndex - 1;
int cix = colIndex - 1;
int ix = rix*ncblks+cix - _offset;
- return _partBlocks[ix];
+ return (T)_partBlocks[ix];
}
/**
@@ -189,43 +205,19 @@ public class PartitionedBlock<T extends CacheBlock> implements Externalizable
int rix = rowIndex - 1;
int cix = colIndex - 1;
int ix = rix*ncblks+cix - _offset;
- _partBlocks[ ix ] = block;
-
- }
-
- /**
- *
- * @param offset
- * @param numBlks
- * @return
- */
- @SuppressWarnings("unchecked")
- public PartitionedBlock<T> createPartition( int offset, int numBlks, T block )
- {
- PartitionedBlock<T> ret = new PartitionedBlock<T>();
- ret._rlen = _rlen;
- ret._clen = _clen;
- ret._brlen = _brlen;
- ret._bclen = _bclen;
-
- _partBlocks = (T[])Array.newInstance(block.getClass(), numBlks);
- ret._offset = offset;
- System.arraycopy(_partBlocks, offset, ret._partBlocks, 0, numBlks);
-
- return ret;
+ _partBlocks[ ix ] = block;
}
/**
*
* @return
*/
- public long getInMemorySize()
- {
+ public long getInMemorySize() {
long ret = 24; //header
ret += 32; //block array
if( _partBlocks != null )
- for( T block : _partBlocks )
+ for( CacheBlock block : _partBlocks )
ret += block.getInMemorySize();
return ret;
@@ -236,12 +228,11 @@ public class PartitionedBlock<T extends CacheBlock> implements Externalizable
* @return
*/
- public long getExactSerializedSize()
- {
+ public long getExactSerializedSize() {
long ret = 24; //header
if( _partBlocks != null )
- for( T block : _partBlocks )
+ for( CacheBlock block : _partBlocks )
ret += block.getExactSerializedSize();
return ret;
@@ -361,8 +352,9 @@ public class PartitionedBlock<T extends CacheBlock> implements Externalizable
dos.writeInt(_bclen);
dos.writeInt(_offset);
dos.writeInt(_partBlocks.length);
+ dos.writeByte(CacheBlockFactory.getCode(_partBlocks[0]));
- for( T block : _partBlocks )
+ for( CacheBlock block : _partBlocks )
block.write(dos);
}
@@ -371,29 +363,21 @@ public class PartitionedBlock<T extends CacheBlock> implements Externalizable
* @param din
* @throws IOException
*/
- @SuppressWarnings("unchecked")
private void readHeaderAndPayload(DataInput dis)
throws IOException
{
- _rlen = dis.readInt();
- _clen = dis.readInt();
+ _rlen = dis.readLong();
+ _clen = dis.readLong();
_brlen = dis.readInt();
_bclen = dis.readInt();
- _offset = dis.readInt();
-
+ _offset = dis.readInt();
int len = dis.readInt();
+ int code = dis.readByte();
- try
- {
- _partBlocks = (T[])Array.newInstance(getClass(), len);
- for( int i=0; i<len; i++ ) {
- _partBlocks[i].readFields(dis);
- }
+ _partBlocks = new CacheBlock[len];
+ for( int i=0; i<len; i++ ) {
+ _partBlocks[i] = CacheBlockFactory.newInstance(code);
+ _partBlocks[i].readFields(dis);
}
- catch(Exception ex) {
- throw new RuntimeException("Failed partitioning of broadcast variable input.", ex);
- }
-
}
-
}
[3/3] incubator-systemml git commit: [SYSTEMML-821] Multi-threaded
matrix block compression, tests
Posted by mb...@apache.org.
[SYSTEMML-821] Multi-threaded matrix block compression, tests
This patch enables multi-threaded matrix block compression in the CP
compression instruction. In detail, this is realized via (1)
multi-threaded transpose, (2) multi-threaded column classification, and
(3) multi-threaded column compression. Down the road we will also add
multi-threaded column co-coding but this requires some major refactoring
first. On the sparse Imagenet dataset (1262102x900) the individual
performance improvements were as follows (compression phases 1-4)
leading to an overall improvement of 4.5x:
(0) baseline: 22.6s, 4.6s, 26.9s, 0.3s,
(1) transpose: 7.2s, 4.6s, 26.9s, 0.3s,
(2) classify: 4.0s, 4.6s, 26.9s, 0.3s,
(3) grouping: 3.8s, 4.7s, 3.4s, 0.3s.
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/873bae76
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/873bae76
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/873bae76
Branch: refs/heads/master
Commit: 873bae76bad9267b2c710404f7a08707ca76ca18
Parents: 44c8f9d
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Thu Jul 28 02:01:07 2016 -0700
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Thu Jul 28 02:01:07 2016 -0700
----------------------------------------------------------------------
.../runtime/compress/CompressedMatrixBlock.java | 350 ++++++++++++++-----
.../cp/CompressionCPInstruction.java | 3 +-
.../runtime/matrix/data/LibMatrixReorg.java | 2 +-
.../functions/compress/ParCompressionTest.java | 169 +++++++++
.../functions/compress/ZPackageSuite.java | 1 +
5 files changed, 428 insertions(+), 97 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/873bae76/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
index 81d933d..f2ccb43 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
@@ -193,8 +193,22 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
* which should be fixed if we move ahead with this compression strategy.
*
* +per column sparsity
+ *
+ * @throws DMLRuntimeException
*/
public void compress()
+ throws DMLRuntimeException
+ {
+ //default sequential execution
+ compress(1);
+ }
+
+ /**
+ *
+ * @param k number of threads
+ * @throws DMLRuntimeException
+ */
+ public void compress(int k)
throws DMLRuntimeException
{
//check for redundant compression
@@ -216,11 +230,8 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
final int numRows = getNumRows();
final int numCols = getNumColumns();
final boolean sparse = isInSparseFormat();
- MatrixBlock rawblock = this;
- if( TRANSPOSE_INPUT )
- rawblock = LibMatrixReorg.transpose(rawblock, new MatrixBlock(numCols, numRows, sparse));
- else
- rawblock = new MatrixBlock(this);
+ MatrixBlock rawblock = !TRANSPOSE_INPUT ? new MatrixBlock(this) :
+ LibMatrixReorg.transpose(this, new MatrixBlock(numCols, numRows, sparse), k);
//construct sample-based size estimator
CompressedSizeEstimator bitmapSizeEstimator =
@@ -234,18 +245,12 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
// We start with a full set of columns.
HashSet<Integer> remainingCols = new HashSet<Integer>();
- for (int i = 0; i < numCols; i++) {
+ for (int i = 0; i < numCols; i++)
remainingCols.add(i);
- }
// PHASE 1: Classify columns by compression type
- // We start by determining which columns are amenable to bitmap
- // compression
-
- // It is correct to use the dense size as the uncompressed size
- // FIXME not numRows but nnz / col otherwise too aggressive overestimation
- // of uncompressed size and hence overestimation of compression potential
- double uncompressedColumnSize = 8 * numRows;
+ // We start by determining which columns are amenable to bitmap compression
+ double uncompressedColumnSize = getUncompressedSize(numRows, 1);
// information about the bitmap amenable columns
List<Integer> bitmapCols = new ArrayList<Integer>();
@@ -256,11 +261,12 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
// Minimum ratio (size of uncompressed / size of compressed) that we
// will accept when encoding a field with a bitmap.
+ CompressedSizeInfo[] sizeInfos = (k > 1) ?
+ computeCompressedSizeInfos(bitmapSizeEstimator, numCols, k) :
+ computeCompressedSizeInfos(bitmapSizeEstimator, numCols);
for (int col = 0; col < numCols; col++)
- {
- CompressedSizeInfo compressedSizeInfo = bitmapSizeEstimator
- .estimateCompressedColGroupSize(new int[] { col });
- long compressedSize = compressedSizeInfo.getMinSize();
+ {
+ long compressedSize = sizeInfos[col].getMinSize();
double compRatio = uncompressedColumnSize / compressedSize;
//FIXME: compression ratio should be checked against 1 instead of min compression
@@ -269,7 +275,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
if (compRatio >= MIN_COMPRESSION_RATIO) {
bitmapCols.add(col);
compressionRatios.put(col, compRatio);
- colsCardinalities.add(compressedSizeInfo.getEstCarinality());
+ colsCardinalities.add(sizeInfos[col].getEstCarinality());
compressedSizes.add(compressedSize);
}
else
@@ -313,77 +319,15 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
}
// PHASE 3: Compress and correct sample-based decisions
-
- for (int[] groupIndices : bitmapColGrps)
- {
- int[] allGroupIndices = null;
- int allColsCount = groupIndices.length;
- CompressedSizeInfo bitmapSizeInfo;
- // The compression type is decided based on a full bitmap since it
- // will be reused for the actual compression step.
- UncompressedBitmap ubm;
- PriorityQueue<CompressedColumn> compRatioPQ = null;
- boolean skipGroup = false;
- while (true)
- {
- ubm = BitmapEncoder.extractBitmap(groupIndices, rawblock);
- bitmapSizeInfo = bitmapSizeEstimator
- .estimateCompressedColGroupSize(ubm);
- double compRatio = uncompressedColumnSize * groupIndices.length
- / bitmapSizeInfo.getMinSize();
- if (compRatio >= MIN_COMPRESSION_RATIO) {
- // we have a good group
- for( Integer col : groupIndices )
- remainingCols.remove(col);
- break;
- } else {
- // modify the group
- if (compRatioPQ == null) {
- // first modification
- allGroupIndices = Arrays.copyOf(groupIndices, groupIndices.length);
- compRatioPQ = new PriorityQueue<CompressedMatrixBlock.CompressedColumn>();
- for (int i = 0; i < groupIndices.length; i++)
- compRatioPQ.add(new CompressedColumn(i,
- compressionRatios.get(groupIndices[i])));
- }
-
- // index in allGroupIndices
- int removeIx = compRatioPQ.poll().colIx;
- allGroupIndices[removeIx] = -1;
- allColsCount--;
- if (allColsCount == 0) {
- skipGroup = true;
- break;
- }
- groupIndices = new int[allColsCount];
- // copying the values that do not equal -1
- int ix = 0;
- for (int col : allGroupIndices) {
- if (col != -1) {
- groupIndices[ix++] = col;
- }
- }
-
- }
+ ColGroup[] colGroups = (k > 1) ?
+ compressColGroups(rawblock, bitmapSizeEstimator, compressionRatios, numRows, bitmapColGrps, k) :
+ compressColGroups(rawblock, bitmapSizeEstimator, compressionRatios, numRows, bitmapColGrps);
+ for( int j=0; j<colGroups.length; j++ ) {
+ if( colGroups[j] != null ) {
+ for( int col : colGroups[j].getColIndices() )
+ remainingCols.remove(col);
+ _colGroups.add(colGroups[j]);
}
-
- if (skipGroup)
- continue;
- long rleNumBytes = bitmapSizeInfo.getRLESize();
- long offsetNumBytes = bitmapSizeInfo.getOLESize();
- double rleRatio = (double) offsetNumBytes / (double) rleNumBytes;
-
- if (rleRatio > MIN_RLE_RATIO) {
- ColGroupRLE compressedGroup = new ColGroupRLE(groupIndices,
- numRows, ubm);
- _colGroups.add(compressedGroup);
- }
- else {
- ColGroupOLE compressedGroup = new ColGroupOLE(
- groupIndices, numRows, ubm);
- _colGroups.add(compressedGroup);
- }
-
}
_stats.timePhase3 = time.stop();
@@ -407,6 +351,182 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
LOG.debug("compression phase 4: "+_stats.timePhase4);
}
+ public CompressionStatistics getCompressionStatistics() {
+ return _stats;
+ }
+
+ /**
+ *
+ * @param estim
+ * @param clen
+ * @return
+ */
+ private static CompressedSizeInfo[] computeCompressedSizeInfos(CompressedSizeEstimator estim, int clen) {
+ CompressedSizeInfo[] ret = new CompressedSizeInfo[clen];
+ for( int col=0; col<clen; col++ )
+ ret[col] = estim.estimateCompressedColGroupSize(new int[] { col });
+ return ret;
+ }
+
+ /**
+ *
+ * @param estim
+ * @param clen
+ * @param k
+ * @return
+ * @throws DMLRuntimeException
+ */
+ private static CompressedSizeInfo[] computeCompressedSizeInfos(CompressedSizeEstimator estim, int clen, int k)
+ throws DMLRuntimeException
+ {
+ try {
+ ExecutorService pool = Executors.newFixedThreadPool( k );
+ ArrayList<SizeEstimTask> tasks = new ArrayList<SizeEstimTask>();
+ for( int col=0; col<clen; col++ )
+ tasks.add(new SizeEstimTask(estim, col));
+ List<Future<CompressedSizeInfo>> rtask = pool.invokeAll(tasks);
+ ArrayList<CompressedSizeInfo> ret = new ArrayList<CompressedSizeInfo>();
+ for( Future<CompressedSizeInfo> lrtask : rtask )
+ ret.add(lrtask.get());
+ pool.shutdown();
+ return ret.toArray(new CompressedSizeInfo[0]);
+ }
+ catch(Exception ex) {
+ throw new DMLRuntimeException(ex);
+ }
+ }
+
+ /**
+ *
+ * @param in
+ * @param estim
+ * @param compRatios
+ * @param rlen
+ * @param groups
+ * @return
+ */
+ private static ColGroup[] compressColGroups(MatrixBlock in, CompressedSizeEstimator estim, HashMap<Integer, Double> compRatios, int rlen, List<int[]> groups)
+ {
+ ColGroup[] ret = new ColGroup[groups.size()];
+ for( int i=0; i<groups.size(); i++ )
+ ret[i] = compressColGroup(in, estim, compRatios, rlen, groups.get(i));
+
+ return ret;
+ }
+
+ /**
+ *
+ * @param in
+ * @param estim
+ * @param compRatios
+ * @param rlen
+ * @param groups
+ * @param k
+ * @return
+ * @throws DMLRuntimeException
+ */
+ private static ColGroup[] compressColGroups(MatrixBlock in, CompressedSizeEstimator estim, HashMap<Integer, Double> compRatios, int rlen, List<int[]> groups, int k)
+ throws DMLRuntimeException
+ {
+ try {
+ ExecutorService pool = Executors.newFixedThreadPool( k );
+ ArrayList<CompressTask> tasks = new ArrayList<CompressTask>();
+ for( int[] colIndexes : groups )
+ tasks.add(new CompressTask(in, estim, compRatios, rlen, colIndexes));
+ List<Future<ColGroup>> rtask = pool.invokeAll(tasks);
+ ArrayList<ColGroup> ret = new ArrayList<ColGroup>();
+ for( Future<ColGroup> lrtask : rtask )
+ ret.add(lrtask.get());
+ pool.shutdown();
+ return ret.toArray(new ColGroup[0]);
+ }
+ catch(Exception ex) {
+ throw new DMLRuntimeException(ex);
+ }
+ }
+
+ /**
+ *
+ * @param in
+ * @param estim
+ * @param compRatios
+ * @param rlen
+ * @param colIndexes
+ * @return
+ */
+ private static ColGroup compressColGroup(MatrixBlock in, CompressedSizeEstimator estim, HashMap<Integer, Double> compRatios, int rlen, int[] colIndexes)
+ {
+ int[] allGroupIndices = null;
+ int allColsCount = colIndexes.length;
+ CompressedSizeInfo sizeInfo;
+ // The compression type is decided based on a full bitmap since it
+ // will be reused for the actual compression step.
+ UncompressedBitmap ubm = null;
+ PriorityQueue<CompressedColumn> compRatioPQ = null;
+ boolean skipGroup = false;
+ while (true)
+ {
+ //exact big list and observe compression ratio
+ ubm = BitmapEncoder.extractBitmap(colIndexes, in);
+ sizeInfo = estim.estimateCompressedColGroupSize(ubm);
+ double compRatio = getUncompressedSize(rlen, colIndexes.length) / sizeInfo.getMinSize();
+
+ if (compRatio >= MIN_COMPRESSION_RATIO) {
+ break; // we have a good group
+ }
+
+ // modify the group
+ if (compRatioPQ == null) {
+ // first modification
+ allGroupIndices = Arrays.copyOf(colIndexes, colIndexes.length);
+ compRatioPQ = new PriorityQueue<CompressedMatrixBlock.CompressedColumn>();
+ for (int i = 0; i < colIndexes.length; i++)
+ compRatioPQ.add(new CompressedColumn(i, compRatios.get(colIndexes[i])));
+ }
+
+ // index in allGroupIndices
+ int removeIx = compRatioPQ.poll().colIx;
+ allGroupIndices[removeIx] = -1;
+ allColsCount--;
+ if (allColsCount == 0) {
+ skipGroup = true;
+ break;
+ }
+ colIndexes = new int[allColsCount];
+ // copying the values that do not equal -1
+ int ix = 0;
+ for (int col : allGroupIndices)
+ if (col != -1)
+ colIndexes[ix++] = col;
+ }
+
+ //add group to uncompressed fallback
+ if( skipGroup )
+ return null;
+
+ //create compressed column group
+ long rleNumBytes = sizeInfo.getRLESize();
+ long offsetNumBytes = sizeInfo.getOLESize();
+ double rleRatio = (double) offsetNumBytes / (double) rleNumBytes;
+ if (rleRatio > MIN_RLE_RATIO)
+ return new ColGroupRLE(colIndexes, rlen, ubm);
+ else
+ return new ColGroupOLE(colIndexes, rlen, ubm);
+ }
+
+ /**
+ *
+ * @param rlen
+ * @param clen
+ * @return
+ */
+ private static double getUncompressedSize(int rlen, int clen) {
+ // It is correct to use the dense size as the uncompressed size
+ // FIXME not numRows but nnz / col otherwise too aggressive overestimation
+ // of uncompressed size and hence overestimation of compression potential
+ return 8 * rlen * clen;
+ }
+
/**
* @return a new uncompressed matrix block containing the contents of this
* block
@@ -439,11 +559,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
return ret;
}
-
- public CompressionStatistics getCompressionStatistics(){
- return _stats;
- }
-
+
/**
*
* @return an upper bound on the memory used to store this compressed block
@@ -463,7 +579,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
return total;
}
- private class CompressedColumn implements Comparable<CompressedColumn> {
+ private static class CompressedColumn implements Comparable<CompressedColumn> {
int colIx;
double compRatio;
@@ -1372,6 +1488,50 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
}
}
+ /**
+ *
+ */
+ private static class SizeEstimTask implements Callable<CompressedSizeInfo>
+ {
+ private CompressedSizeEstimator _estim = null;
+ private int _col = -1;
+
+ protected SizeEstimTask( CompressedSizeEstimator estim, int col ) {
+ _estim = estim;
+ _col = col;
+ }
+
+ @Override
+ public CompressedSizeInfo call() throws DMLRuntimeException {
+ return _estim.estimateCompressedColGroupSize(new int[] { _col });
+ }
+ }
+
+ /**
+ *
+ */
+ private static class CompressTask implements Callable<ColGroup>
+ {
+ private MatrixBlock _in = null;
+ private CompressedSizeEstimator _estim = null;
+ private HashMap<Integer, Double> _compRatios = null;
+ private int _rlen = -1;
+ private int[] _colIndexes = null;
+
+ protected CompressTask( MatrixBlock in, CompressedSizeEstimator estim, HashMap<Integer, Double> compRatios, int rlen, int[] colIndexes ) {
+ _in = in;
+ _estim = estim;
+ _compRatios = compRatios;
+ _rlen = rlen;
+ _colIndexes = colIndexes;
+ }
+
+ @Override
+ public ColGroup call() throws DMLRuntimeException {
+ return compressColGroup(_in, _estim, _compRatios, _rlen, _colIndexes);
+ }
+ }
+
//////////////////////////////////////////
// Graceful fallback to uncompressed linear algebra
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/873bae76/src/main/java/org/apache/sysml/runtime/instructions/cp/CompressionCPInstruction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/cp/CompressionCPInstruction.java b/src/main/java/org/apache/sysml/runtime/instructions/cp/CompressionCPInstruction.java
index 333bfbb..5230945 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/cp/CompressionCPInstruction.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/cp/CompressionCPInstruction.java
@@ -19,6 +19,7 @@
package org.apache.sysml.runtime.instructions.cp;
+import org.apache.sysml.hops.OptimizerUtils;
import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.compress.CompressedMatrixBlock;
import org.apache.sysml.runtime.controlprogram.context.ExecutionContext;
@@ -54,7 +55,7 @@ public class CompressionCPInstruction extends UnaryCPInstruction
//compress the matrix block
CompressedMatrixBlock cmb = new CompressedMatrixBlock(in);
- cmb.compress();
+ cmb.compress(OptimizerUtils.getConstrainedNumThreads(-1));
//set output and release input
ec.releaseMatrixInput(input1.getName());
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/873bae76/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
index 59663b5..a2e1252 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/LibMatrixReorg.java
@@ -185,7 +185,7 @@ public class LibMatrixReorg
throws DMLRuntimeException
{
//redirect small or special cases to sequential execution
- if( in.isEmptyBlock(false) || (in.rlen * in.clen < PAR_NUMCELL_THRESHOLD)
+ if( in.isEmptyBlock(false) || (in.rlen * in.clen < PAR_NUMCELL_THRESHOLD) || k == 1
|| (SHALLOW_DENSE_VECTOR_TRANSPOSE && !in.sparse && !out.sparse && (in.rlen==1 || in.clen==1) )
|| (in.sparse && !out.sparse && in.rlen==1) || (!in.sparse && out.sparse && in.rlen==1)
|| (!in.sparse && out.sparse) || !out.isThreadSafe())
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/873bae76/src/test/java/org/apache/sysml/test/integration/functions/compress/ParCompressionTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/sysml/test/integration/functions/compress/ParCompressionTest.java b/src/test/java/org/apache/sysml/test/integration/functions/compress/ParCompressionTest.java
new file mode 100644
index 0000000..e0fe847
--- /dev/null
+++ b/src/test/java/org/apache/sysml/test/integration/functions/compress/ParCompressionTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.test.integration.functions.compress;
+
+import org.apache.sysml.runtime.compress.CompressedMatrixBlock;
+import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.util.DataConverter;
+import org.apache.sysml.test.integration.AutomatedTestBase;
+import org.apache.sysml.test.utils.TestUtils;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class ParCompressionTest extends AutomatedTestBase
+{
+ private static final int rows = 1023;
+ private static final int cols = 20;
+ private static final double sparsity1 = 0.9;
+ private static final double sparsity2 = 0.1;
+ private static final double sparsity3 = 0.0;
+
+ public enum SparsityType {
+ DENSE,
+ SPARSE,
+ EMPTY,
+ }
+
+ public enum ValueType {
+ RAND,
+ RAND_ROUND,
+ CONST,
+ }
+
+ @Override
+ public void setUp() {
+
+ }
+
+ @Test
+ public void testDenseRandDataCompression() {
+ runCompressionTest(SparsityType.DENSE, ValueType.RAND, true);
+ }
+
+ @Test
+ public void testSparseRandDataCompression() {
+ runCompressionTest(SparsityType.SPARSE, ValueType.RAND, true);
+ }
+
+ @Test
+ public void testEmptyCompression() {
+ runCompressionTest(SparsityType.EMPTY, ValueType.RAND, true);
+ }
+
+ @Test
+ public void testDenseRoundRandDataCompression() {
+ runCompressionTest(SparsityType.DENSE, ValueType.RAND_ROUND, true);
+ }
+
+ @Test
+ public void testSparseRoundRandDataCompression() {
+ runCompressionTest(SparsityType.SPARSE, ValueType.RAND_ROUND, true);
+ }
+
+ @Test
+ public void testDenseConstantDataCompression() {
+ runCompressionTest(SparsityType.DENSE, ValueType.CONST, true);
+ }
+
+ @Test
+ public void testSparseConstDataCompression() {
+ runCompressionTest(SparsityType.SPARSE, ValueType.CONST, true);
+ }
+
+ @Test
+ public void testDenseRandDataNoCompression() {
+ runCompressionTest(SparsityType.DENSE, ValueType.RAND, false);
+ }
+
+ @Test
+ public void testSparseRandDataNoCompression() {
+ runCompressionTest(SparsityType.SPARSE, ValueType.RAND, false);
+ }
+
+ @Test
+ public void testEmptyNoCompression() {
+ runCompressionTest(SparsityType.EMPTY, ValueType.RAND, false);
+ }
+
+ @Test
+ public void testDenseRoundRandDataNoCompression() {
+ runCompressionTest(SparsityType.DENSE, ValueType.RAND_ROUND, false);
+ }
+
+ @Test
+ public void testSparseRoundRandDataNoCompression() {
+ runCompressionTest(SparsityType.SPARSE, ValueType.RAND_ROUND, false);
+ }
+
+ @Test
+ public void testDenseConstDataNoCompression() {
+ runCompressionTest(SparsityType.DENSE, ValueType.CONST, false);
+ }
+
+ @Test
+ public void testSparseConstDataNoCompression() {
+ runCompressionTest(SparsityType.SPARSE, ValueType.CONST, false);
+ }
+
+
+ /**
+ *
+ * @param mb
+ */
+ private void runCompressionTest(SparsityType sptype, ValueType vtype, boolean compress)
+ {
+ try
+ {
+ //prepare sparsity for input data
+ double sparsity = -1;
+ switch( sptype ){
+ case DENSE: sparsity = sparsity1; break;
+ case SPARSE: sparsity = sparsity2; break;
+ case EMPTY: sparsity = sparsity3; break;
+ }
+
+ //generate input data
+ double min = (vtype==ValueType.CONST)? 10 : -10;
+ double[][] input = TestUtils.generateTestMatrix(rows, cols, min, 10, sparsity, 7);
+ if( vtype==ValueType.RAND_ROUND )
+ input = TestUtils.round(input);
+ MatrixBlock mb = DataConverter.convertToMatrixBlock(input);
+
+ //compress given matrix block
+ CompressedMatrixBlock cmb = new CompressedMatrixBlock(mb);
+ if( compress )
+ cmb.compress(InfrastructureAnalyzer.getLocalParallelism());
+
+ //decompress the compressed matrix block
+ MatrixBlock tmp = cmb.decompress();
+
+ //compare result with input
+ double[][] d1 = DataConverter.convertToDoubleMatrix(mb);
+ double[][] d2 = DataConverter.convertToDoubleMatrix(tmp);
+ TestUtils.compareMatrices(d1, d2, rows, cols, 0);
+ }
+ catch(Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/873bae76/src/test_suites/java/org/apache/sysml/test/integration/functions/compress/ZPackageSuite.java
----------------------------------------------------------------------
diff --git a/src/test_suites/java/org/apache/sysml/test/integration/functions/compress/ZPackageSuite.java b/src/test_suites/java/org/apache/sysml/test/integration/functions/compress/ZPackageSuite.java
index c8dc906..7d19ae9 100644
--- a/src/test_suites/java/org/apache/sysml/test/integration/functions/compress/ZPackageSuite.java
+++ b/src/test_suites/java/org/apache/sysml/test/integration/functions/compress/ZPackageSuite.java
@@ -43,6 +43,7 @@ import org.junit.runners.Suite;
LargeMatrixVectorMultTest.class,
LargeParMatrixVectorMultTest.class,
LargeVectorMatrixMultTest.class,
+ ParCompressionTest.class,
ParMatrixMultChainTest.class,
ParMatrixVectorMultTest.class,
ParTransposeSelfLeftMatrixMultTest.class,
[2/3] incubator-systemml git commit: [SYSTEMML-562][SYSTEMML-413]
Basic cleanup of cache block abstraction
Posted by mb...@apache.org.
[SYSTEMML-562][SYSTEMML-413] Basic cleanup of cache block abstraction
Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/44c8f9d4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/44c8f9d4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/44c8f9d4
Branch: refs/heads/master
Commit: 44c8f9d4b1120901f5bf9c14e686189cb17c948b
Parents: 42906ba
Author: Matthias Boehm <mb...@us.ibm.com>
Authored: Wed Jul 27 22:52:22 2016 -0700
Committer: Matthias Boehm <mb...@us.ibm.com>
Committed: Wed Jul 27 22:52:22 2016 -0700
----------------------------------------------------------------------
.../controlprogram/caching/CacheBlock.java | 49 ++++++++++++++------
.../caching/CacheBlockFactory.java | 18 +++++++
.../spark/data/PartitionedBlock.java | 11 ++---
.../sysml/runtime/matrix/data/FrameBlock.java | 13 ------
.../sysml/runtime/matrix/data/MatrixBlock.java | 11 -----
.../matrix/data/OperationsOnMatrixValues.java | 24 ++++++++++
6 files changed, 81 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/44c8f9d4/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheBlock.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheBlock.java
index f58a7c2..b73584c 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheBlock.java
@@ -19,12 +19,8 @@
package org.apache.sysml.runtime.controlprogram.caching;
-import java.util.ArrayList;
-
import org.apache.hadoop.io.Writable;
import org.apache.sysml.runtime.DMLRuntimeException;
-import org.apache.sysml.runtime.matrix.data.Pair;
-import org.apache.sysml.runtime.util.IndexRange;
/**
@@ -35,6 +31,18 @@ import org.apache.sysml.runtime.util.IndexRange;
public interface CacheBlock extends Writable
{
/**
+ *
+ * @return
+ */
+ public int getNumRows();
+
+ /**
+ *
+ * @return
+ */
+ public int getNumColumns();
+
+ /**
* Get the in-memory size in bytes of the cache block.
* @return
*/
@@ -60,18 +68,29 @@ public interface CacheBlock extends Writable
*/
public void compactEmptyBlock();
- public int getNumRows();
- public int getNumColumns();
-
+ /**
+ * Slice a sub block out of the current block and write into the given output block.
+ * This method returns the passed instance if not null.
+ *
+ * @param rl
+ * @param ru
+ * @param cl
+ * @param cu
+ * @param block
+ * @return
+ * @throws DMLRuntimeException
+ */
public CacheBlock sliceOperations(int rl, int ru, int cl, int cu, CacheBlock block)
- throws DMLRuntimeException;
+ throws DMLRuntimeException;
+ /**
+ * Merge the given block into the current block. Both blocks needs to be of equal
+ * dimensions and contain disjoint non-zero cells.
+ *
+ * @param that
+ * @param appendOnly
+ * @throws DMLRuntimeException
+ */
public void merge(CacheBlock that, boolean appendOnly)
- throws DMLRuntimeException;
-
- @SuppressWarnings("rawtypes")
- public ArrayList getPairList();
-
- ArrayList<Pair<?, ?>> performSlice(IndexRange ixrange, int brlen, int bclen, int iix, int jix, CacheBlock in)
- throws DMLRuntimeException;
+ throws DMLRuntimeException;
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/44c8f9d4/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheBlockFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheBlockFactory.java b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheBlockFactory.java
index 3bf86b5..9595441 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheBlockFactory.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/caching/CacheBlockFactory.java
@@ -19,8 +19,12 @@
package org.apache.sysml.runtime.controlprogram.caching;
+import java.util.ArrayList;
+
import org.apache.sysml.runtime.matrix.data.FrameBlock;
import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
+import org.apache.sysml.runtime.matrix.data.Pair;
/**
* Factory to create instances of matrix/frame blocks given
@@ -54,4 +58,18 @@ public class CacheBlockFactory
return 1;
throw new RuntimeException("Unsupported cache block type: "+block.getClass().getName());
}
+
+ /**
+ *
+ * @param block
+ * @return
+ */
+ public static ArrayList<?> getPairList(CacheBlock block) {
+ int code = getCode(block);
+ switch( code ) {
+ case 0: return new ArrayList<Pair<MatrixIndexes,MatrixBlock>>();
+ case 1: return new ArrayList<Pair<Long,FrameBlock>>();
+ }
+ throw new RuntimeException("Unsupported cache block type: "+code);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/44c8f9d4/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java
index 465fdd5..fbd7a25 100644
--- a/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/instructions/spark/data/PartitionedBlock.java
@@ -33,6 +33,7 @@ import java.util.ArrayList;
import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.controlprogram.caching.CacheBlock;
import org.apache.sysml.runtime.controlprogram.caching.CacheBlockFactory;
+import org.apache.sysml.runtime.matrix.data.OperationsOnMatrixValues;
import org.apache.sysml.runtime.matrix.data.Pair;
import org.apache.sysml.runtime.util.FastBufferedDataInputStream;
import org.apache.sysml.runtime.util.FastBufferedDataOutputStream;
@@ -260,19 +261,17 @@ public class PartitionedBlock<T extends CacheBlock> implements Externalizable
int lcl = (int) cl;
int lcu = (int) cu;
- ArrayList<Pair<?, ?>> allBlks = block.getPairList();
+ ArrayList<Pair<?, ?>> allBlks = (ArrayList<Pair<?, ?>>) CacheBlockFactory.getPairList(block);
int start_iix = (lrl-1)/_brlen+1;
int end_iix = (lru-1)/_brlen+1;
int start_jix = (lcl-1)/_bclen+1;
int end_jix = (lcu-1)/_bclen+1;
for( int iix = start_iix; iix <= end_iix; iix++ )
- for(int jix = start_jix; jix <= end_jix; jix++)
- {
- T in = getBlock(iix, jix);
+ for(int jix = start_jix; jix <= end_jix; jix++) {
IndexRange ixrange = new IndexRange(rl, ru, cl, cu);
- ArrayList<Pair<?, ?>> outlist = block.performSlice(ixrange, _brlen, _bclen, iix, jix, in);
- allBlks.addAll(outlist);
+ allBlks.addAll(OperationsOnMatrixValues.performSlice(
+ ixrange, _brlen, _bclen, iix, jix, getBlock(iix, jix)));
}
if(allBlks.size() == 1) {
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/44c8f9d4/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
index 78fa165..1c8ffd0 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/FrameBlock.java
@@ -1305,17 +1305,4 @@ public class FrameBlock implements Writable, CacheBlock, Externalizable
_mvValue = mvVal;
}
}
-
- //TODO generalize these methods and remove from frame block
-
- @Override
- public ArrayList getPairList() {
- return new ArrayList<Pair<Long, FrameBlock>>();
- }
-
- public ArrayList<Pair<?, ?>> performSlice(IndexRange ixrange, int brlen, int bclen, int iix, int jix, CacheBlock in) throws DMLRuntimeException
- {
- return OperationsOnMatrixValues.performSlice(ixrange, brlen, bclen, iix, jix, (FrameBlock)in);
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/44c8f9d4/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
index 842982d..452eddb 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/MatrixBlock.java
@@ -6240,15 +6240,4 @@ public class MatrixBlock extends MatrixValue implements CacheBlock, Externalizab
}
public SparsityEstimate(){}
}
-
- public ArrayList<Pair<MatrixIndexes, MatrixBlock>> getPairList()
- {
- return new ArrayList<Pair<MatrixIndexes,MatrixBlock>>();
- }
-
- @SuppressWarnings("unchecked")
- public ArrayList<Pair<?, ?>> performSlice(IndexRange ixrange, int brlen, int bclen, int iix, int jix, CacheBlock in) throws DMLRuntimeException
- {
- return OperationsOnMatrixValues.performSlice(ixrange, brlen, bclen, iix, jix, (MatrixBlock)in);
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/44c8f9d4/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java b/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java
index 219a1e7..a105e6d 100644
--- a/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java
+++ b/src/main/java/org/apache/sysml/runtime/matrix/data/OperationsOnMatrixValues.java
@@ -27,6 +27,7 @@ import java.util.List;
import org.apache.sysml.parser.Expression.ValueType;
import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.compress.CompressedMatrixBlock;
+import org.apache.sysml.runtime.controlprogram.caching.CacheBlock;
import org.apache.sysml.runtime.controlprogram.caching.MatrixObject.UpdateType;
import org.apache.sysml.runtime.functionobjects.Builtin;
import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils;
@@ -300,6 +301,29 @@ public class OperationsOnMatrixValues
value1.aggregateBinaryOperations(value1, value2, valueOut, op);
}
+ /**
+ *
+ * @param ixrange
+ * @param brlen
+ * @param bclen
+ * @param iix
+ * @param jix
+ * @param in
+ * @return
+ * @throws DMLRuntimeException
+ */
+ @SuppressWarnings("rawtypes")
+ public static ArrayList performSlice(IndexRange ixrange, int brlen, int bclen, int iix, int jix, CacheBlock in)
+ throws DMLRuntimeException
+ {
+ if( in instanceof MatrixBlock )
+ return performSlice(ixrange, brlen, bclen, iix, jix, (MatrixBlock)in);
+ else if( in instanceof FrameBlock )
+ return performSlice(ixrange, brlen, bclen, iix, jix, (FrameBlock)in);
+ throw new DMLRuntimeException("Unsupported cache block type: "+in.getClass().getName());
+ }
+
+
@SuppressWarnings("rawtypes")
public static ArrayList performSlice(IndexRange ixrange, int brlen, int bclen, int iix, int jix, MatrixBlock in)
throws DMLRuntimeException