You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by mb...@apache.org on 2017/02/08 02:22:29 UTC
[4/5] incubator-systemml git commit: [SYSTEMML-449] Compressed linear
algebra v2
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java
index efdcc86..6d2ec43 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupRLE.java
@@ -22,31 +22,29 @@ package org.apache.sysml.runtime.compress;
import java.util.Arrays;
import java.util.Iterator;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.compress.utils.ConverterUtils;
import org.apache.sysml.runtime.compress.utils.LinearAlgebraUtils;
import org.apache.sysml.runtime.functionobjects.Builtin;
import org.apache.sysml.runtime.functionobjects.KahanFunction;
import org.apache.sysml.runtime.functionobjects.KahanPlus;
-import org.apache.sysml.runtime.functionobjects.KahanPlusSq;
-import org.apache.sysml.runtime.functionobjects.ReduceAll;
-import org.apache.sysml.runtime.functionobjects.ReduceCol;
-import org.apache.sysml.runtime.functionobjects.ReduceRow;
-import org.apache.sysml.runtime.functionobjects.Builtin.BuiltinCode;
import org.apache.sysml.runtime.instructions.cp.KahanObject;
import org.apache.sysml.runtime.matrix.data.MatrixBlock;
import org.apache.sysml.runtime.matrix.data.Pair;
-import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator;
import org.apache.sysml.runtime.matrix.operators.ScalarOperator;
/** A group of columns compressed with a single run-length encoded bitmap. */
-public class ColGroupRLE extends ColGroupBitmap
+public class ColGroupRLE extends ColGroupOffset
{
private static final long serialVersionUID = 7450232907594748177L;
+ private static final Log LOG = LogFactory.getLog(ColGroupRLE.class.getName());
+
public ColGroupRLE() {
- super(CompressionType.RLE_BITMAP);
+ super();
}
/**
@@ -62,26 +60,37 @@ public class ColGroupRLE extends ColGroupBitmap
*/
public ColGroupRLE(int[] colIndices, int numRows, UncompressedBitmap ubm)
{
- super(CompressionType.RLE_BITMAP, colIndices, numRows, ubm);
+ super(colIndices, numRows, ubm);
// compress the bitmaps
final int numVals = ubm.getNumValues();
char[][] lbitmaps = new char[numVals][];
int totalLen = 0;
for( int k=0; k<numVals; k++ ) {
- lbitmaps[k] = BitmapEncoder.genRLEBitmap(ubm.getOffsetsList(k));
+ lbitmaps[k] = BitmapEncoder.genRLEBitmap(
+ ubm.getOffsetsList(k).extractValues(), ubm.getNumOffsets(k));
totalLen += lbitmaps[k].length;
}
// compact bitmaps to linearized representation
createCompressedBitmaps(numVals, totalLen, lbitmaps);
+
+ //debug output
+ double ucSize = MatrixBlock.estimateSizeDenseInMemory(numRows, colIndices.length);
+ if( estimateInMemorySize() > ucSize )
+ LOG.warn("RLE group larger than UC dense: "+estimateInMemorySize()+" "+ucSize);
}
public ColGroupRLE(int[] colIndices, int numRows, boolean zeros, double[] values, char[] bitmaps, int[] bitmapOffs) {
- super(CompressionType.RLE_BITMAP, colIndices, numRows, zeros, values);
+ super(colIndices, numRows, zeros, values);
_data = bitmaps;
_ptr = bitmapOffs;
}
+
+ @Override
+ public CompressionType getCompType() {
+ return CompressionType.RLE_BITMAP;
+ }
@Override
public Iterator<Integer> getDecodeIterator(int k) {
@@ -247,7 +256,7 @@ public class ColGroupRLE extends ColGroupBitmap
//L3 cache alignment, see comment rightMultByVector OLE column group
//core difference of RLE to OLE is that runs are not segment alignment,
//which requires care of handling runs crossing cache-buckets
- final int blksz = ColGroupBitmap.WRITE_CACHE_BLKSZ;
+ final int blksz = ColGroupOffset.WRITE_CACHE_BLKSZ;
//step 1: prepare position and value arrays
@@ -335,7 +344,7 @@ public class ColGroupRLE extends ColGroupBitmap
if( LOW_LEVEL_OPT && numVals > 1
&& _numRows > BitmapEncoder.BITMAP_BLOCK_SZ )
{
- final int blksz = ColGroupBitmap.READ_CACHE_BLKSZ;
+ final int blksz = ColGroupOffset.READ_CACHE_BLKSZ;
//step 1: prepare position and value arrays
@@ -423,7 +432,7 @@ public class ColGroupRLE extends ColGroupBitmap
}
double[] rvalues = applyScalarOp(op, val0, getNumCols());
- char[] lbitmap = BitmapEncoder.genRLEBitmap(loff);
+ char[] lbitmap = BitmapEncoder.genRLEBitmap(loff, loff.length);
char[] rbitmaps = Arrays.copyOf(_data, _data.length+lbitmap.length);
System.arraycopy(lbitmap, 0, rbitmaps, _data.length, lbitmap.length);
int[] rbitmapOffs = Arrays.copyOf(_ptr, _ptr.length+1);
@@ -432,49 +441,9 @@ public class ColGroupRLE extends ColGroupBitmap
return new ColGroupRLE(_colIndexes, _numRows, loff.length<_numRows,
rvalues, rbitmaps, rbitmapOffs);
}
-
- @Override
- public void unaryAggregateOperations(AggregateUnaryOperator op, MatrixBlock result)
- throws DMLRuntimeException
- {
- unaryAggregateOperations(op, result, 0, getNumRows());
- }
-
-
- @Override
- public void unaryAggregateOperations(AggregateUnaryOperator op, MatrixBlock result, int rl, int ru)
- throws DMLRuntimeException
- {
- //sum and sumsq (reduceall/reducerow over tuples and counts)
- if( op.aggOp.increOp.fn instanceof KahanPlus || op.aggOp.increOp.fn instanceof KahanPlusSq )
- {
- KahanFunction kplus = (op.aggOp.increOp.fn instanceof KahanPlus) ?
- KahanPlus.getKahanPlusFnObject() : KahanPlusSq.getKahanPlusSqFnObject();
-
- if( op.indexFn instanceof ReduceAll )
- computeSum(result, kplus);
- else if( op.indexFn instanceof ReduceCol )
- computeRowSums(result, kplus, rl, ru);
- else if( op.indexFn instanceof ReduceRow )
- computeColSums(result, kplus);
- }
- //min and max (reduceall/reducerow over tuples only)
- else if(op.aggOp.increOp.fn instanceof Builtin
- && (((Builtin)op.aggOp.increOp.fn).getBuiltinCode()==BuiltinCode.MAX
- || ((Builtin)op.aggOp.increOp.fn).getBuiltinCode()==BuiltinCode.MIN))
- {
- Builtin builtin = (Builtin) op.aggOp.increOp.fn;
- if( op.indexFn instanceof ReduceAll )
- computeMxx(result, builtin);
- else if( op.indexFn instanceof ReduceCol )
- computeRowMxx(result, builtin, rl, ru);
- else if( op.indexFn instanceof ReduceRow )
- computeColMxx(result, builtin);
- }
- }
-
- private void computeSum(MatrixBlock result, KahanFunction kplus)
+ @Override
+ protected final void computeSum(MatrixBlock result, KahanFunction kplus)
{
KahanObject kbuff = new KahanObject(result.quickGetValue(0, 0), result.quickGetValue(0, 1));
@@ -502,37 +471,93 @@ public class ColGroupRLE extends ColGroupBitmap
result.quickSetValue(0, 1, kbuff._correction);
}
- private void computeRowSums(MatrixBlock result, KahanFunction kplus, int rl, int ru)
+ @Override
+ protected final void computeRowSums(MatrixBlock result, KahanFunction kplus, int rl, int ru)
{
KahanObject kbuff = new KahanObject(0, 0);
+ KahanPlus kplus2 = KahanPlus.getKahanPlusFnObject();
+
final int numVals = getNumValues();
double[] c = result.getDenseBlock();
- for (int k = 0; k < numVals; k++) {
- int boff = _ptr[k];
- int blen = len(k);
- double val = sumValues(k);
+ if( ALLOW_CACHE_CONSCIOUS_ROWSUMS
+ && LOW_LEVEL_OPT && numVals > 1
+ && _numRows > BitmapEncoder.BITMAP_BLOCK_SZ )
+ {
+ final int blksz = ColGroupOffset.WRITE_CACHE_BLKSZ/2;
+
+ //step 1: prepare position and value arrays
+
+ //current pos / values per RLE list
+ int[] astart = new int[numVals];
+ int[] apos = skipScan(numVals, rl, astart);
+ double[] aval = sumAllValues(kplus, kbuff);
+
+ //step 2: cache conscious matrix-vector via horizontal scans
+ for( int bi=rl; bi<ru; bi+=blksz )
+ {
+ int bimax = Math.min(bi+blksz, ru);
+
+ //horizontal segment scan, incl pos maintenance
+ for (int k = 0; k < numVals; k++) {
+ int boff = _ptr[k];
+ int blen = len(k);
+ double val = aval[k];
+ int bix = apos[k];
+ int start = astart[k];
+
+ //compute partial results, not aligned
+ while( bix<blen ) {
+ int lstart = _data[boff + bix];
+ int llen = _data[boff + bix + 1];
+ int from = Math.max(bi, start+lstart);
+ int to = Math.min(start+lstart+llen,bimax);
+ for (int rix=from; rix<to; rix++) {
+ kbuff.set(c[2*rix], c[2*rix+1]);
+ kplus2.execute2(kbuff, val);
+ c[2*rix] = kbuff._sum;
+ c[2*rix+1] = kbuff._correction;
+ }
+ if(start+lstart+llen >= bimax)
+ break;
+ start += lstart + llen;
+ bix += 2;
+ }
- if (val != 0.0) {
- Pair<Integer,Integer> tmp = skipScanVal(k, rl);
- int bix = tmp.getKey();
- int curRunStartOff = tmp.getValue();
- int curRunEnd = tmp.getValue();
- for ( ; bix<blen && curRunEnd<ru; bix+=2) {
- curRunStartOff = curRunEnd + _data[boff+bix];
- curRunEnd = curRunStartOff + _data[boff+bix+1];
- for (int rix=curRunStartOff; rix<curRunEnd && rix<ru; rix++) {
- kbuff.set(c[2*rix], c[2*rix+1]);
- kplus.execute2(kbuff, val);
- c[2*rix] = kbuff._sum;
- c[2*rix+1] = kbuff._correction;
+ apos[k] = bix;
+ astart[k] = start;
+ }
+ }
+ }
+ else
+ {
+ for (int k = 0; k < numVals; k++) {
+ int boff = _ptr[k];
+ int blen = len(k);
+ double val = sumValues(k, kplus, kbuff);
+
+ if (val != 0.0) {
+ Pair<Integer,Integer> tmp = skipScanVal(k, rl);
+ int bix = tmp.getKey();
+ int curRunStartOff = tmp.getValue();
+ int curRunEnd = tmp.getValue();
+ for ( ; bix<blen && curRunEnd<ru; bix+=2) {
+ curRunStartOff = curRunEnd + _data[boff+bix];
+ curRunEnd = curRunStartOff + _data[boff+bix+1];
+ for (int rix=curRunStartOff; rix<curRunEnd && rix<ru; rix++) {
+ kbuff.set(c[2*rix], c[2*rix+1]);
+ kplus2.execute2(kbuff, val);
+ c[2*rix] = kbuff._sum;
+ c[2*rix+1] = kbuff._correction;
+ }
}
}
}
}
}
- private void computeColSums(MatrixBlock result, KahanFunction kplus)
+ @Override
+ protected final void computeColSums(MatrixBlock result, KahanFunction kplus)
{
KahanObject kbuff = new KahanObject(0, 0);
@@ -561,7 +586,8 @@ public class ColGroupRLE extends ColGroupBitmap
}
}
- private void computeRowMxx(MatrixBlock result, Builtin builtin, int rl, int ru)
+ @Override
+ protected final void computeRowMxx(MatrixBlock result, Builtin builtin, int rl, int ru)
{
//NOTE: zeros handled once for all column groups outside
final int numVals = getNumValues();
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java
index 9d06bf8..6445c52 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupUncompressed.java
@@ -32,6 +32,7 @@ import org.apache.sysml.runtime.functionobjects.ReduceRow;
import org.apache.sysml.runtime.matrix.data.LibMatrixAgg;
import org.apache.sysml.runtime.matrix.data.LibMatrixMult;
import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.data.SparseBlock.Type;
import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator;
import org.apache.sysml.runtime.matrix.operators.ScalarOperator;
import org.apache.sysml.runtime.util.SortUtils;
@@ -53,7 +54,7 @@ public class ColGroupUncompressed extends ColGroup
private MatrixBlock _data;
public ColGroupUncompressed() {
- super(CompressionType.UNCOMPRESSED, (int[])null, -1);
+ super((int[])null, -1);
}
/**
@@ -71,7 +72,7 @@ public class ColGroupUncompressed extends ColGroup
public ColGroupUncompressed(List<Integer> colIndicesList, MatrixBlock rawblock)
throws DMLRuntimeException
{
- super(CompressionType.UNCOMPRESSED, colIndicesList,
+ super(colIndicesList,
CompressedMatrixBlock.TRANSPOSE_INPUT ?
rawblock.getNumColumns() : rawblock.getNumRows());
@@ -97,7 +98,7 @@ public class ColGroupUncompressed extends ColGroup
return;
}
- // dense implementation for dense and sparse matrices to avoid linear search
+ //dense implementation for dense and sparse matrices to avoid linear search
int m = numRows;
int n = _colIndexes.length;
for( int i = 0; i < m; i++) {
@@ -109,6 +110,11 @@ public class ColGroupUncompressed extends ColGroup
}
}
_data.examSparsity();
+
+ //convert sparse MCSR to read-optimized CSR representation
+ if( _data.isInSparseFormat() ) {
+ _data = new MatrixBlock(_data, Type.CSR, false);
+ }
}
/**
@@ -121,8 +127,7 @@ public class ColGroupUncompressed extends ColGroup
*/
public ColGroupUncompressed(ArrayList<ColGroup> groupsToDecompress)
{
- super(CompressionType.UNCOMPRESSED,
- mergeColIndices(groupsToDecompress),
+ super(mergeColIndices(groupsToDecompress),
groupsToDecompress.get(0)._numRows);
// Invert the list of column indices
@@ -152,10 +157,14 @@ public class ColGroupUncompressed extends ColGroup
*/
public ColGroupUncompressed(int[] colIndices, int numRows, MatrixBlock data)
{
- super(CompressionType.UNCOMPRESSED, colIndices, numRows);
+ super(colIndices, numRows);
_data = data;
}
+ @Override
+ public CompressionType getCompType() {
+ return CompressionType.UNCOMPRESSED;
+ }
/**
* Access for superclass
@@ -276,6 +285,23 @@ public class ColGroupUncompressed extends ColGroup
LibMatrixMult.matrixMult(_data, shortVector, result, rl, ru);
}
+ public void rightMultByVector(MatrixBlock vector, MatrixBlock result, int k)
+ throws DMLRuntimeException
+ {
+ // Pull out the relevant rows of the vector
+ int clen = _colIndexes.length;
+
+ MatrixBlock shortVector = new MatrixBlock(clen, 1, false);
+ shortVector.allocateDenseBlock();
+ double[] b = shortVector.getDenseBlock();
+ for (int colIx = 0; colIx < clen; colIx++)
+ b[colIx] = vector.quickGetValue(_colIndexes[colIx], 0);
+ shortVector.recomputeNonZeros();
+
+ // Multiply the selected columns by the appropriate parts of the vector
+ LibMatrixMult.matrixMult(_data, shortVector, result, k);
+ }
+
@Override
public void leftMultByRowVector(MatrixBlock vector, MatrixBlock result)
throws DMLRuntimeException
@@ -377,8 +403,7 @@ public class ColGroupUncompressed extends ColGroup
}
@Override
- protected void countNonZerosPerRow(int[] rnnz, int rl, int ru)
- {
+ protected void countNonZerosPerRow(int[] rnnz, int rl, int ru) {
for( int i=rl; i<ru; i++ )
rnnz[i-rl] += _data.recomputeNonZeros(i, i, 0, _data.getNumColumns()-1);
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/ColGroupValue.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ColGroupValue.java b/src/main/java/org/apache/sysml/runtime/compress/ColGroupValue.java
new file mode 100644
index 0000000..b3b5e80
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/compress/ColGroupValue.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.compress;
+
+import java.util.Arrays;
+
+import org.apache.sysml.runtime.DMLRuntimeException;
+import org.apache.sysml.runtime.functionobjects.Builtin;
+import org.apache.sysml.runtime.functionobjects.Builtin.BuiltinCode;
+import org.apache.sysml.runtime.functionobjects.KahanFunction;
+import org.apache.sysml.runtime.functionobjects.KahanPlus;
+import org.apache.sysml.runtime.instructions.cp.KahanObject;
+import org.apache.sysml.runtime.matrix.data.MatrixBlock;
+import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator;
+import org.apache.sysml.runtime.matrix.operators.ScalarOperator;
+
+
+/**
+ * Base class for column groups encoded with value dictionary.
+ *
+ */
+public abstract class ColGroupValue extends ColGroup
+{
+ private static final long serialVersionUID = 3786247536054353658L;
+
+ public static boolean LOW_LEVEL_OPT = true;
+
+ //sorting of values by physical length helps by 10-20%, especially for serial, while
+ //slight performance decrease for parallel incl multi-threaded, hence not applied for
+ //distributed operations (also because compression time + garbage collection increases)
+ public static final boolean SORT_VALUES_BY_LENGTH = true;
+
+
+ /** Distinct values associated with individual bitmaps. */
+ protected double[] _values; //linearized <numcol vals> <numcol vals>
+
+ public ColGroupValue() {
+ super((int[]) null, -1);
+ }
+
+ /**
+ * Main constructor. Stores the headers for the individual bitmaps.
+ *
+ * @param colIndices
+ * indices (within the block) of the columns included in this
+ * column
+ * @param numRows
+ * total number of rows in the parent block
+ * @param ubm
+ * Uncompressed bitmap representation of the block
+ */
+ public ColGroupValue(int[] colIndices, int numRows, UncompressedBitmap ubm)
+ {
+ super(colIndices, numRows);
+
+ // sort values by frequency, if requested
+ if( LOW_LEVEL_OPT && SORT_VALUES_BY_LENGTH
+ && numRows > BitmapEncoder.BITMAP_BLOCK_SZ ) {
+ ubm.sortValuesByFrequency();
+ }
+
+ // extract and store distinct values (bitmaps handled by subclasses)
+ _values = ubm.getValues();
+ }
+
+ /**
+ * Constructor for subclass methods that need to create shallow copies
+ *
+ * @param colIndices
+ * raw column index information
+ * @param numRows
+ * number of rows in the block
+ * @param values
+ * set of distinct values for the block (associated bitmaps are
+ * kept in the subclass)
+ */
+ protected ColGroupValue(int[] colIndices, int numRows, double[] values) {
+ super(colIndices, numRows);
+ _values = values;
+ }
+
+ @Override
+ public long estimateInMemorySize() {
+ long size = super.estimateInMemorySize();
+
+ // adding the size of values
+ size += 8; //array reference
+ if (_values != null) {
+ size += 32 + _values.length * 8; //values
+ }
+
+ return size;
+ }
+
+ /**
+ * Obtain number of distrinct sets of values associated with the bitmaps in this column group.
+ *
+ * @return the number of distinct sets of values associated with the bitmaps
+ * in this column group
+ */
+ public int getNumValues() {
+ return _values.length / _colIndexes.length;
+ }
+
+ public double[] getValues() {
+ return _values;
+ }
+
+ protected int containsAllZeroValue() {
+ int numVals = getNumValues();
+ int numCols = getNumCols();
+ for( int i=0, off=0; i<numVals; i++, off+=numCols ) {
+ boolean allZeros = true;
+ for( int j=0; j<numCols; j++ )
+ allZeros &= (_values[off+j] == 0);
+ if( allZeros )
+ return i;
+ }
+ return -1;
+ }
+
+ protected final double sumValues(int valIx) {
+ final int numCols = getNumCols();
+ final int valOff = valIx * numCols;
+ double val = 0.0;
+ for( int i = 0; i < numCols; i++ ) {
+ val += _values[valOff+i];
+ }
+
+ return val;
+ }
+
+ protected final double sumValues(int valIx, KahanFunction kplus, KahanObject kbuff) {
+ final int numCols = getNumCols();
+ final int valOff = valIx * numCols;
+ kbuff.set(0, 0);
+ for( int i = 0; i < numCols; i++ ) {
+ kplus.execute2(kbuff, _values[valOff+i]);
+ }
+
+ return kbuff._sum;
+ }
+
+ protected final double[] sumAllValues(KahanFunction kplus, KahanObject kbuff) {
+ //quick path: sum
+ if( getNumCols()==1 && kplus instanceof KahanPlus )
+ return _values; //shallow copy of values
+
+ //pre-aggregate value tuple
+ final int numVals = getNumValues();
+ double[] ret = new double[numVals];
+ for( int k=0; k<numVals; k++ )
+ ret[k] = sumValues(k, kplus, kbuff);
+
+ return ret;
+ }
+
+ protected final double sumValues(int valIx, double[] b) {
+ final int numCols = getNumCols();
+ final int valOff = valIx * numCols;
+ double val = 0;
+ for( int i = 0; i < numCols; i++ ) {
+ val += _values[valOff+i] * b[i];
+ }
+
+ return val;
+ }
+
+ protected final double[] preaggValues(int numVals, double[] b) {
+ double[] ret = new double[numVals];
+ for( int k = 0; k < numVals; k++ )
+ ret[k] = sumValues(k, b);
+
+ return ret;
+ }
+
+ /**
+ * NOTE: Shared across OLE/RLE/DDC because value-only computation.
+ *
+ * @param result output matrix block
+ * @param builtin function object
+ * @param zeros indicator if column group contains zero values
+ */
+ protected void computeMxx(MatrixBlock result, Builtin builtin, boolean zeros)
+ {
+ //init and 0-value handling
+ double val = Double.MAX_VALUE * ((builtin.getBuiltinCode()==BuiltinCode.MAX)?-1:1);
+ if( zeros )
+ val = builtin.execute2(val, 0);
+
+ //iterate over all values only
+ final int numVals = getNumValues();
+ final int numCols = getNumCols();
+ for (int k = 0; k < numVals; k++)
+ for( int j=0, valOff = k*numCols; j<numCols; j++ )
+ val = builtin.execute2(val, _values[ valOff+j ]);
+
+ //compute new partial aggregate
+ val = builtin.execute2(val, result.quickGetValue(0, 0));
+ result.quickSetValue(0, 0, val);
+ }
+
+ /**
+ * NOTE: Shared across OLE/RLE/DDC because value-only computation.
+ *
+ * @param result output matrix block
+ * @param builtin function object
+ * @param zeros indicator if column group contains zero values
+ */
+ protected void computeColMxx(MatrixBlock result, Builtin builtin, boolean zeros)
+ {
+ final int numVals = getNumValues();
+ final int numCols = getNumCols();
+
+ //init and 0-value handling
+ double[] vals = new double[numCols];
+ Arrays.fill(vals, Double.MAX_VALUE * ((builtin.getBuiltinCode()==BuiltinCode.MAX)?-1:1));
+ if( zeros ) {
+ for( int j = 0; j < numCols; j++ )
+ vals[j] = builtin.execute2(vals[j], 0);
+ }
+
+ //iterate over all values only
+ for (int k = 0; k < numVals; k++)
+ for( int j=0, valOff=k*numCols; j<numCols; j++ )
+ vals[j] = builtin.execute2(vals[j], _values[ valOff+j ]);
+
+ //copy results to output
+ for( int j=0; j<numCols; j++ )
+ result.quickSetValue(0, _colIndexes[j], vals[j]);
+ }
+
+ /**
+ * Method for use by subclasses. Applies a scalar operation to the value
+ * metadata stored in the superclass.
+ *
+ * @param op
+ * scalar operation to perform
+ * @return transformed copy of value metadata for this column group
+ * @throws DMLRuntimeException if DMLRuntimeException occurs
+ */
+ protected double[] applyScalarOp(ScalarOperator op)
+ throws DMLRuntimeException
+ {
+ //scan over linearized values
+ double[] ret = new double[_values.length];
+ for (int i = 0; i < _values.length; i++) {
+ ret[i] = op.executeScalar(_values[i]);
+ }
+
+ return ret;
+ }
+
+ protected double[] applyScalarOp(ScalarOperator op, double newVal, int numCols)
+ throws DMLRuntimeException
+ {
+ //scan over linearized values
+ double[] ret = new double[_values.length + numCols];
+ for( int i = 0; i < _values.length; i++ ) {
+ ret[i] = op.executeScalar(_values[i]);
+ }
+
+ //add new value to the end
+ Arrays.fill(ret, _values.length, _values.length+numCols, newVal);
+
+ return ret;
+ }
+
+ @Override
+ public void unaryAggregateOperations(AggregateUnaryOperator op, MatrixBlock result)
+ throws DMLRuntimeException
+ {
+ unaryAggregateOperations(op, result, 0, getNumRows());
+ }
+
+ /**
+ *
+ * @param op aggregation operator
+ * @param result output matrix block
+ * @param rl row lower index, inclusive
+ * @param ru row upper index, exclusive
+ * @throws DMLRuntimeException
+ */
+ public abstract void unaryAggregateOperations(AggregateUnaryOperator op, MatrixBlock result, int rl, int ru)
+ throws DMLRuntimeException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
index 48ebcc5..84c4812 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/CompressedMatrixBlock.java
@@ -46,6 +46,7 @@ import org.apache.sysml.lops.MMTSJ.MMTSJType;
import org.apache.sysml.lops.MapMultChain.ChainType;
import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.compress.ColGroup.CompressionType;
+import org.apache.sysml.runtime.compress.cocode.PlanningCoCoder;
import org.apache.sysml.runtime.compress.estim.CompressedSizeEstimator;
import org.apache.sysml.runtime.compress.estim.CompressedSizeInfo;
import org.apache.sysml.runtime.compress.estim.SizeEstimatorFactory;
@@ -56,12 +57,14 @@ import org.apache.sysml.runtime.controlprogram.caching.MatrixObject.UpdateType;
import org.apache.sysml.runtime.controlprogram.parfor.stat.Timing;
import org.apache.sysml.runtime.functionobjects.Builtin;
import org.apache.sysml.runtime.functionobjects.Builtin.BuiltinCode;
+import org.apache.sysml.runtime.functionobjects.KahanFunction;
import org.apache.sysml.runtime.functionobjects.KahanPlus;
import org.apache.sysml.runtime.functionobjects.KahanPlusSq;
import org.apache.sysml.runtime.functionobjects.Multiply;
import org.apache.sysml.runtime.functionobjects.ReduceAll;
import org.apache.sysml.runtime.functionobjects.ReduceCol;
import org.apache.sysml.runtime.instructions.cp.CM_COV_Object;
+import org.apache.sysml.runtime.instructions.cp.KahanObject;
import org.apache.sysml.runtime.instructions.cp.ScalarObject;
import org.apache.sysml.runtime.matrix.data.CTableMap;
import org.apache.sysml.runtime.matrix.data.LibMatrixBincell;
@@ -98,7 +101,9 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
public static final boolean MATERIALIZE_ZEROS = false;
public static final long MIN_PAR_AGG_THRESHOLD = 16*1024*1024; //16MB
public static final boolean INVESTIGATE_ESTIMATES = false;
- private static final boolean LDEBUG = false; //local debug flag
+ public static boolean ALLOW_DDC_ENCODING = true;
+ private static final boolean LDEBUG = true; //local debug flag
+ private static final Level LDEBUG_LEVEL = Level.DEBUG; //DEBUG/TRACE for details
private static final Log LOG = LogFactory.getLog(CompressedMatrixBlock.class.getName());
@@ -106,7 +111,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
// for internal debugging only
if( LDEBUG ) {
Logger.getLogger("org.apache.sysml.runtime.compress")
- .setLevel((Level) Level.DEBUG);
+ .setLevel((Level) LDEBUG_LEVEL);
}
}
@@ -231,7 +236,6 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
final int numRows = getNumRows();
final int numCols = getNumColumns();
final boolean sparse = isInSparseFormat();
- final double sp = OptimizerUtils.getSparsity(numRows, numCols, getNonZeros());
MatrixBlock rawblock = !TRANSPOSE_INPUT ? new MatrixBlock(this) :
LibMatrixReorg.transpose(this, new MatrixBlock(numCols, numRows, sparse), k);
@@ -239,45 +243,50 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
CompressedSizeEstimator bitmapSizeEstimator =
SizeEstimatorFactory.getSizeEstimator(rawblock, numRows);
- // The current implementation of this method is written for correctness,
- // not for performance or for minimal use of temporary space.
-
- // We start with a full set of columns.
- HashSet<Integer> remainingCols = new HashSet<Integer>();
- for (int i = 0; i < numCols; i++)
- remainingCols.add(i);
-
// PHASE 1: Classify columns by compression type
- // We start by determining which columns are amenable to bitmap compression
- double uncompressedColumnSize = getUncompressedSize(numRows, 1, sp);
-
- // information about the bitmap amenable columns
- List<Integer> bitmapCols = new ArrayList<Integer>();
- List<Integer> uncompressedCols = new ArrayList<Integer>();
- List<Integer> colsCards = new ArrayList<Integer>();
- List<Long> compressedSizes = new ArrayList<Long>();
- HashMap<Integer, Double> compressionRatios = new HashMap<Integer, Double>();
+ // We start by determining which columns are amenable to compression
+ List<Integer> colsC = new ArrayList<Integer>();
+ List<Integer> colsUC = new ArrayList<Integer>();
+ HashMap<Integer, Double> compRatios = new HashMap<Integer, Double>();
- // Classify columns according to ration (size uncompressed / size compressed),
+ // Classify columns according to ratio (size uncompressed / size compressed),
// where a column is compressible if ratio > 1.
CompressedSizeInfo[] sizeInfos = (k > 1) ?
computeCompressedSizeInfos(bitmapSizeEstimator, numCols, k) :
- computeCompressedSizeInfos(bitmapSizeEstimator, numCols);
+ computeCompressedSizeInfos(bitmapSizeEstimator, numCols);
+ long nnzUC = 0;
for (int col = 0; col < numCols; col++) {
- long compressedSize = sizeInfos[col].getMinSize();
- double compRatio = uncompressedColumnSize / compressedSize;
- if (compRatio > 1) {
- bitmapCols.add(col);
- compressionRatios.put(col, compRatio);
- colsCards.add(sizeInfos[col].getEstCarinality());
- compressedSizes.add(compressedSize);
+ double uncompSize = getUncompressedSize(numRows, 1,
+ OptimizerUtils.getSparsity(numRows, 1, sizeInfos[col].getEstNnz()));
+ double compRatio = uncompSize / sizeInfos[col].getMinSize();
+ if( compRatio > 1 ) {
+ colsC.add(col);
+ compRatios.put(col, compRatio);
+ }
+ else {
+ colsUC.add(col);
+ nnzUC += sizeInfos[col].getEstNnz();
}
- else
- uncompressedCols.add(col);
}
-
- _stats.timePhase1 = time.stop();
+
+ // correction of column classification (reevaluate dense estimates if necessary)
+ boolean sparseUC = MatrixBlock.evalSparseFormatInMemory(numRows, colsUC.size(), nnzUC);
+ if( !sparseUC && !colsUC.isEmpty() ) {
+ for( int i=0; i<colsUC.size(); i++ ) {
+ int col = colsUC.get(i);
+ double uncompSize = getUncompressedSize(numRows, 1, 1.0);
+ double compRatio = uncompSize / sizeInfos[col].getMinSize();
+ if( compRatio > 1 ) {
+ colsC.add(col);
+ colsUC.remove(i); i--;
+ compRatios.put(col, compRatio);
+ nnzUC -= sizeInfos[col].getEstNnz();
+ }
+ }
+ }
+
if( LOG.isDebugEnabled() ) {
+ _stats.timePhase1 = time.stop();
LOG.debug("Compression statistics:");
LOG.debug("--compression phase 1: "+_stats.timePhase1);
}
@@ -285,26 +294,28 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
// PHASE 2: Grouping columns
// Divide the bitmap columns into column groups.
List<int[]> bitmapColGrps = PlanningCoCoder.findCocodesByPartitioning(
- bitmapSizeEstimator, bitmapCols, colsCards, compressedSizes, numRows,
- isInSparseFormat() ? sp : 1, k);
+ bitmapSizeEstimator, colsC, sizeInfos, numRows, k);
- _stats.timePhase2 = time.stop();
- if( LOG.isDebugEnabled() )
+ if( LOG.isDebugEnabled() ) {
+ _stats.timePhase2 = time.stop();
LOG.debug("--compression phase 2: "+_stats.timePhase2);
-
+ }
+
if( INVESTIGATE_ESTIMATES ) {
double est = 0;
for( int[] groupIndices : bitmapColGrps )
est += bitmapSizeEstimator.estimateCompressedColGroupSize(groupIndices).getMinSize();
- est += uncompressedCols.size() * uncompressedColumnSize;
+ est += MatrixBlock.estimateSizeInMemory(numRows, colsUC.size(),
+ OptimizerUtils.getSparsity(numRows, colsUC.size(), nnzUC));
_stats.estSize = est;
}
// PHASE 3: Compress and correct sample-based decisions
ColGroup[] colGroups = (k > 1) ?
- compressColGroups(rawblock, bitmapSizeEstimator, compressionRatios, numRows, sp, bitmapColGrps, k) :
- compressColGroups(rawblock, bitmapSizeEstimator, compressionRatios, numRows, sp, bitmapColGrps);
+ compressColGroups(rawblock, bitmapSizeEstimator, compRatios, numRows, bitmapColGrps, colsUC.isEmpty(), k) :
+ compressColGroups(rawblock, bitmapSizeEstimator, compRatios, numRows, bitmapColGrps, colsUC.isEmpty());
allocateColGroupList();
+ HashSet<Integer> remainingCols = seq(0, numCols-1, 1);
for( int j=0; j<colGroups.length; j++ ) {
if( colGroups[j] != null ) {
for( int col : colGroups[j].getColIndices() )
@@ -313,10 +324,11 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
}
}
- _stats.timePhase3 = time.stop();
- if( LOG.isDebugEnabled() )
+ if( LOG.isDebugEnabled() ) {
+ _stats.timePhase3 = time.stop();
LOG.debug("--compression phase 3: "+_stats.timePhase3);
-
+ }
+
// Phase 4: Cleanup
// The remaining columns are stored uncompressed as one big column group
if( !remainingCols.isEmpty() ) {
@@ -332,10 +344,15 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
rawblock.cleanupBlock(true, true);
this.cleanupBlock(true, true);
- _stats.timePhase4 = time.stop();
if( LOG.isDebugEnabled() ) {
+ _stats.timePhase4 = time.stop();
+ int[] counts = getColGroupCounts(_colGroups);
LOG.debug("--compression phase 4: "+_stats.timePhase4);
LOG.debug("--num col groups: "+_colGroups.size());
+ LOG.debug("--col groups types (OLE,RLE,DDC1,DDC2,UC): "
+ +counts[2]+","+counts[1]+","+counts[3]+","+counts[4]+","+counts[0]);
+ LOG.debug("--col groups sizes (OLE,RLE,DDC1,DDC2,UC): "
+ +counts[7]+","+counts[6]+","+counts[8]+","+counts[9]+","+counts[5]);
LOG.debug("--compressed size: "+_stats.size);
LOG.debug("--compression ratio: "+_stats.ratio);
}
@@ -345,6 +362,22 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
return _stats;
}
+ /**
+ * Get array of counts regarding col group types. The position
+ * corresponds with the enum ordinal.
+ *
+ * @param colgroups list of column groups
+ * @return counts
+ */
+ private static int[] getColGroupCounts(ArrayList<ColGroup> colgroups) {
+ int[] ret = new int[10]; //5 x count, 5 x num_columns
+ for( ColGroup c : colgroups ) {
+ ret[c.getCompType().ordinal()] ++;
+ ret[5+c.getCompType().ordinal()] += c.getNumCols();
+ }
+ return ret;
+ }
+
private static CompressedSizeInfo[] computeCompressedSizeInfos(CompressedSizeEstimator estim, int clen) {
CompressedSizeInfo[] ret = new CompressedSizeInfo[clen];
for( int col=0; col<clen; col++ )
@@ -372,23 +405,23 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
}
}
- private static ColGroup[] compressColGroups(MatrixBlock in, CompressedSizeEstimator estim, HashMap<Integer, Double> compRatios, int rlen, double sp, List<int[]> groups)
+ private static ColGroup[] compressColGroups(MatrixBlock in, CompressedSizeEstimator estim, HashMap<Integer, Double> compRatios, int rlen, List<int[]> groups, boolean denseEst)
{
ColGroup[] ret = new ColGroup[groups.size()];
for( int i=0; i<groups.size(); i++ )
- ret[i] = compressColGroup(in, estim, compRatios, rlen, sp, groups.get(i));
+ ret[i] = compressColGroup(in, estim, compRatios, rlen, groups.get(i), denseEst);
return ret;
}
- private static ColGroup[] compressColGroups(MatrixBlock in, CompressedSizeEstimator estim, HashMap<Integer, Double> compRatios, int rlen, double sp, List<int[]> groups, int k)
+ private static ColGroup[] compressColGroups(MatrixBlock in, CompressedSizeEstimator estim, HashMap<Integer, Double> compRatios, int rlen, List<int[]> groups, boolean denseEst, int k)
throws DMLRuntimeException
{
try {
ExecutorService pool = Executors.newFixedThreadPool( k );
ArrayList<CompressTask> tasks = new ArrayList<CompressTask>();
for( int[] colIndexes : groups )
- tasks.add(new CompressTask(in, estim, compRatios, rlen, sp, colIndexes));
+ tasks.add(new CompressTask(in, estim, compRatios, rlen, colIndexes, denseEst));
List<Future<ColGroup>> rtask = pool.invokeAll(tasks);
ArrayList<ColGroup> ret = new ArrayList<ColGroup>();
for( Future<ColGroup> lrtask : rtask )
@@ -401,7 +434,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
}
}
- private static ColGroup compressColGroup(MatrixBlock in, CompressedSizeEstimator estim, HashMap<Integer, Double> compRatios, int rlen, double sp, int[] colIndexes)
+ private static ColGroup compressColGroup(MatrixBlock in, CompressedSizeEstimator estim, HashMap<Integer, Double> compRatios, int rlen, int[] colIndexes, boolean denseEst)
{
int[] allGroupIndices = null;
int allColsCount = colIndexes.length;
@@ -416,12 +449,13 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
//exact big list and observe compression ratio
ubm = BitmapEncoder.extractBitmap(colIndexes, in);
sizeInfo = estim.estimateCompressedColGroupSize(ubm);
- double compRatio = getUncompressedSize(rlen, colIndexes.length, sp) / sizeInfo.getMinSize();
-
+ double sp2 = denseEst ? 1.0 : OptimizerUtils.getSparsity(rlen, 1, ubm.getNumOffsets());
+ double compRatio = getUncompressedSize(rlen, colIndexes.length, sp2) / sizeInfo.getMinSize();
+
if( compRatio > 1 ) {
break; // we have a good group
}
-
+
// modify the group
if (compRatioPQ == null) {
// first modification
@@ -454,9 +488,17 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
//create compressed column group
long rleSize = sizeInfo.getRLESize();
long oleSize = sizeInfo.getOLESize();
- if( rleSize < oleSize )
+ long ddcSize = sizeInfo.getDDCSize();
+
+ if( ALLOW_DDC_ENCODING && ddcSize < rleSize && ddcSize < oleSize ) {
+ if( ubm.getNumValues()<=255 )
+ return new ColGroupDDC1(colIndexes, rlen, ubm);
+ else
+ return new ColGroupDDC2(colIndexes, rlen, ubm);
+ }
+ else if( rleSize < oleSize )
return new ColGroupRLE(colIndexes, rlen, ubm);
- else
+ else
return new ColGroupOLE(colIndexes, rlen, ubm);
}
@@ -469,10 +511,11 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
* @return estimate of uncompressed size of column group
*/
private static double getUncompressedSize(int rlen, int clen, double sparsity) {
- //we estimate the uncompressed size as 8 * nnz in order to cover both
- //sparse and dense with moderate underestimation (which is conservative as
- //it is biased towards uncompressed columns)
- return 8 * rlen * clen * sparsity;
+ //we estimate the uncompressed size as the minimum of dense representation
+ //and representation in csr, which moderately overestimates sparse representations
+ //of single columns but helps avoid anomalies with sparse columns that are
+ //eventually represented in dense
+ return Math.min(8d * rlen * clen, 4d * rlen + 12d * rlen * clen * sparsity);
}
/**
@@ -587,8 +630,8 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
}
private static class CompressedColumn implements Comparable<CompressedColumn> {
- int colIx;
- double compRatio;
+ final int colIx;
+ final double compRatio;
public CompressedColumn(int colIx, double compRatio) {
this.colIx = colIx;
@@ -613,6 +656,13 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
public CompressionStatistics() {
//do nothing
}
+
+ public CompressionStatistics(double t1, double t2, double t3, double t4){
+ timePhase1 = t1;
+ timePhase2 = t2;
+ timePhase3 = t3;
+ timePhase4 = t4;
+ }
}
@Override
@@ -681,6 +731,10 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
grp = new ColGroupOLE(); break;
case RLE_BITMAP:
grp = new ColGroupRLE(); break;
+ case DDC1:
+ grp = new ColGroupDDC1(); break;
+ case DDC2:
+ grp = new ColGroupDDC2(); break;
}
//deserialize and add column group
@@ -1040,11 +1094,22 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
//aggregate partial results
if( op.indexFn instanceof ReduceAll ) {
- double val = ret.quickGetValue(0, 0);
- for( Future<MatrixBlock> rtask : rtasks )
- val = op.aggOp.increOp.fn.execute(val,
- rtask.get().quickGetValue(0, 0));
- ret.quickSetValue(0, 0, val);
+ if( op.aggOp.increOp.fn instanceof KahanFunction ) {
+ KahanObject kbuff = new KahanObject(ret.quickGetValue(0, 0), 0);
+ for( Future<MatrixBlock> rtask : rtasks ) {
+ double tmp = rtask.get().quickGetValue(0, 0);
+ ((KahanFunction) op.aggOp.increOp.fn).execute2(kbuff, tmp);
+ }
+ ret.quickSetValue(0, 0, kbuff._sum);
+ }
+ else {
+ double val = ret.quickGetValue(0, 0);
+ for( Future<MatrixBlock> rtask : rtasks ) {
+ double tmp = rtask.get().quickGetValue(0, 0);
+ val = op.aggOp.increOp.fn.execute(val, tmp);
+ }
+ ret.quickSetValue(0, 0, val);
+ }
}
}
catch(Exception ex) {
@@ -1058,9 +1123,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
grp.unaryAggregateOperations(op, ret);
//process OLE/RLE column groups
- for (ColGroup grp : _colGroups)
- if( !(grp instanceof ColGroupUncompressed) )
- grp.unaryAggregateOperations(op, ret);
+ aggregateUnaryOperations(op, _colGroups, ret, 0, rlen);
}
//special handling zeros for rowmins/rowmax
@@ -1089,6 +1152,41 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
}
@Override
+ public MatrixValue aggregateUnaryOperations(AggregateUnaryOperator op,
+ MatrixValue result, int blockingFactorRow, int blockingFactorCol,
+ MatrixIndexes indexesIn) throws DMLRuntimeException {
+ return aggregateUnaryOperations(op, result,
+ blockingFactorRow, blockingFactorCol, indexesIn, false);
+ }
+
+ private static void aggregateUnaryOperations(AggregateUnaryOperator op,
+ ArrayList<ColGroup> groups, MatrixBlock ret, int rl, int ru) throws DMLRuntimeException
+ {
+ boolean cacheDDC1 = ColGroupValue.LOW_LEVEL_OPT
+ && op.indexFn instanceof ReduceCol && op.aggOp.increOp.fn instanceof KahanPlus //rowSums
+ && ColGroupOffset.ALLOW_CACHE_CONSCIOUS_ROWSUMS
+ && ru-rl > ColGroupOffset.WRITE_CACHE_BLKSZ/2;
+
+ //process cache-conscious DDC1 groups (adds to output)
+ if( cacheDDC1 ) {
+ ArrayList<ColGroupDDC1> tmp = new ArrayList<ColGroupDDC1>();
+ for( ColGroup grp : groups )
+ if( grp instanceof ColGroupDDC1 )
+ tmp.add((ColGroupDDC1)grp);
+ if( !tmp.isEmpty() )
+ ColGroupDDC1.computeRowSums(tmp.toArray(new ColGroupDDC1[0]), ret,
+ KahanPlus.getKahanPlusFnObject(), rl, ru);
+ }
+
+ //process remaining groups (adds to output)
+ //note: UC group never passed into this function
+ for( ColGroup grp : groups )
+ if( !(grp instanceof ColGroupUncompressed)
+ && !(cacheDDC1 && grp instanceof ColGroupDDC1) )
+ ((ColGroupValue)grp).unaryAggregateOperations(op, ret, rl, ru);
+ }
+
+ @Override
public MatrixBlock transposeSelfMatrixMultOperations(MatrixBlock out, MMTSJType tstype)
throws DMLRuntimeException
{
@@ -1204,12 +1302,7 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
result.allocateDenseBlock();
// delegate matrix-vector operation to each column group
- for( ColGroup grp : _colGroups )
- if( grp instanceof ColGroupUncompressed ) //overwrites output
- grp.rightMultByVector(vector, result, 0, result.getNumRows());
- for( ColGroup grp : _colGroups )
- if( !(grp instanceof ColGroupUncompressed) ) //adds to output
- grp.rightMultByVector(vector, result, 0, result.getNumRows());
+ rightMultByVector(_colGroups, vector, result, true, 0, result.getNumRows());
// post-processing
result.recomputeNonZeros();
@@ -1231,6 +1324,13 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
//multi-threaded execution of all groups
try {
+ ColGroupUncompressed uc = getUncompressedColGroup();
+
+ //compute uncompressed column group in parallel
+ if( uc != null )
+ uc.rightMultByVector(vector, result, k);
+
+ //compute remaining compressed column groups in parallel
ExecutorService pool = Executors.newFixedThreadPool( k );
int rlen = getNumRows();
int seqsz = BitmapEncoder.BITMAP_BLOCK_SZ;
@@ -1239,15 +1339,48 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
ArrayList<RightMatrixMultTask> tasks = new ArrayList<RightMatrixMultTask>();
for( int i=0; i<k & i*blklen<getNumRows(); i++ )
tasks.add(new RightMatrixMultTask(_colGroups, vector, result, i*blklen, Math.min((i+1)*blklen,rlen)));
- pool.invokeAll(tasks);
+ List<Future<Long>> ret = pool.invokeAll(tasks);
pool.shutdown();
+
+ //error handling and nnz aggregation
+ long lnnz = 0;
+ for( Future<Long> tmp : ret )
+ lnnz += tmp.get();
+ result.setNonZeros(lnnz);
}
catch(Exception ex) {
throw new DMLRuntimeException(ex);
}
+ }
+
+ private static void rightMultByVector(ArrayList<ColGroup> groups, MatrixBlock vect, MatrixBlock ret, boolean inclUC, int rl, int ru)
+ throws DMLRuntimeException
+ {
+ boolean cacheDDC1 = ColGroupValue.LOW_LEVEL_OPT
+ && ru-rl > ColGroupOffset.WRITE_CACHE_BLKSZ;
- // post-processing
- result.recomputeNonZeros();
+ // process uncompressed column group (overwrites output)
+ if( inclUC ) {
+ for( ColGroup grp : groups )
+ if( grp instanceof ColGroupUncompressed )
+ grp.rightMultByVector(vect, ret, rl, ru);
+ }
+
+ //process cache-conscious DDC1 groups (adds to output)
+ if( cacheDDC1 ) {
+ ArrayList<ColGroupDDC1> tmp = new ArrayList<ColGroupDDC1>();
+ for( ColGroup grp : groups )
+ if( grp instanceof ColGroupDDC1 )
+ tmp.add((ColGroupDDC1)grp);
+ if( !tmp.isEmpty() )
+ ColGroupDDC1.rightMultByVector(tmp.toArray(new ColGroupDDC1[0]), vect, ret, rl, ru);
+ }
+
+ //process remaining groups (adds to output)
+ for( ColGroup grp : groups )
+ if( !(grp instanceof ColGroupUncompressed)
+ && !(cacheDDC1 && grp instanceof ColGroupDDC1) )
+ grp.rightMultByVector(vect, ret, rl, ru);
}
/**
@@ -1299,11 +1432,9 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
* @param k number of threads
* @throws DMLRuntimeException if DMLRuntimeException occurs
*/
- private static void leftMultByVectorTranspose(List<ColGroup> colGroups,MatrixBlock vector, MatrixBlock result, boolean doTranspose, int k)
+ private void leftMultByVectorTranspose(List<ColGroup> colGroups,MatrixBlock vector, MatrixBlock result, boolean doTranspose, int k)
throws DMLRuntimeException
{
- int kuc = Math.max(1, k - colGroups.size() + 1);
-
//transpose vector if required
MatrixBlock rowVector = vector;
if (doTranspose) {
@@ -1317,12 +1448,21 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
//multi-threaded execution
try {
- ExecutorService pool = Executors.newFixedThreadPool( Math.min(colGroups.size(), k) );
+ //compute uncompressed column group in parallel
+ ColGroupUncompressed uc = getUncompressedColGroup();
+ if( uc != null )
+ uc.leftMultByRowVector(vector, result, k);
+
+ //compute remaining compressed column groups in parallel
+ ExecutorService pool = Executors.newFixedThreadPool( Math.min(colGroups.size()-((uc!=null)?1:0), k) );
ArrayList<LeftMatrixMultTask> tasks = new ArrayList<LeftMatrixMultTask>();
for( ColGroup grp : colGroups )
- tasks.add(new LeftMatrixMultTask(grp, rowVector, result, kuc));
- pool.invokeAll(tasks);
+ if( !(grp instanceof ColGroupUncompressed) )
+ tasks.add(new LeftMatrixMultTask(grp, rowVector, result));
+ List<Future<Object>> ret = pool.invokeAll(tasks);
pool.shutdown();
+ for( Future<Object> tmp : ret )
+ tmp.get(); //error handling
}
catch(Exception ex) {
throw new DMLRuntimeException(ex);
@@ -1405,37 +1545,32 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
private static class LeftMatrixMultTask implements Callable<Object>
{
- private ColGroup _group = null;
- private MatrixBlock _vect = null;
- private MatrixBlock _ret = null;
- private int _kuc = 1;
+ private final ColGroup _group;
+ private final MatrixBlock _vect;
+ private final MatrixBlock _ret;
- protected LeftMatrixMultTask( ColGroup group, MatrixBlock vect, MatrixBlock ret, int kuc) {
+ protected LeftMatrixMultTask( ColGroup group, MatrixBlock vect, MatrixBlock ret) {
_group = group;
_vect = vect;
_ret = ret;
- _kuc = kuc;
}
@Override
public Object call() throws DMLRuntimeException
{
// delegate matrix-vector operation to each column group
- if( _group instanceof ColGroupUncompressed && _kuc >1 && ColGroupBitmap.LOW_LEVEL_OPT )
- ((ColGroupUncompressed)_group).leftMultByRowVector(_vect, _ret, _kuc);
- else
- _group.leftMultByRowVector(_vect, _ret);
+ _group.leftMultByRowVector(_vect, _ret);
return null;
}
}
- private static class RightMatrixMultTask implements Callable<Object>
+ private static class RightMatrixMultTask implements Callable<Long>
{
- private ArrayList<ColGroup> _groups = null;
- private MatrixBlock _vect = null;
- private MatrixBlock _ret = null;
- private int _rl = -1;
- private int _ru = -1;
+ private final ArrayList<ColGroup> _groups;
+ private final MatrixBlock _vect;
+ private final MatrixBlock _ret;
+ private final int _rl;
+ private final int _ru;
protected RightMatrixMultTask( ArrayList<ColGroup> groups, MatrixBlock vect, MatrixBlock ret, int rl, int ru) {
_groups = groups;
@@ -1446,25 +1581,18 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
}
@Override
- public Object call() throws DMLRuntimeException
- {
- // delegate vector-matrix operation to each column group
- for( ColGroup grp : _groups )
- if( grp instanceof ColGroupUncompressed ) //overwrites output
- grp.rightMultByVector(_vect, _ret, _rl, _ru);
- for( ColGroup grp : _groups )
- if( !(grp instanceof ColGroupUncompressed) ) //adds to output
- grp.rightMultByVector(_vect, _ret, _rl, _ru);
- return null;
+ public Long call() throws DMLRuntimeException {
+ rightMultByVector(_groups, _vect, _ret, false, _rl, _ru);
+ return _ret.recomputeNonZeros(_rl, _ru-1, 0, 0);
}
}
private static class MatrixMultTransposeTask implements Callable<Object>
{
- private ArrayList<ColGroup> _groups = null;
- private MatrixBlock _ret = null;
- private int _gl = -1;
- private int _gu = -1;
+ private final ArrayList<ColGroup> _groups;
+ private final MatrixBlock _ret;
+ private final int _gl;
+ private final int _gu;
protected MatrixMultTransposeTask(ArrayList<ColGroup> groups, MatrixBlock ret, int gl, int gu) {
_groups = groups;
@@ -1482,11 +1610,11 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
private static class UnaryAggregateTask implements Callable<MatrixBlock>
{
- private ArrayList<ColGroup> _groups = null;
- private int _rl = -1;
- private int _ru = -1;
- private MatrixBlock _ret = null;
- private AggregateUnaryOperator _op = null;
+ private final ArrayList<ColGroup> _groups;
+ private final int _rl;
+ private final int _ru;
+ private final MatrixBlock _ret;
+ private final AggregateUnaryOperator _op;
protected UnaryAggregateTask( ArrayList<ColGroup> groups, MatrixBlock ret, int rl, int ru, AggregateUnaryOperator op) {
_groups = groups;
@@ -1507,18 +1635,15 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
@Override
public MatrixBlock call() throws DMLRuntimeException {
- // delegate unary aggregate operation to each column group
- // (uncompressed column group handles separately)
- for( ColGroup grp : _groups )
- ((ColGroupBitmap)grp).unaryAggregateOperations(_op, _ret, _rl, _ru);
+ aggregateUnaryOperations(_op, _groups, _ret, _rl, _ru);
return _ret;
}
}
private static class SizeEstimTask implements Callable<CompressedSizeInfo>
{
- private CompressedSizeEstimator _estim = null;
- private int _col = -1;
+ private final CompressedSizeEstimator _estim;
+ private final int _col;
protected SizeEstimTask( CompressedSizeEstimator estim, int col ) {
_estim = estim;
@@ -1533,34 +1658,34 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
private static class CompressTask implements Callable<ColGroup>
{
- private MatrixBlock _in = null;
- private CompressedSizeEstimator _estim = null;
- private HashMap<Integer, Double> _compRatios = null;
- private int _rlen = -1;
- private double _sp = -1;
- private int[] _colIndexes = null;
-
- protected CompressTask( MatrixBlock in, CompressedSizeEstimator estim, HashMap<Integer, Double> compRatios, int rlen, double sp, int[] colIndexes ) {
+ private final MatrixBlock _in;
+ private final CompressedSizeEstimator _estim;
+ private final HashMap<Integer, Double> _compRatios;
+ private final int _rlen;
+ private final int[] _colIndexes;
+ private final boolean _denseEst;
+
+ protected CompressTask( MatrixBlock in, CompressedSizeEstimator estim, HashMap<Integer, Double> compRatios, int rlen, int[] colIndexes, boolean denseEst ) {
_in = in;
_estim = estim;
_compRatios = compRatios;
_rlen = rlen;
- _sp = sp;
_colIndexes = colIndexes;
+ _denseEst = denseEst;
}
@Override
public ColGroup call() throws DMLRuntimeException {
- return compressColGroup(_in, _estim, _compRatios, _rlen, _sp, _colIndexes);
+ return compressColGroup(_in, _estim, _compRatios, _rlen, _colIndexes, _denseEst);
}
}
private static class DecompressTask implements Callable<Object>
{
- private List<ColGroup> _colGroups = null;
- private MatrixBlock _ret = null;
- private int _rl = -1;
- private int _ru = -1;
+ private final List<ColGroup> _colGroups;
+ private final MatrixBlock _ret;
+ private final int _rl;
+ private final int _ru;
protected DecompressTask( List<ColGroup> colGroups, MatrixBlock ret, int rl, int ru ) {
_colGroups = colGroups;
@@ -1735,15 +1860,6 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
MatrixBlock tmp = isCompressed() ? decompress() : this;
return tmp.zeroOutOperations(result, range, complementary);
}
-
- @Override
- public MatrixValue aggregateUnaryOperations(AggregateUnaryOperator op,
- MatrixValue result, int blockingFactorRow, int blockingFactorCol,
- MatrixIndexes indexesIn) throws DMLRuntimeException {
- printDecompressWarning("aggregateUnaryOperations");
- MatrixBlock tmp = isCompressed() ? decompress() : this;
- return tmp.aggregateUnaryOperations(op, result, blockingFactorRow, blockingFactorCol, indexesIn);
- }
@Override
public CM_COV_Object cmOperations(CMOperator op) throws DMLRuntimeException {
@@ -2000,4 +2116,11 @@ public class CompressedMatrixBlock extends MatrixBlock implements Externalizable
LOG.warn("Operation '"+operation+"' not supported yet - decompressing for ULA operations.");
}
}
+
+ private HashSet<Integer> seq(int from, int to, int incr) {
+ HashSet<Integer> ret = new HashSet<Integer>();
+ for (int i = from; i <= to; i+=incr)
+ ret.add(i);
+ return ret;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/PlanningBinPacker.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/PlanningBinPacker.java b/src/main/java/org/apache/sysml/runtime/compress/PlanningBinPacker.java
deleted file mode 100644
index 70308bb..0000000
--- a/src/main/java/org/apache/sysml/runtime/compress/PlanningBinPacker.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.compress;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-/**
- * Used for the finding columns to co-code
- *
- */
-public class PlanningBinPacker
-{
- private final float _binWeight;
- private final List<Integer> _items;
- private final List<Float> _itemWeights;
-
- public PlanningBinPacker(float binWeight, List<Integer> items, List<Float> itemWeights) {
- _binWeight = binWeight;
- _items = items;
- _itemWeights = itemWeights;
- }
-
- /**
- * NOTE: upper bound is 17/10 OPT
- *
- * @return key: available space, value: list of the bins that have that free space
- */
- public TreeMap<Float, List<List<Integer>>> packFirstFit() {
- return packFirstFit(_items, _itemWeights);
- }
-
- private TreeMap<Float, List<List<Integer>>> packFirstFit(List<Integer> items, List<Float> itemWeights)
- {
- // when searching for a bin, the first bin in the list is used
- TreeMap<Float, List<List<Integer>>> bins = new TreeMap<Float, List<List<Integer>>>();
- // first bin
- bins.put(_binWeight, createBinList());
- int numItems = items.size();
- for (int i = 0; i < numItems; i++) {
- float itemWeight = itemWeights.get(i);
- Map.Entry<Float, List<List<Integer>>> entry = bins
- .ceilingEntry(itemWeight);
- if (entry == null) {
- // new bin
- float newBinWeight = _binWeight - itemWeight;
- List<List<Integer>> binList = bins.get(newBinWeight);
- if (binList == null) {
- bins.put(newBinWeight, createBinList(items.get(i)));
- } else {
- List<Integer> newBin = new ArrayList<Integer>();
- newBin.add(items.get(i));
- binList.add(newBin);
- }
- } else {
- // add to the first bin in the list
- List<Integer> assignedBin = entry.getValue().remove(0);
- assignedBin.add(items.get(i));
- if (entry.getValue().size() == 0)
- bins.remove(entry.getKey());
- float newBinWeight = entry.getKey() - itemWeight;
- List<List<Integer>> newBinsList = bins.get(newBinWeight);
- if (newBinsList == null) {
- // new bin
- bins.put(newBinWeight, createBinList(assignedBin));
- } else {
- newBinsList.add(assignedBin);
- }
- }
- }
- return bins;
- }
-
- private List<List<Integer>> createBinList() {
- List<List<Integer>> binList = new ArrayList<List<Integer>>();
- binList.add(new ArrayList<Integer>());
- return binList;
- }
-
- private List<List<Integer>> createBinList(int item) {
- List<List<Integer>> binList = new ArrayList<List<Integer>>();
- List<Integer> bin = new ArrayList<Integer>();
- binList.add(bin);
- bin.add(item);
- return binList;
- }
-
- private List<List<Integer>> createBinList(List<Integer> bin) {
- List<List<Integer>> binList = new ArrayList<List<Integer>>();
- binList.add(bin);
- return binList;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCoder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCoder.java b/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCoder.java
deleted file mode 100644
index 9313cd9..0000000
--- a/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCoder.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.compress;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.PriorityQueue;
-import java.util.TreeMap;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import org.apache.sysml.runtime.DMLRuntimeException;
-import org.apache.sysml.runtime.compress.estim.CompressedSizeEstimator;
-
-public class PlanningCoCoder
-{
- //constants for weight computation
- private final static float GROUPABILITY_THRESHOLD = 0.00064f;
- private final static float PARTITION_WEIGHT = 0.05F; //higher values lead to more grouping
- private final static float PARTITION_SIZE = PARTITION_WEIGHT * GROUPABILITY_THRESHOLD;
-
- public static List<int[]> findCocodesByPartitioning(CompressedSizeEstimator sizeEstimator, List<Integer> availCols,
- List<Integer> colsCardinalities, List<Long> compressedSize, int numRows, double sparsity, int k)
- throws DMLRuntimeException
- {
- List<int[]> retGroups = new ArrayList<int[]>();
-
- // filtering out non-groupable columns as singleton groups
- // weighted of each column is the ratio of its cardinality to the number
- // of rows scaled by the matrix sparsity
- int numCols = availCols.size();
- List<Integer> groupCols = new ArrayList<Integer>();
- List<Float> groupColWeights = new ArrayList<Float>();
- HashMap<Integer, GroupableColInfo> groupColsInfo = new HashMap<Integer, GroupableColInfo>();
- for (int i = 0; i < numCols; i++) {
- int colIx = availCols.get(i);
- int cardinality = colsCardinalities.get(i);
- float weight = ((float) cardinality) / numRows;
- if (weight <= GROUPABILITY_THRESHOLD) {
- groupCols.add(colIx);
- groupColWeights.add(weight);
- groupColsInfo.put(colIx, new GroupableColInfo(weight,compressedSize.get(i)));
- } else {
- retGroups.add(new int[] { colIx });
- }
- }
-
- // bin packing based on PARTITION_WEIGHT and column weights
- float weight = computeWeightForCoCoding(numRows, sparsity);
- TreeMap<Float, List<List<Integer>>> bins = new PlanningBinPacker(
- weight, groupCols, groupColWeights).packFirstFit();
-
- // brute force grouping within each partition
- retGroups.addAll( (k > 1) ?
- getCocodingGroupsBruteForce(bins, groupColsInfo, sizeEstimator, numRows, k) :
- getCocodingGroupsBruteForce(bins, groupColsInfo, sizeEstimator, numRows));
-
- return retGroups;
- }
-
- private static List<int[]> getCocodingGroupsBruteForce(TreeMap<Float, List<List<Integer>>> bins, HashMap<Integer, GroupableColInfo> groupColsInfo, CompressedSizeEstimator estim, int rlen)
- {
- List<int[]> retGroups = new ArrayList<int[]>();
- for (List<List<Integer>> binList : bins.values()) {
- for (List<Integer> bin : binList) {
- // building an array of singleton CoCodingGroup
- ArrayList<PlanningCoCodingGroup> sgroups = new ArrayList<PlanningCoCodingGroup>();
- for (Integer col : bin)
- sgroups.add(new PlanningCoCodingGroup(col, groupColsInfo.get(col)));
- // brute force co-coding
- PlanningCoCodingGroup[] outputGroups = findCocodesBruteForce(
- estim, rlen, sgroups.toArray(new PlanningCoCodingGroup[0]));
- for (PlanningCoCodingGroup grp : outputGroups)
- retGroups.add(grp.getColIndices());
- }
- }
-
- return retGroups;
- }
-
- private static List<int[]> getCocodingGroupsBruteForce(TreeMap<Float, List<List<Integer>>> bins, HashMap<Integer, GroupableColInfo> groupColsInfo, CompressedSizeEstimator estim, int rlen, int k)
- throws DMLRuntimeException
- {
- List<int[]> retGroups = new ArrayList<int[]>();
- try {
- ExecutorService pool = Executors.newFixedThreadPool( k );
- ArrayList<CocodeTask> tasks = new ArrayList<CocodeTask>();
- for (List<List<Integer>> binList : bins.values())
- for (List<Integer> bin : binList) {
- // building an array of singleton CoCodingGroup
- ArrayList<PlanningCoCodingGroup> sgroups = new ArrayList<PlanningCoCodingGroup>();
- for (Integer col : bin)
- sgroups.add(new PlanningCoCodingGroup(col, groupColsInfo.get(col)));
- tasks.add(new CocodeTask(estim, sgroups, rlen));
- }
- List<Future<PlanningCoCodingGroup[]>> rtask = pool.invokeAll(tasks);
- for( Future<PlanningCoCodingGroup[]> lrtask : rtask )
- for (PlanningCoCodingGroup grp : lrtask.get())
- retGroups.add(grp.getColIndices());
- pool.shutdown();
- }
- catch(Exception ex) {
- throw new DMLRuntimeException(ex);
- }
-
- return retGroups;
- }
-
- /**
- * Identify columns to code together. Uses a greedy approach that merges
- * pairs of column groups into larger groups. Each phase of the greedy
- * algorithm considers all combinations of pairs to merge.
- *
- * @param sizeEstimator compressed size estimator
- * @param numRowsWeight number of rows weight
- * @param singltonGroups planning co-coding groups
- * @return
- */
- private static PlanningCoCodingGroup[] findCocodesBruteForce(
- CompressedSizeEstimator sizeEstimator, float numRowsWeight,
- PlanningCoCodingGroup[] singltonGroups)
- {
- // Populate a priority queue with all available 2-column cocodings.
- PriorityQueue<PlanningGroupMergeAction> q = new PriorityQueue<PlanningGroupMergeAction>();
- for (int leftIx = 0; leftIx < singltonGroups.length; leftIx++) {
- PlanningCoCodingGroup leftGrp = singltonGroups[leftIx];
- for (int rightIx = leftIx + 1; rightIx < singltonGroups.length; rightIx++) {
- PlanningCoCodingGroup rightGrp = singltonGroups[rightIx];
- // at least one of the two groups should be low-cardinality
- float cardRatio = leftGrp.getCardinalityRatio() + rightGrp.getCardinalityRatio();
- if ( cardRatio < GROUPABILITY_THRESHOLD) {
- PlanningGroupMergeAction potentialMerge = new PlanningGroupMergeAction(
- sizeEstimator, numRowsWeight, leftGrp, rightGrp);
- if (potentialMerge.getChangeInSize() < 0) {
- q.add(potentialMerge);
- }
- }
- }
- }
- PlanningCoCodingGroup[] colGroups = singltonGroups;
-
- // Greedily merge groups until we can no longer reduce the number of
- // runs by merging groups
- while (q.size() > 0) {
- PlanningGroupMergeAction merge = q.poll();
-
- // The queue can contain merge actions involving column groups that
- // have already been merged.
- // Filter those actions out.
- int leftIx = findInArray(colGroups, merge.getLeftGrp());
- int rightIx = findInArray(colGroups, merge.getRightGrp());
- if (leftIx < 0 || rightIx < 0) {
- // One or more of the groups to be merged has already been made
- // part of another group.
- // Drop the merge action.
- } else {
- PlanningCoCodingGroup mergedGrp = merge.getMergedGrp();
-
- PlanningCoCodingGroup[] newColGroups = new PlanningCoCodingGroup[colGroups.length - 1];
- int targetIx = 0;
- for (int i = 0; i < colGroups.length; i++) {
- if (i != leftIx && i != rightIx) {
- newColGroups[targetIx] = colGroups[i];
- targetIx++;
- }
- }
-
- // New group goes at the end to (hopefully) speed up future
- // linear search operations
- newColGroups[newColGroups.length - 1] = mergedGrp;
-
- // Consider merging the new group with all the other
- // pre-existing groups.
- for (int i = 0; i < newColGroups.length - 1; i++) {
- PlanningCoCodingGroup newLeftGrp = newColGroups[i];
- PlanningCoCodingGroup newRightGrp = mergedGrp;
- if (newLeftGrp.getCardinalityRatio()
- + newRightGrp.getCardinalityRatio() < GROUPABILITY_THRESHOLD) {
- PlanningGroupMergeAction newPotentialMerge = new PlanningGroupMergeAction(
- sizeEstimator, numRowsWeight, newLeftGrp,
- newRightGrp);
- if (newPotentialMerge.getChangeInSize() < 0) {
- q.add(newPotentialMerge);
- }
- }
- }
- colGroups = newColGroups;
- }
- }
- return colGroups;
- }
-
- private static float computeWeightForCoCoding(int numRows, double sparsity) {
- //we use a constant partition size (independent of the number of rows
- //in order to ensure constant compression speed independent of blocking)
- return PARTITION_SIZE;
- }
-
- private static int findInArray(Object[] arr, Object val) {
- for (int i = 0; i < arr.length; i++) {
- if (arr[i].equals(val)) {
- return i;
- }
- }
- return -1;
- }
-
- protected static class GroupableColInfo {
- float cardRatio;
- long size;
-
- public GroupableColInfo(float lcardRatio, long lsize) {
- cardRatio = lcardRatio;
- size = lsize;
- }
- }
-
- private static class CocodeTask implements Callable<PlanningCoCodingGroup[]>
- {
- private CompressedSizeEstimator _estim = null;
- private ArrayList<PlanningCoCodingGroup> _sgroups = null;
- private int _rlen = -1;
-
- protected CocodeTask( CompressedSizeEstimator estim, ArrayList<PlanningCoCodingGroup> sgroups, int rlen ) {
- _estim = estim;
- _sgroups = sgroups;
- _rlen = rlen;
- }
-
- @Override
- public PlanningCoCodingGroup[] call() throws DMLRuntimeException {
- // brute force co-coding
- return findCocodesBruteForce(_estim, _rlen,
- _sgroups.toArray(new PlanningCoCodingGroup[0]));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCodingGroup.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCodingGroup.java b/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCodingGroup.java
deleted file mode 100644
index 9ee0d7e..0000000
--- a/src/main/java/org/apache/sysml/runtime/compress/PlanningCoCodingGroup.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.compress;
-
-import java.util.Arrays;
-
-import org.apache.sysml.runtime.compress.PlanningCoCoder.GroupableColInfo;
-import org.apache.sysml.runtime.compress.estim.CompressedSizeEstimator;
-import org.apache.sysml.runtime.compress.estim.CompressedSizeInfo;
-
-/**
- * Class to represent information about co-coding a group of columns.
- *
- */
-public class PlanningCoCodingGroup
-{
- private int[] _colIndexes;
- private long _estSize;
- private float _cardRatio;
-
- /**
- * Constructor for a one-column group; i.e. do not co-code a given column.
- *
- * @param col column
- * @param info groupable column info
- */
- public PlanningCoCodingGroup(int col, GroupableColInfo info) {
- _colIndexes = new int[]{col};
- _estSize = info.size;
- _cardRatio = info.cardRatio;
- }
-
- /**
- * Constructor for merging two disjoint groups of columns
- *
- * @param grp1 first group of columns to merge
- * @param grp2 second group to merge
- * @param bitmapSizeEstimator bitmap size estimator
- * @param numRowsWeight numRows x sparsity
- */
- public PlanningCoCodingGroup(PlanningCoCodingGroup grp1, PlanningCoCodingGroup grp2,
- CompressedSizeEstimator bitmapSizeEstimator, float numRowsWeight)
- {
- // merge sorted non-empty arrays
- _colIndexes = new int[grp1._colIndexes.length + grp2._colIndexes.length];
- int grp1Ptr = 0, grp2Ptr = 0;
- for (int mergedIx = 0; mergedIx < _colIndexes.length; mergedIx++) {
- if (grp1._colIndexes[grp1Ptr] < grp2._colIndexes[grp2Ptr]) {
- _colIndexes[mergedIx] = grp1._colIndexes[grp1Ptr++];
- if (grp1Ptr == grp1._colIndexes.length) {
- System.arraycopy(grp2._colIndexes, grp2Ptr, _colIndexes,
- mergedIx + 1, grp2._colIndexes.length - grp2Ptr);
- break;
- }
- } else {
- _colIndexes[mergedIx] = grp2._colIndexes[grp2Ptr++];
- if (grp2Ptr == grp2._colIndexes.length) {
- System.arraycopy(grp1._colIndexes, grp1Ptr, _colIndexes,
- mergedIx + 1, grp1._colIndexes.length - grp1Ptr);
- break;
- }
- }
- }
-
- // estimating size info
- CompressedSizeInfo groupSizeInfo = bitmapSizeEstimator
- .estimateCompressedColGroupSize(_colIndexes);
- _estSize = groupSizeInfo.getMinSize();
- _cardRatio = groupSizeInfo.getEstCarinality() / numRowsWeight;
- }
-
- public int[] getColIndices() {
- return _colIndexes;
- }
-
- /**
- * Obtain estimated compressed size of the grouped columns.
- *
- * @return estimated compressed size of the grouped columns
- */
- public long getEstSize() {
- return _estSize;
- }
-
- public float getCardinalityRatio() {
- return _cardRatio;
- }
-
- @Override
- public String toString() {
- return Arrays.toString(_colIndexes);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/PlanningGroupMergeAction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/PlanningGroupMergeAction.java b/src/main/java/org/apache/sysml/runtime/compress/PlanningGroupMergeAction.java
deleted file mode 100644
index 47d46d5..0000000
--- a/src/main/java/org/apache/sysml/runtime/compress/PlanningGroupMergeAction.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.sysml.runtime.compress;
-
-import org.apache.sysml.runtime.compress.estim.CompressedSizeEstimator;
-
-/**
- * Internal data structure for tracking potential merges of column groups in
- * co-coding calculations.
- *
- */
-class PlanningGroupMergeAction implements Comparable<PlanningGroupMergeAction>
-{
- private PlanningCoCodingGroup _leftGrp; //left input
- private PlanningCoCodingGroup _rightGrp; //right input
- private PlanningCoCodingGroup _mergedGrp; //output
- private long _changeInSize;
-
-
- public PlanningGroupMergeAction(CompressedSizeEstimator sizeEstimator,
- float numRowsWeight, PlanningCoCodingGroup leftGrp, PlanningCoCodingGroup rightGrp) {
- _leftGrp = leftGrp;
- _rightGrp = rightGrp;
- _mergedGrp = new PlanningCoCodingGroup(leftGrp, rightGrp, sizeEstimator, numRowsWeight);
-
- // Negative size change ==> Decrease in size
- _changeInSize = _mergedGrp.getEstSize()
- - leftGrp.getEstSize() - rightGrp.getEstSize();
- }
-
- public int compareTo(PlanningGroupMergeAction o) {
- // We only sort by the change in size
- return (int) Math.signum(_changeInSize - o._changeInSize);
- }
-
- @Override
- public String toString() {
- return String.format("Merge %s and %s", _leftGrp, _rightGrp);
- }
-
- public PlanningCoCodingGroup getLeftGrp() {
- return _leftGrp;
- }
-
- public PlanningCoCodingGroup getRightGrp() {
- return _rightGrp;
- }
-
- public PlanningCoCodingGroup getMergedGrp() {
- return _mergedGrp;
- }
-
- public long getChangeInSize() {
- return _changeInSize;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/ReaderColumnSelectionSparse.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/ReaderColumnSelectionSparse.java b/src/main/java/org/apache/sysml/runtime/compress/ReaderColumnSelectionSparse.java
index 63c0467..60d0532 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/ReaderColumnSelectionSparse.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/ReaderColumnSelectionSparse.java
@@ -63,7 +63,6 @@ public class ReaderColumnSelectionSparse extends ReaderColumnSelection
if( data.getSparseBlock()!=null )
for( int i=0; i<colIndexes.length; i++ )
sparseCols[i] = data.getSparseBlock().get(colIndexes[i]);
- Arrays.fill(sparsePos, 0);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/UncompressedBitmap.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/UncompressedBitmap.java b/src/main/java/org/apache/sysml/runtime/compress/UncompressedBitmap.java
index d62bae9..2f68edf 100644
--- a/src/main/java/org/apache/sysml/runtime/compress/UncompressedBitmap.java
+++ b/src/main/java/org/apache/sysml/runtime/compress/UncompressedBitmap.java
@@ -21,10 +21,13 @@ package org.apache.sysml.runtime.compress;
import java.util.Arrays;
+import org.apache.commons.lang.ArrayUtils;
import org.apache.sysml.runtime.compress.utils.DblArrayIntListHashMap;
import org.apache.sysml.runtime.compress.utils.DoubleIntListHashMap;
import org.apache.sysml.runtime.compress.utils.DblArrayIntListHashMap.DArrayIListEntry;
import org.apache.sysml.runtime.compress.utils.DoubleIntListHashMap.DIListEntry;
+import org.apache.sysml.runtime.compress.utils.IntArrayList;
+import org.apache.sysml.runtime.util.SortUtils;
/**
* Uncompressed representation of one or more columns in bitmap format.
@@ -32,13 +35,13 @@ import org.apache.sysml.runtime.compress.utils.DoubleIntListHashMap.DIListEntry;
*/
public final class UncompressedBitmap
{
- private int _numCols;
+ private final int _numCols;
/** Distinct values that appear in the column. Linearized as value groups <v11 v12> <v21 v22>.*/
private double[] _values;
/** Bitmaps (as lists of offsets) for each of the values. */
- private int[][] _offsetsLists;
+ private IntArrayList[] _offsetsLists;
public UncompressedBitmap( DblArrayIntListHashMap distinctVals, int numColumns )
{
@@ -46,11 +49,11 @@ public final class UncompressedBitmap
// Convert inputs to arrays
int numVals = distinctVals.size();
_values = new double[numVals*numColumns];
- _offsetsLists = new int[numVals][];
+ _offsetsLists = new IntArrayList[numVals];
int bitmapIx = 0;
for( DArrayIListEntry val : distinctVals.extractValues()) {
System.arraycopy(val.key.getData(), 0, _values, bitmapIx*numColumns, numColumns);
- _offsetsLists[bitmapIx++] = val.value.extractValues();
+ _offsetsLists[bitmapIx++] = val.value;
}
_numCols = numColumns;
}
@@ -61,11 +64,11 @@ public final class UncompressedBitmap
// Convert inputs to arrays
int numVals = distinctVals.size();
_values = new double[numVals];
- _offsetsLists = new int[numVals][];
+ _offsetsLists = new IntArrayList[numVals];
int bitmapIx = 0;
for(DIListEntry val : distinctVals.extractValues()) {
_values[bitmapIx] = val.key;
- _offsetsLists[bitmapIx++] = val.value.extractValues();
+ _offsetsLists[bitmapIx++] = val.value;
}
_numCols = 1;
}
@@ -74,6 +77,15 @@ public final class UncompressedBitmap
return _numCols;
}
+ /**
+ * Get all values without unnecessary allocations and copies.
+ *
+ * @return dictionary of value tuples
+ */
+ public double[] getValues() {
+ return _values;
+ }
+
/**
* Obtain tuple of column values associated with index.
*
@@ -94,21 +106,46 @@ public final class UncompressedBitmap
return _values.length / _numCols;
}
- /**
- * Obtain array of offsets of the rows containing index value
- *
- * @param ix index of a particular distinct value
- * @return IMMUTABLE array of the offsets of the rows containing the value
- * with the indicated index
- */
- public int[] getOffsetsList(int ix) {
+ public IntArrayList getOffsetsList(int ix) {
return _offsetsLists[ix];
}
- public int getNumOffsets() {
- int ret = 0;
- for( int[] offlist : _offsetsLists )
- ret += offlist.length;
+ public long getNumOffsets() {
+ long ret = 0;
+ for( IntArrayList offlist : _offsetsLists )
+ ret += offlist.size();
return ret;
}
+
+ public int getNumOffsets(int ix) {
+ return _offsetsLists[ix].size();
+ }
+
+ public void sortValuesByFrequency() {
+ int numVals = getNumValues();
+ int numCols = getNumColumns();
+
+ double[] freq = new double[numVals];
+ int[] pos = new int[numVals];
+
+ //populate the temporary arrays
+ for(int i=0; i<numVals; i++) {
+ freq[i] = getNumOffsets(i);
+ pos[i] = i;
+ }
+
+ //sort ascending and reverse (descending)
+ SortUtils.sortByValue(0, numVals, freq, pos);
+ ArrayUtils.reverse(pos);
+
+ //create new value and offset list arrays
+ double[] lvalues = new double[numVals*numCols];
+ IntArrayList[] loffsets = new IntArrayList[numVals];
+ for(int i=0; i<numVals; i++) {
+ System.arraycopy(_values, pos[i]*numCols, lvalues, i*numCols, numCols);
+ loffsets[i] = _offsetsLists[pos[i]];
+ }
+ _values = lvalues;
+ _offsetsLists = loffsets;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/cocode/ColumnGroupPartitioner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/cocode/ColumnGroupPartitioner.java b/src/main/java/org/apache/sysml/runtime/compress/cocode/ColumnGroupPartitioner.java
new file mode 100644
index 0000000..05af19d
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/compress/cocode/ColumnGroupPartitioner.java
@@ -0,0 +1,19 @@
+package org.apache.sysml.runtime.compress.cocode;
+
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.sysml.runtime.compress.cocode.PlanningCoCoder.GroupableColInfo;
+
+public abstract class ColumnGroupPartitioner
+{
+ /**
+ * Partitions a list of columns into a list of partitions that contains subsets of columns.
+ * Note that this call must compute a complete and disjoint partitioning.
+ *
+ * @param groupCols list of columns
+ * @param groupColsInfo list of column infos
+ * @return list of partitions (where each partition is a list of columns)
+ */
+ public abstract List<List<Integer>> partitionColumns(List<Integer> groupCols, HashMap<Integer, GroupableColInfo> groupColsInfo);
+}
http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/37a215bc/src/main/java/org/apache/sysml/runtime/compress/cocode/ColumnGroupPartitionerBinPacking.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/sysml/runtime/compress/cocode/ColumnGroupPartitionerBinPacking.java b/src/main/java/org/apache/sysml/runtime/compress/cocode/ColumnGroupPartitionerBinPacking.java
new file mode 100644
index 0000000..0fb6abe
--- /dev/null
+++ b/src/main/java/org/apache/sysml/runtime/compress/cocode/ColumnGroupPartitionerBinPacking.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysml.runtime.compress.cocode;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.sysml.runtime.compress.cocode.PlanningCoCoder.GroupableColInfo;
+import org.apache.sysml.runtime.util.SortUtils;
+
+/**
+ * Column group partitioning with bin packing heuristic.
+ *
+ */
+public class ColumnGroupPartitionerBinPacking extends ColumnGroupPartitioner
+{
+ private static final boolean FIRST_FIT_DEC = true;
+ private static final int MAX_COL_PER_GROUP = Integer.MAX_VALUE;
+
+ //we use a constant partition size (independent of the number of rows
+ //in order to ensure constant compression speed independent of blocking)
+ public static double BIN_CAPACITY = 0.000032; //higher values, more grouping
+
+ @Override
+ public List<List<Integer>> partitionColumns(List<Integer> groupCols, HashMap<Integer, GroupableColInfo> groupColsInfo)
+ {
+ //obtain column weights
+ int[] items = new int[groupCols.size()];
+ double[] itemWeights = new double[groupCols.size()];
+ for( int i=0; i<groupCols.size(); i++ ) {
+ int col = groupCols.get(i);
+ items[i] = col;
+ itemWeights[i] = groupColsInfo.get(col).cardRatio;
+ }
+
+ //sort items (first fit decreasing)
+ if( FIRST_FIT_DEC ) {
+ SortUtils.sortByValue(0, items.length, itemWeights, items);
+ ArrayUtils.reverse(items);
+ ArrayUtils.reverse(itemWeights);
+ }
+
+ //partition columns via bin packing
+ return packFirstFit(items, itemWeights);
+ }
+
+ /**
+ * NOTE: upper bound is 17/10 OPT
+ *
+ * @param items the items in terms of columns
+ * @param itemWeights the weights of the items
+ * @return
+ */
+ private List<List<Integer>> packFirstFit(int[] items, double[] itemWeights)
+ {
+ List<List<Integer>> bins = new ArrayList<List<Integer>>();
+ List<Double> binWeights = new ArrayList<Double>();
+
+ for( int i = 0; i < items.length; i++ ) {
+ //add to existing bin
+ boolean assigned = false;
+ for( int j = 0; j < bins.size(); j++ ) {
+ double newBinWeight = binWeights.get(j)-itemWeights[i];
+ if( newBinWeight >= 0 && bins.get(j).size() < MAX_COL_PER_GROUP-1 ){
+ bins.get(j).add(items[i]);
+ binWeights.set(j, newBinWeight);
+ assigned = true; break;
+ }
+ }
+
+ //create new bin at end of list
+ if( !assigned ) {
+ bins.add(new ArrayList<Integer>(Arrays.asList(items[i])));
+ binWeights.add(BIN_CAPACITY-itemWeights[i]);
+ }
+ }
+
+ return bins;
+ }
+}