You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/07/17 21:31:54 UTC
[4/7] incubator-systemml git commit: [SYSTEMML-766][SYSTEMML-810] Fix
missing licenses / unix line delimiters
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/da318739/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
index 6ac26c6..f5b2058 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
@@ -1,1342 +1,1342 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.compress;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.PriorityQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.apache.sysml.hops.OptimizerUtils;
-import org.apache.sysml.lops.MMTSJ.MMTSJType;
-import org.apache.sysml.lops.MapMultChain.ChainType;
-import org.apache.sysml.runtime.DMLRuntimeException;
-import org.apache.sysml.runtime.compress.ColGroup.CompressionType;
-import org.apache.sysml.runtime.compress.estim.CompressedSizeEstimator;
-import org.apache.sysml.runtime.compress.estim.CompressedSizeInfo;
-import org.apache.sysml.runtime.compress.estim.SizeEstimatorFactory;
-import org.apache.sysml.runtime.compress.utils.ConverterUtils;
-import org.apache.sysml.runtime.compress.utils.LinearAlgebraUtils;
-import org.apache.sysml.runtime.controlprogram.parfor.stat.Timing;
-import org.apache.sysml.runtime.functionobjects.KahanPlus;
-import org.apache.sysml.runtime.functionobjects.KahanPlusSq;
-import org.apache.sysml.runtime.functionobjects.Multiply;
-import org.apache.sysml.runtime.functionobjects.ReduceRow;
-import org.apache.sysml.runtime.instructions.cp.KahanObject;
-import org.apache.sysml.runtime.matrix.data.LibMatrixBincell;
-import org.apache.sysml.runtime.matrix.data.LibMatrixReorg;
-import org.apache.sysml.runtime.matrix.data.MatrixBlock;
-import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
-import org.apache.sysml.runtime.matrix.data.MatrixValue;
-import org.apache.sysml.runtime.matrix.data.SparseBlock;
-import org.apache.sysml.runtime.matrix.operators.AggregateBinaryOperator;
-import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator;
-import org.apache.sysml.runtime.matrix.operators.BinaryOperator;
-import org.apache.sysml.runtime.matrix.operators.ScalarOperator;
-
-/**
- * Experimental version of MatrixBlock that allows a compressed internal
- * representation.
- */
-public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
-{
- private static final long serialVersionUID = 7319972089143154057L;
-
- //internal configuration
- public static final int MAX_NUMBER_COCODING_COLUMNS = 1000;
- public static final double MIN_COMPRESSION_RATIO = 2.0;
- public static final double MIN_RLE_RATIO = 1.0; // Minimum additional compression (non-RLE size / RLE size) before we switch to run-length encoding.
- public static final boolean TRANSPOSE_INPUT = true;
- public static final boolean MATERIALIZE_ZEROS = false;
- public static final long MIN_PAR_AGG_THRESHOLD = 16*1024*1024; //16MB
- public static final boolean INVESTIGATE_ESTIMATES = false;
- private static final boolean LDEBUG = false; //local debug flag
-
- private static final Log LOG = LogFactory.getLog(CompressedMatrixBlock.class.getName());
-
- static{
- // for internal debugging only
- if( LDEBUG ) {
- Logger.getLogger("org.apache.sysml.runtime.compress")
- .setLevel((Level) Level.DEBUG);
- }
- }
-
- protected ArrayList<ColGroup> _colGroups = null;
- protected CompressionStatistics _stats = null;
-
- public CompressedMatrixBlock() {
- super(-1, -1, true);
- }
-
- /**
- * Main constructor for building a block from scratch.
- *
- * @param rl
- * number of rows in the block
- * @param cl
- * number of columns
- * @param sparse
- * true if the UNCOMPRESSED representation of the block should be
- * sparse
- */
- public CompressedMatrixBlock(int rl, int cl, boolean sparse) {
- super(rl, cl, sparse);
- }
-
- /**
- * "Copy" constructor to populate this compressed block with the
- * uncompressed contents of a conventional block. Does <b>not</b> compress
- * the block.
- */
- public CompressedMatrixBlock(MatrixBlock mb) {
- super(mb.getNumRows(), mb.getNumColumns(), mb.isInSparseFormat());
-
- //shallow copy (deep copy on compression, prevents unnecessary copy)
- if( isInSparseFormat() )
- sparseBlock = mb.getSparseBlock();
- else
- denseBlock = mb.getDenseBlock();
- nonZeros = mb.getNonZeros();
- }
-
- /**
- *
- * @return the column groups constructed by the compression process.
- *
- */
- public ArrayList<ColGroup> getColGroups() {
- return _colGroups;
- }
-
- /**
- * @return true if this block is in compressed form; false if the block has
- * not yet been compressed
- */
- public boolean isCompressed() {
- return (_colGroups != null);
- }
-
- /**
- *
- * @return
- */
- public boolean isSingleUncompressedGroup(){
- return (_colGroups!=null && _colGroups.size()==1
- && _colGroups.get(0) instanceof ColGroupUncompressed);
- }
-
- private void allocateColGroupList() {
- _colGroups = new ArrayList<ColGroup>();
- }
-
- @Override
- public boolean isEmptyBlock(boolean safe) {
- if( !isCompressed() )
- return super.isEmptyBlock(safe);
- return (_colGroups == null || getNonZeros()==0);
- }
-
- /**
- * Compress the contents of this matrix block. After compression, the
- * uncompressed data is discarded. Attempts to update this block after
- * calling this method currently result in INCORRECT BEHAVIOR, something
- * which should be fixed if we move ahead with this compression strategy.
- *
- * +per column sparsity
- */
- public void compress()
- throws DMLRuntimeException
- {
- //check for redundant compression
- if( isCompressed() ){
- throw new DMLRuntimeException("Redundant compression, block already compressed.");
- }
-
- Timing time = new Timing(true);
- _stats = new CompressionStatistics();
-
- // SAMPLE-BASED DECISIONS:
- // Decisions such as testing if a column is amenable to bitmap
- // compression or evaluating co-coding potentionls are made based on a
- // subset of the rows. For large datasets, sampling might take a
- // significant amount of time. So, we generate only one sample and use
- // it for the entire compression process.
-
- //prepare basic meta data and deep copy / transpose input
- final int numRows = getNumRows();
- final int numCols = getNumColumns();
- final boolean sparse = isInSparseFormat();
- MatrixBlock rawblock = this;
- if( TRANSPOSE_INPUT )
- rawblock = LibMatrixReorg.transpose(rawblock, new MatrixBlock(numCols, numRows, sparse));
- else
- rawblock = new MatrixBlock(this);
-
- //construct sample-based size estimator
- CompressedSizeEstimator bitmapSizeEstimator =
- SizeEstimatorFactory.getSizeEstimator(rawblock, numRows);
-
- //allocate list of column groups
- allocateColGroupList();
-
- // The current implementation of this method is written for correctness,
- // not for performance or for minimal use of temporary space.
-
- // We start with a full set of columns.
- HashSet<Integer> remainingCols = new HashSet<Integer>();
- for (int i = 0; i < numCols; i++) {
- remainingCols.add(i);
- }
-
- // PHASE 1: Classify columns by compression type
- // We start by determining which columns are amenable to bitmap
- // compression
-
- // It is correct to use the dense size as the uncompressed size
- // FIXME not numRows but nnz / col otherwise too aggressive overestimation
- // of uncompressed size and hence overestimation of compression potential
- double uncompressedColumnSize = 8 * numRows;
-
- // information about the bitmap amenable columns
- List<Integer> bitmapCols = new ArrayList<Integer>();
- List<Integer> uncompressedCols = new ArrayList<Integer>();
- List<Integer> colsCardinalities = new ArrayList<Integer>();
- List<Long> compressedSizes = new ArrayList<Long>();
- HashMap<Integer, Double> compressionRatios = new HashMap<Integer, Double>();
-
- // Minimum ratio (size of uncompressed / size of compressed) that we
- // will accept when encoding a field with a bitmap.
- for (int col = 0; col < numCols; col++)
- {
- CompressedSizeInfo compressedSizeInfo = bitmapSizeEstimator
- .estimateCompressedColGroupSize(new int[] { col });
- long compressedSize = compressedSizeInfo.getMinSize();
- double compRatio = uncompressedColumnSize / compressedSize;
-
- //FIXME: compression ratio should be checked against 1 instead of min compression
- //ratio; I think this threshold was only required because we overestimated the
- //the uncompressed column size with n\alpha instead of z\alpha
- if (compRatio >= MIN_COMPRESSION_RATIO) {
- bitmapCols.add(col);
- compressionRatios.put(col, compRatio);
- colsCardinalities.add(compressedSizeInfo.getEstCarinality());
- compressedSizes.add(compressedSize);
- }
- else
- uncompressedCols.add(col);
- }
-
- _stats.timePhase1 = time.stop();
- if( LOG.isDebugEnabled() )
- LOG.debug("compression phase 1: "+_stats.timePhase1);
-
- // Filters for additional types of compression should be inserted here.
-
- // PHASE 2: Grouping columns
- // Divide the bitmap columns into column groups.
- List<int[]> bitmapColGrps = null;
- if (bitmapCols.size() > MAX_NUMBER_COCODING_COLUMNS) {
- // Too many columns to compute co-coding groups with current methods.
- // Generate singleton groups.
- bitmapColGrps = new ArrayList<int[]>(bitmapCols.size());
- for (int col : bitmapCols) {
- bitmapColGrps.add(new int[] { col });
- }
- }
- else {
- bitmapColGrps = PlanningCoCoder.findCocodesByPartitioning(
- bitmapSizeEstimator, bitmapCols, colsCardinalities,
- compressedSizes, numRows, isInSparseFormat() ?
- OptimizerUtils.getSparsity(numRows, numCols, getNonZeros()): 1);
- }
-
- _stats.timePhase2 = time.stop();
- if( LOG.isDebugEnabled() )
- LOG.debug("compression phase 2: "+_stats.timePhase2);
-
- if( INVESTIGATE_ESTIMATES ) {
- double est = 0;
- for( int[] groupIndices : bitmapColGrps )
- est += bitmapSizeEstimator.estimateCompressedColGroupSize(groupIndices).getMinSize();
- est += uncompressedCols.size() * uncompressedColumnSize;
- _stats.estSize = est;
- }
-
- // PHASE 3: Compress and correct sample-based decisions
-
- for (int[] groupIndices : bitmapColGrps)
- {
- int[] allGroupIndices = null;
- int allColsCount = groupIndices.length;
- CompressedSizeInfo bitmapSizeInfo;
- // The compression type is decided based on a full bitmap since it
- // will be reused for the actual compression step.
- UncompressedBitmap ubm;
- PriorityQueue<CompressedColumn> compRatioPQ = null;
- boolean skipGroup = false;
- while (true)
- {
- ubm = BitmapEncoder.extractBitmap(groupIndices, rawblock);
- bitmapSizeInfo = bitmapSizeEstimator
- .estimateCompressedColGroupSize(ubm);
- double compRatio = uncompressedColumnSize * groupIndices.length
- / bitmapSizeInfo.getMinSize();
- if (compRatio >= MIN_COMPRESSION_RATIO) {
- // we have a good group
- for( Integer col : groupIndices )
- remainingCols.remove(col);
- break;
- } else {
- // modify the group
- if (compRatioPQ == null) {
- // first modification
- allGroupIndices = Arrays.copyOf(groupIndices, groupIndices.length);
- compRatioPQ = new PriorityQueue<CompressedMatrixBlock.CompressedColumn>();
- for (int i = 0; i < groupIndices.length; i++)
- compRatioPQ.add(new CompressedColumn(i,
- compressionRatios.get(groupIndices[i])));
- }
-
- // index in allGroupIndices
- int removeIx = compRatioPQ.poll().colIx;
- allGroupIndices[removeIx] = -1;
- allColsCount--;
- if (allColsCount == 0) {
- skipGroup = true;
- break;
- }
- groupIndices = new int[allColsCount];
- // copying the values that do not equal -1
- int ix = 0;
- for (int col : allGroupIndices) {
- if (col != -1) {
- groupIndices[ix++] = col;
- }
- }
-
- }
- }
-
- if (skipGroup)
- continue;
- long rleNumBytes = bitmapSizeInfo.getRLESize();
- long offsetNumBytes = bitmapSizeInfo.getOLESize();
- double rleRatio = (double) offsetNumBytes / (double) rleNumBytes;
-
- if (rleRatio > MIN_RLE_RATIO) {
- ColGroupRLE compressedGroup = new ColGroupRLE(groupIndices,
- numRows, ubm);
- _colGroups.add(compressedGroup);
- }
- else {
- ColGroupOLE compressedGroup = new ColGroupOLE(
- groupIndices, numRows, ubm);
- _colGroups.add(compressedGroup);
- }
-
- }
-
- _stats.timePhase3 = time.stop();
- if( LOG.isDebugEnabled() )
- LOG.debug("compression phase 3: "+_stats.timePhase3);
-
- // Phase 4: Cleanup
- // The remaining columns are stored uncompressed as one big column group
- if (remainingCols.size() > 0) {
- ArrayList<Integer> list = new ArrayList<Integer>(remainingCols);
- ColGroupUncompressed ucgroup = new ColGroupUncompressed(list, rawblock);
- _colGroups.add(ucgroup);
- }
-
- //final cleanup (discard uncompressed block)
- rawblock.cleanupBlock(true, true);
- this.cleanupBlock(true, true);
-
- _stats.timePhase4 = time.stop();
- if( LOG.isDebugEnabled() )
- LOG.debug("compression phase 4: "+_stats.timePhase4);
- }
-
- /**
- * @return a new uncompressed matrix block containing the contents of this
- * block
- */
- public MatrixBlock decompress() throws DMLRuntimeException
- {
- //early abort for not yet compressed blocks
- if( !isCompressed() )
- return new MatrixBlock(this);
-
- //preallocation sparse rows to avoid repeated reallocations
- MatrixBlock ret = new MatrixBlock(getNumRows(), getNumColumns(), isInSparseFormat(), getNonZeros());
- if( ret.isInSparseFormat() ) {
- int[] rnnz = new int[rlen];
- for (ColGroup grp : _colGroups)
- grp.countNonZerosPerRow(rnnz);
- ret.allocateSparseRowsBlock();
- SparseBlock rows = ret.getSparseBlock();
- for( int i=0; i<rlen; i++ )
- rows.allocate(i, rnnz[i]);
- }
-
- //core decompression (append if sparse)
- for (ColGroup grp : _colGroups)
- grp.decompressToBlock(ret);
-
- //post-processing (for append in decompress)
- if( isInSparseFormat() )
- ret.sortSparseRows();
-
- return ret;
- }
-
- public CompressionStatistics getCompressionStatistics(){
- return _stats;
- }
-
- /**
- *
- * @return an upper bound on the memory used to store this compressed block
- * considering class overhead.
- */
- public long estimateCompressedSizeInMemory() {
- if (!isCompressed())
- return 0;
- // basic data inherited from MatrixBlock
- long total = MatrixBlock.estimateSizeInMemory(0, 0, 0);
- // adding the size of colGroups ArrayList overhead
- // object overhead (32B) + int size (4B) + int modCount (4B) + Object[]
- // elementData overhead + reference (32+8)B +reference ofr each Object (8B)
- total += 80 + 8 * _colGroups.size();
- for (ColGroup grp : _colGroups)
- total += grp.estimateInMemorySize();
- return total;
- }
-
- private class CompressedColumn implements Comparable<CompressedColumn> {
- int colIx;
- double compRatio;
-
- public CompressedColumn(int colIx, double compRatio) {
- this.colIx = colIx;
- this.compRatio = compRatio;
- }
-
- @Override
- public int compareTo(CompressedColumn o) {
- return (int) Math.signum(compRatio - o.compRatio);
- }
- }
-
- public static class CompressionStatistics {
- public double timePhase1 = -1;
- public double timePhase2 = -1;
- public double timePhase3 = -1;
- public double timePhase4 = -1;
- public double estSize = -1;
-
- public CompressionStatistics() {
- //do nothing
- }
-
- public CompressionStatistics(double t1, double t2, double t3, double t4){
- timePhase1 = t1;
- timePhase2 = t2;
- timePhase3 = t3;
- timePhase4 = t4;
- }
- }
-
-
- //////////////////////////////////////////
- // Serialization / Deserialization
-
- @Override
- public long getExactSizeOnDisk()
- {
- //header information
- long ret = 12;
-
- for( ColGroup grp : _colGroups ) {
- ret += 1; //type info
- ret += grp.getExactSizeOnDisk();
- }
-
- return ret;
- }
-
- @Override
- public void readFields(DataInput in)
- throws IOException
- {
- boolean compressed = in.readBoolean();
-
- //deserialize uncompressed block
- if( !compressed ) {
- super.readFields(in);
- return;
- }
-
- //deserialize compressed block
- rlen = in.readInt();
- clen = in.readInt();
- nonZeros = in.readLong();
- int ncolGroups = in.readInt();
-
- _colGroups = new ArrayList<ColGroup>(ncolGroups);
- for( int i=0; i<ncolGroups; i++ )
- {
- CompressionType ctype = CompressionType.values()[in.readByte()];
- ColGroup grp = null;
-
- //create instance of column group
- switch( ctype ) {
- case UNCOMPRESSED:
- grp = new ColGroupUncompressed(); break;
- case OLE_BITMAP:
- grp = new ColGroupOLE(); break;
- case RLE_BITMAP:
- grp = new ColGroupRLE(); break;
- }
-
- //deserialize and add column group
- grp.readFields(in);
- _colGroups.add(grp);
- }
- }
-
- @Override
- public void write(DataOutput out)
- throws IOException
- {
- out.writeBoolean( isCompressed() );
-
- //serialize uncompressed block
- if( !isCompressed() ) {
- super.write(out);
- return;
- }
-
- //serialize compressed matrix block
- out.writeInt(rlen);
- out.writeInt(clen);
- out.writeLong(nonZeros);
- out.writeInt(_colGroups.size());
-
- for( ColGroup grp : _colGroups ) {
- out.writeByte( grp.getCompType().ordinal() );
- grp.write(out); //delegate serialization
- }
- }
-
-
- /**
- * Redirects the default java serialization via externalizable to our default
- * hadoop writable serialization for efficient broadcast/rdd deserialization.
- *
- * @param is
- * @throws IOException
- */
- @Override
- public void readExternal(ObjectInput is)
- throws IOException
- {
- readFields(is);
- }
-
- /**
- * Redirects the default java serialization via externalizable to our default
- * hadoop writable serialization for efficient broadcast/rdd serialization.
- *
- * @param is
- * @throws IOException
- */
- @Override
- public void writeExternal(ObjectOutput os)
- throws IOException
- {
- write(os);
- }
-
-
- //////////////////////////////////////////
- // Operations (overwrite existing ops for seamless integration)
-
- @Override
- public MatrixValue scalarOperations(ScalarOperator sop, MatrixValue result)
- throws DMLRuntimeException
- {
- //call uncompressed matrix scalar if necessary
- if( !isCompressed() ) {
- return super.scalarOperations(sop, result);
- }
-
- //allocate the output matrix block
- CompressedMatrixBlock ret = null;
- if( result==null || !(result instanceof CompressedMatrixBlock) )
- ret = new CompressedMatrixBlock(getNumRows(), getNumColumns(), sparse);
- else {
- ret = (CompressedMatrixBlock) result;
- ret.reset(rlen, clen);
- }
-
- // Apply the operation recursively to each of the column groups.
- // Most implementations will only modify metadata.
- ArrayList<ColGroup> newColGroups = new ArrayList<ColGroup>();
- for (ColGroup grp : _colGroups) {
- newColGroups.add(grp.scalarOperation(sop));
- }
- ret._colGroups = newColGroups;
- ret.setNonZeros(rlen*clen);
-
- return ret;
- }
-
- @Override
- public MatrixBlock appendOperations(MatrixBlock that, MatrixBlock ret)
- throws DMLRuntimeException
- {
- //call uncompressed matrix append if necessary
- if( !isCompressed() ) {
- if( that instanceof CompressedMatrixBlock )
- that = ((CompressedMatrixBlock) that).decompress();
- return super.appendOperations(that, ret);
- }
-
- final int m = rlen;
- final int n = clen+that.getNumColumns();
- final long nnz = nonZeros+that.getNonZeros();
-
- //init result matrix
- CompressedMatrixBlock ret2 = null;
- if( ret == null || !(ret instanceof CompressedMatrixBlock) ) {
- ret2 = new CompressedMatrixBlock(m, n, isInSparseFormat());
- }
- else {
- ret2 = (CompressedMatrixBlock) ret;
- ret2.reset(m, n);
- }
-
- //shallow copy of lhs column groups
- ret2.allocateColGroupList();
- ret2._colGroups.addAll(_colGroups);
-
- //copy of rhs column groups w/ col index shifting
- if( !(that instanceof CompressedMatrixBlock) ) {
- that = new CompressedMatrixBlock(that);
- ((CompressedMatrixBlock)that).compress();
- }
- ArrayList<ColGroup> inColGroups = ((CompressedMatrixBlock) that)._colGroups;
- for( ColGroup group : inColGroups ) {
- ColGroup tmp = ConverterUtils.copyColGroup(group);
- tmp.shiftColIndices(clen);
- ret2._colGroups.add(tmp);
- }
-
- //meta data maintenance
- ret2.setNonZeros(nnz);
- return ret2;
- }
-
- @Override
- public MatrixBlock chainMatrixMultOperations(MatrixBlock v, MatrixBlock w, MatrixBlock out, ChainType ctype)
- throws DMLRuntimeException
- {
- //call uncompressed matrix mult if necessary
- if( !isCompressed() ) {
- return super.chainMatrixMultOperations(v, w, out, ctype);
- }
-
- //single-threaded mmchain of single uncompressed colgroup
- if( isSingleUncompressedGroup() ){
- return ((ColGroupUncompressed)_colGroups.get(0))
- .getData().chainMatrixMultOperations(v, w, out, ctype);
- }
-
- //Timing time = new Timing(true);
-
- //prepare result
- if( out != null )
- out.reset(clen, 1, false);
- else
- out = new MatrixBlock(clen, 1, false);
-
- //empty block handling
- if( isEmptyBlock(false) )
- return out;
-
- //compute matrix mult
- MatrixBlock tmp = new MatrixBlock(rlen, 1, false);
- rightMultByVector(v, tmp);
- if( ctype == ChainType.XtwXv ) {
- BinaryOperator bop = new BinaryOperator(Multiply.getMultiplyFnObject());
- LibMatrixBincell.bincellOpInPlace(tmp, w, bop);
- }
- leftMultByVectorTranspose(_colGroups, tmp, out, true);
-
- //System.out.println("Compressed MMChain in "+time.stop());
-
- return out;
- }
-
- @Override
- public MatrixBlock chainMatrixMultOperations(MatrixBlock v, MatrixBlock w, MatrixBlock out, ChainType ctype, int k)
- throws DMLRuntimeException
- {
- //call uncompressed matrix mult if necessary
- if( !isCompressed() ){
- return super.chainMatrixMultOperations(v, w, out, ctype, k);
- }
-
- //multi-threaded mmchain of single uncompressed colgroup
- if( isSingleUncompressedGroup() ){
- return ((ColGroupUncompressed)_colGroups.get(0))
- .getData().chainMatrixMultOperations(v, w, out, ctype, k);
- }
-
- Timing time = LOG.isDebugEnabled() ? new Timing(true) : null;
-
- //prepare result
- if( out != null )
- out.reset(clen, 1, false);
- else
- out = new MatrixBlock(clen, 1, false);
-
- //empty block handling
- if( isEmptyBlock(false) )
- return out;
-
- //compute matrix mult
- MatrixBlock tmp = new MatrixBlock(rlen, 1, false);
- rightMultByVector(v, tmp, k);
- if( ctype == ChainType.XtwXv ) {
- BinaryOperator bop = new BinaryOperator(Multiply.getMultiplyFnObject());
- LibMatrixBincell.bincellOpInPlace(tmp, w, bop);
- }
- leftMultByVectorTranspose(_colGroups, tmp, out, true, k);
-
- if( LOG.isDebugEnabled() )
- LOG.debug("Compressed MMChain k="+k+" in "+time.stop());
-
- return out;
- }
-
- @Override
- public MatrixValue aggregateBinaryOperations(MatrixValue mv1, MatrixValue mv2, MatrixValue result, AggregateBinaryOperator op)
- throws DMLRuntimeException
- {
- //call uncompressed matrix mult if necessary
- if( !isCompressed() ) {
- return super.aggregateBinaryOperations(mv1, mv2, result, op);
- }
-
- //multi-threaded mm of single uncompressed colgroup
- if( isSingleUncompressedGroup() ){
- MatrixBlock tmp = ((ColGroupUncompressed)_colGroups.get(0)).getData();
- return tmp.aggregateBinaryOperations(this==mv1?tmp:mv1, this==mv2?tmp:mv2, result, op);
- }
-
- Timing time = LOG.isDebugEnabled() ? new Timing(true) : null;
-
- //setup meta data (dimensions, sparsity)
- int rl = mv1.getNumRows();
- int cl = mv2.getNumColumns();
-
- //create output matrix block
- MatrixBlock ret = (MatrixBlock) result;
- if( ret==null )
- ret = new MatrixBlock(rl, cl, false, rl*cl);
- else
- ret.reset(rl, cl, false, rl*cl);
-
- //compute matrix mult
- if( mv1.getNumRows()>1 && mv2.getNumColumns()==1 ) { //MV right
- CompressedMatrixBlock cmb = (CompressedMatrixBlock)mv1;
- MatrixBlock mb = (MatrixBlock) mv2;
- if( op.getNumThreads()>1 )
- cmb.rightMultByVector(mb, ret, op.getNumThreads());
- else
- cmb.rightMultByVector(mb, ret);
- }
- else if( mv1.getNumRows()==1 && mv2.getNumColumns()>1 ) { //MV left
- MatrixBlock mb = (MatrixBlock) mv1;
- if( op.getNumThreads()>1 )
- leftMultByVectorTranspose(_colGroups, mb, ret, false, op.getNumThreads());
- else
- leftMultByVectorTranspose(_colGroups, mb, ret, false);
- }
- else {
- //NOTE: we could decompress and invoke super.aggregateBinary but for now
- //we want to have an eager fail if this happens
- throw new DMLRuntimeException("Unsupported matrix-matrix multiplication over compressed matrix block.");
- }
-
- if( LOG.isDebugEnabled() )
- LOG.debug("Compressed MM in "+time.stop());
-
- return ret;
- }
-
- @Override
- public MatrixValue aggregateUnaryOperations(AggregateUnaryOperator op, MatrixValue result,
- int blockingFactorRow, int blockingFactorCol, MatrixIndexes indexesIn, boolean inCP)
- throws DMLRuntimeException
- {
- //call uncompressed matrix mult if necessary
- if( !isCompressed() ) {
- return super.aggregateUnaryOperations(op, result, blockingFactorRow, blockingFactorCol, indexesIn, inCP);
- }
-
- //check for supported operations
- if( !(op.aggOp.increOp.fn instanceof KahanPlus || op.aggOp.increOp.fn instanceof KahanPlusSq) ){
- throw new DMLRuntimeException("Unary aggregates other than sums not supported yet.");
- }
-
- //prepare output dimensions
- CellIndex tempCellIndex = new CellIndex(-1,-1);
- op.indexFn.computeDimension(rlen, clen, tempCellIndex);
- if(op.aggOp.correctionExists) {
- switch(op.aggOp.correctionLocation)
- {
- case LASTROW: tempCellIndex.row++; break;
- case LASTCOLUMN: tempCellIndex.column++; break;
- case LASTTWOROWS: tempCellIndex.row+=2; break;
- case LASTTWOCOLUMNS: tempCellIndex.column+=2; break;
- default:
- throw new DMLRuntimeException("unrecognized correctionLocation: "+op.aggOp.correctionLocation);
- }
- }
-
- //prepare output
- if(result==null)
- result=new MatrixBlock(tempCellIndex.row, tempCellIndex.column, false);
- else
- result.reset(tempCellIndex.row, tempCellIndex.column, false);
-
- MatrixBlock ret = (MatrixBlock) result;
-
- //core unary aggregate
- if( op.getNumThreads() > 1
- && getExactSizeOnDisk() > MIN_PAR_AGG_THRESHOLD )
- {
- // initialize and allocate the result
- ret.allocateDenseBlock();
-
- //multi-threaded execution of all groups
- ArrayList<ColGroup>[] grpParts = createStaticTaskPartitioning(op.getNumThreads(), false);
- ColGroupUncompressed uc = getUncompressedColGroup();
- try {
- //compute all compressed column groups
- ExecutorService pool = Executors.newFixedThreadPool( op.getNumThreads() );
- ArrayList<UnaryAggregateTask> tasks = new ArrayList<UnaryAggregateTask>();
- for( ArrayList<ColGroup> grp : grpParts )
- tasks.add(new UnaryAggregateTask(grp, ret, op));
- pool.invokeAll(tasks);
- pool.shutdown();
- //compute uncompressed column group in parallel (otherwise bottleneck)
- if( uc != null )
- ret = (MatrixBlock)uc.getData().aggregateUnaryOperations(op, ret, blockingFactorRow, blockingFactorCol, indexesIn, false);
- //aggregate partial results
- if( !(op.indexFn instanceof ReduceRow) ){
- KahanObject kbuff = new KahanObject(0,0);
- KahanPlus kplus = KahanPlus.getKahanPlusFnObject();
- for( int i=0; i<ret.getNumRows(); i++ ) {
- kbuff.set(ret.quickGetValue(i, 0), ret.quickGetValue(i, 0));
- for( UnaryAggregateTask task : tasks )
- kplus.execute2(kbuff, task.getResult().quickGetValue(i, 0));
- ret.quickSetValue(i, 0, kbuff._sum);
- ret.quickSetValue(i, 1, kbuff._correction);
- }
- }
- }
- catch(Exception ex) {
- throw new DMLRuntimeException(ex);
- }
- }
- else {
- for (ColGroup grp : _colGroups) {
- grp.unaryAggregateOperations(op, ret);
- }
- }
-
- //drop correction if necessary
- if(op.aggOp.correctionExists && inCP)
- ret.dropLastRowsOrColums(op.aggOp.correctionLocation);
-
- //post-processing
- ret.recomputeNonZeros();
-
- return ret;
- }
-
- @Override
- public MatrixBlock transposeSelfMatrixMultOperations(MatrixBlock out, MMTSJType tstype)
- throws DMLRuntimeException
- {
- //call uncompressed matrix mult if necessary
- if( !isCompressed() ) {
- return super.transposeSelfMatrixMultOperations(out, tstype);
- }
-
- //single-threaded tsmm of single uncompressed colgroup
- if( isSingleUncompressedGroup() ){
- return ((ColGroupUncompressed)_colGroups.get(0))
- .getData().transposeSelfMatrixMultOperations(out, tstype);
- }
-
- Timing time = LOG.isDebugEnabled() ? new Timing(true) : null;
-
- //check for transpose type
- if( tstype != MMTSJType.LEFT ) //right not supported yet
- throw new DMLRuntimeException("Invalid MMTSJ type '"+tstype.toString()+"'.");
-
- //create output matrix block
- if( out == null )
- out = new MatrixBlock(clen, clen, false);
- else
- out.reset(clen, clen, false);
- out.allocateDenseBlock();
-
- if( !isEmptyBlock(false) ) {
- //compute matrix mult
- leftMultByTransposeSelf(_colGroups, out, 0, _colGroups.size());
-
- // post-processing
- out.recomputeNonZeros();
- }
-
- if( LOG.isDebugEnabled() )
- LOG.debug("Compressed TSMM in "+time.stop());
-
- return out;
- }
-
-
- @Override
- public MatrixBlock transposeSelfMatrixMultOperations(MatrixBlock out, MMTSJType tstype, int k)
- throws DMLRuntimeException
- {
- //call uncompressed matrix mult if necessary
- if( !isCompressed() ){
- return super.transposeSelfMatrixMultOperations(out, tstype, k);
- }
-
- //multi-threaded tsmm of single uncompressed colgroup
- if( isSingleUncompressedGroup() ){
- return ((ColGroupUncompressed)_colGroups.get(0))
- .getData().transposeSelfMatrixMultOperations(out, tstype, k);
- }
-
- Timing time = LOG.isDebugEnabled() ? new Timing(true) : null;
-
- //check for transpose type
- if( tstype != MMTSJType.LEFT ) //right not supported yet
- throw new DMLRuntimeException("Invalid MMTSJ type '"+tstype.toString()+"'.");
-
- //create output matrix block
- if( out == null )
- out = new MatrixBlock(clen, clen, false);
- else
- out.reset(clen, clen, false);
- out.allocateDenseBlock();
-
- if( !isEmptyBlock(false) ) {
- //compute matrix mult
- try {
- ExecutorService pool = Executors.newFixedThreadPool( k );
- ArrayList<MatrixMultTransposeTask> tasks = new ArrayList<MatrixMultTransposeTask>();
- int blklen = (int)(Math.ceil((double)clen/(2*k)));
- for( int i=0; i<2*k & i*blklen<clen; i++ )
- tasks.add(new MatrixMultTransposeTask(_colGroups, out, i*blklen, Math.min((i+1)*blklen, clen)));
- List<Future<Object>> ret = pool.invokeAll(tasks);
- for( Future<Object> tret : ret )
- tret.get(); //check for errors
- pool.shutdown();
- }
- catch(Exception ex) {
- throw new DMLRuntimeException(ex);
- }
-
- // post-processing
- out.recomputeNonZeros();
- }
-
- if( LOG.isDebugEnabled() )
- LOG.debug("Compressed TSMM k="+k+" in "+time.stop());
-
- return out;
- }
-
-
- /**
- * Multiply this matrix block by a column vector on the right.
- *
- * @param vector
- * right-hand operand of the multiplication
- * @param result
- * buffer to hold the result; must have the appropriate size
- * already
- */
- private void rightMultByVector(MatrixBlock vector, MatrixBlock result)
- throws DMLRuntimeException
- {
- // initialize and allocate the result
- result.allocateDenseBlock();
-
- // delegate matrix-vector operation to each column group
- for( ColGroup grp : _colGroups )
- if( grp instanceof ColGroupUncompressed ) //overwrites output
- grp.rightMultByVector(vector, result, 0, result.getNumRows());
- for( ColGroup grp : _colGroups )
- if( !(grp instanceof ColGroupUncompressed) ) //adds to output
- grp.rightMultByVector(vector, result, 0, result.getNumRows());
-
- // post-processing
- result.recomputeNonZeros();
- }
-
- /**
- * Multi-threaded version of rightMultByVector.
- *
- * @param vector
- * @param result
- * @param k
- * @throws DMLRuntimeException
- */
- private void rightMultByVector(MatrixBlock vector, MatrixBlock result, int k)
- throws DMLRuntimeException
- {
- // initialize and allocate the result
- result.allocateDenseBlock();
-
- //multi-threaded execution of all groups
- try {
- ExecutorService pool = Executors.newFixedThreadPool( k );
- int rlen = getNumRows();
- int seqsz = BitmapEncoder.BITMAP_BLOCK_SZ;
- int blklen = (int)(Math.ceil((double)rlen/k));
- blklen += (blklen%seqsz != 0)?seqsz-blklen%seqsz:0;
- ArrayList<RightMatrixMultTask> tasks = new ArrayList<RightMatrixMultTask>();
- for( int i=0; i<k & i*blklen<getNumRows(); i++ )
- tasks.add(new RightMatrixMultTask(_colGroups, vector, result, i*blklen, Math.min((i+1)*blklen,rlen)));
- pool.invokeAll(tasks);
- pool.shutdown();
- }
- catch(Exception ex) {
- throw new DMLRuntimeException(ex);
- }
-
- // post-processing
- result.recomputeNonZeros();
- }
-
- /**
- * Multiply this matrix block by the transpose of a column vector (i.e.
- * t(v)%*%X)
- *
- * @param vector
- * left-hand operand of the multiplication
- * @param result
- * buffer to hold the result; must have the appropriate size
- * already
- */
- private static void leftMultByVectorTranspose(List<ColGroup> colGroups, MatrixBlock vector, MatrixBlock result, boolean doTranspose)
- throws DMLRuntimeException
- {
- //transpose vector if required
- MatrixBlock rowVector = vector;
- if (doTranspose) {
- rowVector = new MatrixBlock(1, vector.getNumRows(), false);
- LibMatrixReorg.transpose(vector, rowVector);
- }
-
- // initialize and allocate the result
- result.reset();
- result.allocateDenseBlock();
-
- // delegate matrix-vector operation to each column group
- for (ColGroup grp : colGroups) {
- grp.leftMultByRowVector(rowVector, result);
- }
-
- // post-processing
- result.recomputeNonZeros();
- }
-
- /**
- * Multi-thread version of leftMultByVectorTranspose.
- *
- * @param vector
- * @param result
- * @param doTranspose
- * @param k
- * @throws DMLRuntimeException
- */
- private static void leftMultByVectorTranspose(List<ColGroup> colGroups,MatrixBlock vector, MatrixBlock result, boolean doTranspose, int k)
- throws DMLRuntimeException
- {
- int kuc = Math.max(1, k - colGroups.size() + 1);
-
- //transpose vector if required
- MatrixBlock rowVector = vector;
- if (doTranspose) {
- rowVector = new MatrixBlock(1, vector.getNumRows(), false);
- LibMatrixReorg.transpose(vector, rowVector);
- }
-
- // initialize and allocate the result
- result.reset();
- result.allocateDenseBlock();
-
- //multi-threaded execution
- try {
- ExecutorService pool = Executors.newFixedThreadPool( Math.min(colGroups.size(), k) );
- ArrayList<LeftMatrixMultTask> tasks = new ArrayList<LeftMatrixMultTask>();
- for( ColGroup grp : colGroups )
- tasks.add(new LeftMatrixMultTask(grp, rowVector, result, kuc));
- pool.invokeAll(tasks);
- pool.shutdown();
- }
- catch(Exception ex) {
- throw new DMLRuntimeException(ex);
- }
-
- // post-processing
- result.recomputeNonZeros();
- }
-
- /**
- *
- * @param result
- * @throws DMLRuntimeException
- */
- private static void leftMultByTransposeSelf(ArrayList<ColGroup> groups, MatrixBlock result, int cl, int cu)
- throws DMLRuntimeException
- {
- final int numRows = groups.get(0).getNumRows();
- final int numGroups = groups.size();
-
- //preallocated dense matrix block
- MatrixBlock lhs = new MatrixBlock(numRows, 1, false);
- lhs.allocateDenseBlock();
-
- //approach: for each colgroup, extract uncompressed columns one at-a-time
- //vector-matrix multiplies against remaining col groups
- for( int i=cl; i<cu; i++ )
- {
- //get current group and relevant col groups
- ColGroup group = groups.get(i);
- int[] ixgroup = group.getColIndices();
- List<ColGroup> tmpList = groups.subList(i, numGroups);
-
- //for all uncompressed lhs columns vectors
- for( int j=0; j<ixgroup.length; j++ ) {
- //decompress single column
- lhs.reset(numRows, 1, false);
- group.decompressToBlock(lhs, j);
-
- if( !lhs.isEmptyBlock(false) ) {
- //compute vector-matrix partial result
- MatrixBlock tmpret = new MatrixBlock(1,result.getNumColumns(),false);
- leftMultByVectorTranspose(tmpList, lhs, tmpret, true);
-
- //write partial results (disjoint non-zeros)
- LinearAlgebraUtils.copyNonZerosToRowCol(result, tmpret, ixgroup[j]);
- }
- }
- }
- }
-
- /**
- *
- * @param k
- * @return
- */
- @SuppressWarnings("unchecked")
- private ArrayList<ColGroup>[] createStaticTaskPartitioning(int k, boolean inclUncompressed)
- {
- // special case: single uncompressed col group
- if( _colGroups.size()==1 && _colGroups.get(0) instanceof ColGroupUncompressed ){
- return new ArrayList[0];
- }
-
- // initialize round robin col group distribution
- // (static task partitioning to reduce mem requirements/final agg)
- int numTasks = Math.min(k, _colGroups.size());
- ArrayList<ColGroup>[] grpParts = new ArrayList[numTasks];
- int pos = 0;
- for( ColGroup grp : _colGroups ){
- if( grpParts[pos]==null )
- grpParts[pos] = new ArrayList<ColGroup>();
- if( inclUncompressed || !(grp instanceof ColGroupUncompressed) ) {
- grpParts[pos].add(grp);
- pos = (pos==numTasks-1) ? 0 : pos+1;
- }
- }
-
- return grpParts;
- }
-
- /**
- *
- * @return
- */
- private ColGroupUncompressed getUncompressedColGroup()
- {
- for( ColGroup grp : _colGroups )
- if( grp instanceof ColGroupUncompressed )
- return (ColGroupUncompressed)grp;
-
- return null;
- }
-
- /**
- *
- */
- private static class LeftMatrixMultTask implements Callable<Object>
- {
- private ColGroup _group = null;
- private MatrixBlock _vect = null;
- private MatrixBlock _ret = null;
- private int _kuc = 1;
-
- protected LeftMatrixMultTask( ColGroup group, MatrixBlock vect, MatrixBlock ret, int kuc) {
- _group = group;
- _vect = vect;
- _ret = ret;
- _kuc = kuc;
- }
-
- @Override
- public Object call() throws DMLRuntimeException
- {
- // delegate matrix-vector operation to each column group
- if( _group instanceof ColGroupUncompressed && _kuc >1 && ColGroupBitmap.LOW_LEVEL_OPT )
- ((ColGroupUncompressed)_group).leftMultByRowVector(_vect, _ret, _kuc);
- else
- _group.leftMultByRowVector(_vect, _ret);
- return null;
- }
- }
-
- /**
- *
- */
- private static class RightMatrixMultTask implements Callable<Object>
- {
- private ArrayList<ColGroup> _groups = null;
- private MatrixBlock _vect = null;
- private MatrixBlock _ret = null;
- private int _rl = -1;
- private int _ru = -1;
-
- protected RightMatrixMultTask( ArrayList<ColGroup> groups, MatrixBlock vect, MatrixBlock ret, int rl, int ru) {
- _groups = groups;
- _vect = vect;
- _ret = ret;
- _rl = rl;
- _ru = ru;
- }
-
- @Override
- public Object call() throws DMLRuntimeException
- {
- // delegate vector-matrix operation to each column group
- for( ColGroup grp : _groups )
- if( grp instanceof ColGroupUncompressed ) //overwrites output
- grp.rightMultByVector(_vect, _ret, _rl, _ru);
- for( ColGroup grp : _groups )
- if( !(grp instanceof ColGroupUncompressed) ) //adds to output
- grp.rightMultByVector(_vect, _ret, _rl, _ru);
- return null;
- }
- }
-
- private static class MatrixMultTransposeTask implements Callable<Object>
- {
- private ArrayList<ColGroup> _groups = null;
- private MatrixBlock _ret = null;
- private int _cl = -1;
- private int _cu = -1;
-
- protected MatrixMultTransposeTask(ArrayList<ColGroup> groups, MatrixBlock ret, int cl, int cu) {
- _groups = groups;
- _ret = ret;
- _cl = cl;
- _cu = cu;
- }
-
- @Override
- public Object call() throws DMLRuntimeException {
- leftMultByTransposeSelf(_groups, _ret, _cl, _cu);
- return null;
- }
- }
-
- private static class UnaryAggregateTask implements Callable<Object>
- {
- private ArrayList<ColGroup> _groups = null;
- private MatrixBlock _ret = null;
- private AggregateUnaryOperator _op = null;
-
- protected UnaryAggregateTask( ArrayList<ColGroup> groups, MatrixBlock ret, AggregateUnaryOperator op) {
- _groups = groups;
- _op = op;
-
- if( !(_op.indexFn instanceof ReduceRow) ) { //sum/rowSums
- _ret = new MatrixBlock(ret.getNumRows(), ret.getNumColumns(), false);
- _ret.allocateDenseBlock();
- }
- else { //colSums
- _ret = ret;
- }
- }
-
- @Override
- public Object call() throws DMLRuntimeException
- {
- // delegate vector-matrix operation to each column group
- for( ColGroup grp : _groups )
- grp.unaryAggregateOperations(_op, _ret);
- return null;
- }
-
- public MatrixBlock getResult(){
- return _ret;
- }
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.compress;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.sysml.hops.OptimizerUtils;
+import org.apache.sysml.lops.MMTSJ.MMTSJType;
+import org.apache.sysml.lops.MapMultChain.ChainType;
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.compress.ColGroup.CompressionType;
+import org.apache.sysml.runtime.compress.estim.CompressedSizeEstimator;
+import org.apache.sysml.runtime.compress.estim.CompressedSizeInfo;
+import org.apache.sysml.runtime.compress.estim.SizeEstimatorFactory;
+import org.apache.sysml.runtime.compress.utils.ConverterUtils;
+import org.apache.sysml.runtime.compress.utils.LinearAlgebraUtils;
+import org.apache.sysml.runtime.controlprogram.parfor.stat.Timing;
+import org.apache.sysml.runtime.functionobjects.KahanPlus;
+import org.apache.sysml.runtime.functionobjects.KahanPlusSq;
+import org.apache.sysml.runtime.functionobjects.Multiply;
+import org.apache.sysml.runtime.functionobjects.ReduceRow;
+import org.apache.sysml.runtime.instructions.cp.KahanObject;
+import org.apache.sysml.runtime.matrix.data.LibMatrixBincell;
+import org.apache.sysml.runtime.matrix.data.LibMatrixReorg;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
+import org.apache.sysml.runtime.matrix.data.MatrixValue;
+import org.apache.sysml.runtime.matrix.data.SparseBlock;
+import org.apache.sysml.runtime.matrix.operators.AggregateBinaryOperator;
+import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator;
+import org.apache.sysml.runtime.matrix.operators.BinaryOperator;
+import org.apache.sysml.runtime.matrix.operators.ScalarOperator;
+
+/**
+ * Experimental version of MatrixBlock that allows a compressed internal
+ * representation.
+ */
+public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
+{
+ private static final long serialVersionUID = 7319972089143154057L;
+
+ //internal configuration
+ public static final int MAX_NUMBER_COCODING_COLUMNS = 1000;
+ public static final double MIN_COMPRESSION_RATIO = 2.0;
+ public static final double MIN_RLE_RATIO = 1.0; // Minimum additional compression (non-RLE size / RLE size) before we switch to run-length encoding.
+ public static final boolean TRANSPOSE_INPUT = true;
+ public static final boolean MATERIALIZE_ZEROS = false;
+ public static final long MIN_PAR_AGG_THRESHOLD = 16*1024*1024; //16MB
+ public static final boolean INVESTIGATE_ESTIMATES = false;
+ private static final boolean LDEBUG = false; //local debug flag
+
+ private static final Log LOG = LogFactory.getLog(CompressedMatrixBlock.class.getName());
+
+ static{
+ // for internal debugging only
+ if( LDEBUG ) {
+ Logger.getLogger("org.apache.sysml.runtime.compress")
+ .setLevel((Level) Level.DEBUG);
+ }
+ }
+
+ protected ArrayList<ColGroup> _colGroups = null;
+ protected CompressionStatistics _stats = null;
+
+ public CompressedMatrixBlock() {
+ super(-1, -1, true);
+ }
+
+ /**
+ * Main constructor for building a block from scratch.
+ *
+ * @param rl
+ * number of rows in the block
+ * @param cl
+ * number of columns
+ * @param sparse
+ * true if the UNCOMPRESSED representation of the block should be
+ * sparse
+ */
+ public CompressedMatrixBlock(int rl, int cl, boolean sparse) {
+ super(rl, cl, sparse);
+ }
+
+ /**
+ * "Copy" constructor to populate this compressed block with the
+ * uncompressed contents of a conventional block. Does <b>not</b> compress
+ * the block.
+ */
+ public CompressedMatrixBlock(MatrixBlock mb) {
+ super(mb.getNumRows(), mb.getNumColumns(), mb.isInSparseFormat());
+
+ //shallow copy (deep copy on compression, prevents unnecessary copy)
+ if( isInSparseFormat() )
+ sparseBlock = mb.getSparseBlock();
+ else
+ denseBlock = mb.getDenseBlock();
+ nonZeros = mb.getNonZeros();
+ }
+
+ /**
+ *
+ * @return the column groups constructed by the compression process.
+ *
+ */
+ public ArrayList<ColGroup> getColGroups() {
+ return _colGroups;
+ }
+
+ /**
+ * @return true if this block is in compressed form; false if the block has
+ * not yet been compressed
+ */
+ public boolean isCompressed() {
+ return (_colGroups != null);
+ }
+
+ /**
+ *
+ * @return
+ */
+ public boolean isSingleUncompressedGroup(){
+ return (_colGroups!=null && _colGroups.size()==1
+ && _colGroups.get(0) instanceof ColGroupUncompressed);
+ }
+
+ private void allocateColGroupList() {
+ _colGroups = new ArrayList<ColGroup>();
+ }
+
+ @Override
+ public boolean isEmptyBlock(boolean safe) {
+ if( !isCompressed() )
+ return super.isEmptyBlock(safe);
+ return (_colGroups == null || getNonZeros()==0);
+ }
+
+ /**
+ * Compress the contents of this matrix block. After compression, the
+ * uncompressed data is discarded. Attempts to update this block after
+ * calling this method currently result in INCORRECT BEHAVIOR, something
+ * which should be fixed if we move ahead with this compression strategy.
+ *
+ * +per column sparsity
+ */
+ public void compress()
+ throws DMLRuntimeException
+ {
+ //check for redundant compression
+ if( isCompressed() ){
+ throw new DMLRuntimeException("Redundant compression, block already compressed.");
+ }
+
+ Timing time = new Timing(true);
+ _stats = new CompressionStatistics();
+
+ // SAMPLE-BASED DECISIONS:
+ // Decisions such as testing if a column is amenable to bitmap
+ // compression or evaluating co-coding potentionls are made based on a
+ // subset of the rows. For large datasets, sampling might take a
+ // significant amount of time. So, we generate only one sample and use
+ // it for the entire compression process.
+
+ //prepare basic meta data and deep copy / transpose input
+ final int numRows = getNumRows();
+ final int numCols = getNumColumns();
+ final boolean sparse = isInSparseFormat();
+ MatrixBlock rawblock = this;
+ if( TRANSPOSE_INPUT )
+ rawblock = LibMatrixReorg.transpose(rawblock, new MatrixBlock(numCols, numRows, sparse));
+ else
+ rawblock = new MatrixBlock(this);
+
+ //construct sample-based size estimator
+ CompressedSizeEstimator bitmapSizeEstimator =
+ SizeEstimatorFactory.getSizeEstimator(rawblock, numRows);
+
+ //allocate list of column groups
+ allocateColGroupList();
+
+ // The current implementation of this method is written for correctness,
+ // not for performance or for minimal use of temporary space.
+
+ // We start with a full set of columns.
+ HashSet<Integer> remainingCols = new HashSet<Integer>();
+ for (int i = 0; i < numCols; i++) {
+ remainingCols.add(i);
+ }
+
+ // PHASE 1: Classify columns by compression type
+ // We start by determining which columns are amenable to bitmap
+ // compression
+
+ // It is correct to use the dense size as the uncompressed size
+ // FIXME not numRows but nnz / col otherwise too aggressive overestimation
+ // of uncompressed size and hence overestimation of compression potential
+ double uncompressedColumnSize = 8 * numRows;
+
+ // information about the bitmap amenable columns
+ List<Integer> bitmapCols = new ArrayList<Integer>();
+ List<Integer> uncompressedCols = new ArrayList<Integer>();
+ List<Integer> colsCardinalities = new ArrayList<Integer>();
+ List<Long> compressedSizes = new ArrayList<Long>();
+ HashMap<Integer, Double> compressionRatios = new HashMap<Integer, Double>();
+
+ // Minimum ratio (size of uncompressed / size of compressed) that we
+ // will accept when encoding a field with a bitmap.
+ for (int col = 0; col < numCols; col++)
+ {
+ CompressedSizeInfo compressedSizeInfo = bitmapSizeEstimator
+ .estimateCompressedColGroupSize(new int[] { col });
+ long compressedSize = compressedSizeInfo.getMinSize();
+ double compRatio = uncompressedColumnSize / compressedSize;
+
+ //FIXME: compression ratio should be checked against 1 instead of min compression
+ //ratio; I think this threshold was only required because we overestimated the
+ //the uncompressed column size with n\alpha instead of z\alpha
+ if (compRatio >= MIN_COMPRESSION_RATIO) {
+ bitmapCols.add(col);
+ compressionRatios.put(col, compRatio);
+ colsCardinalities.add(compressedSizeInfo.getEstCarinality());
+ compressedSizes.add(compressedSize);
+ }
+ else
+ uncompressedCols.add(col);
+ }
+
+ _stats.timePhase1 = time.stop();
+ if( LOG.isDebugEnabled() )
+ LOG.debug("compression phase 1: "+_stats.timePhase1);
+
+ // Filters for additional types of compression should be inserted here.
+
+ // PHASE 2: Grouping columns
+ // Divide the bitmap columns into column groups.
+ List<int[]> bitmapColGrps = null;
+ if (bitmapCols.size() > MAX_NUMBER_COCODING_COLUMNS) {
+ // Too many columns to compute co-coding groups with current methods.
+ // Generate singleton groups.
+ bitmapColGrps = new ArrayList<int[]>(bitmapCols.size());
+ for (int col : bitmapCols) {
+ bitmapColGrps.add(new int[] { col });
+ }
+ }
+ else {
+ bitmapColGrps = PlanningCoCoder.findCocodesByPartitioning(
+ bitmapSizeEstimator, bitmapCols, colsCardinalities,
+ compressedSizes, numRows, isInSparseFormat() ?
+ OptimizerUtils.getSparsity(numRows, numCols, getNonZeros()): 1);
+ }
+
+ _stats.timePhase2 = time.stop();
+ if( LOG.isDebugEnabled() )
+ LOG.debug("compression phase 2: "+_stats.timePhase2);
+
+ if( INVESTIGATE_ESTIMATES ) {
+ double est = 0;
+ for( int[] groupIndices : bitmapColGrps )
+ est += bitmapSizeEstimator.estimateCompressedColGroupSize(groupIndices).getMinSize();
+ est += uncompressedCols.size() * uncompressedColumnSize;
+ _stats.estSize = est;
+ }
+
+ // PHASE 3: Compress and correct sample-based decisions
+
+ for (int[] groupIndices : bitmapColGrps)
+ {
+ int[] allGroupIndices = null;
+ int allColsCount = groupIndices.length;
+ CompressedSizeInfo bitmapSizeInfo;
+ // The compression type is decided based on a full bitmap since it
+ // will be reused for the actual compression step.
+ UncompressedBitmap ubm;
+ PriorityQueue<CompressedColumn> compRatioPQ = null;
+ boolean skipGroup = false;
+ while (true)
+ {
+ ubm = BitmapEncoder.extractBitmap(groupIndices, rawblock);
+ bitmapSizeInfo = bitmapSizeEstimator
+ .estimateCompressedColGroupSize(ubm);
+ double compRatio = uncompressedColumnSize * groupIndices.length
+ / bitmapSizeInfo.getMinSize();
+ if (compRatio >= MIN_COMPRESSION_RATIO) {
+ // we have a good group
+ for( Integer col : groupIndices )
+ remainingCols.remove(col);
+ break;
+ } else {
+ // modify the group
+ if (compRatioPQ == null) {
+ // first modification
+ allGroupIndices = Arrays.copyOf(groupIndices, groupIndices.length);
+ compRatioPQ = new PriorityQueue<CompressedMatrixBlock.CompressedColumn>();
+ for (int i = 0; i < groupIndices.length; i++)
+ compRatioPQ.add(new CompressedColumn(i,
+ compressionRatios.get(groupIndices[i])));
+ }
+
+ // index in allGroupIndices
+ int removeIx = compRatioPQ.poll().colIx;
+ allGroupIndices[removeIx] = -1;
+ allColsCount--;
+ if (allColsCount == 0) {
+ skipGroup = true;
+ break;
+ }
+ groupIndices = new int[allColsCount];
+ // copying the values that do not equal -1
+ int ix = 0;
+ for (int col : allGroupIndices) {
+ if (col != -1) {
+ groupIndices[ix++] = col;
+ }
+ }
+
+ }
+ }
+
+ if (skipGroup)
+ continue;
+ long rleNumBytes = bitmapSizeInfo.getRLESize();
+ long offsetNumBytes = bitmapSizeInfo.getOLESize();
+ double rleRatio = (double) offsetNumBytes / (double) rleNumBytes;
+
+ if (rleRatio > MIN_RLE_RATIO) {
+ ColGroupRLE compressedGroup = new ColGroupRLE(groupIndices,
+ numRows, ubm);
+ _colGroups.add(compressedGroup);
+ }
+ else {
+ ColGroupOLE compressedGroup = new ColGroupOLE(
+ groupIndices, numRows, ubm);
+ _colGroups.add(compressedGroup);
+ }
+
+ }
+
+ _stats.timePhase3 = time.stop();
+ if( LOG.isDebugEnabled() )
+ LOG.debug("compression phase 3: "+_stats.timePhase3);
+
+ // Phase 4: Cleanup
+ // The remaining columns are stored uncompressed as one big column group
+ if (remainingCols.size() > 0) {
+ ArrayList<Integer> list = new ArrayList<Integer>(remainingCols);
+ ColGroupUncompressed ucgroup = new ColGroupUncompressed(list, rawblock);
+ _colGroups.add(ucgroup);
+ }
+
+ //final cleanup (discard uncompressed block)
+ rawblock.cleanupBlock(true, true);
+ this.cleanupBlock(true, true);
+
+ _stats.timePhase4 = time.stop();
+ if( LOG.isDebugEnabled() )
+ LOG.debug("compression phase 4: "+_stats.timePhase4);
+ }
+
+ /**
+ * @return a new uncompressed matrix block containing the contents of this
+ * block
+ */
+ public MatrixBlock decompress() throws DMLRuntimeException
+ {
+ //early abort for not yet compressed blocks
+ if( !isCompressed() )
+ return new MatrixBlock(this);
+
+ //preallocation sparse rows to avoid repeated reallocations
+ MatrixBlock ret = new MatrixBlock(getNumRows(), getNumColumns(), isInSparseFormat(), getNonZeros());
+ if( ret.isInSparseFormat() ) {
+ int[] rnnz = new int[rlen];
+ for (ColGroup grp : _colGroups)
+ grp.countNonZerosPerRow(rnnz);
+ ret.allocateSparseRowsBlock();
+ SparseBlock rows = ret.getSparseBlock();
+ for( int i=0; i<rlen; i++ )
+ rows.allocate(i, rnnz[i]);
+ }
+
+ //core decompression (append if sparse)
+ for (ColGroup grp : _colGroups)
+ grp.decompressToBlock(ret);
+
+ //post-processing (for append in decompress)
+ if( isInSparseFormat() )
+ ret.sortSparseRows();
+
+ return ret;
+ }
+
+ public CompressionStatistics getCompressionStatistics(){
+ return _stats;
+ }
+
+ /**
+ *
+ * @return an upper bound on the memory used to store this compressed block
+ * considering class overhead.
+ */
+ public long estimateCompressedSizeInMemory() {
+ if (!isCompressed())
+ return 0;
+ // basic data inherited from MatrixBlock
+ long total = MatrixBlock.estimateSizeInMemory(0, 0, 0);
+ // adding the size of colGroups ArrayList overhead
+ // object overhead (32B) + int size (4B) + int modCount (4B) + Object[]
+ // elementData overhead + reference (32+8)B +reference ofr each Object (8B)
+ total += 80 + 8 * _colGroups.size();
+ for (ColGroup grp : _colGroups)
+ total += grp.estimateInMemorySize();
+ return total;
+ }
+
+ private class CompressedColumn implements Comparable<CompressedColumn> {
+ int colIx;
+ double compRatio;
+
+ public CompressedColumn(int colIx, double compRatio) {
+ this.colIx = colIx;
+ this.compRatio = compRatio;
+ }
+
+ @Override
+ public int compareTo(CompressedColumn o) {
+ return (int) Math.signum(compRatio - o.compRatio);
+ }
+ }
+
+ public static class CompressionStatistics {
+ public double timePhase1 = -1;
+ public double timePhase2 = -1;
+ public double timePhase3 = -1;
+ public double timePhase4 = -1;
+ public double estSize = -1;
+
+ public CompressionStatistics() {
+ //do nothing
+ }
+
+ public CompressionStatistics(double t1, double t2, double t3, double t4){
+ timePhase1 = t1;
+ timePhase2 = t2;
+ timePhase3 = t3;
+ timePhase4 = t4;
+ }
+ }
+
+
+ //////////////////////////////////////////
+ // Serialization / Deserialization
+
+ @Override
+ public long getExactSizeOnDisk()
+ {
+ //header information
+ long ret = 12;
+
+ for( ColGroup grp : _colGroups ) {
+ ret += 1; //type info
+ ret += grp.getExactSizeOnDisk();
+ }
+
+ return ret;
+ }
+
+ @Override
+ public void readFields(DataInput in)
+ throws IOException
+ {
+ boolean compressed = in.readBoolean();
+
+ //deserialize uncompressed block
+ if( !compressed ) {
+ super.readFields(in);
+ return;
+ }
+
+ //deserialize compressed block
+ rlen = in.readInt();
+ clen = in.readInt();
+ nonZeros = in.readLong();
+ int ncolGroups = in.readInt();
+
+ _colGroups = new ArrayList<ColGroup>(ncolGroups);
+ for( int i=0; i<ncolGroups; i++ )
+ {
+ CompressionType ctype = CompressionType.values()[in.readByte()];
+ ColGroup grp = null;
+
+ //create instance of column group
+ switch( ctype ) {
+ case UNCOMPRESSED:
+ grp = new ColGroupUncompressed(); break;
+ case OLE_BITMAP:
+ grp = new ColGroupOLE(); break;
+ case RLE_BITMAP:
+ grp = new ColGroupRLE(); break;
+ }
+
+ //deserialize and add column group
+ grp.readFields(in);
+ _colGroups.add(grp);
+ }
+ }
+
+ @Override
+ public void write(DataOutput out)
+ throws IOException
+ {
+ out.writeBoolean( isCompressed() );
+
+ //serialize uncompressed block
+ if( !isCompressed() ) {
+ super.write(out);
+ return;
+ }
+
+ //serialize compressed matrix block
+ out.writeInt(rlen);
+ out.writeInt(clen);
+ out.writeLong(nonZeros);
+ out.writeInt(_colGroups.size());
+
+ for( ColGroup grp : _colGroups ) {
+ out.writeByte( grp.getCompType().ordinal() );
+ grp.write(out); //delegate serialization
+ }
+ }
+
+
+ /**
+ * Redirects the default java serialization via externalizable to our default
+ * hadoop writable serialization for efficient broadcast/rdd deserialization.
+ *
+ * @param is
+ * @throws IOException
+ */
+ @Override
+ public void readExternal(ObjectInput is)
+ throws IOException
+ {
+ readFields(is);
+ }
+
+ /**
+ * Redirects the default java serialization via externalizable to our default
+ * hadoop writable serialization for efficient broadcast/rdd serialization.
+ *
+ * @param is
+ * @throws IOException
+ */
+ @Override
+ public void writeExternal(ObjectOutput os)
+ throws IOException
+ {
+ write(os);
+ }
+
+
+ //////////////////////////////////////////
+ // Operations (overwrite existing ops for seamless integration)
+
+ @Override
+ public MatrixValue scalarOperations(ScalarOperator sop, MatrixValue result)
+ throws DMLRuntimeException
+ {
+ //call uncompressed matrix scalar if necessary
+ if( !isCompressed() ) {
+ return super.scalarOperations(sop, result);
+ }
+
+ //allocate the output matrix block
+ CompressedMatrixBlock ret = null;
+ if( result==null || !(result instanceof CompressedMatrixBlock) )
+ ret = new CompressedMatrixBlock(getNumRows(), getNumColumns(), sparse);
+ else {
+ ret = (CompressedMatrixBlock) result;
+ ret.reset(rlen, clen);
+ }
+
+ // Apply the operation recursively to each of the column groups.
+ // Most implementations will only modify metadata.
+ ArrayList<ColGroup> newColGroups = new ArrayList<ColGroup>();
+ for (ColGroup grp : _colGroups) {
+ newColGroups.add(grp.scalarOperation(sop));
+ }
+ ret._colGroups = newColGroups;
+ ret.setNonZeros(rlen*clen);
+
+ return ret;
+ }
+
+ @Override
+ public MatrixBlock appendOperations(MatrixBlock that, MatrixBlock ret)
+ throws DMLRuntimeException
+ {
+ //call uncompressed matrix append if necessary
+ if( !isCompressed() ) {
+ if( that instanceof CompressedMatrixBlock )
+ that = ((CompressedMatrixBlock) that).decompress();
+ return super.appendOperations(that, ret);
+ }
+
+ final int m = rlen;
+ final int n = clen+that.getNumColumns();
+ final long nnz = nonZeros+that.getNonZeros();
+
+ //init result matrix
+ CompressedMatrixBlock ret2 = null;
+ if( ret == null || !(ret instanceof CompressedMatrixBlock) ) {
+ ret2 = new CompressedMatrixBlock(m, n, isInSparseFormat());
+ }
+ else {
+ ret2 = (CompressedMatrixBlock) ret;
+ ret2.reset(m, n);
+ }
+
+ //shallow copy of lhs column groups
+ ret2.allocateColGroupList();
+ ret2._colGroups.addAll(_colGroups);
+
+ //copy of rhs column groups w/ col index shifting
+ if( !(that instanceof CompressedMatrixBlock) ) {
+ that = new CompressedMatrixBlock(that);
+ ((CompressedMatrixBlock)that).compress();
+ }
+ ArrayList<ColGroup> inColGroups = ((CompressedMatrixBlock) that)._colGroups;
+ for( ColGroup group : inColGroups ) {
+ ColGroup tmp = ConverterUtils.copyColGroup(group);
+ tmp.shiftColIndices(clen);
+ ret2._colGroups.add(tmp);
+ }
+
+ //meta data maintenance
+ ret2.setNonZeros(nnz);
+ return ret2;
+ }
+
+ @Override
+ public MatrixBlock chainMatrixMultOperations(MatrixBlock v, MatrixBlock w, MatrixBlock out, ChainType ctype)
+ throws DMLRuntimeException
+ {
+ //call uncompressed matrix mult if necessary
+ if( !isCompressed() ) {
+ return super.chainMatrixMultOperations(v, w, out, ctype);
+ }
+
+ //single-threaded mmchain of single uncompressed colgroup
+ if( isSingleUncompressedGroup() ){
+ return ((ColGroupUncompressed)_colGroups.get(0))
+ .getData().chainMatrixMultOperations(v, w, out, ctype);
+ }
+
+ //Timing time = new Timing(true);
+
+ //prepare result
+ if( out != null )
+ out.reset(clen, 1, false);
+ else
+ out = new MatrixBlock(clen, 1, false);
+
+ //empty block handling
+ if( isEmptyBlock(false) )
+ return out;
+
+ //compute matrix mult
+ MatrixBlock tmp = new MatrixBlock(rlen, 1, false);
+ rightMultByVector(v, tmp);
+ if( ctype == ChainType.XtwXv ) {
+ BinaryOperator bop = new BinaryOperator(Multiply.getMultiplyFnObject());
+ LibMatrixBincell.bincellOpInPlace(tmp, w, bop);
+ }
+ leftMultByVectorTranspose(_colGroups, tmp, out, true);
+
+ //System.out.println("Compressed MMChain in "+time.stop());
+
+ return out;
+ }
+
+ @Override
+ public MatrixBlock chainMatrixMultOperations(MatrixBlock v, MatrixBlock w, MatrixBlock out, ChainType ctype, int k)
+ throws DMLRuntimeException
+ {
+ //call uncompressed matrix mult if necessary
+ if( !isCompressed() ){
+ return super.chainMatrixMultOperations(v, w, out, ctype, k);
+ }
+
+ //multi-threaded mmchain of single uncompressed colgroup
+ if( isSingleUncompressedGroup() ){
+ return ((ColGroupUncompressed)_colGroups.get(0))
+ .getData().chainMatrixMultOperations(v, w, out, ctype, k);
+ }
+
+ Timing time = LOG.isDebugEnabled() ? new Timing(true) : null;
+
+ //prepare result
+ if( out != null )
+ out.reset(clen, 1, false);
+ else
+ out = new MatrixBlock(clen, 1, false);
+
+ //empty block handling
+ if( isEmptyBlock(false) )
+ return out;
+
+ //compute matrix mult
+ MatrixBlock tmp = new MatrixBlock(rlen, 1, false);
+ rightMultByVector(v, tmp, k);
+ if( ctype == ChainType.XtwXv ) {
+ BinaryOperator bop = new BinaryOperator(Multiply.getMultiplyFnObject());
+ LibMatrixBincell.bincellOpInPlace(tmp, w, bop);
+ }
+ leftMultByVectorTranspose(_colGroups, tmp, out, true, k);
+
+ if( LOG.isDebugEnabled() )
+ LOG.debug("Compressed MMChain k="+k+" in "+time.stop());
+
+ return out;
+ }
+
+ @Override
+ public MatrixValue aggregateBinaryOperations(MatrixValue mv1, MatrixValue mv2, MatrixValue result, AggregateBinaryOperator op)
+ throws DMLRuntimeException
+ {
+ //call uncompressed matrix mult if necessary
+ if( !isCompressed() ) {
+ return super.aggregateBinaryOperations(mv1, mv2, result, op);
+ }
+
+ //multi-threaded mm of single uncompressed colgroup
+ if( isSingleUncompressedGroup() ){
+ MatrixBlock tmp = ((ColGroupUncompressed)_colGroups.get(0)).getData();
+ return tmp.aggregateBinaryOperations(this==mv1?tmp:mv1, this==mv2?tmp:mv2, result, op);
+ }
+
+ Timing time = LOG.isDebugEnabled() ? new Timing(true) : null;
+
+ //setup meta data (dimensions, sparsity)
+ int rl = mv1.getNumRows();
+ int cl = mv2.getNumColumns();
+
+ //create output matrix block
+ MatrixBlock ret = (MatrixBlock) result;
+ if( ret==null )
+ ret = new MatrixBlock(rl, cl, false, rl*cl);
+ else
+ ret.reset(rl, cl, false, rl*cl);
+
+ //compute matrix mult
+ if( mv1.getNumRows()>1 && mv2.getNumColumns()==1 ) { //MV right
+ CompressedMatrixBlock cmb = (CompressedMatrixBlock)mv1;
+ MatrixBlock mb = (MatrixBlock) mv2;
+ if( op.getNumThreads()>1 )
+ cmb.rightMultByVector(mb, ret, op.getNumThreads());
+ else
+ cmb.rightMultByVector(mb, ret);
+ }
+ else if( mv1.getNumRows()==1 && mv2.getNumColumns()>1 ) { //MV left
+ MatrixBlock mb = (MatrixBlock) mv1;
+ if( op.getNumThreads()>1 )
+ leftMultByVectorTranspose(_colGroups, mb, ret, false, op.getNumThreads());
+ else
+ leftMultByVectorTranspose(_colGroups, mb, ret, false);
+ }
+ else {
+ //NOTE: we could decompress and invoke super.aggregateBinary but for now
+ //we want to have an eager fail if this happens
+ throw new DMLRuntimeException("Unsupported matrix-matrix multiplication over compressed matrix block.");
+ }
+
+ if( LOG.isDebugEnabled() )
+ LOG.debug("Compressed MM in "+time.stop());
+
+ return ret;
+ }
+
+ @Override
+ public MatrixValue aggregateUnaryOperations(AggregateUnaryOperator op, MatrixValue result,
+ int blockingFactorRow, int blockingFactorCol, MatrixIndexes indexesIn, boolean inCP)
+ throws DMLRuntimeException
+ {
+ //call uncompressed matrix mult if necessary
+ if( !isCompressed() ) {
+ return super.aggregateUnaryOperations(op, result, blockingFactorRow, blockingFactorCol, indexesIn, inCP);
+ }
+
+ //check for supported operations
+ if( !(op.aggOp.increOp.fn instanceof KahanPlus || op.aggOp.increOp.fn instanceof KahanPlusSq) ){
+ throw new DMLRuntimeException("Unary aggregates other than sums not supported yet.");
+ }
+
+ //prepare output dimensions
+ CellIndex tempCellIndex = new CellIndex(-1,-1);
+ op.indexFn.computeDimension(rlen, clen, tempCellIndex);
+ if(op.aggOp.correctionExists) {
+ switch(op.aggOp.correctionLocation)
+ {
+ case LASTROW: tempCellIndex.row++; break;
+ case LASTCOLUMN: tempCellIndex.column++; break;
+ case LASTTWOROWS: tempCellIndex.row+=2; break;
+ case LASTTWOCOLUMNS: tempCellIndex.column+=2; break;
+ default:
+ throw new DMLRuntimeException("unrecognized correctionLocation: "+op.aggOp.correctionLocation);
+ }
+ }
+
+ //prepare output
+ if(result==null)
+ result=new MatrixBlock(tempCellIndex.row, tempCellIndex.column, false);
+ else
+ result.reset(tempCellIndex.row, tempCellIndex.column, false);
+
+ MatrixBlock ret = (MatrixBlock) result;
+
+ //core unary aggregate
+ if( op.getNumThreads() > 1
+ && getExactSizeOnDisk() > MIN_PAR_AGG_THRESHOLD )
+ {
+ // initialize and allocate the result
+ ret.allocateDenseBlock();
+
+ //multi-threaded execution of all groups
+ ArrayList<ColGroup>[] grpParts = createStaticTaskPartitioning(op.getNumThreads(), false);
+ ColGroupUncompressed uc = getUncompressedColGroup();
+ try {
+ //compute all compressed column groups
+ ExecutorService pool = Executors.newFixedThreadPool( op.getNumThreads() );
+ ArrayList<UnaryAggregateTask> tasks = new ArrayList<UnaryAggregateTask>();
+ for( ArrayList<ColGroup> grp : grpParts )
+ tasks.add(new UnaryAggregateTask(grp, ret, op));
+ pool.invokeAll(tasks);
+ pool.shutdown();
+ //compute uncompressed column group in parallel (otherwise bottleneck)
+ if( uc != null )
+ ret = (MatrixBlock)uc.getData().aggregateUnaryOperations(op, ret, blockingFactorRow, blockingFactorCol, indexesIn, false);
+ //aggregate partial results
+ if( !(op.indexFn instanceof ReduceRow) ){
+ KahanObject kbuff = new KahanObject(0,0);
+ KahanPlus kplus = KahanPlus.getKahanPlusFnObject();
+ for( int i=0; i<ret.getNumRows(); i++ ) {
+ kbuff.set(ret.quickGetValue(i, 0), ret.quickGetValue(i, 0));
+ for( UnaryAggregateTask task : tasks )
+ kplus.execute2(kbuff, task.getResult().quickGetValue(i, 0));
+ ret.quickSetValue(i, 0, kbuff._sum);
+ ret.quickSetValue(i, 1, kbuff._correction);
+ }
+ }
+ }
+ catch(Exception ex) {
+ throw new DMLRuntimeException(ex);
+ }
+ }
+ else {
+ for (ColGroup grp : _colGroups) {
+ grp.unaryAggregateOperations(op, ret);
+ }
+ }
+
+ //drop correction if necessary
+ if(op.aggOp.correctionExists && inCP)
+ ret.dropLastRowsOrColums(op.aggOp.correctionLocation);
+
+ //post-processing
+ ret.recomputeNonZeros();
+
+ return ret;
+ }
+
+ @Override
+ public MatrixBlock transposeSelfMatrixMultOperations(MatrixBlock out, MMTSJType tstype)
+ throws DMLRuntimeException
+ {
+ //call uncompressed matrix mult if necessary
+ if( !isCompressed() ) {
+ return super.transposeSelfMatrixMultOperations(out, tstype);
+ }
+
+ //single-threaded tsmm of single uncompressed colgroup
+ if( isSingleUncompressedGroup() ){
+ return ((ColGroupUncompressed)_colGroups.get(0))
+ .getData().transposeSelfMatrixMultOperations(out, tstype);
+ }
+
+ Timing time = LOG.isDebugEnabled() ? new Timing(true) : null;
+
+ //check for transpose type
+ if( tstype != MMTSJType.LEFT ) //right not supported yet
+ throw new DMLRuntimeException("Invalid MMTSJ type '"+tstype.toString()+"'.");
+
+ //create output matrix block
+ if( out == null )
+ out = new MatrixBlock(clen, clen, false);
+ else
+ out.reset(clen, clen, false);
+ out.allocateDenseBlock();
+
+ if( !isEmptyBlock(false) ) {
+ //compute matrix mult
+ leftMultByTransposeSelf(_colGroups, out, 0, _colGroups.size());
+
+ // post-processing
+ out.recomputeNonZeros();
+ }
+
+ if( LOG.isDebugEnabled() )
+ LOG.debug("Compressed TSMM in "+time.stop());
+
+ return out;
+ }
+
+
+ @Override
+ public MatrixBlock transposeSelfMatrixMultOperations(MatrixBlock out, MMTSJType tstype, int k)
+ throws DMLRuntimeException
+ {
+ //call uncompressed matrix mult if necessary
+ if( !isCompressed() ){
+ return super.transposeSelfMatrixMultOperations(out, tstype, k);
+ }
+
+ //multi-threaded tsmm of single uncompressed colgroup
+ if( isSingleUncompressedGroup() ){
+ return ((ColGroupUncompressed)_colGroups.get(0))
+ .getData().transposeSelfMatrixMultOperations(out, tstype, k);
+ }
+
+ Timing time = LOG.isDebugEnabled() ? new Timing(true) : null;
+
+ //check for transpose type
+ if( tstype != MMTSJType.LEFT ) //right not supported yet
+ throw new DMLRuntimeException("Invalid MMTSJ type '"+tstype.toString()+"'.");
+
+ //create output matrix block
+ if( out == null )
+ out = new MatrixBlock(clen, clen, false);
+ else
+ out.reset(clen, clen, false);
+ out.allocateDenseBlock();
+
+ if( !isEmptyBlock(false) ) {
+ //compute matrix mult
+ try {
+ ExecutorService pool = Executors.newFixedThreadPool( k );
+ ArrayList<MatrixMultTransposeTask> tasks = new ArrayList<MatrixMultTransposeTask>();
+ int blklen = (int)(Math.ceil((double)clen/(2*k)));
+ for( int i=0; i<2*k & i*blklen<clen; i++ )
+ tasks.add(new MatrixMultTransposeTask(_colGroups, out, i*blklen, Math.min((i+1)*blklen, clen)));
+ List<Future<Object>> ret = pool.invokeAll(tasks);
+ for( Future<Object> tret : ret )
+ tret.get(); //check for errors
+ pool.shutdown();
+ }
+ catch(Exception ex) {
+ throw new DMLRuntimeException(ex);
+ }
+
+ // post-processing
+ out.recomputeNonZeros();
+ }
+
+ if( LOG.isDebugEnabled() )
+ LOG.debug("Compressed TSMM k="+k+" in "+time.stop());
+
+ return out;
+ }
+
+
+ /**
+ * Multiply this matrix block by a column vector on the right.
+ *
+ * @param vector
+ * right-hand operand of the multiplication
+ * @param result
+ * buffer to hold the result; must have the appropriate size
+ * already
+ */
+ private void rightMultByVector(MatrixBlock vector, MatrixBlock result)
+ throws DMLRuntimeException
+ {
+ // initialize and allocate the result
+ result.allocateDenseBlock();
+
+ // delegate matrix-vector operation to each column group
+ for( ColGroup grp : _colGroups )
+ if( grp instanceof ColGroupUncompressed ) //overwrites output
+ grp.rightMultByVector(vector, result, 0, result.getNumRows());
+ for( ColGroup grp : _colGroups )
+ if( !(grp instanceof ColGroupUncompressed) ) //adds to output
+ grp.rightMultByVector(vector, result, 0, result.getNumRows());
+
+ // post-processing
+ result.recomputeNonZeros();
+ }
+
+ /**
+ * Multi-threaded version of rightMultByVector.
+ *
+ * @param vector
+ * @param result
+ * @param k
+ * @throws DMLRuntimeException
+ */
+ private void rightMultByVector(MatrixBlock vector, MatrixBlock result, int k)
+ throws DMLRuntimeException
+ {
+ // initialize and allocate the result
+ result.allocateDenseBlock();
+
+ //multi-threaded execution of all groups
+ try {
+ ExecutorService pool = Executors.newFixedThreadPool( k );
+ int rlen = getNumRows();
+ int seqsz = BitmapEncoder.BITMAP_BLOCK_SZ;
+ int blklen = (int)(Math.ceil((double)rlen/k));
+ blklen += (blklen%seqsz != 0)?seqsz-blklen%seqsz:0;
+ ArrayList<RightMatrixMultTask> tasks = new ArrayList<RightMatrixMultTask>();
+ for( int i=0; i<k & i*blklen<getNumRows(); i++ )
+ tasks.add(new RightMatrixMultTask(_colGroups, vector, result, i*blklen, Math.min((i+1)*blklen,rlen)));
+ pool.invokeAll(tasks);
+ pool.shutdown();
+ }
+ catch(Exception ex) {
+ throw new DMLRuntimeException(ex);
+ }
+
+ // post-processing
+ result.recomputeNonZeros();
+ }
+
+ /**
+ * Multiply this matrix block by the transpose of a column vector (i.e.
+ * t(v)%*%X)
+ *
+ * @param vector
+ * left-hand operand of the multiplication
+ * @param result
+ * buffer to hold the result; must have the appropriate size
+ * already
+ */
+ private static void leftMultByVectorTranspose(List<ColGroup> colGroups, MatrixBlock vector, MatrixBlock result, boolean doTranspose)
+ throws DMLRuntimeException
+ {
+ //transpose vector if required
+ MatrixBlock rowVector = vector;
+ if (doTranspose) {
+ rowVector = new MatrixBlock(1, vector.getNumRows(), false);
+ LibMatrixReorg.transpose(vector, rowVector);
+ }
+
+ // initialize and allocate the result
+ result.reset();
+ result.allocateDenseBlock();
+
+ // delegate matrix-vector operation to each column group
+ for (ColGroup grp : colGroups) {
+ grp.leftMultByRowVector(rowVector, result);
+ }
+
+ // post-processing
+ result.recomputeNonZeros();
+ }
+
+ /**
+ * Multi-thread version of leftMultByVectorTranspose.
+ *
+ * @param vector
+ * @param result
+ * @param doTranspose
+ * @param k
+ * @throws DMLRuntimeException
+ */
+ private static void leftMultByVectorTranspose(List<ColGroup> colGroups,MatrixBlock vector, MatrixBlock result, boolean doTranspose, int k)
+ throws DMLRuntimeException
+ {
+ int kuc = Math.max(1, k - colGroups.size() + 1);
+
+ //transpose vector if required
+ MatrixBlock rowVector = vector;
+ if (doTranspose) {
+ rowVector = new MatrixBlock(1, vector.getNumRows(), false);
+ LibMatrixReorg.transpose(vector, rowVector);
+ }
+
+ // initialize and allocate the result
+ result.reset();
+ result.allocateDenseBlock();
+
+ //multi-threaded execution
+ try {
+ ExecutorService pool = Executors.newFixedThreadPool( Math.min(colGroups.size(), k) );
+ ArrayList<LeftMatrixMultTask> tasks = new ArrayList<LeftMatrixMultTask>();
+ for( ColGroup grp : colGroups )
+ tasks.add(new LeftMatrixMultTask(grp, rowVector, result, kuc));
+ pool.invokeAll(tasks);
+ pool.shutdown();
+ }
+ catch(Exception ex) {
+ throw new DMLRuntimeException(ex);
+ }
+
+ // post-processing
+ result.recomputeNonZeros();
+ }
+
+ /**
+ *
+ * @param result
+ * @throws DMLRuntimeException
+ */
+ private static void leftMultByTransposeSelf(ArrayList<ColGroup> groups, MatrixBlock result, int cl, int cu)
+ throws DMLRuntimeException
+ {
+ final int numRows = groups.get(0).getNumRows();
+ final int numGroups = groups.size();
+
+ //preallocated dense matrix block
+ MatrixBlock lhs = new MatrixBlock(numRows, 1, false);
+ lhs.allocateDenseBlock();
+
+ //approach: for each colgroup, extract uncompressed columns one at-a-time
+ //vector-matrix multiplies against remaining col groups
+ for( int i=cl; i<cu; i++ )
+ {
+ //get current group and relevant col groups
+ ColGroup group = groups.get(i);
+ int[] ixgroup = group.getColIndices();
+ List<ColGroup> tmpList = groups.subList(i, numGroups);
+
+ //for all uncompressed lhs columns vectors
+ for( int j=0; j<ixgroup.length; j++ ) {
+ //decompress single column
+ lhs.reset(numRows, 1, false);
+ group.decompressToBlock(lhs, j);
+
+ if( !lhs.isEmptyBlock(false) ) {
+ //compute vector-matrix partial result
+ MatrixBlock tmpret = new MatrixBlock(1,result.getNumColumns(),false);
+ leftMultByVectorTranspose(tmpList, lhs, tmpret, true);
+
+ //write partial results (disjoint non-zeros)
+ LinearAlgebraUtils.copyNonZerosToRowCol(result, tmpret, ixgroup[j]);
+ }
+ }
+ }
+ }
+
+ /**
+ *
+ * @param k
+ * @return
+ */
+ @SuppressWarnings("unchecked")
+ private ArrayList<ColGroup>[] createStaticTaskPartitioning(int k, boolean inclUncompressed)
+ {
+ // special case: single uncompressed col group
+ if( _colGroups.size()==1 && _colGroups.get(0) instanceof ColGroupUncompressed ){
+ return new ArrayList[0];
+ }
+
+ // initialize round robin col group distribution
+ // (static task partitioning to reduce mem requirements/final agg)
+ int numTasks = Math.min(k, _colGroups.size());
+ ArrayList<ColGroup>[] grpParts = new ArrayList[numTasks];
+ int pos = 0;
+ for( ColGroup grp : _colGroups ){
+ if( grpParts[pos]==null )
+ grpParts[pos] = new ArrayList<ColGroup>();
+ if( inclUncompressed || !(grp instanceof ColGroupUncompressed) ) {
+ grpParts[pos].add(grp);
+ pos = (pos==numTasks-1) ? 0 : pos+1;
+ }
+ }
+
+ return grpParts;
+ }
+
+ /**
+ *
+ * @return
+ */
+ private ColGroupUncompressed getUncompressedColGroup()
+ {
+ for( ColGroup grp : _colGroups )
+ if( grp instanceof ColGroupUncompressed )
+ return (ColGroupUncompressed)grp;
+
+ return null;
+ }
+
+ /**
+ *
+ */
+ private static class LeftMatrixMultTask implements Callable<Object>
+ {
+ private ColGroup _group = null;
+ private MatrixBlock _vect = null;
+ private MatrixBlock _ret = null;
+ private int _kuc = 1;
+
+ protected LeftMatrixMultTask( ColGroup group, MatrixBlock vect, MatrixBlock ret, int kuc) {
+ _group = group;
+ _vect = vect;
+ _ret = ret;
+ _kuc = kuc;
+ }
+
+ @Override
+ public Object call() throws DMLRuntimeException
+ {
+ // delegate matrix-vector operation to each column group
+ if( _group instanceof ColGroupUncompressed && _kuc >1 && ColGroupBitmap.LOW_LEVEL_OPT )
+ ((ColGroupUncompressed)_group).leftMultByRowVector(_vect, _ret, _kuc);
+ else
+ _group.leftMultByRowVector(_vect, _ret);
+ return null;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class RightMatrixMultTask implements Callable<Object>
+ {
+ private ArrayList<ColGroup> _groups = null;
+ private MatrixBlock _vect = null;
+ private MatrixBlock _ret = null;
+ private int _rl = -1;
+ private int _ru = -1;
+
+ protected RightMatrixMultTask( ArrayList<ColGroup> groups, MatrixBlock vect, MatrixBlock ret, int rl, int ru) {
+ _groups = groups;
+ _vect = vect;
+ _ret = ret;
+ _rl = rl;
+ _ru = ru;
+ }
+
+ @Override
+ public Object call() throws DMLRuntimeException
+ {
+ // delegate vector-matrix operation to each column group
+ for( ColGroup grp : _groups )
+ if( grp instanceof ColGroupUncompressed ) //overwrites output
+ grp.rightMultByVector(_vect, _ret, _rl, _ru);
+ for( ColGroup grp : _groups )
+ if( !(grp instanceof ColGroupUncompressed) ) //adds to output
+ grp.rightMultByVector(_vect, _ret, _rl, _ru);
+ return null;
+ }
+ }
+
+ private static class MatrixMultTransposeTask implements Callable<Object>
+ {
+ private ArrayList<ColGroup> _groups = null;
+ private MatrixBlock _ret = null;
+ private int _cl = -1;
+ private int _cu = -1;
+
+ protected MatrixMultTransposeTask(ArrayList<ColGroup> groups, MatrixBlock ret, int cl, int cu) {
+ _groups = groups;
+ _ret = ret;
+ _cl = cl;
+ _cu = cu;
+ }
+
+ @Override
+ public Object call() throws DMLRuntimeException {
+ leftMultByTransposeSelf(_groups, _ret, _cl, _cu);
+ return null;
+ }
+ }
+
+ private static class UnaryAggregateTask implements Callable<Object>
+ {
+ private ArrayList<ColGroup> _groups = null;
+ private MatrixBlock _ret = null;
+ private AggregateUnaryOperator _op = null;
+
+ protected UnaryAggregateTask( ArrayList<ColGroup> groups, MatrixBlock ret, AggregateUnaryOperator op) {
+ _groups = groups;
+ _op = op;
+
+ if( !(_op.indexFn instanceof ReduceRow) ) { //sum/rowSums
+ _ret = new MatrixBlock(ret.getNumRows(), ret.getNumColumns(), false);
+ _ret.allocateDenseBlock();
+ }
+ else { //colSums
+ _ret = ret;
+ }
+ }
+
+ @Override
+ public Object call() throws DMLRuntimeException
+ {
+ // delegate vector-matrix operation to each column group
+ for( ColGroup grp : _groups )
+ grp.unaryAggregateOperations(_op, _ret);
+ return null;
+ }
+
+ public MatrixBlock getResult(){
+ return _ret;
+ }
+ }
+}