You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2016/07/17 00:23:30 UTC
[5/6] incubator-systemml git commit: [SYSTEMML-810] New compressed
matrix blocks and operations, tests
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/16e7b1c8/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java
new file mode 100644
index 0000000..39bc162
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java
@@ -0,0 +1,617 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.compress;
+
+import java.util.Arrays;
+import java.util.Iterator;
+
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.compress.utils.ConverterUtils;
+import org.apache.sysml.runtime.compress.utils.LinearAlgebraUtils;
+import org.apache.sysml.runtime.functionobjects.KahanFunction;
+import org.apache.sysml.runtime.functionobjects.KahanPlus;
+import org.apache.sysml.runtime.functionobjects.KahanPlusSq;
+import org.apache.sysml.runtime.functionobjects.ReduceAll;
+import org.apache.sysml.runtime.functionobjects.ReduceCol;
+import org.apache.sysml.runtime.functionobjects.ReduceRow;
+import org.apache.sysml.runtime.instructions.cp.KahanObject;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator;
+import org.apache.sysml.runtime.matrix.operators.ScalarOperator;
+
+
+/** A group of columns compressed with a single run-length encoded bitmap. */
+public class ColGroupRLE extends ColGroupBitmap
+{
+ private static final long serialVersionUID = 7450232907594748177L;
+
+ public ColGroupRLE() {
+ super(CompressionType.RLE_BITMAP);
+ }
+
+ /**
+ * Main constructor. Constructs and stores the necessary bitmaps.
+ *
+ * @param colIndices
+ * indices (within the block) of the columns included in this
+ * column
+ * @param numRows
+ * total number of rows in the parent block
+ * @param ubm
+ * Uncompressed bitmap representation of the block
+ */
+ public ColGroupRLE(int[] colIndices, int numRows, UncompressedBitmap ubm)
+ {
+ super(CompressionType.RLE_BITMAP, colIndices, numRows, ubm);
+
+ // compress the bitmaps
+ final int numVals = ubm.getNumValues();
+ char[][] lbitmaps = new char[numVals][];
+ int totalLen = 0;
+ for( int i=0; i<numVals; i++ ) {
+ lbitmaps[i] = BitmapEncoder.genRLEBitmap(ubm.getOffsetsList(i));
+ totalLen += lbitmaps[i].length;
+ }
+
+ // compact bitmaps to linearized representation
+ createCompressedBitmaps(numVals, totalLen, lbitmaps);
+ }
+
+ /**
+ * Constructor for internal use.
+ */
+ public ColGroupRLE(int[] colIndices, int numRows, double[] values, char[] bitmaps, int[] bitmapOffs) {
+ super(CompressionType.RLE_BITMAP, colIndices, numRows, values);
+ _data = bitmaps;
+ _ptr = bitmapOffs;
+ }
+
+ @Override
+ public Iterator<Integer> getDecodeIterator(int bmpIx) {
+ return new BitmapDecoderRLE(_data, _ptr[bmpIx], len(bmpIx));
+ }
+
+ @Override
+ public void decompressToBlock(MatrixBlock target)
+ {
+ if( LOW_LEVEL_OPT && getNumValues() > 1 )
+ {
+ final int blksz = 128 * 1024;
+ final int numCols = getNumCols();
+ final int numVals = getNumValues();
+ final int n = getNumRows();
+
+ //position and start offset arrays
+ int[] apos = new int[numVals];
+ int[] astart = new int[numVals];
+
+ //cache conscious append via horizontal scans
+ for( int bi=0; bi<n; bi+=blksz ) {
+ int bimax = Math.min(bi+blksz, n);
+ for (int k=0, off=0; k < numVals; k++, off+=numCols) {
+ int bitmapOff = _ptr[k];
+ int bitmapLen = len(k);
+ int bufIx = apos[k];
+ int start = astart[k];
+ for( ; bufIx<bitmapLen & start<bimax; bufIx+=2) {
+ start += _data[bitmapOff + bufIx];
+ int len = _data[bitmapOff + bufIx+1];
+ for( int i=start; i<start+len; i++ )
+ for( int j=0; j<numCols; j++ )
+ if( _values[off+j]!=0 )
+ target.appendValue(i, _colIndexes[j], _values[off+j]);
+ start += len;
+ }
+ apos[k] = bufIx;
+ astart[k] = start;
+ }
+ }
+ }
+ else
+ {
+ //call generic decompression with decoder
+ super.decompressToBlock(target);
+ }
+ }
+
+ @Override
+ public void decompressToBlock(MatrixBlock target, int[] colixTargets)
+ {
+ if( LOW_LEVEL_OPT && getNumValues() > 1 )
+ {
+ final int blksz = 128 * 1024;
+ final int numCols = getNumCols();
+ final int numVals = getNumValues();
+ final int n = getNumRows();
+
+ //position and start offset arrays
+ int[] apos = new int[numVals];
+ int[] astart = new int[numVals];
+ int[] cix = new int[numCols];
+
+ //prepare target col indexes
+ for( int j=0; j<numCols; j++ )
+ cix[j] = colixTargets[_colIndexes[j]];
+
+ //cache conscious append via horizontal scans
+ for( int bi=0; bi<n; bi+=blksz ) {
+ int bimax = Math.min(bi+blksz, n);
+ for (int k=0, off=0; k < numVals; k++, off+=numCols) {
+ int bitmapOff = _ptr[k];
+ int bitmapLen = len(k);
+ int bufIx = apos[k];
+ if( bufIx >= bitmapLen )
+ continue;
+ int start = astart[k];
+ for( ; bufIx<bitmapLen & start<bimax; bufIx+=2) {
+ start += _data[bitmapOff + bufIx];
+ int len = _data[bitmapOff + bufIx+1];
+ for( int i=start; i<start+len; i++ )
+ for( int j=0; j<numCols; j++ )
+ if( _values[off+j]!=0 )
+ target.appendValue(i, cix[j], _values[off+j]);
+ start += len;
+ }
+ apos[k] = bufIx;
+ astart[k] = start;
+ }
+ }
+ }
+ else
+ {
+ //call generic decompression with decoder
+ super.decompressToBlock(target, colixTargets);
+ }
+ }
+
+ @Override
+ public void decompressToBlock(MatrixBlock target, int colpos)
+ {
+ if( LOW_LEVEL_OPT && getNumValues() > 1 )
+ {
+ final int blksz = 128 * 1024;
+ final int numCols = getNumCols();
+ final int numVals = getNumValues();
+ final int n = getNumRows();
+ double[] c = target.getDenseBlock();
+
+ //position and start offset arrays
+ int[] apos = new int[numVals];
+ int[] astart = new int[numVals];
+
+ //cache conscious append via horizontal scans
+ for( int bi=0; bi<n; bi+=blksz ) {
+ int bimax = Math.min(bi+blksz, n);
+ for (int k=0, off=0; k < numVals; k++, off+=numCols) {
+ int bitmapOff = _ptr[k];
+ int bitmapLen = len(k);
+ int bufIx = apos[k];
+ if( bufIx >= bitmapLen )
+ continue;
+ int start = astart[k];
+ for( ; bufIx<bitmapLen & start<bimax; bufIx+=2) {
+ start += _data[bitmapOff + bufIx];
+ int len = _data[bitmapOff + bufIx+1];
+ for( int i=start; i<start+len; i++ )
+ c[i] = _values[off+colpos];
+ start += len;
+ }
+ apos[k] = bufIx;
+ astart[k] = start;
+ }
+ }
+
+ target.recomputeNonZeros();
+ }
+ else
+ {
+ //call generic decompression with decoder
+ super.decompressToBlock(target, colpos);
+ }
+ }
+
+ @Override
+ public void rightMultByVector(MatrixBlock vector, MatrixBlock result, int rl, int ru)
+ throws DMLRuntimeException
+ {
+ double[] b = ConverterUtils.getDenseVector(vector);
+ double[] c = result.getDenseBlock();
+ final int numCols = getNumCols();
+ final int numVals = getNumValues();
+
+ //prepare reduced rhs w/ relevant values
+ double[] sb = new double[numCols];
+ for (int j = 0; j < numCols; j++) {
+ sb[j] = b[_colIndexes[j]];
+ }
+
+ if( LOW_LEVEL_OPT && numVals > 1
+ && _numRows > BitmapEncoder.BITMAP_BLOCK_SZ )
+ {
+ //L3 cache alignment, see comment rightMultByVector OLE column group
+ //core difference of RLE to OLE is that runs are not segment alignment,
+ //which requires care of handling runs crossing cache-buckets
+ final int blksz = ColGroupBitmap.WRITE_CACHE_BLKSZ;
+
+ //step 1: prepare position and value arrays
+
+ //current pos / values per RLE list
+ int[] apos = new int[numVals];
+ int[] astart = new int[numVals];
+ double[] aval = new double[numVals];
+
+ //skip-scan to beginning for all OLs
+ if( rl > 0 ) { //rl aligned with blksz
+ for (int k = 0; k < numVals; k++) {
+ int boff = _ptr[k];
+ int blen = len(k);
+ int bix = 0;
+ int start = 0;
+ while( bix<blen ) {
+ int lstart = _data[boff + bix]; //start
+ int llen = _data[boff + bix + 1]; //len
+ if( start+lstart+llen >= rl )
+ break;
+ start += lstart + llen;
+ bix += 2;
+ }
+ apos[k] = bix;
+ astart[k] = start;
+ }
+ }
+
+ //pre-aggregate values per OLs
+ for( int k = 0; k < numVals; k++ )
+ aval[k] = sumValues(k, sb);
+
+ //step 2: cache conscious matrix-vector via horizontal scans
+ for( int bi=rl; bi<ru; bi+=blksz )
+ {
+ int bimax = Math.min(bi+blksz, ru);
+
+ //horizontal segment scan, incl pos maintenance
+ for (int k = 0; k < numVals; k++) {
+ int boff = _ptr[k];
+ int blen = len(k);
+ double val = aval[k];
+ int bix = apos[k];
+ int start = astart[k];
+
+ //compute partial results, not aligned
+ while( bix<blen ) {
+ int lstart = _data[boff + bix];
+ int llen = _data[boff + bix + 1];
+ LinearAlgebraUtils.vectAdd(val, c, Math.max(bi, start+lstart),
+ Math.min(start+lstart+llen,bimax) - Math.max(bi, start+lstart));
+ if(start+lstart+llen >= bimax)
+ break;
+ start += lstart + llen;
+ bix += 2;
+ }
+
+ apos[k] = bix;
+ astart[k] = start;
+ }
+ }
+ }
+ else
+ {
+ for (int k = 0; k < numVals; k++) {
+ int boff = _ptr[k];
+ int blen = len(k);
+ double val = sumValues(k, sb);
+ int bix = 0;
+ int start = 0;
+
+ //scan to beginning offset if necessary
+ if( rl > 0 ) { //rl aligned with blksz
+ while( bix<blen ) {
+ int lstart = _data[boff + bix]; //start
+ int llen = _data[boff + bix + 1]; //len
+ if( start+lstart+llen >= rl )
+ break;
+ start += lstart + llen;
+ bix += 2;
+ }
+ }
+
+ //compute partial results, not aligned
+ while( bix<blen ) {
+ int lstart = _data[boff + bix];
+ int llen = _data[boff + bix + 1];
+ LinearAlgebraUtils.vectAdd(val, c, Math.max(rl, start+lstart),
+ Math.min(start+lstart+llen,ru) - Math.max(rl, start+lstart));
+ if(start+lstart+llen >= ru)
+ break;
+ start += lstart + llen;
+ bix += 2;
+ }
+ }
+ }
+ }
+
+ @Override
+ public void leftMultByRowVector(MatrixBlock vector, MatrixBlock result)
+ throws DMLRuntimeException
+ {
+ double[] a = ConverterUtils.getDenseVector(vector);
+ double[] c = result.getDenseBlock();
+ final int numCols = getNumCols();
+ final int numVals = getNumValues();
+ final int n = getNumRows();
+
+ if( LOW_LEVEL_OPT && numVals > 1
+ && _numRows > BitmapEncoder.BITMAP_BLOCK_SZ )
+ {
+ final int blksz = ColGroupBitmap.READ_CACHE_BLKSZ;
+
+ //step 1: prepare position and value arrays
+
+ //current pos per OLs / output values
+ int[] apos = new int[numVals];
+ int[] astart = new int[numVals];
+ double[] cvals = new double[numVals];
+
+ //step 2: cache conscious matrix-vector via horizontal scans
+ for( int ai=0; ai<n; ai+=blksz )
+ {
+ int aimax = Math.min(ai+blksz, n);
+
+ //horizontal scan, incl pos maintenance
+ for (int k = 0; k < numVals; k++) {
+ int bitmapOff = _ptr[k];
+ int bitmapLen = len(k);
+ int bufIx = apos[k];
+ int start = astart[k];
+
+ //compute partial results, not aligned
+ while( bufIx<bitmapLen & start<aimax ) {
+ start += _data[bitmapOff + bufIx];
+ int len = _data[bitmapOff + bufIx+1];
+ cvals[k] += LinearAlgebraUtils.vectSum(a, start, len);
+ start += len;
+ bufIx += 2;
+ }
+
+ apos[k] = bufIx;
+ astart[k] = start;
+ }
+ }
+
+ //step 3: scale partial results by values and write to global output
+ for (int k = 0, valOff=0; k < numVals; k++, valOff+=numCols)
+ for( int j = 0; j < numCols; j++ )
+ c[ _colIndexes[j] ] += cvals[k] * _values[valOff+j];
+
+ }
+ else
+ {
+ //iterate over all values and their bitmaps
+ for (int bitmapIx=0, valOff=0; bitmapIx<numVals; bitmapIx++, valOff+=numCols)
+ {
+ int bitmapOff = _ptr[bitmapIx];
+ int bitmapLen = len(bitmapIx);
+
+ double vsum = 0;
+ int curRunEnd = 0;
+ for (int bufIx = 0; bufIx < bitmapLen; bufIx += 2) {
+ int curRunStartOff = curRunEnd + _data[bitmapOff+bufIx];
+ int curRunLen = _data[bitmapOff+bufIx + 1];
+ vsum += LinearAlgebraUtils.vectSum(a, curRunStartOff, curRunLen);
+ curRunEnd = curRunStartOff + curRunLen;
+ }
+
+ //scale partial results by values and write results
+ for( int j = 0; j < numCols; j++ )
+ c[ _colIndexes[j] ] += vsum * _values[ valOff+j ];
+ }
+ }
+ }
+
+ @Override
+ public ColGroup scalarOperation(ScalarOperator op)
+ throws DMLRuntimeException
+ {
+ double val0 = op.executeScalar(0);
+
+ //fast path: sparse-safe operations
+ // Note that bitmaps don't change and are shallow-copied
+ if( op.sparseSafe || val0==0 ) {
+ return new ColGroupRLE(_colIndexes, _numRows,
+ applyScalarOp(op), _data, _ptr);
+ }
+
+ //slow path: sparse-unsafe operations (potentially create new bitmap)
+ //note: for efficiency, we currently don't drop values that become 0
+ boolean[] lind = computeZeroIndicatorVector();
+ int[] loff = computeOffsets(lind);
+ if( loff.length==0 ) { //empty offset list: go back to fast path
+ return new ColGroupRLE(_colIndexes, _numRows,
+ applyScalarOp(op), _data, _ptr);
+ }
+
+ double[] rvalues = applyScalarOp(op, val0, getNumCols());
+ char[] lbitmap = BitmapEncoder.genRLEBitmap(loff);
+ char[] rbitmaps = Arrays.copyOf(_data, _data.length+lbitmap.length);
+ System.arraycopy(lbitmap, 0, rbitmaps, _data.length, lbitmap.length);
+ int[] rbitmapOffs = Arrays.copyOf(_ptr, _ptr.length+1);
+ rbitmapOffs[rbitmapOffs.length-1] = rbitmaps.length;
+
+ return new ColGroupRLE(_colIndexes, _numRows,
+ rvalues, rbitmaps, rbitmapOffs);
+ }
+
+ @Override
+ public void unaryAggregateOperations(AggregateUnaryOperator op, MatrixBlock result)
+ throws DMLRuntimeException
+ {
+ KahanFunction kplus = (op.aggOp.increOp.fn instanceof KahanPlus) ?
+ KahanPlus.getKahanPlusFnObject() : KahanPlusSq.getKahanPlusSqFnObject();
+
+ if( op.indexFn instanceof ReduceAll )
+ computeSum(result, kplus);
+ else if( op.indexFn instanceof ReduceCol )
+ computeRowSums(result, kplus);
+ else if( op.indexFn instanceof ReduceRow )
+ computeColSums(result, kplus);
+ }
+
+ /**
+ *
+ * @param result
+ */
+ private void computeSum(MatrixBlock result, KahanFunction kplus)
+ {
+ KahanObject kbuff = new KahanObject(result.quickGetValue(0, 0), result.quickGetValue(0, 1));
+
+ final int numCols = getNumCols();
+ final int numVals = getNumValues();
+
+ for (int bitmapIx = 0; bitmapIx < numVals; bitmapIx++) {
+ int bitmapOff = _ptr[bitmapIx];
+ int bitmapLen = len(bitmapIx);
+ int valOff = bitmapIx * numCols;
+ int curRunEnd = 0;
+ int count = 0;
+ for (int bufIx = 0; bufIx < bitmapLen; bufIx += 2) {
+ int curRunStartOff = curRunEnd + _data[bitmapOff+bufIx];
+ curRunEnd = curRunStartOff + _data[bitmapOff+bufIx + 1];
+ count += curRunEnd-curRunStartOff;
+ }
+
+ //scale counts by all values
+ for( int j = 0; j < numCols; j++ )
+ kplus.execute3(kbuff, _values[ valOff+j ], count);
+ }
+
+ result.quickSetValue(0, 0, kbuff._sum);
+ result.quickSetValue(0, 1, kbuff._correction);
+ }
+
+ /**
+ *
+ * @param result
+ */
+ private void computeRowSums(MatrixBlock result, KahanFunction kplus)
+ {
+ KahanObject kbuff = new KahanObject(0, 0);
+ final int numVals = getNumValues();
+
+ for (int bitmapIx = 0; bitmapIx < numVals; bitmapIx++) {
+ int bitmapOff = _ptr[bitmapIx];
+ int bitmapLen = len(bitmapIx);
+ double val = sumValues(bitmapIx);
+
+ if (val != 0.0) {
+ int curRunStartOff = 0;
+ int curRunEnd = 0;
+ for (int bufIx = 0; bufIx < bitmapLen; bufIx += 2) {
+ curRunStartOff = curRunEnd + _data[bitmapOff+bufIx];
+ curRunEnd = curRunStartOff + _data[bitmapOff+bufIx + 1];
+ for (int rix = curRunStartOff; rix < curRunEnd; rix++) {
+ kbuff.set(result.quickGetValue(rix, 0), result.quickGetValue(rix, 1));
+ kplus.execute2(kbuff, val);
+ result.quickSetValue(rix, 0, kbuff._sum);
+ result.quickSetValue(rix, 1, kbuff._correction);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ *
+ * @param result
+ */
+ private void computeColSums(MatrixBlock result, KahanFunction kplus)
+ {
+ KahanObject kbuff = new KahanObject(0, 0);
+
+ final int numCols = getNumCols();
+ final int numVals = getNumValues();
+
+ for (int bitmapIx = 0; bitmapIx < numVals; bitmapIx++) {
+ int bitmapOff = _ptr[bitmapIx];
+ int bitmapLen = len(bitmapIx);
+ int valOff = bitmapIx * numCols;
+ int curRunEnd = 0;
+ int count = 0;
+ for (int bufIx = 0; bufIx < bitmapLen; bufIx += 2) {
+ int curRunStartOff = curRunEnd + _data[bitmapOff+bufIx];
+ curRunEnd = curRunStartOff + _data[bitmapOff+bufIx + 1];
+ count += curRunEnd-curRunStartOff;
+ }
+
+ //scale counts by all values
+ for( int j = 0; j < numCols; j++ ) {
+ kbuff.set(result.quickGetValue(0, _colIndexes[j]),result.quickGetValue(1, _colIndexes[j]));
+ kplus.execute3(kbuff, _values[ valOff+j ], count);
+ result.quickSetValue(0, _colIndexes[j], kbuff._sum);
+ result.quickSetValue(1, _colIndexes[j], kbuff._correction);
+ }
+ }
+ }
+
+ public boolean[] computeZeroIndicatorVector()
+ throws DMLRuntimeException
+ {
+ boolean[] ret = new boolean[_numRows];
+ final int numVals = getNumValues();
+
+ //initialize everything with zero
+ Arrays.fill(ret, true);
+
+ for (int bitmapIx = 0; bitmapIx < numVals; bitmapIx++) {
+ int bitmapOff = _ptr[bitmapIx];
+ int bitmapLen = len(bitmapIx);
+
+ int curRunStartOff = 0;
+ int curRunEnd = 0;
+ for (int bufIx = 0; bufIx < bitmapLen; bufIx += 2) {
+ curRunStartOff = curRunEnd + _data[bitmapOff+bufIx];
+ curRunEnd = curRunStartOff + _data[bitmapOff+bufIx + 1];
+ Arrays.fill(ret, curRunStartOff, curRunEnd, false);
+ }
+ }
+
+ return ret;
+ }
+
+ @Override
+ protected void countNonZerosPerRow(int[] rnnz)
+ {
+ final int numVals = getNumValues();
+ final int numCols = getNumCols();
+
+ for (int k = 0; k < numVals; k++) {
+ int bitmapOff = _ptr[k];
+ int bitmapLen = len(k);
+
+ int curRunStartOff = 0;
+ int curRunEnd = 0;
+ for (int bufIx = 0; bufIx < bitmapLen; bufIx += 2) {
+ curRunStartOff = curRunEnd + _data[bitmapOff+bufIx];
+ curRunEnd = curRunStartOff + _data[bitmapOff+bufIx + 1];
+ for( int i=curRunStartOff; i<curRunEnd; i++ )
+ rnnz[i] += numCols;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/16e7b1c8/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java
new file mode 100644
index 0000000..d9b75a1
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.compress;
+
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.matrix.data.LibMatrixAgg;
+import org.apache.sysml.runtime.matrix.data.LibMatrixMult;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator;
+import org.apache.sysml.runtime.matrix.operators.ScalarOperator;
+import org.apache.sysml.runtime.util.SortUtils;
+
+
+/**
+ * Column group type for columns that are stored as dense arrays of doubles.
+ * Uses a MatrixBlock internally to store the column contents.
+ *
+ */
+public class ColGroupUncompressed extends ColGroup
+{
+ private static final long serialVersionUID = 4870546053280378891L;
+
+ /**
+ * We store the contents of the columns as a MatrixBlock to take advantage
+ * of high-performance routines available for this data structure.
+ */
+ private MatrixBlock _data;
+
+ public ColGroupUncompressed() {
+ super(CompressionType.UNCOMPRESSED, (int[])null, -1);
+ }
+
+ /**
+ * Main constructor.
+ *
+ * @param colIndicesList
+ * indices (relative to the current block) of the columns that
+ * this column group represents.
+ * @param rawblock
+ * the uncompressed block; uncompressed data must be present at
+ * the time that the constructor is called
+ * @throws DMLRuntimeException
+ */
+ @SuppressWarnings("unused")
+ public ColGroupUncompressed(List<Integer> colIndicesList, MatrixBlock rawblock)
+ throws DMLRuntimeException
+ {
+ super(CompressionType.UNCOMPRESSED, colIndicesList,
+ CompressedMatrixBlock.TRANSPOSE_INPUT ?
+ rawblock.getNumColumns() : rawblock.getNumRows());
+
+ //prepare meta data
+ int numRows = CompressedMatrixBlock.TRANSPOSE_INPUT ?
+ rawblock.getNumColumns() : rawblock.getNumRows();
+
+ // Create a matrix with just the requested rows of the original block
+ _data = new MatrixBlock(numRows,
+ _colIndexes.length, rawblock.isInSparseFormat());
+
+ //ensure sorted col indices
+ if(!SortUtils.isSorted(0, _colIndexes.length, _colIndexes))
+ Arrays.sort(_colIndexes);
+
+ //special cases empty blocks
+ if (rawblock.isEmptyBlock(false))
+ return;
+ //special cases full block
+ if( !CompressedMatrixBlock.TRANSPOSE_INPUT &&
+ _data.getNumColumns() == rawblock.getNumColumns() ){
+ _data.copy(rawblock);
+ return;
+ }
+
+ // dense implementation for dense and sparse matrices to avoid linear search
+ int m = numRows;
+ int n = _colIndexes.length;
+ for( int i = 0; i < m; i++) {
+ for( int j = 0; j < n; j++ ) {
+ double val = CompressedMatrixBlock.TRANSPOSE_INPUT ?
+ rawblock.quickGetValue(_colIndexes[j], i) :
+ rawblock.quickGetValue(i, _colIndexes[j]);
+ _data.appendValue(i, j, val);
+ }
+ }
+ _data.examSparsity();
+ }
+
+ /**
+ * Constructor for creating temporary decompressed versions of one or more
+ * compressed column groups.
+ *
+ * @param groupsToDecompress
+ * compressed columns to subsume. Must contain at least one
+ * element.
+ */
+ public ColGroupUncompressed(ArrayList<ColGroup> groupsToDecompress)
+ {
+ super(CompressionType.UNCOMPRESSED,
+ mergeColIndices(groupsToDecompress),
+ groupsToDecompress.get(0)._numRows);
+
+ // Invert the list of column indices
+ int maxColIndex = _colIndexes[_colIndexes.length - 1];
+ int[] colIndicesInverted = new int[maxColIndex + 1];
+ for (int i = 0; i < _colIndexes.length; i++) {
+ colIndicesInverted[_colIndexes[i]] = i;
+ }
+
+ // Create the buffer that holds the uncompressed data, packed together
+ _data = new MatrixBlock(_numRows, _colIndexes.length, false);
+
+ for (ColGroup colGroup : groupsToDecompress) {
+ colGroup.decompressToBlock(_data, colIndicesInverted);
+ }
+ }
+
+ /**
+ * Constructor for internal use. Used when a method needs to build an
+ * instance of this class from scratch.
+ *
+ * @param colIndices
+ * column mapping for this column group
+ * @param numRows
+ * number of rows in the column, for passing to the superclass
+ * @param colContents
+ * uncompressed cell values
+ */
+ public ColGroupUncompressed(int[] colIndices, int numRows, MatrixBlock data)
+ {
+ super(CompressionType.UNCOMPRESSED, colIndices, numRows);
+ _data = data;
+ }
+
+
+ /**
+ * Access for superclass
+ *
+ * @return direct pointer to the internal representation of the columns
+ */
+ public MatrixBlock getData() {
+ return _data;
+ }
+
+ /**
+ * Subroutine of constructor.
+ *
+ * @param groupsToDecompress
+ * input to the constructor that decompresses into a temporary
+ * UncompressedColGroup
+ * @return a merged set of column indices across all those groups
+ */
+ private static int[] mergeColIndices(ArrayList<ColGroup> groupsToDecompress)
+ {
+ // Pass 1: Determine number of columns
+ int sz = 0;
+ for (ColGroup colGroup : groupsToDecompress) {
+ sz += colGroup.getNumCols();
+ }
+
+ // Pass 2: Copy column offsets out
+ int[] ret = new int[sz];
+ int pos = 0;
+ for (ColGroup colGroup : groupsToDecompress) {
+ int[] tmp = colGroup.getColIndices();
+ System.arraycopy(tmp, 0, ret, pos, tmp.length);
+ pos += tmp.length;
+ }
+
+ // Pass 3: Sort and return the list of columns
+ Arrays.sort(ret);
+ return ret;
+ }
+
+ @Override
+ public long estimateInMemorySize() {
+ long size = super.estimateInMemorySize();
+ // adding the size of colContents
+ return size + 8 + _data.estimateSizeInMemory();
+ }
+
+ @Override
+ public void decompressToBlock(MatrixBlock target) {
+ //empty block, nothing to add to output
+ if( _data.isEmptyBlock(false) )
+ return;
+ for (int row = 0; row < _data.getNumRows(); row++) {
+ for (int colIx = 0; colIx < _colIndexes.length; colIx++) {
+ int col = _colIndexes[colIx];
+ double cellVal = _data.quickGetValue(row, colIx);
+ target.quickSetValue(row, col, cellVal);
+ }
+ }
+ }
+
+ @Override
+ public void decompressToBlock(MatrixBlock target, int[] colIndexTargets) {
+ //empty block, nothing to add to output
+ if( _data.isEmptyBlock(false) ) {
+ return;
+ }
+ // Run through the rows, putting values into the appropriate locations
+ for (int row = 0; row < _data.getNumRows(); row++) {
+ for (int colIx = 0; colIx < _data.getNumColumns(); colIx++) {
+ int origMatrixColIx = getColIndex(colIx);
+ int col = colIndexTargets[origMatrixColIx];
+ double cellVal = _data.quickGetValue(row, colIx);
+ target.quickSetValue(row, col, cellVal);
+ }
+ }
+ }
+
+ @Override
+ public void decompressToBlock(MatrixBlock target, int colpos) {
+ //empty block, nothing to add to output
+ if( _data.isEmptyBlock(false) ) {
+ return;
+ }
+ // Run through the rows, putting values into the appropriate locations
+ for (int row = 0; row < _data.getNumRows(); row++) {
+ double cellVal = _data.quickGetValue(row, colpos);
+ target.quickSetValue(row, 0, cellVal);
+ }
+ }
+
+ @Override
+ public void rightMultByVector(MatrixBlock vector, MatrixBlock result, int rl, int ru)
+ throws DMLRuntimeException
+ {
+ // Pull out the relevant rows of the vector
+ int clen = _colIndexes.length;
+
+ MatrixBlock shortVector = new MatrixBlock(clen, 1, false);
+ shortVector.allocateDenseBlock();
+ double[] b = shortVector.getDenseBlock();
+ for (int colIx = 0; colIx < clen; colIx++)
+ b[colIx] = vector.quickGetValue(_colIndexes[colIx], 0);
+ shortVector.recomputeNonZeros();
+
+ // Multiply the selected columns by the appropriate parts of the vector
+ LibMatrixMult.matrixMult(_data, shortVector, result, rl, ru);
+ }
+
+ @Override
+ public void leftMultByRowVector(MatrixBlock vector, MatrixBlock result)
+ throws DMLRuntimeException
+ {
+ MatrixBlock pret = new MatrixBlock(1, _colIndexes.length, false);
+ LibMatrixMult.matrixMult(vector, _data, pret);
+
+ // copying partialResult to the proper indices of the result
+ if( !pret.isEmptyBlock(false) ) {
+ double[] rsltArr = result.getDenseBlock();
+ for (int colIx = 0; colIx < _colIndexes.length; colIx++)
+ rsltArr[_colIndexes[colIx]] = pret.quickGetValue(0, colIx);
+ result.recomputeNonZeros();
+ }
+ }
+
+ public void leftMultByRowVector(MatrixBlock vector, MatrixBlock result, int k)
+ throws DMLRuntimeException
+ {
+ MatrixBlock pret = new MatrixBlock(1, _colIndexes.length, false);
+ LibMatrixMult.matrixMult(vector, _data, pret, k);
+
+ // copying partialResult to the proper indices of the result
+ if( !pret.isEmptyBlock(false) ) {
+ double[] rsltArr = result.getDenseBlock();
+ for (int colIx = 0; colIx < _colIndexes.length; colIx++)
+ rsltArr[_colIndexes[colIx]] = pret.quickGetValue(0, colIx);
+ result.recomputeNonZeros();
+ }
+ }
+
+ @Override
+ public ColGroup scalarOperation(ScalarOperator op)
+ throws DMLRuntimeException
+ {
+ //execute scalar operations
+ MatrixBlock retContent = (MatrixBlock) _data
+ .scalarOperations(op, new MatrixBlock());
+
+ //construct new uncompressed column group
+ return new ColGroupUncompressed(getColIndices(), _data.getNumRows(), retContent);
+ }
+
+ @Override
+ public void unaryAggregateOperations(AggregateUnaryOperator op, MatrixBlock ret)
+ throws DMLRuntimeException
+ {
+ //execute unary aggregate operations
+ LibMatrixAgg.aggregateUnaryMatrix(_data, ret, op);
+ }
+
+ @Override
+ public void readFields(DataInput in)
+ throws IOException
+ {
+ //read col contents (w/ meta data)
+ _data = new MatrixBlock();
+ _data.readFields(in);
+ _numRows = _data.getNumRows();
+
+ //read col indices
+ int numCols = _data.getNumColumns();
+ _colIndexes = new int[ numCols ];
+ for( int i=0; i<numCols; i++ )
+ _colIndexes[i] = in.readInt();
+ }
+
+ @Override
+ public void write(DataOutput out)
+ throws IOException
+ {
+ //write col contents first (w/ meta data)
+ _data.write(out);
+
+ //write col indices
+ int len = _data.getNumColumns();
+ for( int i=0; i<len; i++ )
+ out.writeInt( _colIndexes[i] );
+ }
+
+ @Override
+ public long getExactSizeOnDisk() {
+ return _data.getExactSizeOnDisk()
+ + 4 * _data.getNumColumns();
+ }
+
+ @Override
+ protected void countNonZerosPerRow(int[] rnnz)
+ {
+ for( int i=0; i<_data.getNumRows(); i++ )
+ rnnz[i] += _data.recomputeNonZeros(i, i, 0, _data.getNumColumns()-1);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/16e7b1c8/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
new file mode 100644
index 0000000..6ac26c6
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
@@ -0,0 +1,1342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.compress;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.sysml.hops.OptimizerUtils;
+import org.apache.sysml.lops.MMTSJ.MMTSJType;
+import org.apache.sysml.lops.MapMultChain.ChainType;
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.compress.ColGroup.CompressionType;
+import org.apache.sysml.runtime.compress.estim.CompressedSizeEstimator;
+import org.apache.sysml.runtime.compress.estim.CompressedSizeInfo;
+import org.apache.sysml.runtime.compress.estim.SizeEstimatorFactory;
+import org.apache.sysml.runtime.compress.utils.ConverterUtils;
+import org.apache.sysml.runtime.compress.utils.LinearAlgebraUtils;
+import org.apache.sysml.runtime.controlprogram.parfor.stat.Timing;
+import org.apache.sysml.runtime.functionobjects.KahanPlus;
+import org.apache.sysml.runtime.functionobjects.KahanPlusSq;
+import org.apache.sysml.runtime.functionobjects.Multiply;
+import org.apache.sysml.runtime.functionobjects.ReduceRow;
+import org.apache.sysml.runtime.instructions.cp.KahanObject;
+import org.apache.sysml.runtime.matrix.data.LibMatrixBincell;
+import org.apache.sysml.runtime.matrix.data.LibMatrixReorg;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
+import org.apache.sysml.runtime.matrix.data.MatrixValue;
+import org.apache.sysml.runtime.matrix.data.SparseBlock;
+import org.apache.sysml.runtime.matrix.operators.AggregateBinaryOperator;
+import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator;
+import org.apache.sysml.runtime.matrix.operators.BinaryOperator;
+import org.apache.sysml.runtime.matrix.operators.ScalarOperator;
+
+/**
+ * Experimental version of MatrixBlock that allows a compressed internal
+ * representation.
+ */
+public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
+{
+ private static final long serialVersionUID = 7319972089143154057L;
+
+ //internal configuration
+ public static final int MAX_NUMBER_COCODING_COLUMNS = 1000;
+ public static final double MIN_COMPRESSION_RATIO = 2.0;
+ public static final double MIN_RLE_RATIO = 1.0; // Minimum additional compression (non-RLE size / RLE size) before we switch to run-length encoding.
+ public static final boolean TRANSPOSE_INPUT = true;
+ public static final boolean MATERIALIZE_ZEROS = false;
+ public static final long MIN_PAR_AGG_THRESHOLD = 16*1024*1024; //16MB
+ public static final boolean INVESTIGATE_ESTIMATES = false;
+ private static final boolean LDEBUG = false; //local debug flag
+
+ private static final Log LOG = LogFactory.getLog(CompressedMatrixBlock.class.getName());
+
+ static{
+ // for internal debugging only
+ if( LDEBUG ) {
+ Logger.getLogger("org.apache.sysml.runtime.compress")
+ .setLevel((Level) Level.DEBUG);
+ }
+ }
+
+ protected ArrayList<ColGroup> _colGroups = null;
+ protected CompressionStatistics _stats = null;
+
+ public CompressedMatrixBlock() {
+ super(-1, -1, true);
+ }
+
+ /**
+ * Main constructor for building a block from scratch.
+ *
+ * @param rl
+ * number of rows in the block
+ * @param cl
+ * number of columns
+ * @param sparse
+ * true if the UNCOMPRESSED representation of the block should be
+ * sparse
+ */
+ public CompressedMatrixBlock(int rl, int cl, boolean sparse) {
+ super(rl, cl, sparse);
+ }
+
+ /**
+ * "Copy" constructor to populate this compressed block with the
+ * uncompressed contents of a conventional block. Does <b>not</b> compress
+ * the block.
+ */
+ public CompressedMatrixBlock(MatrixBlock mb) {
+ super(mb.getNumRows(), mb.getNumColumns(), mb.isInSparseFormat());
+
+ //shallow copy (deep copy on compression, prevents unnecessary copy)
+ if( isInSparseFormat() )
+ sparseBlock = mb.getSparseBlock();
+ else
+ denseBlock = mb.getDenseBlock();
+ nonZeros = mb.getNonZeros();
+ }
+
+ /**
+ *
+ * @return the column groups constructed by the compression process.
+ *
+ */
+ public ArrayList<ColGroup> getColGroups() {
+ return _colGroups;
+ }
+
+ /**
+ * @return true if this block is in compressed form; false if the block has
+ * not yet been compressed
+ */
+ public boolean isCompressed() {
+ return (_colGroups != null);
+ }
+
+ /**
+ *
+ * @return
+ */
+ public boolean isSingleUncompressedGroup(){
+ return (_colGroups!=null && _colGroups.size()==1
+ && _colGroups.get(0) instanceof ColGroupUncompressed);
+ }
+
+ private void allocateColGroupList() {
+ _colGroups = new ArrayList<ColGroup>();
+ }
+
+ @Override
+ public boolean isEmptyBlock(boolean safe) {
+ if( !isCompressed() )
+ return super.isEmptyBlock(safe);
+ return (_colGroups == null || getNonZeros()==0);
+ }
+
+ /**
+ * Compress the contents of this matrix block. After compression, the
+ * uncompressed data is discarded. Attempts to update this block after
+ * calling this method currently result in INCORRECT BEHAVIOR, something
+ * which should be fixed if we move ahead with this compression strategy.
+ *
+ * +per column sparsity
+ */
+ public void compress()
+ throws DMLRuntimeException
+ {
+ //check for redundant compression
+ if( isCompressed() ){
+ throw new DMLRuntimeException("Redundant compression, block already compressed.");
+ }
+
+ Timing time = new Timing(true);
+ _stats = new CompressionStatistics();
+
+ // SAMPLE-BASED DECISIONS:
+ // Decisions such as testing if a column is amenable to bitmap
+ // compression or evaluating co-coding potentionls are made based on a
+ // subset of the rows. For large datasets, sampling might take a
+ // significant amount of time. So, we generate only one sample and use
+ // it for the entire compression process.
+
+ //prepare basic meta data and deep copy / transpose input
+ final int numRows = getNumRows();
+ final int numCols = getNumColumns();
+ final boolean sparse = isInSparseFormat();
+ MatrixBlock rawblock = this;
+ if( TRANSPOSE_INPUT )
+ rawblock = LibMatrixReorg.transpose(rawblock, new MatrixBlock(numCols, numRows, sparse));
+ else
+ rawblock = new MatrixBlock(this);
+
+ //construct sample-based size estimator
+ CompressedSizeEstimator bitmapSizeEstimator =
+ SizeEstimatorFactory.getSizeEstimator(rawblock, numRows);
+
+ //allocate list of column groups
+ allocateColGroupList();
+
+ // The current implementation of this method is written for correctness,
+ // not for performance or for minimal use of temporary space.
+
+ // We start with a full set of columns.
+ HashSet<Integer> remainingCols = new HashSet<Integer>();
+ for (int i = 0; i < numCols; i++) {
+ remainingCols.add(i);
+ }
+
+ // PHASE 1: Classify columns by compression type
+ // We start by determining which columns are amenable to bitmap
+ // compression
+
+ // It is correct to use the dense size as the uncompressed size
+ // FIXME not numRows but nnz / col otherwise too aggressive overestimation
+ // of uncompressed size and hence overestimation of compression potential
+ double uncompressedColumnSize = 8 * numRows;
+
+ // information about the bitmap amenable columns
+ List<Integer> bitmapCols = new ArrayList<Integer>();
+ List<Integer> uncompressedCols = new ArrayList<Integer>();
+ List<Integer> colsCardinalities = new ArrayList<Integer>();
+ List<Long> compressedSizes = new ArrayList<Long>();
+ HashMap<Integer, Double> compressionRatios = new HashMap<Integer, Double>();
+
+ // Minimum ratio (size of uncompressed / size of compressed) that we
+ // will accept when encoding a field with a bitmap.
+ for (int col = 0; col < numCols; col++)
+ {
+ CompressedSizeInfo compressedSizeInfo = bitmapSizeEstimator
+ .estimateCompressedColGroupSize(new int[] { col });
+ long compressedSize = compressedSizeInfo.getMinSize();
+ double compRatio = uncompressedColumnSize / compressedSize;
+
+ //FIXME: compression ratio should be checked against 1 instead of min compression
+ //ratio; I think this threshold was only required because we overestimated the
+ //the uncompressed column size with n\alpha instead of z\alpha
+ if (compRatio >= MIN_COMPRESSION_RATIO) {
+ bitmapCols.add(col);
+ compressionRatios.put(col, compRatio);
+ colsCardinalities.add(compressedSizeInfo.getEstCarinality());
+ compressedSizes.add(compressedSize);
+ }
+ else
+ uncompressedCols.add(col);
+ }
+
+ _stats.timePhase1 = time.stop();
+ if( LOG.isDebugEnabled() )
+ LOG.debug("compression phase 1: "+_stats.timePhase1);
+
+ // Filters for additional types of compression should be inserted here.
+
+ // PHASE 2: Grouping columns
+ // Divide the bitmap columns into column groups.
+ List<int[]> bitmapColGrps = null;
+ if (bitmapCols.size() > MAX_NUMBER_COCODING_COLUMNS) {
+ // Too many columns to compute co-coding groups with current methods.
+ // Generate singleton groups.
+ bitmapColGrps = new ArrayList<int[]>(bitmapCols.size());
+ for (int col : bitmapCols) {
+ bitmapColGrps.add(new int[] { col });
+ }
+ }
+ else {
+ bitmapColGrps = PlanningCoCoder.findCocodesByPartitioning(
+ bitmapSizeEstimator, bitmapCols, colsCardinalities,
+ compressedSizes, numRows, isInSparseFormat() ?
+ OptimizerUtils.getSparsity(numRows, numCols, getNonZeros()): 1);
+ }
+
+ _stats.timePhase2 = time.stop();
+ if( LOG.isDebugEnabled() )
+ LOG.debug("compression phase 2: "+_stats.timePhase2);
+
+ if( INVESTIGATE_ESTIMATES ) {
+ double est = 0;
+ for( int[] groupIndices : bitmapColGrps )
+ est += bitmapSizeEstimator.estimateCompressedColGroupSize(groupIndices).getMinSize();
+ est += uncompressedCols.size() * uncompressedColumnSize;
+ _stats.estSize = est;
+ }
+
+ // PHASE 3: Compress and correct sample-based decisions
+
+ for (int[] groupIndices : bitmapColGrps)
+ {
+ int[] allGroupIndices = null;
+ int allColsCount = groupIndices.length;
+ CompressedSizeInfo bitmapSizeInfo;
+ // The compression type is decided based on a full bitmap since it
+ // will be reused for the actual compression step.
+ UncompressedBitmap ubm;
+ PriorityQueue<CompressedColumn> compRatioPQ = null;
+ boolean skipGroup = false;
+ while (true)
+ {
+ ubm = BitmapEncoder.extractBitmap(groupIndices, rawblock);
+ bitmapSizeInfo = bitmapSizeEstimator
+ .estimateCompressedColGroupSize(ubm);
+ double compRatio = uncompressedColumnSize * groupIndices.length
+ / bitmapSizeInfo.getMinSize();
+ if (compRatio >= MIN_COMPRESSION_RATIO) {
+ // we have a good group
+ for( Integer col : groupIndices )
+ remainingCols.remove(col);
+ break;
+ } else {
+ // modify the group
+ if (compRatioPQ == null) {
+ // first modification
+ allGroupIndices = Arrays.copyOf(groupIndices, groupIndices.length);
+ compRatioPQ = new PriorityQueue<CompressedMatrixBlock.CompressedColumn>();
+ for (int i = 0; i < groupIndices.length; i++)
+ compRatioPQ.add(new CompressedColumn(i,
+ compressionRatios.get(groupIndices[i])));
+ }
+
+ // index in allGroupIndices
+ int removeIx = compRatioPQ.poll().colIx;
+ allGroupIndices[removeIx] = -1;
+ allColsCount--;
+ if (allColsCount == 0) {
+ skipGroup = true;
+ break;
+ }
+ groupIndices = new int[allColsCount];
+ // copying the values that do not equal -1
+ int ix = 0;
+ for (int col : allGroupIndices) {
+ if (col != -1) {
+ groupIndices[ix++] = col;
+ }
+ }
+
+ }
+ }
+
+ if (skipGroup)
+ continue;
+ long rleNumBytes = bitmapSizeInfo.getRLESize();
+ long offsetNumBytes = bitmapSizeInfo.getOLESize();
+ double rleRatio = (double) offsetNumBytes / (double) rleNumBytes;
+
+ if (rleRatio > MIN_RLE_RATIO) {
+ ColGroupRLE compressedGroup = new ColGroupRLE(groupIndices,
+ numRows, ubm);
+ _colGroups.add(compressedGroup);
+ }
+ else {
+ ColGroupOLE compressedGroup = new ColGroupOLE(
+ groupIndices, numRows, ubm);
+ _colGroups.add(compressedGroup);
+ }
+
+ }
+
+ _stats.timePhase3 = time.stop();
+ if( LOG.isDebugEnabled() )
+ LOG.debug("compression phase 3: "+_stats.timePhase3);
+
+ // Phase 4: Cleanup
+ // The remaining columns are stored uncompressed as one big column group
+ if (remainingCols.size() > 0) {
+ ArrayList<Integer> list = new ArrayList<Integer>(remainingCols);
+ ColGroupUncompressed ucgroup = new ColGroupUncompressed(list, rawblock);
+ _colGroups.add(ucgroup);
+ }
+
+ //final cleanup (discard uncompressed block)
+ rawblock.cleanupBlock(true, true);
+ this.cleanupBlock(true, true);
+
+ _stats.timePhase4 = time.stop();
+ if( LOG.isDebugEnabled() )
+ LOG.debug("compression phase 4: "+_stats.timePhase4);
+ }
+
+ /**
+ * @return a new uncompressed matrix block containing the contents of this
+ * block
+ */
+ public MatrixBlock decompress() throws DMLRuntimeException
+ {
+ //early abort for not yet compressed blocks
+ if( !isCompressed() )
+ return new MatrixBlock(this);
+
+ //preallocation sparse rows to avoid repeated reallocations
+ MatrixBlock ret = new MatrixBlock(getNumRows(), getNumColumns(), isInSparseFormat(), getNonZeros());
+ if( ret.isInSparseFormat() ) {
+ int[] rnnz = new int[rlen];
+ for (ColGroup grp : _colGroups)
+ grp.countNonZerosPerRow(rnnz);
+ ret.allocateSparseRowsBlock();
+ SparseBlock rows = ret.getSparseBlock();
+ for( int i=0; i<rlen; i++ )
+ rows.allocate(i, rnnz[i]);
+ }
+
+ //core decompression (append if sparse)
+ for (ColGroup grp : _colGroups)
+ grp.decompressToBlock(ret);
+
+ //post-processing (for append in decompress)
+ if( isInSparseFormat() )
+ ret.sortSparseRows();
+
+ return ret;
+ }
+
+ public CompressionStatistics getCompressionStatistics(){
+ return _stats;
+ }
+
+ /**
+ *
+ * @return an upper bound on the memory used to store this compressed block
+ * considering class overhead.
+ */
+ public long estimateCompressedSizeInMemory() {
+ if (!isCompressed())
+ return 0;
+ // basic data inherited from MatrixBlock
+ long total = MatrixBlock.estimateSizeInMemory(0, 0, 0);
+ // adding the size of colGroups ArrayList overhead
+ // object overhead (32B) + int size (4B) + int modCount (4B) + Object[]
+ // elementData overhead + reference (32+8)B +reference ofr each Object (8B)
+ total += 80 + 8 * _colGroups.size();
+ for (ColGroup grp : _colGroups)
+ total += grp.estimateInMemorySize();
+ return total;
+ }
+
+ private class CompressedColumn implements Comparable<CompressedColumn> {
+ int colIx;
+ double compRatio;
+
+ public CompressedColumn(int colIx, double compRatio) {
+ this.colIx = colIx;
+ this.compRatio = compRatio;
+ }
+
+ @Override
+ public int compareTo(CompressedColumn o) {
+ return (int) Math.signum(compRatio - o.compRatio);
+ }
+ }
+
+ public static class CompressionStatistics {
+ public double timePhase1 = -1;
+ public double timePhase2 = -1;
+ public double timePhase3 = -1;
+ public double timePhase4 = -1;
+ public double estSize = -1;
+
+ public CompressionStatistics() {
+ //do nothing
+ }
+
+ public CompressionStatistics(double t1, double t2, double t3, double t4){
+ timePhase1 = t1;
+ timePhase2 = t2;
+ timePhase3 = t3;
+ timePhase4 = t4;
+ }
+ }
+
+
+ //////////////////////////////////////////
+ // Serialization / Deserialization
+
+ @Override
+ public long getExactSizeOnDisk()
+ {
+ //header information
+ long ret = 12;
+
+ for( ColGroup grp : _colGroups ) {
+ ret += 1; //type info
+ ret += grp.getExactSizeOnDisk();
+ }
+
+ return ret;
+ }
+
+ @Override
+ public void readFields(DataInput in)
+ throws IOException
+ {
+ boolean compressed = in.readBoolean();
+
+ //deserialize uncompressed block
+ if( !compressed ) {
+ super.readFields(in);
+ return;
+ }
+
+ //deserialize compressed block
+ rlen = in.readInt();
+ clen = in.readInt();
+ nonZeros = in.readLong();
+ int ncolGroups = in.readInt();
+
+ _colGroups = new ArrayList<ColGroup>(ncolGroups);
+ for( int i=0; i<ncolGroups; i++ )
+ {
+ CompressionType ctype = CompressionType.values()[in.readByte()];
+ ColGroup grp = null;
+
+ //create instance of column group
+ switch( ctype ) {
+ case UNCOMPRESSED:
+ grp = new ColGroupUncompressed(); break;
+ case OLE_BITMAP:
+ grp = new ColGroupOLE(); break;
+ case RLE_BITMAP:
+ grp = new ColGroupRLE(); break;
+ }
+
+ //deserialize and add column group
+ grp.readFields(in);
+ _colGroups.add(grp);
+ }
+ }
+
+ @Override
+ public void write(DataOutput out)
+ throws IOException
+ {
+ out.writeBoolean( isCompressed() );
+
+ //serialize uncompressed block
+ if( !isCompressed() ) {
+ super.write(out);
+ return;
+ }
+
+ //serialize compressed matrix block
+ out.writeInt(rlen);
+ out.writeInt(clen);
+ out.writeLong(nonZeros);
+ out.writeInt(_colGroups.size());
+
+ for( ColGroup grp : _colGroups ) {
+ out.writeByte( grp.getCompType().ordinal() );
+ grp.write(out); //delegate serialization
+ }
+ }
+
+
+ /**
+ * Redirects the default java serialization via externalizable to our default
+ * hadoop writable serialization for efficient broadcast/rdd deserialization.
+ *
+ * @param is
+ * @throws IOException
+ */
+ @Override
+ public void readExternal(ObjectInput is)
+ throws IOException
+ {
+ readFields(is);
+ }
+
+ /**
+ * Redirects the default java serialization via externalizable to our default
+ * hadoop writable serialization for efficient broadcast/rdd serialization.
+ *
+ * @param is
+ * @throws IOException
+ */
+ @Override
+ public void writeExternal(ObjectOutput os)
+ throws IOException
+ {
+ write(os);
+ }
+
+
+ //////////////////////////////////////////
+ // Operations (overwrite existing ops for seamless integration)
+
+ @Override
+ public MatrixValue scalarOperations(ScalarOperator sop, MatrixValue result)
+ throws DMLRuntimeException
+ {
+ //call uncompressed matrix scalar if necessary
+ if( !isCompressed() ) {
+ return super.scalarOperations(sop, result);
+ }
+
+ //allocate the output matrix block
+ CompressedMatrixBlock ret = null;
+ if( result==null || !(result instanceof CompressedMatrixBlock) )
+ ret = new CompressedMatrixBlock(getNumRows(), getNumColumns(), sparse);
+ else {
+ ret = (CompressedMatrixBlock) result;
+ ret.reset(rlen, clen);
+ }
+
+ // Apply the operation recursively to each of the column groups.
+ // Most implementations will only modify metadata.
+ ArrayList<ColGroup> newColGroups = new ArrayList<ColGroup>();
+ for (ColGroup grp : _colGroups) {
+ newColGroups.add(grp.scalarOperation(sop));
+ }
+ ret._colGroups = newColGroups;
+ ret.setNonZeros(rlen*clen);
+
+ return ret;
+ }
+
+ @Override
+ public MatrixBlock appendOperations(MatrixBlock that, MatrixBlock ret)
+ throws DMLRuntimeException
+ {
+ //call uncompressed matrix append if necessary
+ if( !isCompressed() ) {
+ if( that instanceof CompressedMatrixBlock )
+ that = ((CompressedMatrixBlock) that).decompress();
+ return super.appendOperations(that, ret);
+ }
+
+ final int m = rlen;
+ final int n = clen+that.getNumColumns();
+ final long nnz = nonZeros+that.getNonZeros();
+
+ //init result matrix
+ CompressedMatrixBlock ret2 = null;
+ if( ret == null || !(ret instanceof CompressedMatrixBlock) ) {
+ ret2 = new CompressedMatrixBlock(m, n, isInSparseFormat());
+ }
+ else {
+ ret2 = (CompressedMatrixBlock) ret;
+ ret2.reset(m, n);
+ }
+
+ //shallow copy of lhs column groups
+ ret2.allocateColGroupList();
+ ret2._colGroups.addAll(_colGroups);
+
+ //copy of rhs column groups w/ col index shifting
+ if( !(that instanceof CompressedMatrixBlock) ) {
+ that = new CompressedMatrixBlock(that);
+ ((CompressedMatrixBlock)that).compress();
+ }
+ ArrayList<ColGroup> inColGroups = ((CompressedMatrixBlock) that)._colGroups;
+ for( ColGroup group : inColGroups ) {
+ ColGroup tmp = ConverterUtils.copyColGroup(group);
+ tmp.shiftColIndices(clen);
+ ret2._colGroups.add(tmp);
+ }
+
+ //meta data maintenance
+ ret2.setNonZeros(nnz);
+ return ret2;
+ }
+
+ @Override
+ public MatrixBlock chainMatrixMultOperations(MatrixBlock v, MatrixBlock w, MatrixBlock out, ChainType ctype)
+ throws DMLRuntimeException
+ {
+ //call uncompressed matrix mult if necessary
+ if( !isCompressed() ) {
+ return super.chainMatrixMultOperations(v, w, out, ctype);
+ }
+
+ //single-threaded mmchain of single uncompressed colgroup
+ if( isSingleUncompressedGroup() ){
+ return ((ColGroupUncompressed)_colGroups.get(0))
+ .getData().chainMatrixMultOperations(v, w, out, ctype);
+ }
+
+ //Timing time = new Timing(true);
+
+ //prepare result
+ if( out != null )
+ out.reset(clen, 1, false);
+ else
+ out = new MatrixBlock(clen, 1, false);
+
+ //empty block handling
+ if( isEmptyBlock(false) )
+ return out;
+
+ //compute matrix mult
+ MatrixBlock tmp = new MatrixBlock(rlen, 1, false);
+ rightMultByVector(v, tmp);
+ if( ctype == ChainType.XtwXv ) {
+ BinaryOperator bop = new BinaryOperator(Multiply.getMultiplyFnObject());
+ LibMatrixBincell.bincellOpInPlace(tmp, w, bop);
+ }
+ leftMultByVectorTranspose(_colGroups, tmp, out, true);
+
+ //System.out.println("Compressed MMChain in "+time.stop());
+
+ return out;
+ }
+
+ @Override
+ public MatrixBlock chainMatrixMultOperations(MatrixBlock v, MatrixBlock w, MatrixBlock out, ChainType ctype, int k)
+ throws DMLRuntimeException
+ {
+ //call uncompressed matrix mult if necessary
+ if( !isCompressed() ){
+ return super.chainMatrixMultOperations(v, w, out, ctype, k);
+ }
+
+ //multi-threaded mmchain of single uncompressed colgroup
+ if( isSingleUncompressedGroup() ){
+ return ((ColGroupUncompressed)_colGroups.get(0))
+ .getData().chainMatrixMultOperations(v, w, out, ctype, k);
+ }
+
+ Timing time = LOG.isDebugEnabled() ? new Timing(true) : null;
+
+ //prepare result
+ if( out != null )
+ out.reset(clen, 1, false);
+ else
+ out = new MatrixBlock(clen, 1, false);
+
+ //empty block handling
+ if( isEmptyBlock(false) )
+ return out;
+
+ //compute matrix mult
+ MatrixBlock tmp = new MatrixBlock(rlen, 1, false);
+ rightMultByVector(v, tmp, k);
+ if( ctype == ChainType.XtwXv ) {
+ BinaryOperator bop = new BinaryOperator(Multiply.getMultiplyFnObject());
+ LibMatrixBincell.bincellOpInPlace(tmp, w, bop);
+ }
+ leftMultByVectorTranspose(_colGroups, tmp, out, true, k);
+
+ if( LOG.isDebugEnabled() )
+ LOG.debug("Compressed MMChain k="+k+" in "+time.stop());
+
+ return out;
+ }
+
+ @Override
+ public MatrixValue aggregateBinaryOperations(MatrixValue mv1, MatrixValue mv2, MatrixValue result, AggregateBinaryOperator op)
+ throws DMLRuntimeException
+ {
+ //call uncompressed matrix mult if necessary
+ if( !isCompressed() ) {
+ return super.aggregateBinaryOperations(mv1, mv2, result, op);
+ }
+
+ //multi-threaded mm of single uncompressed colgroup
+ if( isSingleUncompressedGroup() ){
+ MatrixBlock tmp = ((ColGroupUncompressed)_colGroups.get(0)).getData();
+ return tmp.aggregateBinaryOperations(this==mv1?tmp:mv1, this==mv2?tmp:mv2, result, op);
+ }
+
+ Timing time = LOG.isDebugEnabled() ? new Timing(true) : null;
+
+ //setup meta data (dimensions, sparsity)
+ int rl = mv1.getNumRows();
+ int cl = mv2.getNumColumns();
+
+ //create output matrix block
+ MatrixBlock ret = (MatrixBlock) result;
+ if( ret==null )
+ ret = new MatrixBlock(rl, cl, false, rl*cl);
+ else
+ ret.reset(rl, cl, false, rl*cl);
+
+ //compute matrix mult
+ if( mv1.getNumRows()>1 && mv2.getNumColumns()==1 ) { //MV right
+ CompressedMatrixBlock cmb = (CompressedMatrixBlock)mv1;
+ MatrixBlock mb = (MatrixBlock) mv2;
+ if( op.getNumThreads()>1 )
+ cmb.rightMultByVector(mb, ret, op.getNumThreads());
+ else
+ cmb.rightMultByVector(mb, ret);
+ }
+ else if( mv1.getNumRows()==1 && mv2.getNumColumns()>1 ) { //MV left
+ MatrixBlock mb = (MatrixBlock) mv1;
+ if( op.getNumThreads()>1 )
+ leftMultByVectorTranspose(_colGroups, mb, ret, false, op.getNumThreads());
+ else
+ leftMultByVectorTranspose(_colGroups, mb, ret, false);
+ }
+ else {
+ //NOTE: we could decompress and invoke super.aggregateBinary but for now
+ //we want to have an eager fail if this happens
+ throw new DMLRuntimeException("Unsupported matrix-matrix multiplication over compressed matrix block.");
+ }
+
+ if( LOG.isDebugEnabled() )
+ LOG.debug("Compressed MM in "+time.stop());
+
+ return ret;
+ }
+
+ @Override
+ public MatrixValue aggregateUnaryOperations(AggregateUnaryOperator op, MatrixValue result,
+ int blockingFactorRow, int blockingFactorCol, MatrixIndexes indexesIn, boolean inCP)
+ throws DMLRuntimeException
+ {
+ //call uncompressed matrix mult if necessary
+ if( !isCompressed() ) {
+ return super.aggregateUnaryOperations(op, result, blockingFactorRow, blockingFactorCol, indexesIn, inCP);
+ }
+
+ //check for supported operations
+ if( !(op.aggOp.increOp.fn instanceof KahanPlus || op.aggOp.increOp.fn instanceof KahanPlusSq) ){
+ throw new DMLRuntimeException("Unary aggregates other than sums not supported yet.");
+ }
+
+ //prepare output dimensions
+ CellIndex tempCellIndex = new CellIndex(-1,-1);
+ op.indexFn.computeDimension(rlen, clen, tempCellIndex);
+ if(op.aggOp.correctionExists) {
+ switch(op.aggOp.correctionLocation)
+ {
+ case LASTROW: tempCellIndex.row++; break;
+ case LASTCOLUMN: tempCellIndex.column++; break;
+ case LASTTWOROWS: tempCellIndex.row+=2; break;
+ case LASTTWOCOLUMNS: tempCellIndex.column+=2; break;
+ default:
+ throw new DMLRuntimeException("unrecognized correctionLocation: "+op.aggOp.correctionLocation);
+ }
+ }
+
+ //prepare output
+ if(result==null)
+ result=new MatrixBlock(tempCellIndex.row, tempCellIndex.column, false);
+ else
+ result.reset(tempCellIndex.row, tempCellIndex.column, false);
+
+ MatrixBlock ret = (MatrixBlock) result;
+
+ //core unary aggregate
+ if( op.getNumThreads() > 1
+ && getExactSizeOnDisk() > MIN_PAR_AGG_THRESHOLD )
+ {
+ // initialize and allocate the result
+ ret.allocateDenseBlock();
+
+ //multi-threaded execution of all groups
+ ArrayList<ColGroup>[] grpParts = createStaticTaskPartitioning(op.getNumThreads(), false);
+ ColGroupUncompressed uc = getUncompressedColGroup();
+ try {
+ //compute all compressed column groups
+ ExecutorService pool = Executors.newFixedThreadPool( op.getNumThreads() );
+ ArrayList<UnaryAggregateTask> tasks = new ArrayList<UnaryAggregateTask>();
+ for( ArrayList<ColGroup> grp : grpParts )
+ tasks.add(new UnaryAggregateTask(grp, ret, op));
+ pool.invokeAll(tasks);
+ pool.shutdown();
+ //compute uncompressed column group in parallel (otherwise bottleneck)
+ if( uc != null )
+ ret = (MatrixBlock)uc.getData().aggregateUnaryOperations(op, ret, blockingFactorRow, blockingFactorCol, indexesIn, false);
+ //aggregate partial results
+ if( !(op.indexFn instanceof ReduceRow) ){
+ KahanObject kbuff = new KahanObject(0,0);
+ KahanPlus kplus = KahanPlus.getKahanPlusFnObject();
+ for( int i=0; i<ret.getNumRows(); i++ ) {
+ kbuff.set(ret.quickGetValue(i, 0), ret.quickGetValue(i, 0));
+ for( UnaryAggregateTask task : tasks )
+ kplus.execute2(kbuff, task.getResult().quickGetValue(i, 0));
+ ret.quickSetValue(i, 0, kbuff._sum);
+ ret.quickSetValue(i, 1, kbuff._correction);
+ }
+ }
+ }
+ catch(Exception ex) {
+ throw new DMLRuntimeException(ex);
+ }
+ }
+ else {
+ for (ColGroup grp : _colGroups) {
+ grp.unaryAggregateOperations(op, ret);
+ }
+ }
+
+ //drop correction if necessary
+ if(op.aggOp.correctionExists && inCP)
+ ret.dropLastRowsOrColums(op.aggOp.correctionLocation);
+
+ //post-processing
+ ret.recomputeNonZeros();
+
+ return ret;
+ }
+
+ @Override
+ public MatrixBlock transposeSelfMatrixMultOperations(MatrixBlock out, MMTSJType tstype)
+ throws DMLRuntimeException
+ {
+ //call uncompressed matrix mult if necessary
+ if( !isCompressed() ) {
+ return super.transposeSelfMatrixMultOperations(out, tstype);
+ }
+
+ //single-threaded tsmm of single uncompressed colgroup
+ if( isSingleUncompressedGroup() ){
+ return ((ColGroupUncompressed)_colGroups.get(0))
+ .getData().transposeSelfMatrixMultOperations(out, tstype);
+ }
+
+ Timing time = LOG.isDebugEnabled() ? new Timing(true) : null;
+
+ //check for transpose type
+ if( tstype != MMTSJType.LEFT ) //right not supported yet
+ throw new DMLRuntimeException("Invalid MMTSJ type '"+tstype.toString()+"'.");
+
+ //create output matrix block
+ if( out == null )
+ out = new MatrixBlock(clen, clen, false);
+ else
+ out.reset(clen, clen, false);
+ out.allocateDenseBlock();
+
+ if( !isEmptyBlock(false) ) {
+ //compute matrix mult
+ leftMultByTransposeSelf(_colGroups, out, 0, _colGroups.size());
+
+ // post-processing
+ out.recomputeNonZeros();
+ }
+
+ if( LOG.isDebugEnabled() )
+ LOG.debug("Compressed TSMM in "+time.stop());
+
+ return out;
+ }
+
+
+ @Override
+ public MatrixBlock transposeSelfMatrixMultOperations(MatrixBlock out, MMTSJType tstype, int k)
+ throws DMLRuntimeException
+ {
+ //call uncompressed matrix mult if necessary
+ if( !isCompressed() ){
+ return super.transposeSelfMatrixMultOperations(out, tstype, k);
+ }
+
+ //multi-threaded tsmm of single uncompressed colgroup
+ if( isSingleUncompressedGroup() ){
+ return ((ColGroupUncompressed)_colGroups.get(0))
+ .getData().transposeSelfMatrixMultOperations(out, tstype, k);
+ }
+
+ Timing time = LOG.isDebugEnabled() ? new Timing(true) : null;
+
+ //check for transpose type
+ if( tstype != MMTSJType.LEFT ) //right not supported yet
+ throw new DMLRuntimeException("Invalid MMTSJ type '"+tstype.toString()+"'.");
+
+ //create output matrix block
+ if( out == null )
+ out = new MatrixBlock(clen, clen, false);
+ else
+ out.reset(clen, clen, false);
+ out.allocateDenseBlock();
+
+ if( !isEmptyBlock(false) ) {
+ //compute matrix mult
+ try {
+ ExecutorService pool = Executors.newFixedThreadPool( k );
+ ArrayList<MatrixMultTransposeTask> tasks = new ArrayList<MatrixMultTransposeTask>();
+ int blklen = (int)(Math.ceil((double)clen/(2*k)));
+ for( int i=0; i<2*k & i*blklen<clen; i++ )
+ tasks.add(new MatrixMultTransposeTask(_colGroups, out, i*blklen, Math.min((i+1)*blklen, clen)));
+ List<Future<Object>> ret = pool.invokeAll(tasks);
+ for( Future<Object> tret : ret )
+ tret.get(); //check for errors
+ pool.shutdown();
+ }
+ catch(Exception ex) {
+ throw new DMLRuntimeException(ex);
+ }
+
+ // post-processing
+ out.recomputeNonZeros();
+ }
+
+ if( LOG.isDebugEnabled() )
+ LOG.debug("Compressed TSMM k="+k+" in "+time.stop());
+
+ return out;
+ }
+
+
+ /**
+ * Multiply this matrix block by a column vector on the right.
+ *
+ * @param vector
+ * right-hand operand of the multiplication
+ * @param result
+ * buffer to hold the result; must have the appropriate size
+ * already
+ */
+ private void rightMultByVector(MatrixBlock vector, MatrixBlock result)
+ throws DMLRuntimeException
+ {
+ // initialize and allocate the result
+ result.allocateDenseBlock();
+
+ // delegate matrix-vector operation to each column group
+ for( ColGroup grp : _colGroups )
+ if( grp instanceof ColGroupUncompressed ) //overwrites output
+ grp.rightMultByVector(vector, result, 0, result.getNumRows());
+ for( ColGroup grp : _colGroups )
+ if( !(grp instanceof ColGroupUncompressed) ) //adds to output
+ grp.rightMultByVector(vector, result, 0, result.getNumRows());
+
+ // post-processing
+ result.recomputeNonZeros();
+ }
+
+ /**
+ * Multi-threaded version of rightMultByVector.
+ *
+ * @param vector
+ * @param result
+ * @param k
+ * @throws DMLRuntimeException
+ */
+ private void rightMultByVector(MatrixBlock vector, MatrixBlock result, int k)
+ throws DMLRuntimeException
+ {
+ // initialize and allocate the result
+ result.allocateDenseBlock();
+
+ //multi-threaded execution of all groups
+ try {
+ ExecutorService pool = Executors.newFixedThreadPool( k );
+ int rlen = getNumRows();
+ int seqsz = BitmapEncoder.BITMAP_BLOCK_SZ;
+ int blklen = (int)(Math.ceil((double)rlen/k));
+ blklen += (blklen%seqsz != 0)?seqsz-blklen%seqsz:0;
+ ArrayList<RightMatrixMultTask> tasks = new ArrayList<RightMatrixMultTask>();
+ for( int i=0; i<k & i*blklen<getNumRows(); i++ )
+ tasks.add(new RightMatrixMultTask(_colGroups, vector, result, i*blklen, Math.min((i+1)*blklen,rlen)));
+ pool.invokeAll(tasks);
+ pool.shutdown();
+ }
+ catch(Exception ex) {
+ throw new DMLRuntimeException(ex);
+ }
+
+ // post-processing
+ result.recomputeNonZeros();
+ }
+
+ /**
+ * Multiply this matrix block by the transpose of a column vector (i.e.
+ * t(v)%*%X)
+ *
+ * @param vector
+ * left-hand operand of the multiplication
+ * @param result
+ * buffer to hold the result; must have the appropriate size
+ * already
+ */
+ private static void leftMultByVectorTranspose(List<ColGroup> colGroups, MatrixBlock vector, MatrixBlock result, boolean doTranspose)
+ throws DMLRuntimeException
+ {
+ //transpose vector if required
+ MatrixBlock rowVector = vector;
+ if (doTranspose) {
+ rowVector = new MatrixBlock(1, vector.getNumRows(), false);
+ LibMatrixReorg.transpose(vector, rowVector);
+ }
+
+ // initialize and allocate the result
+ result.reset();
+ result.allocateDenseBlock();
+
+ // delegate matrix-vector operation to each column group
+ for (ColGroup grp : colGroups) {
+ grp.leftMultByRowVector(rowVector, result);
+ }
+
+ // post-processing
+ result.recomputeNonZeros();
+ }
+
+ /**
+ * Multi-thread version of leftMultByVectorTranspose.
+ *
+ * @param vector
+ * @param result
+ * @param doTranspose
+ * @param k
+ * @throws DMLRuntimeException
+ */
+ private static void leftMultByVectorTranspose(List<ColGroup> colGroups,MatrixBlock vector, MatrixBlock result, boolean doTranspose, int k)
+ throws DMLRuntimeException
+ {
+ int kuc = Math.max(1, k - colGroups.size() + 1);
+
+ //transpose vector if required
+ MatrixBlock rowVector = vector;
+ if (doTranspose) {
+ rowVector = new MatrixBlock(1, vector.getNumRows(), false);
+ LibMatrixReorg.transpose(vector, rowVector);
+ }
+
+ // initialize and allocate the result
+ result.reset();
+ result.allocateDenseBlock();
+
+ //multi-threaded execution
+ try {
+ ExecutorService pool = Executors.newFixedThreadPool( Math.min(colGroups.size(), k) );
+ ArrayList<LeftMatrixMultTask> tasks = new ArrayList<LeftMatrixMultTask>();
+ for( ColGroup grp : colGroups )
+ tasks.add(new LeftMatrixMultTask(grp, rowVector, result, kuc));
+ pool.invokeAll(tasks);
+ pool.shutdown();
+ }
+ catch(Exception ex) {
+ throw new DMLRuntimeException(ex);
+ }
+
+ // post-processing
+ result.recomputeNonZeros();
+ }
+
+ /**
+ *
+ * @param result
+ * @throws DMLRuntimeException
+ */
+ private static void leftMultByTransposeSelf(ArrayList<ColGroup> groups, MatrixBlock result, int cl, int cu)
+ throws DMLRuntimeException
+ {
+ final int numRows = groups.get(0).getNumRows();
+ final int numGroups = groups.size();
+
+ //preallocated dense matrix block
+ MatrixBlock lhs = new MatrixBlock(numRows, 1, false);
+ lhs.allocateDenseBlock();
+
+ //approach: for each colgroup, extract uncompressed columns one at-a-time
+ //vector-matrix multiplies against remaining col groups
+ for( int i=cl; i<cu; i++ )
+ {
+ //get current group and relevant col groups
+ ColGroup group = groups.get(i);
+ int[] ixgroup = group.getColIndices();
+ List<ColGroup> tmpList = groups.subList(i, numGroups);
+
+ //for all uncompressed lhs columns vectors
+ for( int j=0; j<ixgroup.length; j++ ) {
+ //decompress single column
+ lhs.reset(numRows, 1, false);
+ group.decompressToBlock(lhs, j);
+
+ if( !lhs.isEmptyBlock(false) ) {
+ //compute vector-matrix partial result
+ MatrixBlock tmpret = new MatrixBlock(1,result.getNumColumns(),false);
+ leftMultByVectorTranspose(tmpList, lhs, tmpret, true);
+
+ //write partial results (disjoint non-zeros)
+ LinearAlgebraUtils.copyNonZerosToRowCol(result, tmpret, ixgroup[j]);
+ }
+ }
+ }
+ }
+
+ /**
+ *
+ * @param k
+ * @return
+ */
+ @SuppressWarnings("unchecked")
+ private ArrayList<ColGroup>[] createStaticTaskPartitioning(int k, boolean inclUncompressed)
+ {
+ // special case: single uncompressed col group
+ if( _colGroups.size()==1 && _colGroups.get(0) instanceof ColGroupUncompressed ){
+ return new ArrayList[0];
+ }
+
+ // initialize round robin col group distribution
+ // (static task partitioning to reduce mem requirements/final agg)
+ int numTasks = Math.min(k, _colGroups.size());
+ ArrayList<ColGroup>[] grpParts = new ArrayList[numTasks];
+ int pos = 0;
+ for( ColGroup grp : _colGroups ){
+ if( grpParts[pos]==null )
+ grpParts[pos] = new ArrayList<ColGroup>();
+ if( inclUncompressed || !(grp instanceof ColGroupUncompressed) ) {
+ grpParts[pos].add(grp);
+ pos = (pos==numTasks-1) ? 0 : pos+1;
+ }
+ }
+
+ return grpParts;
+ }
+
+ /**
+ *
+ * @return
+ */
+ private ColGroupUncompressed getUncompressedColGroup()
+ {
+ for( ColGroup grp : _colGroups )
+ if( grp instanceof ColGroupUncompressed )
+ return (ColGroupUncompressed)grp;
+
+ return null;
+ }
+
+ /**
+ *
+ */
+ private static class LeftMatrixMultTask implements Callable<Object>
+ {
+ private ColGroup _group = null;
+ private MatrixBlock _vect = null;
+ private MatrixBlock _ret = null;
+ private int _kuc = 1;
+
+ protected LeftMatrixMultTask( ColGroup group, MatrixBlock vect, MatrixBlock ret, int kuc) {
+ _group = group;
+ _vect = vect;
+ _ret = ret;
+ _kuc = kuc;
+ }
+
+ @Override
+ public Object call() throws DMLRuntimeException
+ {
+ // delegate matrix-vector operation to each column group
+ if( _group instanceof ColGroupUncompressed && _kuc >1 && ColGroupBitmap.LOW_LEVEL_OPT )
+ ((ColGroupUncompressed)_group).leftMultByRowVector(_vect, _ret, _kuc);
+ else
+ _group.leftMultByRowVector(_vect, _ret);
+ return null;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class RightMatrixMultTask implements Callable<Object>
+ {
+ private ArrayList<ColGroup> _groups = null;
+ private MatrixBlock _vect = null;
+ private MatrixBlock _ret = null;
+ private int _rl = -1;
+ private int _ru = -1;
+
+ protected RightMatrixMultTask( ArrayList<ColGroup> groups, MatrixBlock vect, MatrixBlock ret, int rl, int ru) {
+ _groups = groups;
+ _vect = vect;
+ _ret = ret;
+ _rl = rl;
+ _ru = ru;
+ }
+
+ @Override
+ public Object call() throws DMLRuntimeException
+ {
+ // delegate vector-matrix operation to each column group
+ for( ColGroup grp : _groups )
+ if( grp instanceof ColGroupUncompressed ) //overwrites output
+ grp.rightMultByVector(_vect, _ret, _rl, _ru);
+ for( ColGroup grp : _groups )
+ if( !(grp instanceof ColGroupUncompressed) ) //adds to output
+ grp.rightMultByVector(_vect, _ret, _rl, _ru);
+ return null;
+ }
+ }
+
+ private static class MatrixMultTransposeTask implements Callable<Object>
+ {
+ private ArrayList<ColGroup> _groups = null;
+ private MatrixBlock _ret = null;
+ private int _cl = -1;
+ private int _cu = -1;
+
+ protected MatrixMultTransposeTask(ArrayList<ColGroup> groups, MatrixBlock ret, int cl, int cu) {
+ _groups = groups;
+ _ret = ret;
+ _cl = cl;
+ _cu = cu;
+ }
+
+ @Override
+ public Object call() throws DMLRuntimeException {
+ leftMultByTransposeSelf(_groups, _ret, _cl, _cu);
+ return null;
+ }
+ }
+
+ private static class UnaryAggregateTask implements Callable<Object>
+ {
+ private ArrayList<ColGroup> _groups = null;
+ private MatrixBlock _ret = null;
+ private AggregateUnaryOperator _op = null;
+
+ protected UnaryAggregateTask( ArrayList<ColGroup> groups, MatrixBlock ret, AggregateUnaryOperator op) {
+ _groups = groups;
+ _op = op;
+
+ if( !(_op.indexFn instanceof ReduceRow) ) { //sum/rowSums
+ _ret = new MatrixBlock(ret.getNumRows(), ret.getNumColumns(), false);
+ _ret.allocateDenseBlock();
+ }
+ else { //colSums
+ _ret = ret;
+ }
+ }
+
+ @Override
+ public Object call() throws DMLRuntimeException
+ {
+ // delegate vector-matrix operation to each column group
+ for( ColGroup grp : _groups )
+ grp.unaryAggregateOperations(_op, _ret);
+ return null;
+ }
+
+ public MatrixBlock getResult(){
+ return _ret;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/16e7b1c8/src/main/java/org/apache/sysml/runtime/compress/PlanningBinPacker.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/PlanningBinPacker.java b/src/main/java/org/apache/sysml/runtime/compress/PlanningBinPacker.java
new file mode 100644
index 0000000..a76223c
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/compress/PlanningBinPacker.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.compress;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.commons.math3.random.RandomDataGenerator;
+
+/**
+ * Used for the finding columns to co-code
+ *
+ */
+public class PlanningBinPacker
+{
+ private final float _binWeight;
+ private final List<Integer> _items;
+ private final List<Float> _itemWeights;
+
+ public PlanningBinPacker(float binWeight, List<Integer> items, List<Float> itemWeights) {
+ _binWeight = binWeight;
+ _items = items;
+ _itemWeights = itemWeights;
+ }
+
+ /**
+ * NOTE: upper bound is 17/10 OPT
+ *
+ * @return key: available space, value: list of the bins that have that free space
+ */
+ public TreeMap<Float, List<List<Integer>>> packFirstFit() {
+ return packFirstFit(_items, _itemWeights);
+ }
+
+ /**
+ * shuffling the items to make some potential for having bins of different
+ * sizes when consecutive columns are of close cardinalities
+ *
+ * @return key: available space, value: list of the bins that have that free
+ * space
+ */
+ public TreeMap<Float, List<List<Integer>>> packFirstFitShuffled() {
+ RandomDataGenerator rnd = new RandomDataGenerator();
+ int[] permutation = rnd.nextPermutation(_items.size(), _items.size());
+ List<Integer> shuffledItems = new ArrayList<Integer>(_items.size());
+ List<Float> shuffledWeights = new ArrayList<Float>(_items.size());
+ for (int ix : permutation) {
+ shuffledItems.add(_items.get(ix));
+ shuffledWeights.add(_itemWeights.get(ix));
+ }
+
+ return packFirstFit(shuffledItems, shuffledWeights);
+ }
+
+ /**
+ *
+ * @param items
+ * @param itemWeights
+ * @return
+ */
+ private TreeMap<Float, List<List<Integer>>> packFirstFit(List<Integer> items, List<Float> itemWeights)
+ {
+ // when searching for a bin, the first bin in the list is used
+ TreeMap<Float, List<List<Integer>>> bins = new TreeMap<Float, List<List<Integer>>>();
+ // first bin
+ bins.put(_binWeight, createBinList());
+ int numItems = items.size();
+ for (int i = 0; i < numItems; i++) {
+ float itemWeight = itemWeights.get(i);
+ Map.Entry<Float, List<List<Integer>>> entry = bins
+ .ceilingEntry(itemWeight);
+ if (entry == null) {
+ // new bin
+ float newBinWeight = _binWeight - itemWeight;
+ List<List<Integer>> binList = bins.get(newBinWeight);
+ if (binList == null) {
+ bins.put(newBinWeight, createBinList(items.get(i)));
+ } else {
+ List<Integer> newBin = new ArrayList<Integer>();
+ newBin.add(items.get(i));
+ binList.add(newBin);
+ }
+ } else {
+ // add to the first bin in the list
+ List<Integer> assignedBin = entry.getValue().remove(0);
+ assignedBin.add(items.get(i));
+ if (entry.getValue().size() == 0)
+ bins.remove(entry.getKey());
+ float newBinWeight = entry.getKey() - itemWeight;
+ List<List<Integer>> newBinsList = bins.get(newBinWeight);
+ if (newBinsList == null) {
+ // new bin
+ bins.put(newBinWeight, createBinList(assignedBin));
+ } else {
+ newBinsList.add(assignedBin);
+ }
+ }
+ }
+ return bins;
+ }
+
+ /**
+ * NOTE: upper bound is 11/9 OPT + 6/9 (~1.22 OPT)
+ *
+ * @return
+ */
+ public TreeMap<Float, List<List<Integer>>> packFirstFitDescending() {
+ // sort items descending based on their weights
+ Integer[] indexes = new Integer[_items.size()];
+ for (int i = 0; i < indexes.length; i++)
+ indexes[i] = i;
+ Arrays.sort(indexes, new Comparator<Integer>() {
+
+ @Override
+ public int compare(Integer o1, Integer o2) {
+ return _itemWeights.get(o1).compareTo(_itemWeights.get(o2));
+ }
+ });
+ List<Integer> sortedItems = new ArrayList<Integer>();
+ List<Float> sortedItemWeights = new ArrayList<Float>();
+ for (int i = indexes.length - 1; i >= 0; i--) {
+ sortedItems.add(_items.get(i));
+ sortedItemWeights.add(_itemWeights.get(i));
+ }
+ return packFirstFit(sortedItems, sortedItemWeights);
+ }
+
+ /**
+ * NOTE: upper bound is 71/60 OPT + 6/9 (~1.18 OPT)
+ *
+ * @return
+ */
+ public TreeMap<Float, List<List<Integer>>> packModifiedFirstFitDescending() {
+ throw new UnsupportedOperationException("Not implemented yet!");
+ }
+
+ /**
+ *
+ * @return
+ */
+ private List<List<Integer>> createBinList() {
+ List<List<Integer>> binList = new ArrayList<List<Integer>>();
+ binList.add(new ArrayList<Integer>());
+ return binList;
+ }
+
+ /**
+ *
+ * @param item
+ * @return
+ */
+ private List<List<Integer>> createBinList(int item) {
+ List<List<Integer>> binList = new ArrayList<List<Integer>>();
+ List<Integer> bin = new ArrayList<Integer>();
+ binList.add(bin);
+ bin.add(item);
+ return binList;
+ }
+
+ /**
+ *
+ * @param bin
+ * @return
+ */
+ private List<List<Integer>> createBinList(List<Integer> bin) {
+ List<List<Integer>> binList = new ArrayList<List<Integer>>();
+ binList.add(bin);
+ return binList;
+ }
+}