You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by ba...@apache.org on 2021/09/20 11:27:37 UTC
[systemds] 01/03: [SYSTEMDS-2610] CLA updates
This is an automated email from the ASF dual-hosted git repository.
baunsgaard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git
commit 67bb8034611a766ca93c0314ee735a3252e79290
Author: baunsgaard <ba...@tugraz.at>
AuthorDate: Mon Aug 30 16:28:31 2021 +0200
[SYSTEMDS-2610] CLA updates
- Compressed matrix factory improvements
- Add decompression if the data is serialized and larger in compressed format
- Decompress on write to HDFS
- Abort compression after cocode if the compression sizes are bad
- Make c bind decompressing in workload tree (worst case)
- Add a minimum compression ratio argument to the CompressionSettings
- Reduce the sampling size in c bind compression and set high minimum compression ratio
- Fix order of operations in compressed append
- Add compressed output size to unary hops
- More utilization of the cached decompressed matrix if it fits in
memory by looking for soft reference of uncompressed in certain cases
---
src/main/java/org/apache/sysds/hops/UnaryOp.java | 12 +-
.../ipa/IPAPassCompressionWorkloadAnalysis.java | 1 +
.../hops/rewrite/RewriteCompressedReblock.java | 2 +-
.../runtime/compress/CompressedMatrixBlock.java | 29 ++-
.../compress/CompressedMatrixBlockFactory.java | 98 +++++++-
.../runtime/compress/CompressionSettings.java | 8 +-
.../compress/CompressionSettingsBuilder.java | 8 +-
.../runtime/compress/cocode/AColumnCoCoder.java | 22 +-
.../runtime/compress/cocode/CoCodeBinPacking.java | 4 +-
.../runtime/compress/cocode/CoCodeGreedy.java | 2 +-
.../runtime/compress/cocode/CoCodePriorityQue.java | 4 +-
.../compress/colgroup/ColGroupUncompressed.java | 19 +-
.../compress/colgroup/mapping/MapToFactory.java | 12 +-
.../compress/cost/CostEstimatorBuilder.java | 5 +
.../compress/cost/CostEstimatorFactory.java | 2 +-
.../compress/estim/CompressedSizeEstimator.java | 43 +++-
.../estim/CompressedSizeEstimatorExact.java | 5 +-
.../estim/CompressedSizeEstimatorFactory.java | 30 ++-
.../estim/CompressedSizeEstimatorSample.java | 11 +-
.../compress/estim/CompressedSizeInfoColGroup.java | 2 +-
.../sysds/runtime/compress/lib/CLALibAppend.java | 32 +--
.../runtime/compress/lib/CLALibBinaryCellOp.java | 42 ++--
.../sysds/runtime/compress/lib/CLALibCompAgg.java | 51 ++--
.../runtime/compress/lib/CLALibRelationalOp.java | 267 ---------------------
.../sysds/runtime/compress/lib/CLALibScalar.java | 7 -
.../compress/workload/WorkloadAnalyzer.java | 5 +-
.../context/SparkExecutionContext.java | 8 +-
.../spark/BinUaggChainSPInstruction.java | 6 +
.../component/compress/CompressedMatrixTest.java | 33 +++
.../component/compress/CompressedTestBase.java | 27 ---
.../component/compress/workload/WorkloadTest.java | 18 +-
.../compress/workload/WorkloadAlgorithmTest.java | 2 +-
32 files changed, 379 insertions(+), 438 deletions(-)
diff --git a/src/main/java/org/apache/sysds/hops/UnaryOp.java b/src/main/java/org/apache/sysds/hops/UnaryOp.java
index b7df277..38199b2 100644
--- a/src/main/java/org/apache/sysds/hops/UnaryOp.java
+++ b/src/main/java/org/apache/sysds/hops/UnaryOp.java
@@ -545,7 +545,7 @@ public class UnaryOp extends MultiThreadedHop
setDim2(1);
}
else if(_op == OpOp1.TYPEOF || _op == OpOp1.DETECTSCHEMA || _op == OpOp1.COLNAMES) {
- //TODO theses three builtins should rather be moved to unary aggregates
+ //TODO these three builtins should rather be moved to unary aggregates
setDim1(1);
setDim2(input.getDim2());
}
@@ -564,6 +564,16 @@ public class UnaryOp extends MultiThreadedHop
{
setNnz( input.getNnz() );
}
+
+ // if the input is compressed then set the output to be compressed as well.
+ if(input._compressedOutput && ! (_op==OpOp1.DECOMPRESS)){
+ setCompressedOutput(true);
+ // Setting the compressed output to be 2 x larger.
+ // Just in case we change the compressed structure slightly.
+ // this value is overwritten with correct size once the hop is executed
+ // TODO handle overlapping state, since some operations would not lead to compressed output.
+ setCompressedSize(input.compressedSize() * 2);
+ }
}
}
diff --git a/src/main/java/org/apache/sysds/hops/ipa/IPAPassCompressionWorkloadAnalysis.java b/src/main/java/org/apache/sysds/hops/ipa/IPAPassCompressionWorkloadAnalysis.java
index 324b272..b23397f 100644
--- a/src/main/java/org/apache/sysds/hops/ipa/IPAPassCompressionWorkloadAnalysis.java
+++ b/src/main/java/org/apache/sysds/hops/ipa/IPAPassCompressionWorkloadAnalysis.java
@@ -58,6 +58,7 @@ public class IPAPassCompressionWorkloadAnalysis extends IPAPass {
WTreeRoot tree = e.getValue();
CostEstimatorBuilder b = new CostEstimatorBuilder(tree);
// filter out compression plans that is known bad
+
if(b.shouldTryToCompress()){
tree.getRoot().setRequiresCompression(tree);
for(Hop h : tree.getDecompressList())
diff --git a/src/main/java/org/apache/sysds/hops/rewrite/RewriteCompressedReblock.java b/src/main/java/org/apache/sysds/hops/rewrite/RewriteCompressedReblock.java
index 6b51a51..2f0f1f6c 100644
--- a/src/main/java/org/apache/sysds/hops/rewrite/RewriteCompressedReblock.java
+++ b/src/main/java/org/apache/sysds/hops/rewrite/RewriteCompressedReblock.java
@@ -128,7 +128,7 @@ public class RewriteCompressedReblock extends StatementBlockRewriteRule {
public static boolean satisfiesSizeConstraintsForCompression(Hop hop) {
if(hop.getDim2() >= 1) {
- return (hop.getDim1() >= 1000 && hop.getDim2() < 100) || hop.getDim1() / hop.getDim2() >= 75;
+ return (hop.getDim1() >= 1000 && hop.getDim2() < 100) || hop.getDim1() / hop.getDim2() >= 75 || (hop.getSparsity() < 0.0001 && hop.getDim1() > 1000);
}
return false;
}
diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
index d374d3c..5dcc406 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
@@ -302,7 +302,10 @@ public class CompressedMatrixBlock extends MatrixBlock {
return ret;
}
- private MatrixBlock getCachedDecompressed() {
+ /**
+ * Get the cached decompressed matrix (if it exists otherwise null)
+ */
+ public MatrixBlock getCachedDecompressed() {
if(decompressedVersion != null) {
final MatrixBlock mb = decompressedVersion.get();
if(mb != null) {
@@ -453,6 +456,16 @@ public class CompressedMatrixBlock extends MatrixBlock {
@Override
public void write(DataOutput out) throws IOException {
+ if(getExactSizeOnDisk() > MatrixBlock.estimateSizeOnDisk(rlen, clen, nonZeros)) {
+ // decompress and make a uncompressed column group. this is then used for the serialization, since it is
+ // smaller.
+ // throw new NotImplementedException("Decompressing serialization is not implemented");
+
+ MatrixBlock uncompressed = getUncompressed("Decompressing serialization for smaller serialization");
+ ColGroupUncompressed cg = new ColGroupUncompressed(uncompressed);
+ allocateColGroup(cg);
+ nonZeros = cg.getNumberNonZeros();
+ }
// serialize compressed matrix block
out.writeInt(rlen);
out.writeInt(clen);
@@ -492,11 +505,15 @@ public class CompressedMatrixBlock extends MatrixBlock {
@Override
public MatrixBlock binaryOperations(BinaryOperator op, MatrixValue thatValue, MatrixValue result) {
- return CLALibBinaryCellOp.binaryOperations(op, this, thatValue, result);
+ MatrixBlock that = thatValue == null ? null : (MatrixBlock) thatValue;
+ MatrixBlock ret = result == null ? null : (MatrixBlock) result;
+ return CLALibBinaryCellOp.binaryOperations(op, this, that, ret);
}
public MatrixBlock binaryOperationsLeft(BinaryOperator op, MatrixValue thatValue, MatrixValue result) {
- return CLALibBinaryCellOp.binaryOperationsLeft(op, this, thatValue, result);
+ MatrixBlock that = thatValue == null ? null : (MatrixBlock) thatValue;
+ MatrixBlock ret = result == null ? null : (MatrixBlock) result;
+ return CLALibBinaryCellOp.binaryOperationsLeft(op, this, that, ret);
}
@Override
@@ -686,8 +703,8 @@ public class CompressedMatrixBlock extends MatrixBlock {
.aggregateUnaryOperations(op, result, blen, indexesIn, inCP);
}
-
- return CLALibCompAgg.aggregateUnary(this, result, op, blen, indexesIn, inCP);
+ MatrixBlock ret = (result == null) ? null : (MatrixBlock) result;
+ return CLALibCompAgg.aggregateUnary(this, ret, op, blen, indexesIn, inCP);
}
@Override
@@ -1080,7 +1097,7 @@ public class CompressedMatrixBlock extends MatrixBlock {
boolean m2C = m2 instanceof CompressedMatrixBlock;
boolean m3C = m3 instanceof CompressedMatrixBlock;
printDecompressWarning("aggregateTernaryOperations " + op.aggOp.getClass().getSimpleName() + " "
- + op.indexFn.getClass().getSimpleName() + " " + op.aggOp.increOp.fn.getClass().getSimpleName() + " "
+ + op.indexFn.getClass().getSimpleName() + " " + op.aggOp.increOp.fn.getClass().getSimpleName() + " "
+ op.binaryFn.getClass().getSimpleName() + " m1,m2,m3 " + m1C + " " + m2C + " " + m3C);
MatrixBlock left = getUncompressed();
MatrixBlock right1 = getUncompressed(m2);
diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java
index 20beaf5..5f45d56 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java
@@ -32,18 +32,21 @@ import org.apache.sysds.runtime.compress.colgroup.AColGroup;
import org.apache.sysds.runtime.compress.colgroup.ColGroupConst;
import org.apache.sysds.runtime.compress.colgroup.ColGroupEmpty;
import org.apache.sysds.runtime.compress.colgroup.ColGroupFactory;
+import org.apache.sysds.runtime.compress.colgroup.ColGroupUncompressed;
import org.apache.sysds.runtime.compress.colgroup.ColGroupValue;
import org.apache.sysds.runtime.compress.colgroup.dictionary.Dictionary;
+import org.apache.sysds.runtime.compress.cost.ComputationCostEstimator;
import org.apache.sysds.runtime.compress.cost.CostEstimatorFactory;
import org.apache.sysds.runtime.compress.cost.ICostEstimate;
-import org.apache.sysds.runtime.compress.cost.MemoryCostEstimator;
import org.apache.sysds.runtime.compress.estim.CompressedSizeEstimator;
import org.apache.sysds.runtime.compress.estim.CompressedSizeEstimatorFactory;
import org.apache.sysds.runtime.compress.estim.CompressedSizeInfo;
+import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup;
import org.apache.sysds.runtime.compress.utils.DblArrayIntListHashMap;
import org.apache.sysds.runtime.compress.utils.DoubleCountHashMap;
import org.apache.sysds.runtime.compress.workload.WTreeRoot;
import org.apache.sysds.runtime.controlprogram.parfor.stat.Timing;
+import org.apache.sysds.runtime.data.SparseBlock;
import org.apache.sysds.runtime.matrix.data.LibMatrixReorg;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.utils.DMLCompressionStatistics;
@@ -65,7 +68,7 @@ public class CompressedMatrixBlockFactory {
private final CompressionSettings compSettings;
/** The main cost estimator used for the compression */
private final ICostEstimate costEstimator;
-
+
/** Time stamp of last phase */
private double lastPhase;
/** Pointer to the original matrix Block that is about to be compressed. */
@@ -153,6 +156,20 @@ public class CompressedMatrixBlockFactory {
}
/**
+ * Generate a CompressedMatrixBlock Object that contains a single uncompressed matrix block column group.
+ *
+ * @param mb The matrix block to be contained in the uncompressed matrix block column,
+ * @return a CompressedMatrixBlock
+ */
+ public static CompressedMatrixBlock genUncompressedCompressedMatrixBlock(MatrixBlock mb) {
+ CompressedMatrixBlock ret = new CompressedMatrixBlock(mb.getNumRows(), mb.getNumColumns());
+ AColGroup cg = new ColGroupUncompressed(mb);
+ ret.allocateColGroup(cg);
+ ret.setNonZeros(mb.getNonZeros());
+ return ret;
+ }
+
+ /**
* Method for constructing a compressed matrix out of an constant input.
*
* Since the input is a constant value it is trivially compressable, therefore we skip the entire compression
@@ -191,9 +208,13 @@ public class CompressedMatrixBlockFactory {
res = new CompressedMatrixBlock(mb); // copy metadata and allocate soft reference
- classifyPhase();
- if(coCodeColGroups == null)
- return abortCompression();
+ looksLikeOneHot();
+
+ if(coCodeColGroups == null) {
+ classifyPhase();
+ if(coCodeColGroups == null)
+ return abortCompression();
+ }
transposePhase();
compressPhase();
@@ -217,7 +238,13 @@ public class CompressedMatrixBlockFactory {
_stats.estimatedSizeCols = sizeInfos.memoryEstimate();
logPhase();
- if(!(costEstimator instanceof MemoryCostEstimator) || _stats.estimatedSizeCols < _stats.originalSize)
+ final boolean isValidForComputeBasedCompression = isComputeBasedCompression() &&
+ (compSettings.minimumCompressionRatio != 1.0) ? _stats.estimatedSizeCols *
+ compSettings.minimumCompressionRatio < _stats.originalSize : true;
+ final boolean isValidForMemoryBasedCompression = _stats.estimatedSizeCols *
+ compSettings.minimumCompressionRatio < _stats.originalSize;
+
+ if(isValidForComputeBasedCompression || isValidForMemoryBasedCompression)
coCodePhase(sizeEstimator, sizeInfos, costEstimator);
else {
LOG.info("Estimated Size of singleColGroups: " + _stats.estimatedSizeCols);
@@ -225,13 +252,72 @@ public class CompressedMatrixBlockFactory {
}
}
+ private boolean isComputeBasedCompression() {
+ return costEstimator instanceof ComputationCostEstimator;
+ }
+
private void coCodePhase(CompressedSizeEstimator sizeEstimator, CompressedSizeInfo sizeInfos,
ICostEstimate costEstimator) {
coCodeColGroups = CoCoderFactory.findCoCodesByPartitioning(sizeEstimator, sizeInfos, k, costEstimator,
compSettings);
_stats.estimatedSizeCoCoded = coCodeColGroups.memoryEstimate();
+
logPhase();
+
+ // if cocode is estimated larger than uncompressed abort compression.
+ if(isComputeBasedCompression() &&
+ _stats.estimatedSizeCoCoded * compSettings.minimumCompressionRatio > _stats.originalSize) {
+
+ coCodeColGroups = null;
+ LOG.info("Aborting compression because the cocoded size : " + _stats.estimatedSizeCoCoded);
+ LOG.info("Vs original size : " + _stats.originalSize);
+ }
+
+ }
+
+ private void looksLikeOneHot() {
+ final int numColumns = mb.getNumColumns();
+ final int numRows = mb.getNumRows();
+ final long nnz = mb.getNonZeros();
+ final int colGroupSize = 100;
+ if(nnz == numRows) {
+ boolean onlyOneValues = true;
+ LOG.debug("Looks like one hot encoded.");
+ if(mb.isInSparseFormat()) {
+ final SparseBlock sb = mb.getSparseBlock();
+ for(double v : sb.get(0).values()) {
+ onlyOneValues = v == 1.0;
+ if(!onlyOneValues) {
+ break;
+ }
+ }
+ }
+ else {
+ final double[] vals = mb.getDenseBlock().values(0);
+ for(int i = 0; i < Math.min(vals.length, 1000); i++) {
+ double v = vals[i];
+ onlyOneValues = v == 1.0 || v == 0.0;
+ if(!onlyOneValues) {
+ break;
+ }
+ }
+ }
+ if(onlyOneValues) {
+ List<CompressedSizeInfoColGroup> ng = new ArrayList<>(numColumns / colGroupSize + 1);
+ for(int i = 0; i < numColumns; i += colGroupSize) {
+ int[] columnIds = new int[Math.min(colGroupSize, numColumns - i)];
+ for(int j = 0; j < columnIds.length; j++)
+ columnIds[j] = i + j;
+ ng.add(new CompressedSizeInfoColGroup(columnIds, Math.min(numColumns, colGroupSize), numRows));
+ }
+ coCodeColGroups = new CompressedSizeInfo(ng);
+
+ LOG.debug("Concluded that it probably is one hot encoded skipping analysis");
+ // skipping two phases
+ phase += 2;
+ }
+ }
}
private void transposePhase() {
diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java b/src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java
index 6dfcdae..ea04e8e 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java
@@ -103,10 +103,15 @@ public class CompressionSettings {
*/
public boolean transposed = false;
+ /**
+ * The minimum compression ratio to achieve.
+ */
+ public final double minimumCompressionRatio;
+
protected CompressionSettings(double samplingRatio, boolean allowSharedDictionary, String transposeInput,
int seed, boolean lossy, EnumSet<CompressionType> validCompressions,
boolean sortValuesByLength, PartitionerType columnPartitioner, int maxColGroupCoCode, double coCodePercentage,
- int minimumSampleSize, EstimationType estimationType, CostType costComputationType) {
+ int minimumSampleSize, EstimationType estimationType, CostType costComputationType, double minimumCompressionRatio) {
this.samplingRatio = samplingRatio;
this.allowSharedDictionary = allowSharedDictionary;
this.transposeInput = transposeInput;
@@ -120,6 +125,7 @@ public class CompressionSettings {
this.minimumSampleSize = minimumSampleSize;
this.estimationType = estimationType;
this.costComputationType = costComputationType;
+ this.minimumCompressionRatio = minimumCompressionRatio;
if(LOG.isDebugEnabled())
LOG.debug(this);
}
diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressionSettingsBuilder.java b/src/main/java/org/apache/sysds/runtime/compress/CompressionSettingsBuilder.java
index a56285b..2864118 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/CompressionSettingsBuilder.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/CompressionSettingsBuilder.java
@@ -45,6 +45,7 @@ public class CompressionSettingsBuilder {
private EstimationType estimationType = EstimationType.HassAndStokes;
private PartitionerType columnPartitioner;
private CostType costType;
+ private double minimumCompressionRatio = 1.0;
public CompressionSettingsBuilder() {
@@ -267,6 +268,11 @@ public class CompressionSettingsBuilder {
return this;
}
+ public CompressionSettingsBuilder setMinimumCompressionRatio(double ratio){
+ this.minimumCompressionRatio = ratio;
+ return this;
+ }
+
/**
* Create the CompressionSettings object to use in the compression.
*
@@ -275,6 +281,6 @@ public class CompressionSettingsBuilder {
public CompressionSettings create() {
return new CompressionSettings(samplingRatio, allowSharedDictionary, transposeInput, seed, lossy,
validCompressions, sortValuesByLength, columnPartitioner, maxColGroupCoCode, coCodePercentage,
- minimumSampleSize, estimationType, costType);
+ minimumSampleSize, estimationType, costType,minimumCompressionRatio);
}
}
diff --git a/src/main/java/org/apache/sysds/runtime/compress/cocode/AColumnCoCoder.java b/src/main/java/org/apache/sysds/runtime/compress/cocode/AColumnCoCoder.java
index 41d986c..3bdcf5d 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/cocode/AColumnCoCoder.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/cocode/AColumnCoCoder.java
@@ -26,7 +26,6 @@ import org.apache.sysds.runtime.compress.cost.ICostEstimate;
import org.apache.sysds.runtime.compress.estim.CompressedSizeEstimator;
import org.apache.sysds.runtime.compress.estim.CompressedSizeInfo;
import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup;
-import org.apache.sysds.runtime.compress.utils.Util;
public abstract class AColumnCoCoder {
@@ -52,26 +51,17 @@ public abstract class AColumnCoCoder {
*/
protected abstract CompressedSizeInfo coCodeColumns(CompressedSizeInfo colInfos, int k);
- protected CompressedSizeInfoColGroup join(CompressedSizeInfoColGroup lhs, CompressedSizeInfoColGroup rhs,
- boolean analyze) {
- return analyze ? joinWithAnalysis(lhs, rhs) : joinWithoutAnalysis(lhs, rhs);
+ protected CompressedSizeInfoColGroup join(int[] joined, CompressedSizeInfoColGroup lhs,
+ CompressedSizeInfoColGroup rhs, boolean analyze) {
+ return analyze ? _sest.estimateJoinCompressedSize(joined, lhs, rhs) : joinWithoutAnalysis(joined, lhs, rhs);
}
- protected CompressedSizeInfoColGroup joinWithAnalysis(CompressedSizeInfoColGroup lhs,
+ protected CompressedSizeInfoColGroup joinWithoutAnalysis(int[] joined, CompressedSizeInfoColGroup lhs,
CompressedSizeInfoColGroup rhs) {
- return _sest.estimateJoinCompressedSize(lhs, rhs);
- }
-
- protected CompressedSizeInfoColGroup joinWithoutAnalysis(CompressedSizeInfoColGroup lhs,
- CompressedSizeInfoColGroup rhs) {
- int[] joined = Util.join(lhs.getColumns(), rhs.getColumns());
final int lhsV = lhs.getNumVals();
final int rhsV = rhs.getNumVals();
- final int numVals = lhsV * rhsV;
- if(numVals < 0 || numVals > _sest.getNumRows())
- return null;
- else
- return new CompressedSizeInfoColGroup(joined, numVals, _sest.getNumRows());
+ final int joinedMaxDistinct = (int) Math.min((long) lhsV * (long) rhsV, (long) _sest.getNumRows());
+ return new CompressedSizeInfoColGroup(joined, joinedMaxDistinct, _sest.getNumRows());
}
protected CompressedSizeInfoColGroup analyze(CompressedSizeInfoColGroup g) {
diff --git a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeBinPacking.java b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeBinPacking.java
index 1887bdc..1731ef2 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeBinPacking.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeBinPacking.java
@@ -141,7 +141,7 @@ public class CoCodeBinPacking extends AColumnCoCoder {
for(int j = 0; j < bins.size(); j++) {
double newBinWeight = binWeights[j] - c.getCardinalityRatio();
if(newBinWeight >= 0 && bins.get(j).getColumns().length < MAX_COL_PER_GROUP - 1) {
- bins.set(j, joinWithoutAnalysis(bins.get(j), c));
+ bins.set(j, joinWithoutAnalysis(Util.join(bins.get(j).getColumns(), c.getColumns()),bins.get(j), c));
binWeights[j] = newBinWeight;
assigned = true;
break;
@@ -291,7 +291,7 @@ public class CoCodeBinPacking extends AColumnCoCoder {
g = CompressedSizeInfoColGroup.addConstGroup(c, left, _cs.validCompressions);
else {
st3++;
- g = _sest.estimateJoinCompressedSize(left, right);
+ g = _sest.estimateJoinCompressedSize(c, left, right);
}
if(leftConst || rightConst)
diff --git a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeGreedy.java b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeGreedy.java
index 51bfa46..3091ba0 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeGreedy.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodeGreedy.java
@@ -163,7 +163,7 @@ public class CoCodeGreedy extends AColumnCoCoder {
g = CompressedSizeInfoColGroup.addConstGroup(c, left, _cs.validCompressions);
else {
st3++;
- g = _sest.estimateJoinCompressedSize(left, right);
+ g = _sest.estimateJoinCompressedSize(c, left, right);
}
if(leftConst || rightConst)
diff --git a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodePriorityQue.java b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodePriorityQue.java
index 2e0e7fb..27b678c 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodePriorityQue.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/cocode/CoCodePriorityQue.java
@@ -68,7 +68,7 @@ public class CoCodePriorityQue extends AColumnCoCoder {
while(que.peek() != null) {
CompressedSizeInfoColGroup r = que.poll();
- final CompressedSizeInfoColGroup g = joinWithAnalysis(l, r);
+ final CompressedSizeInfoColGroup g = _sest.estimateJoinCompressedSize(l, r);
if(g != null) {
final double costOfJoin = _cest.getCostOfCollectionOfGroups(que, g);
if(costOfJoin < costBeforeJoin) {
@@ -93,7 +93,7 @@ public class CoCodePriorityQue extends AColumnCoCoder {
while(que.peek() != null) {
CompressedSizeInfoColGroup r = que.peek();
if(_cest.shouldTryJoin(l, r)) {
- CompressedSizeInfoColGroup g = joinWithAnalysis(l, r);
+ CompressedSizeInfoColGroup g = _sest.estimateJoinCompressedSize(l, r);
if(g != null) {
double costOfJoin = _cest.getCostOfColumnGroup(g);
double costIndividual = _cest.getCostOfColumnGroup(l) + _cest.getCostOfColumnGroup(r);
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupUncompressed.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupUncompressed.java
index ab94646..b1b0540 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupUncompressed.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupUncompressed.java
@@ -132,6 +132,23 @@ public class ColGroupUncompressed extends AColGroup {
_data = data;
}
+ /**
+ * Constructor for allocating a single uncompressed column group.
+ *
+ * @param data matrix block
+ */
+ public ColGroupUncompressed(MatrixBlock data) {
+ super(generateColumnList(data.getNumColumns()));
+ _data = data;
+ }
+
+ private static int[] generateColumnList(int nCol){
+ int[] cols = new int[nCol];
+ for(int i = 0; i< nCol; i++)
+ cols[i] = i;
+ return cols;
+ }
+
@Override
public CompressionType getCompType() {
return CompressionType.UNCOMPRESSED;
@@ -632,7 +649,7 @@ public class ColGroupUncompressed extends AColGroup {
double[] dv = colSum.getDenseBlockValues();
for(int i = 0; i < _colIndexes.length; i++)
c[_colIndexes[i]] += dv[i];
-
+
}
}
}
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToFactory.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToFactory.java
index 5a72ff7..6c0b9fa 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToFactory.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/mapping/MapToFactory.java
@@ -108,16 +108,13 @@ public final class MapToFactory {
final int nVL = left.getUnique();
final int nVR = right.getUnique();
final int size = left.size();
- final int maxUnique = nVL * nVR;
+ final long maxUnique = nVL * nVR;
+ if(maxUnique > (long)Integer.MAX_VALUE)
+ throw new DMLCompressionException("Joining impossible using linearized join, since each side has a large number of unique values");
if(size != right.size())
throw new DMLCompressionException("Invalid input maps to join, must contain same number of rows");
- try {
- return computeJoin(left, right, size, nVL, maxUnique);
- }
- catch(Exception e) {
- throw new DMLCompressionException("Joining failed max unique expected:" + maxUnique, e);
- }
+ return computeJoin(left, right, size, nVL, (int)maxUnique);
}
private static AMapToData computeJoin(AMapToData left, AMapToData right, int size, int nVL, int maxUnique) {
@@ -141,7 +138,6 @@ public final class MapToFactory {
}
tmp.setUnique(newUID-1);
- // LOG.error(Arrays.toString(map));
return tmp;
}
}
diff --git a/src/main/java/org/apache/sysds/runtime/compress/cost/CostEstimatorBuilder.java b/src/main/java/org/apache/sysds/runtime/compress/cost/CostEstimatorBuilder.java
index d7f305c..3d58bff 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/cost/CostEstimatorBuilder.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/cost/CostEstimatorBuilder.java
@@ -125,6 +125,11 @@ public final class CostEstimatorBuilder implements Serializable {
numberOps += counter.scans + counter.leftMultiplications * 2 + counter.rightMultiplications +
counter.compressedMultiplications * 4 + counter.dictionaryOps;
numberOps -= counter.decompressions + counter.overlappingDecompressions;
+
+ if(counter.decompressions > 1 &&
+ counter.leftMultiplications + counter.rightMultiplications + counter.compressedMultiplications < 1)
+ // This condition is added for l2svm and mLogReg y dataset, that is compressing while it should not.
+ return false;
return numberOps > 4;
}
diff --git a/src/main/java/org/apache/sysds/runtime/compress/cost/CostEstimatorFactory.java b/src/main/java/org/apache/sysds/runtime/compress/cost/CostEstimatorFactory.java
index e8bff06..83b794e 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/cost/CostEstimatorFactory.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/cost/CostEstimatorFactory.java
@@ -45,7 +45,7 @@ public final class CostEstimatorFactory {
return b.create(nRows, nCols);
}
else
- return new DistinctCostEstimator(nRows, cs);
+ return new MemoryCostEstimator();
case MEMORY:
default:
return new MemoryCostEstimator();
diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimator.java b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimator.java
index 3dd092c..ef7d9dd 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimator.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimator.java
@@ -82,7 +82,7 @@ public abstract class CompressedSizeEstimator {
return _numCols;
}
- public MatrixBlock getData(){
+ public MatrixBlock getData() {
return _data;
}
@@ -163,7 +163,7 @@ public abstract class CompressedSizeEstimator {
* @param colIndexes The columns to group together inside a ColGroup
* @return The CompressedSizeInformation associated with the selected ColGroups.
*/
- public CompressedSizeInfoColGroup estimateCompressedColGroupSize(int[] colIndexes){
+ public CompressedSizeInfoColGroup estimateCompressedColGroupSize(int[] colIndexes) {
return estimateCompressedColGroupSize(colIndexes, 8, getNumRows());
}
@@ -173,7 +173,7 @@ public abstract class CompressedSizeEstimator {
* the number estimated in sub groups of the given colIndexes.
*
* @param colIndexes The columns to extract compression information from
- * @param estimate An estimate of number of unique elements in these columns
+ * @param estimate An estimate of number of unique elements in these columns
* @param nrUniqueUpperBound The upper bound of unique elements allowed in the estimate, can be calculated from the
* number of unique elements estimated in sub columns multiplied together. This is
* flexible in the sense that if the sample is small then this unique can be manually
@@ -181,13 +181,16 @@ public abstract class CompressedSizeEstimator {
*
* @return The CompressedSizeInfoColGroup fro the given column indexes.
*/
- public abstract CompressedSizeInfoColGroup estimateCompressedColGroupSize(int[] colIndexes, int estimate, int nrUniqueUpperBound);
+ public abstract CompressedSizeInfoColGroup estimateCompressedColGroupSize(int[] colIndexes, int estimate,
+ int nrUniqueUpperBound);
/**
* Join two analyzed column groups together. without materializing the dictionaries of either side.
*
- * If either side was constructed without analysis then fall back to default materialization of double arrays.
+ * if the number of distinct elements in both sides multiplied is larger than Integer, return null.
*
+ * If either side was constructed without analysis then fall back to default materialization of double arrays.
+ * O
* @param g1 First group
* @param g2 Second group
* @return A joined compressed size estimation for the group.
@@ -195,19 +198,39 @@ public abstract class CompressedSizeEstimator {
public CompressedSizeInfoColGroup estimateJoinCompressedSize(CompressedSizeInfoColGroup g1,
CompressedSizeInfoColGroup g2) {
final int[] joined = Util.join(g1.getColumns(), g2.getColumns());
+ return estimateJoinCompressedSize(joined, g1, g2);
+ }
+
+ /**
+ * Join two analyzed column groups together. without materializing the dictionaries of either side.
+ *
+ * if the number of distinct elements in both sides multiplied is larger than Integer, return null.
+ *
+ * If either side was constructed without analysis then fall back to default materialization of double arrays.
+ *
+ * @param joined The joined column indexes.
+ * @param g1 First group
+ * @param g2 Second group
+ * @return A joined compressed size estimation for the group.
+ */
+ public CompressedSizeInfoColGroup estimateJoinCompressedSize(int[] joined, CompressedSizeInfoColGroup g1,
+ CompressedSizeInfoColGroup g2) {
final int g1V = g1.getNumVals();
final int g2V = g2.getNumVals();
- if(g1V * g2V < 0 || g1V * g2V > getNumRows())
+ if((long) g1V * g2V > (long) Integer.MAX_VALUE)
return null;
- else if((g1.getMap() == null && g2V != 0) || (g2.getMap() == null && g2V != 0))
- return estimateCompressedColGroupSize(joined, Math.max(g1V + 1, g2V+ 1), Math.min((g1V + 1) * (g2V + 1), getNumRows()));
+
+ final int joinedMaxDistinct = (int) Math.min((long) g1V * (long) g2V, getNumRows());
+ if((g1.getMap() == null && g2V != 0) || (g2.getMap() == null && g2V != 0))
+ return estimateCompressedColGroupSize(joined, Math.max(g1V + 1, g2V + 1),
+ Math.min((g1V + 1) * (g2V + 1), getNumRows()));
else
- return estimateJoinCompressedSize(joined, g1, g2);
+ return estimateJoinCompressedSize(joined, g1, g2, joinedMaxDistinct);
}
protected abstract CompressedSizeInfoColGroup estimateJoinCompressedSize(int[] joinedcols,
- CompressedSizeInfoColGroup g1, CompressedSizeInfoColGroup g2);
+ CompressedSizeInfoColGroup g1, CompressedSizeInfoColGroup g2, int joinedMaxDistinct);
/**
* Method used to extract the CompressedSizeEstimationFactors from an constructed UncompressedBitmap. Note this
diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorExact.java b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorExact.java
index 824034a..1531781 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorExact.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorExact.java
@@ -45,8 +45,9 @@ public class CompressedSizeEstimatorExact extends CompressedSizeEstimator {
}
@Override
- public CompressedSizeInfoColGroup estimateJoinCompressedSize(int[] joined, CompressedSizeInfoColGroup g1,
- CompressedSizeInfoColGroup g2) {
+ protected CompressedSizeInfoColGroup estimateJoinCompressedSize(int[] joined, CompressedSizeInfoColGroup g1,
+ CompressedSizeInfoColGroup g2, int joinedMaxDistinct) {
+
AMapToData map = MapToFactory.join(g1.getMap(), g2.getMap());
EstimationFactors em = EstimationFactors.computeSizeEstimation(joined, map,
_cs.validCompressions.contains(CompressionType.RLE), _numRows, false);
diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorFactory.java b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorFactory.java
index 8b42258..6e9208a 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorFactory.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorFactory.java
@@ -31,22 +31,31 @@ public class CompressedSizeEstimatorFactory {
public static CompressedSizeEstimator getSizeEstimator(MatrixBlock data, CompressionSettings cs, int k) {
- final int nRows = cs.transposed ? data.getNumColumns() : data.getNumRows();
+ final int nRows = cs.transposed ? data.getNumColumns() : data.getNumRows();
final int nCols = cs.transposed ? data.getNumRows() : data.getNumColumns();
final int nnzRows = (int) Math.ceil(data.getNonZeros() / nCols);
final double sampleRatio = cs.samplingRatio;
final int sampleSize = Math.min(getSampleSize(sampleRatio, nRows, cs.minimumSampleSize), maxSampleSize);
- if(shouldUseExactEstimator(cs, nRows, sampleSize, nnzRows)) {
- if(sampleSize > nnzRows && nRows > 10000 && nCols > 10 && !cs.transposed) {
- data = LibMatrixReorg.transpose(data,
- new MatrixBlock(data.getNumColumns(), data.getNumRows(), data.isInSparseFormat()), k);
- cs.transposed = true;
- }
- return new CompressedSizeEstimatorExact(data, cs);
+
+ if(nCols > 1000) {
+ return tryToMakeSampleEstimator(data, cs, sampleRatio, sampleSize / 10, nRows, nnzRows, k);
}
else {
- return tryToMakeSampleEstimator(data, cs, sampleRatio, sampleSize, nRows, nnzRows, k);
+ if(shouldUseExactEstimator(cs, nRows, sampleSize, nnzRows)) {
+ if(sampleSize > nnzRows && nRows > 10000 && nCols > 10 && !cs.transposed) {
+ LOG.info("Transposing for exact estimator");
+ data = LibMatrixReorg.transpose(data,
+ new MatrixBlock(data.getNumColumns(), data.getNumRows(), data.isInSparseFormat()), k);
+ cs.transposed = true;
+ }
+ LOG.info("Using Exact estimator");
+ return new CompressedSizeEstimatorExact(data, cs);
+ }
+ else {
+ LOG.info("Trying sample size: " + sampleSize);
+ return tryToMakeSampleEstimator(data, cs, sampleRatio, sampleSize, nRows, nnzRows, k);
+ }
}
}
@@ -54,8 +63,9 @@ public class CompressedSizeEstimatorFactory {
private static CompressedSizeEstimator tryToMakeSampleEstimator(MatrixBlock data, CompressionSettings cs,
double sampleRatio, int sampleSize, int nRows, int nnzRows, int k) {
CompressedSizeEstimatorSample estS = new CompressedSizeEstimatorSample(data, cs, sampleSize, k);
+ int double_number = 1;
while(estS.getSample() == null) {
- LOG.warn("Doubling sample size");
+ LOG.error("Warining doubling sample size " + double_number++);
sampleSize = sampleSize * 2;
if(shouldUseExactEstimator(cs, nRows, sampleSize, nnzRows))
return new CompressedSizeEstimatorExact(data, cs);
diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorSample.java b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorSample.java
index 24ee358..218e12b 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorSample.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorSample.java
@@ -107,18 +107,17 @@ public class CompressedSizeEstimatorSample extends CompressedSizeEstimator {
}
@Override
- public CompressedSizeInfoColGroup estimateJoinCompressedSize(int[] joined, CompressedSizeInfoColGroup g1,
- CompressedSizeInfoColGroup g2) {
- final int g1V = g1.getMap().getUnique();
- final int g2V = g2.getMap().getUnique();
- final int nrUniqueUpperBound = g1V * g2V;
+ protected CompressedSizeInfoColGroup estimateJoinCompressedSize(int[] joined, CompressedSizeInfoColGroup g1,
+ CompressedSizeInfoColGroup g2, int joinedMaxDistinct) {
+ if((long)g1.getNumVals() * g2.getNumVals() >(long)Integer.MAX_VALUE )
+ return null;
final AMapToData map = MapToFactory.join(g1.getMap(), g2.getMap());
EstimationFactors sampleFacts = EstimationFactors.computeSizeEstimation(joined, map,
_cs.validCompressions.contains(CompressionType.RLE), map.size(), false);
// result facts
- EstimationFactors em = estimateCompressionFactors(sampleFacts, map, joined, nrUniqueUpperBound);
+ EstimationFactors em = estimateCompressionFactors(sampleFacts, map, joined, joinedMaxDistinct);
return new CompressedSizeInfoColGroup(em, _cs.validCompressions, map);
}
diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java
index e2b7491..1026aa2 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeInfoColGroup.java
@@ -66,7 +66,7 @@ public class CompressedSizeInfoColGroup {
_facts = new EstimationFactors(columns, numVals, numRows);
_cardinalityRatio = (double) numVals / numRows;
_sizes = null;
- _bestCompressionType = null;
+ _bestCompressionType = CompressionType.DDC;
_minSize = ColGroupSizes.estimateInMemorySizeDDC(columns.length, numVals, numRows, 1.0, false);
_map = null;
}
diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibAppend.java b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibAppend.java
index fc086e6..ab60d9f 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibAppend.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibAppend.java
@@ -22,12 +22,10 @@ package org.apache.sysds.runtime.compress.lib;
import java.util.ArrayList;
import java.util.List;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
import org.apache.sysds.runtime.compress.CompressedMatrixBlockFactory;
-import org.apache.sysds.runtime.compress.CompressionStatistics;
import org.apache.sysds.runtime.compress.colgroup.AColGroup;
import org.apache.sysds.runtime.compress.colgroup.ColGroupEmpty;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
@@ -41,24 +39,18 @@ public class CLALibAppend {
if(left.isEmpty() && right instanceof CompressedMatrixBlock)
return appendLeftEmpty(left, (CompressedMatrixBlock) right);
else if(right.isEmpty() && left instanceof CompressedMatrixBlock)
- return appendRightEmpty((CompressedMatrixBlock)left, right);
+ return appendRightEmpty((CompressedMatrixBlock) left, right);
final int m = left.getNumRows();
final int n = left.getNumColumns() + right.getNumColumns();
- // try to compress both sides (if not already compressed).
if(!(left instanceof CompressedMatrixBlock) && m > 1000) {
- LOG.warn("Compressing left for append operation");
- Pair<MatrixBlock, CompressionStatistics> x = CompressedMatrixBlockFactory.compress(left);
- if(x.getRight().getRatio() > 3.0)
- left = x.getLeft();
-
+ LOG.info("Appending uncompressed column group left");
+ left = CompressedMatrixBlockFactory.genUncompressedCompressedMatrixBlock(left);
}
if(!(right instanceof CompressedMatrixBlock) && m > 1000) {
- LOG.warn("Compressing right for append operation");
- Pair<MatrixBlock, CompressionStatistics> x = CompressedMatrixBlockFactory.compress(right);
- if(x.getRight().getRatio() > 3.0)
- right = x.getLeft();
+ LOG.warn("Appending uncompressed column group right");
+ left = CompressedMatrixBlockFactory.genUncompressedCompressedMatrixBlock(right);
}
// if compression failed then use default append method.
@@ -72,14 +64,22 @@ public class CLALibAppend {
CompressedMatrixBlock ret = new CompressedMatrixBlock(m, n);
ret = appendColGroups(ret, leftC.getColGroups(), rightC.getColGroups(), leftC.getNumColumns());
- return ret;
+
+ double compressedSize = ret.getInMemorySize();
+ double uncompressedSize = MatrixBlock.estimateSizeInMemory(m,n, ret.getSparsity());
+
+
+ if(compressedSize * 10 < uncompressedSize)
+ return ret;
+ else
+ return ret.getUncompressed("Decompressing c bind matrix");
}
private static MatrixBlock appendRightEmpty(CompressedMatrixBlock left, MatrixBlock right) {
final int m = left.getNumRows();
final int n = left.getNumColumns() + right.getNumColumns();
- CompressedMatrixBlock ret = new CompressedMatrixBlock(m,n);
+ CompressedMatrixBlock ret = new CompressedMatrixBlock(m, n);
List<AColGroup> newGroup = new ArrayList<>(1);
newGroup.add(ColGroupEmpty.generate(right.getNumColumns(), right.getNumRows()));
@@ -91,7 +91,7 @@ public class CLALibAppend {
private static MatrixBlock appendLeftEmpty(MatrixBlock left, CompressedMatrixBlock right) {
final int m = left.getNumRows();
final int n = left.getNumColumns() + right.getNumColumns();
- CompressedMatrixBlock ret = new CompressedMatrixBlock(m,n);
+ CompressedMatrixBlock ret = new CompressedMatrixBlock(m, n);
List<AColGroup> newGroup = new ArrayList<>(1);
newGroup.add(ColGroupEmpty.generate(left.getNumColumns(), left.getNumRows()));
diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibBinaryCellOp.java b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibBinaryCellOp.java
index 79cf1c3..99d9c92 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibBinaryCellOp.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibBinaryCellOp.java
@@ -19,7 +19,6 @@
package org.apache.sysds.runtime.compress.lib;
-import java.lang.ref.SoftReference;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
@@ -59,33 +58,41 @@ public class CLALibBinaryCellOp {
private static final Log LOG = LogFactory.getLog(CLALibBinaryCellOp.class.getName());
- public static MatrixBlock binaryOperations(BinaryOperator op, CompressedMatrixBlock m1, MatrixValue thatValue,
- MatrixValue result) {
+ public static MatrixBlock binaryOperations(BinaryOperator op, CompressedMatrixBlock m1, MatrixBlock thatValue,
+ MatrixBlock result) {
MatrixBlock that = CompressedMatrixBlock.getUncompressed(thatValue, "Decompressing right side in BinaryOps");
if(m1.getNumRows() <= 0)
LOG.error(m1);
- if(thatValue.getNumRows() <= 0)
- LOG.error(thatValue);
+ if(that.getNumRows() <= 0)
+ LOG.error(that);
LibMatrixBincell.isValidDimensionsBinary(m1, that);
- thatValue = that;
BinaryAccessType atype = LibMatrixBincell.getBinaryAccessType(m1, that);
- return selectProcessingBasedOnAccessType(op, m1, thatValue, result, atype, false);
+ return selectProcessingBasedOnAccessType(op, m1, that, result, atype, false);
}
- public static MatrixBlock binaryOperationsLeft(BinaryOperator op, CompressedMatrixBlock m1, MatrixValue thatValue,
- MatrixValue result) {
+ public static MatrixBlock binaryOperationsLeft(BinaryOperator op, CompressedMatrixBlock m1, MatrixBlock thatValue,
+ MatrixBlock result) {
MatrixBlock that = CompressedMatrixBlock.getUncompressed(thatValue, "Decompressing left side in BinaryOps");
LibMatrixBincell.isValidDimensionsBinary(that, m1);
thatValue = that;
BinaryAccessType atype = LibMatrixBincell.getBinaryAccessType(that, m1);
- return selectProcessingBasedOnAccessType(op, m1, thatValue, result, atype, true);
+ return selectProcessingBasedOnAccessType(op, m1, that, result, atype, true);
}
private static MatrixBlock selectProcessingBasedOnAccessType(BinaryOperator op, CompressedMatrixBlock m1,
- MatrixValue thatValue, MatrixValue result, BinaryAccessType atype, boolean left) {
- MatrixBlock that = (MatrixBlock) thatValue;
- if(atype == BinaryAccessType.MATRIX_COL_VECTOR)
- return binaryMVCol(m1, that, op, left);
+ MatrixBlock that, MatrixBlock result, BinaryAccessType atype, boolean left) {
+ if(atype == BinaryAccessType.MATRIX_COL_VECTOR) {
+ MatrixBlock d_compressed = m1.getCachedDecompressed();
+ if(d_compressed != null) {
+ if(left)
+ return that.binaryOperations(op, d_compressed, result);
+ else
+ return d_compressed.binaryOperations(op, that, result);
+ }
+ else
+ return binaryMVCol(m1, that, op, left);
+
+ }
else if(atype == BinaryAccessType.MATRIX_MATRIX) {
if(that.isEmpty()) {
ScalarOperator sop = left ? new LeftScalarOperator(op.fn, 0, -1) : new RightScalarOperator(op.fn, 0,
@@ -93,8 +100,7 @@ public class CLALibBinaryCellOp {
return CLALibScalar.scalarOperations(sop, m1, result);
}
else {
- SoftReference<MatrixBlock> msf = m1.getSoftReferenceToDecompressed();
- MatrixBlock d_compressed = msf != null ? msf.get() : null;
+ MatrixBlock d_compressed = m1.getCachedDecompressed();
if(d_compressed != null) {
// copy the decompressed matrix if there is a decompressed matrix already.
MatrixBlock tmp = d_compressed;
@@ -117,7 +123,7 @@ public class CLALibBinaryCellOp {
return bincellOp(m1, that, setupCompressedReturnMatrixBlock(m1, result), op, left);
else {
LOG.warn("Decompressing since Binary Ops" + op.fn + " is not supported compressed");
- return CompressedMatrixBlock.getUncompressed(m1).binaryOperations(op, thatValue, result);
+ return CompressedMatrixBlock.getUncompressed(m1).binaryOperations(op, that, result);
}
}
@@ -295,7 +301,7 @@ public class CLALibBinaryCellOp {
final int blkz = CompressionSettings.BITMAP_BLOCK_SZ;
final int k = op.getNumThreads();
long nnz = 0;
- ;
+
if(k <= 1) {
for(int i = 0; i * blkz < m1.getNumRows(); i++) {
if(left)
diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibCompAgg.java b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibCompAgg.java
index 892ec95..5c9f7b2 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibCompAgg.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibCompAgg.java
@@ -34,6 +34,7 @@ import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
import org.apache.sysds.runtime.compress.CompressionSettings;
import org.apache.sysds.runtime.compress.DMLCompressionException;
import org.apache.sysds.runtime.compress.colgroup.AColGroup;
+import org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
import org.apache.sysds.runtime.functionobjects.Builtin;
import org.apache.sysds.runtime.functionobjects.Builtin.BuiltinCode;
import org.apache.sysds.runtime.functionobjects.IndexFunction;
@@ -50,7 +51,6 @@ import org.apache.sysds.runtime.matrix.data.LibMatrixAgg;
import org.apache.sysds.runtime.matrix.data.LibMatrixBincell;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
-import org.apache.sysds.runtime.matrix.data.MatrixValue;
import org.apache.sysds.runtime.matrix.data.MatrixValue.CellIndex;
import org.apache.sysds.runtime.matrix.operators.AggregateOperator;
import org.apache.sysds.runtime.matrix.operators.AggregateUnaryOperator;
@@ -71,43 +71,64 @@ public class CLALibCompAgg {
}
};
- public static MatrixBlock aggregateUnary(CompressedMatrixBlock inputMatrix, MatrixValue result,
+ public static MatrixBlock aggregateUnary(CompressedMatrixBlock inputMatrix, MatrixBlock result,
AggregateUnaryOperator op, int blen, MatrixIndexes indexesIn, boolean inCP) {
// prepare output dimensions
CellIndex tempCellIndex = new CellIndex(-1, -1);
op.indexFn.computeDimension(inputMatrix.getNumRows(), inputMatrix.getNumColumns(), tempCellIndex);
+ if(requireDecompression(inputMatrix, op)) {
+ // Decide if we should use the cached decompressed Version, or we should decompress.
+ final double denseSize = MatrixBlock.estimateSizeDenseInMemory(inputMatrix.getNumRows(),
+ inputMatrix.getNumColumns());
+ final double currentSize = inputMatrix.getInMemorySize();
+ final double localMaxMemory = InfrastructureAnalyzer.getLocalMaxMemory();
+
+ if(denseSize < 5 * currentSize && inputMatrix.getColGroups().size() > 5 &&
+ denseSize <= localMaxMemory / 2) {
+ LOG.info("Decompressing for unaryAggregate because of overlapping state");
+ inputMatrix.decompress(op.getNumThreads());
+ }
+ MatrixBlock decomp = inputMatrix.getCachedDecompressed();
+ if(decomp != null)
+ return decomp.aggregateUnaryOperations(op, result, blen, indexesIn, inCP);
+ }
+
// initialize and allocate the result
if(result == null)
result = new MatrixBlock(tempCellIndex.row, tempCellIndex.column, false);
else
result.reset(tempCellIndex.row, tempCellIndex.column, false);
- MatrixBlock ret = (MatrixBlock) result;
- ret.allocateDenseBlock();
+ result.allocateDenseBlock();
AggregateUnaryOperator opm = replaceKahnOperations(op);
if(inputMatrix.getColGroups() != null) {
- fillStart(ret, opm);
+ fillStart(result, opm);
- if(inputMatrix.isOverlapping() &&
- (opm.aggOp.increOp.fn instanceof KahanPlusSq || (opm.aggOp.increOp.fn instanceof Builtin &&
- (((Builtin) opm.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MIN ||
- ((Builtin) opm.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MAX))))
- aggregateUnaryOverlapping(inputMatrix, ret, opm, indexesIn, inCP);
+ if(requireDecompression(inputMatrix, opm))
+ aggregateUnaryOverlapping(inputMatrix, result, opm, indexesIn, inCP);
else
- aggregateUnaryNormalCompressedMatrixBlock(inputMatrix, ret, opm, blen, indexesIn, inCP);
+ aggregateUnaryNormalCompressedMatrixBlock(inputMatrix, result, opm, blen, indexesIn, inCP);
}
- ret.recomputeNonZeros();
+
+ result.recomputeNonZeros();
if(op.aggOp.existsCorrection() && !inCP) {
- ret = addCorrection(ret, op);
+ result = addCorrection(result, op);
if(op.aggOp.increOp.fn instanceof Mean)
- ret = addCellCount(ret, op, inputMatrix.getNumRows(), inputMatrix.getNumColumns());
+ result = addCellCount(result, op, inputMatrix.getNumRows(), inputMatrix.getNumColumns());
}
- return ret;
+ return result;
+
+ }
+ private static boolean requireDecompression(CompressedMatrixBlock inputMatrix, AggregateUnaryOperator op) {
+ return inputMatrix.isOverlapping() &&
+ (op.aggOp.increOp.fn instanceof KahanPlusSq || (op.aggOp.increOp.fn instanceof Builtin &&
+ (((Builtin) op.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MIN ||
+ ((Builtin) op.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MAX)));
}
private static MatrixBlock addCorrection(MatrixBlock ret, AggregateUnaryOperator op) {
diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibRelationalOp.java b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibRelationalOp.java
deleted file mode 100644
index d4fbb7e..0000000
--- a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibRelationalOp.java
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-// package org.apache.sysds.runtime.compress.lib;
-
-// import java.util.ArrayList;
-// import java.util.Arrays;
-// import java.util.List;
-// import java.util.concurrent.Callable;
-// import java.util.concurrent.ExecutionException;
-// import java.util.concurrent.ExecutorService;
-// import java.util.concurrent.Future;
-
-// import org.apache.sysds.hops.OptimizerUtils;
-// import org.apache.sysds.runtime.DMLRuntimeException;
-// import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
-// import org.apache.sysds.runtime.compress.CompressionSettings;
-// import org.apache.sysds.runtime.compress.colgroup.AColGroup;
-// import org.apache.sysds.runtime.compress.colgroup.ColGroupConst;
-// import org.apache.sysds.runtime.compress.colgroup.dictionary.Dictionary;
-// import org.apache.sysds.runtime.functionobjects.Equals;
-// import org.apache.sysds.runtime.functionobjects.GreaterThan;
-// import org.apache.sysds.runtime.functionobjects.GreaterThanEquals;
-// import org.apache.sysds.runtime.functionobjects.LessThan;
-// import org.apache.sysds.runtime.functionobjects.LessThanEquals;
-// import org.apache.sysds.runtime.functionobjects.NotEquals;
-// import org.apache.sysds.runtime.matrix.data.MatrixBlock;
-// import org.apache.sysds.runtime.matrix.operators.LeftScalarOperator;
-// import org.apache.sysds.runtime.matrix.operators.RightScalarOperator;
-// import org.apache.sysds.runtime.matrix.operators.ScalarOperator;
-// import org.apache.sysds.runtime.util.CommonThreadPool;
-
-// /**
-// * This class is used for relational operators that return binary values depending on individual cells values in the
-// * compression. This indicate that the resulting vectors/matrices are amenable to compression since they only contain
-// * two distinct values, true or false.
-// *
-// */
-// public class CLALibRelationalOp {
-// // private static final Log LOG = LogFactory.getLog(LibRelationalOp.class.getName());
-
-// /** Thread pool matrix Block for materializing decompressed groups. */
-// private static ThreadLocal<MatrixBlock> memPool = new ThreadLocal<MatrixBlock>() {
-// @Override
-// protected MatrixBlock initialValue() {
-// return null;
-// }
-// };
-
-// protected static boolean isValidForRelationalOperation(ScalarOperator sop, CompressedMatrixBlock m1) {
-// return m1.isOverlapping() &&
-// (sop.fn instanceof LessThan || sop.fn instanceof LessThanEquals || sop.fn instanceof GreaterThan ||
-// sop.fn instanceof GreaterThanEquals || sop.fn instanceof Equals || sop.fn instanceof NotEquals);
-// }
-
-// // public static MatrixBlock overlappingRelativeRelationalOperation(ScalarOperator sop, CompressedMatrixBlock m1) {
-
-// // List<AColGroup> colGroups = m1.getColGroups();
-// // boolean less = ((sop.fn instanceof LessThan || sop.fn instanceof LessThanEquals) &&
-// // sop instanceof LeftScalarOperator) ||
-// // (sop instanceof RightScalarOperator &&
-// // (sop.fn instanceof GreaterThan || sop.fn instanceof GreaterThanEquals));
-// // double v = sop.getConstant();
-// // double min = m1.min();
-// // double max = m1.max();
-
-// // // Shortcut:
-// // // If we know worst case min and worst case max and the values to compare to in all cases is
-// // // less then or greater than worst then we can return a full matrix with either 1 or 0.
-
-// // if(v < min || v > max) {
-// // if(sop.fn instanceof Equals) {
-// // return makeConstZero(m1.getNumRows(), m1.getNumColumns());
-// // }
-// // else if(sop.fn instanceof NotEquals) {
-// // return makeConstOne(m1.getNumRows(), m1.getNumColumns());
-// // }
-// // else if(less) {
-// // if(v < min || ((sop.fn instanceof LessThanEquals || sop.fn instanceof GreaterThan) && v <= min))
-// // return makeConstOne(m1.getNumRows(), m1.getNumColumns());
-// // else
-// // return makeConstZero(m1.getNumRows(), m1.getNumColumns());
-// // }
-// // else {
-// // if(v > max || ((sop.fn instanceof LessThanEquals || sop.fn instanceof GreaterThan) && v >= max))
-// // return makeConstOne(m1.getNumRows(), m1.getNumColumns());
-// // else
-// // return makeConstZero(m1.getNumRows(), m1.getNumColumns());
-// // }
-// // }
-// // else {
-// // return processNonConstant(sop, minMax, min, max, m1.getNumRows(), m1.getNumColumns(), less);
-// // }
-
-// // }
-
-// private static MatrixBlock makeConstOne(int rows, int cols) {
-// // List<AColGroup> newColGroups = new ArrayList<>();
-// // int[] colIndexes = new int[cols];
-// // for(int i = 0; i < colIndexes.length; i++) {
-// // colIndexes[i] = i;
-// // }
-// // double[] values = new double[cols];
-// // Arrays.fill(values, 1);
-
-// // newColGroups.add(new ColGroupConst(colIndexes, rows, new Dictionary(values)));
-// // CompressedMatrixBlock ret = new CompressedMatrixBlock(rows, cols);
-// // ret.allocateColGroupList(newColGroups);
-// // ret.setNonZeros(cols * rows);
-// // ret.setOverlapping(false);
-// // return ret;
-// // }
-
-// // private static MatrixBlock makeConstZero(int rows, int cols) {
-// // MatrixBlock sb = new MatrixBlock(rows, cols, true, 0);
-// // return sb;
-// // }
-
-// // private static MatrixBlock processNonConstant(ScalarOperator sop, MinMaxGroup[] minMax, double minS, double maxS,
-// // final int rows, final int cols, boolean less) {
-
-// // // BitSet res = new BitSet(ret.getNumColumns() * ret.getNumRows());
-// // MatrixBlock res = new MatrixBlock(rows, cols, true, 0).allocateBlock();
-// // int k = OptimizerUtils.getConstrainedNumThreads(-1);
-// // int outRows = rows;
-// // long nnz = 0;
-// // if(k == 1) {
-// // final int b = CompressionSettings.BITMAP_BLOCK_SZ / cols;
-// // final int blkz = (outRows < b) ? outRows : b;
-
-// // MatrixBlock tmp = new MatrixBlock(blkz, cols, false, -1).allocateBlock();
-// // for(int i = 0; i * blkz < outRows; i++) {
-// // for(MinMaxGroup mmg : minMax)
-// // mmg.g.decompressToBlockUnSafe(tmp, i * blkz, Math.min((i + 1) * blkz, rows), 0);
-
-// // for(int row = 0; row < blkz && row < rows - i * blkz; row++) {
-// // int off = (row + i * blkz);
-// // for(int col = 0; col < cols; col++) {
-// // res.quickSetValue(off, col, sop.executeScalar(tmp.quickGetValue(row, col)));
-// // if(res.quickGetValue(off, col) != 0) {
-// // nnz++;
-// // }
-// // }
-// // }
-// // }
-// // tmp.reset();
-// // res.setNonZeros(nnz);
-// // }
-// // else {
-// // final int blkz = CompressionSettings.BITMAP_BLOCK_SZ / 2;
-// // ExecutorService pool = CommonThreadPool.get(k);
-// // ArrayList<RelationalTask> tasks = new ArrayList<>();
-
-// // try {
-// // for(int i = 0; i * blkz < outRows; i++) {
-// // RelationalTask rt = new RelationalTask(minMax, i, blkz, res, rows, cols, sop);
-// // tasks.add(rt);
-// // }
-// // List<Future<Object>> futures = pool.invokeAll(tasks);
-// // pool.shutdown();
-// // for(Future<Object> f : futures)
-// // f.get();
-// // }
-// // catch(InterruptedException | ExecutionException e) {
-// // e.printStackTrace();
-// // throw new DMLRuntimeException(e);
-// // }
-
-// // }
-// // memPool.remove();
-
-// // return res;
-// // }
-
-// // protected static class MinMaxGroup implements Comparable<MinMaxGroup> {
-// // double min;
-// // double max;
-// // AColGroup g;
-// // double[] values;
-
-// // public MinMaxGroup(double min, double max, AColGroup g) {
-// // this.min = min;
-// // this.max = max;
-// // this.g = g;
-
-// // this.values = g.getValues();
-// // }
-
-// // @Override
-// // public int compareTo(MinMaxGroup o) {
-// // double t = max - min;
-// // double ot = o.max - o.min;
-// // return Double.compare(t, ot);
-// // }
-
-// // @Override
-// // public String toString() {
-// // StringBuilder sb = new StringBuilder();
-// // sb.append("MMG: ");
-// // sb.append("[" + min + "," + max + "]");
-// // sb.append(" " + g.getClass().getSimpleName());
-// // return sb.toString();
-// // }
-// // }
-
-// // private static class RelationalTask implements Callable<Object> {
-// // private final MinMaxGroup[] _minMax;
-// // private final int _i;
-// // private final int _blkz;
-// // private final MatrixBlock _res;
-// // private final int _rows;
-// // private final int _cols;
-// // private final ScalarOperator _sop;
-
-// // protected RelationalTask(MinMaxGroup[] minMax, int i, int blkz, MatrixBlock res, int rows, int cols,
-// // ScalarOperator sop) {
-// // _minMax = minMax;
-// // _i = i;
-// // _blkz = blkz;
-// // _res = res;
-// // _rows = rows;
-// // _cols = cols;
-// // _sop = sop;
-// // }
-
-// // @Override
-// // public Object call() {
-// // MatrixBlock tmp = memPool.get();
-// // if(tmp == null) {
-// // memPool.set(new MatrixBlock(_blkz, _cols, false, -1).allocateBlock());
-// // tmp = memPool.get();
-// // }
-// // else {
-// // tmp = memPool.get();
-// // tmp.reset(_blkz, _cols, false, -1);
-// // }
-
-// // for(MinMaxGroup mmg : _minMax) {
-// // if(mmg.g.getNumberNonZeros() != 0)
-// // mmg.g.decompressToBlockUnSafe(tmp, _i * _blkz, Math.min((_i + 1) * _blkz, mmg.g.getNumRows()), 0);
-// // }
-
-// // for(int row = 0, off = _i * _blkz; row < _blkz && row < _rows - _i * _blkz; row++, off++) {
-// // for(int col = 0; col < _cols; col++) {
-// // _res.appendValue(off, col, _sop.executeScalar(tmp.quickGetValue(row, col)));
-// // }
-// // }
-// // return null;
-// // }
-// // }
-// }
diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibScalar.java b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibScalar.java
index aea5488..1822685 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibScalar.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibScalar.java
@@ -56,13 +56,6 @@ public class CLALibScalar {
private static final int MINIMUM_PARALLEL_SIZE = 8096;
public static MatrixBlock scalarOperations(ScalarOperator sop, CompressedMatrixBlock m1, MatrixValue result) {
- // Special case handling of overlapping relational operations
- // if(CLALibRelationalOp.isValidForRelationalOperation(sop, m1)) {
- // MatrixBlock ret = CLALibRelationalOp.overlappingRelativeRelationalOperation(sop, m1);
- // ret.recomputeNonZeros();
- // return ret;
- // }
-
if(isInvalidForCompressedOutput(m1, sop)) {
LOG.warn("scalar overlapping not supported for op: " + sop.fn);
MatrixBlock m1d = m1.decompress(sop.getNumThreads());
diff --git a/src/main/java/org/apache/sysds/runtime/compress/workload/WorkloadAnalyzer.java b/src/main/java/org/apache/sysds/runtime/compress/workload/WorkloadAnalyzer.java
index b37acc1..31b3714 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/workload/WorkloadAnalyzer.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/workload/WorkloadAnalyzer.java
@@ -404,7 +404,10 @@ public class WorkloadAnalyzer {
ArrayList<Hop> in = hop.getInput();
if(isOverlapping(in.get(0)) || isOverlapping(in.get(1)))
overlapping.add(hop.getHopID());
- return new OpNormal(hop, true);
+ // CBind is in worst case decompressing, but can be compressing the other side if it is trivially compressable.
+ // to make the optimizer correct we need to mark this operation as decompressing, since it is the worst possible outcome.
+ // Currently we dont optimize for operations that are located past a cbind.
+ return new OpDecompressing(hop);
}
else if(HopRewriteUtils.isBinary(hop, OpOp2.RBIND)) {
ArrayList<Hop> in = hop.getInput();
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
index 880f31f..faef2c7 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
@@ -66,6 +66,7 @@ import org.apache.sysds.runtime.data.SparseBlock;
import org.apache.sysds.runtime.data.TensorBlock;
import org.apache.sysds.runtime.data.TensorIndexes;
import org.apache.sysds.runtime.instructions.cp.Data;
+import org.apache.sysds.runtime.instructions.spark.DeCompressionSPInstruction;
import org.apache.sysds.runtime.instructions.spark.data.BroadcastObject;
import org.apache.sysds.runtime.instructions.spark.data.LineageObject;
import org.apache.sysds.runtime.instructions.spark.data.PartitionedBlock;
@@ -1280,16 +1281,21 @@ public class SparkExecutionContext extends ExecutionContext
return out;
}
- @SuppressWarnings("unchecked")
+ // @SuppressWarnings("unchecked")
public static long writeMatrixRDDtoHDFS( RDDObject rdd, String path, FileFormat fmt )
{
JavaPairRDD<MatrixIndexes,MatrixBlock> lrdd = (JavaPairRDD<MatrixIndexes, MatrixBlock>) rdd.getRDD();
InputOutputInfo oinfo = InputOutputInfo.get(DataType.MATRIX, fmt);
+ // if compression is enabled decompress all blocks before writing to disk TEMPORARY MODIFICATION UNTILL MATRIXBLOCK IS MERGED WITH COMPRESSEDMATRIXBLOCK
+ if(ConfigurationManager.isCompressionEnabled())
+ lrdd = lrdd.mapValues(new DeCompressionSPInstruction.DeCompressionFunction());
+
//piggyback nnz maintenance on write
LongAccumulator aNnz = getSparkContextStatic().sc().longAccumulator("nnz");
lrdd = lrdd.mapValues(new ComputeBinaryBlockNnzFunction(aNnz));
+
//save file is an action which also triggers nnz maintenance
lrdd.saveAsHadoopFile(path,
oinfo.keyClass,
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/BinUaggChainSPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/BinUaggChainSPInstruction.java
index 4ea0284..39b2408 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/spark/BinUaggChainSPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/BinUaggChainSPInstruction.java
@@ -30,6 +30,12 @@ import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
import org.apache.sysds.runtime.matrix.operators.AggregateUnaryOperator;
import org.apache.sysds.runtime.matrix.operators.BinaryOperator;
+/**
+ * Instruction that performs
+ *
+ * res = X / rowsum(x)
+ *
+ */
public class BinUaggChainSPInstruction extends UnarySPInstruction {
// operators
private BinaryOperator _bOp = null;
diff --git a/src/test/java/org/apache/sysds/test/component/compress/CompressedMatrixTest.java b/src/test/java/org/apache/sysds/test/component/compress/CompressedMatrixTest.java
index 6115654..a998a74 100644
--- a/src/test/java/org/apache/sysds/test/component/compress/CompressedMatrixTest.java
+++ b/src/test/java/org/apache/sysds/test/component/compress/CompressedMatrixTest.java
@@ -29,6 +29,7 @@ import java.io.DataOutputStream;
import java.util.Collection;
import org.apache.commons.math3.random.Well1024a;
+import org.apache.sysds.common.Types.CorrectionLocationType;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
@@ -36,10 +37,15 @@ import org.apache.sysds.runtime.compress.CompressionSettingsBuilder;
import org.apache.sysds.runtime.compress.CompressionStatistics;
import org.apache.sysds.runtime.compress.colgroup.AColGroup;
import org.apache.sysds.runtime.compress.colgroup.AColGroup.CompressionType;
+import org.apache.sysds.runtime.functionobjects.KahanPlus;
+import org.apache.sysds.runtime.functionobjects.Multiply;
+import org.apache.sysds.runtime.functionobjects.ReduceAll;
import org.apache.sysds.runtime.matrix.data.LibMatrixCountDistinct;
import org.apache.sysds.runtime.matrix.data.LibMatrixDatagen;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.data.RandomMatrixGenerator;
+import org.apache.sysds.runtime.matrix.operators.AggregateOperator;
+import org.apache.sysds.runtime.matrix.operators.AggregateTernaryOperator;
import org.apache.sysds.runtime.matrix.operators.AggregateUnaryOperator;
import org.apache.sysds.runtime.matrix.operators.CountDistinctOperator;
import org.apache.sysds.runtime.matrix.operators.CountDistinctOperator.CountDistinctTypes;
@@ -570,6 +576,33 @@ public class CompressedMatrixTest extends AbstractCompressedUnaryTests {
cmb.copy(cmb);
}
+ @Test
+ public void testAggregateTernaryOperation() {
+ try {
+ if(!(cmb instanceof CompressedMatrixBlock) || rows * cols > 10000)
+ return;
+ CorrectionLocationType corr = CorrectionLocationType.LASTCOLUMN;
+ AggregateOperator agg = new AggregateOperator(0, KahanPlus.getKahanPlusFnObject(), corr);
+ AggregateTernaryOperator op = new AggregateTernaryOperator(Multiply.getMultiplyFnObject(), agg,
+ ReduceAll.getReduceAllFnObject());
+
+ int nrow = mb.getNumRows();
+ int ncol = mb.getNumColumns();
+
+ MatrixBlock m2 = new MatrixBlock(nrow, ncol, 13.0);
+ MatrixBlock m3 = new MatrixBlock(nrow, ncol, 14.0);
+
+ MatrixBlock ret1 = cmb.aggregateTernaryOperations(cmb, m2, m3, null, op, true);
+ MatrixBlock ret2 = mb.aggregateTernaryOperations(mb, m2, m3, null, op, true);
+
+ compareResultMatrices(ret2, ret1, 1);
+ }
+ catch(Exception e) {
+ e.printStackTrace();
+ throw new DMLRuntimeException(e);
+ }
+ }
+
private static long getJolSize(CompressedMatrixBlock cmb, CompressionStatistics cStat) {
Layouter l = new HotSpotLayouter(new X86_64_DataModel());
long jolEstimate = 0;
diff --git a/src/test/java/org/apache/sysds/test/component/compress/CompressedTestBase.java b/src/test/java/org/apache/sysds/test/component/compress/CompressedTestBase.java
index eb5ceca..c291235 100644
--- a/src/test/java/org/apache/sysds/test/component/compress/CompressedTestBase.java
+++ b/src/test/java/org/apache/sysds/test/component/compress/CompressedTestBase.java
@@ -31,7 +31,6 @@ import java.util.List;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.sysds.common.Types.CorrectionLocationType;
import org.apache.sysds.lops.MMTSJ.MMTSJType;
import org.apache.sysds.lops.MapMultChain.ChainType;
import org.apache.sysds.runtime.DMLRuntimeException;
@@ -55,22 +54,18 @@ import org.apache.sysds.runtime.functionobjects.Divide;
import org.apache.sysds.runtime.functionobjects.Equals;
import org.apache.sysds.runtime.functionobjects.GreaterThan;
import org.apache.sysds.runtime.functionobjects.GreaterThanEquals;
-import org.apache.sysds.runtime.functionobjects.KahanPlus;
import org.apache.sysds.runtime.functionobjects.LessThan;
import org.apache.sysds.runtime.functionobjects.LessThanEquals;
import org.apache.sysds.runtime.functionobjects.Minus;
import org.apache.sysds.runtime.functionobjects.Multiply;
import org.apache.sysds.runtime.functionobjects.Plus;
import org.apache.sysds.runtime.functionobjects.Power2;
-import org.apache.sysds.runtime.functionobjects.ReduceAll;
import org.apache.sysds.runtime.functionobjects.SwapIndex;
import org.apache.sysds.runtime.functionobjects.ValueFunction;
import org.apache.sysds.runtime.functionobjects.Xor;
import org.apache.sysds.runtime.instructions.InstructionUtils;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.operators.AggregateBinaryOperator;
-import org.apache.sysds.runtime.matrix.operators.AggregateOperator;
-import org.apache.sysds.runtime.matrix.operators.AggregateTernaryOperator;
import org.apache.sysds.runtime.matrix.operators.BinaryOperator;
import org.apache.sysds.runtime.matrix.operators.LeftScalarOperator;
import org.apache.sysds.runtime.matrix.operators.ReorgOperator;
@@ -1253,28 +1248,6 @@ public abstract class CompressedTestBase extends TestBase {
}
@Test
- public void aggregateTernaryOperations() {
- if(!(cmb instanceof CompressedMatrixBlock) || rows * cols > 10000)
- return;
-
- MatrixBlock m1 = new MatrixBlock();
- MatrixBlock m2 = new MatrixBlock();
- MatrixBlock m3 = null;
- CorrectionLocationType corr = CorrectionLocationType.LASTCOLUMN;
- AggregateOperator agg = new AggregateOperator(0, KahanPlus.getKahanPlusFnObject(), corr);
- AggregateTernaryOperator op = new AggregateTernaryOperator(Multiply.getMultiplyFnObject(), agg,
- ReduceAll.getReduceAllFnObject(), _k);
-
- boolean inCP = true;
-
- MatrixBlock ret1 = mb.aggregateTernaryOperations(m1, m2, m3, null, op, inCP);
- MatrixBlock ret2 = cmb.aggregateTernaryOperations(m1, m2, m3, null, op, inCP);
-
- compareResultMatrices(ret1, ret2, 1);
-
- }
-
- @Test
public void unaryOperations() {
if(!(cmb instanceof CompressedMatrixBlock) || cmb.getNumColumns() < 2)
return;
diff --git a/src/test/java/org/apache/sysds/test/component/compress/workload/WorkloadTest.java b/src/test/java/org/apache/sysds/test/component/compress/workload/WorkloadTest.java
index aacb716..9422e58 100644
--- a/src/test/java/org/apache/sysds/test/component/compress/workload/WorkloadTest.java
+++ b/src/test/java/org/apache/sysds/test/component/compress/workload/WorkloadTest.java
@@ -119,7 +119,7 @@ public class WorkloadTest {
args.put("$2", "FALSE");
args.put("$3", "0");
- tests.add(new Object[] {0, 0, 0, 1, 1, 1, 6, 0, true, false, "functions/lmDS.dml", args});
+ tests.add(new Object[] {0, 1, 0, 1, 1, 1, 6, 0, true, false, "functions/lmDS.dml", args});
tests.add(new Object[] {0, 0, 0, 1, 0, 1, 0, 0, true, true, "functions/lmDS.dml", args});
tests.add(new Object[] {0, 0, 0, 1, 10, 10, 1, 0, true, true, "functions/lmCG.dml", args});
@@ -134,15 +134,15 @@ public class WorkloadTest {
args.put("$1", testFile);
args.put("$2", "TRUE");
args.put("$3", "1");
- tests.add(new Object[] {0, 0, 1, 1, 1, 1, 1, 0, true, true, "functions/lmDS.dml", args});
- tests.add(new Object[] {0, 0, 1, 1, 11, 10, 2, 0, true, true, "functions/lmCG.dml", args});
+ tests.add(new Object[] {0, 1, 0, 0, 0, 0, 1, 0, false, true, "functions/lmDS.dml", args});
+ tests.add(new Object[] {0, 1, 1, 1, 11, 10, 2, 0, true, true, "functions/lmCG.dml", args});
args = new HashMap<>();
args.put("$1", testFile);
args.put("$2", "TRUE");
args.put("$3", "2");
- tests.add(new Object[] {0, 0, 1, 1, 1, 1, 3, 0, true, true, "functions/lmDS.dml", args});
- tests.add(new Object[] {0, 0, 1, 1, 11, 10, 4, 0, true, true, "functions/lmCG.dml", args});
+ tests.add(new Object[] {0, 1, 0, 0, 0, 0, 1, 0, false, true, "functions/lmDS.dml", args});
+ tests.add(new Object[] {0, 1, 1, 1, 11, 10, 2, 0, true, true, "functions/lmCG.dml", args});
args = new HashMap<>();
args.put("$1", testFile);
@@ -176,7 +176,7 @@ public class WorkloadTest {
CostEstimatorBuilder ceb = new CostEstimatorBuilder(wtr);
InstructionTypeCounter itc = ceb.getCounter();
- verify(wtr, itc, ceb);
+ verify(wtr, itc, ceb, scriptName, args);
}
catch(Exception e) {
e.printStackTrace();
@@ -184,9 +184,9 @@ public class WorkloadTest {
}
}
- private void verify(WTreeRoot wtr, InstructionTypeCounter itc, CostEstimatorBuilder ceb) {
+ private void verify(WTreeRoot wtr, InstructionTypeCounter itc, CostEstimatorBuilder ceb, String name, Map<String, String> args) {
- String errorString = wtr + "\n" + itc + " \n ";
+ String errorString = wtr + "\n" + itc + " \n " + name + " -- " + args + "\n";
Assert.assertEquals(errorString + "scans:", scans, itc.getScans());
Assert.assertEquals(errorString + "decompressions", decompressions, itc.getDecompressions());
Assert.assertEquals(errorString + "overlappingDecompressions", overlappingDecompressions,
@@ -197,7 +197,7 @@ public class WorkloadTest {
itc.getCompressedMultiplications());
Assert.assertEquals(errorString + "dictionaryOps", dictionaryOps, itc.getDictionaryOps());
Assert.assertEquals(errorString + "lookup", indexing, itc.getIndexing());
- Assert.assertEquals(shouldCompress, ceb.shouldTryToCompress());
+ Assert.assertEquals(errorString + "Should Compresss", shouldCompress, ceb.shouldTryToCompress());
}
private static WTreeRoot getWorkloadTree(DMLProgram prog) {
diff --git a/src/test/java/org/apache/sysds/test/functions/compress/workload/WorkloadAlgorithmTest.java b/src/test/java/org/apache/sysds/test/functions/compress/workload/WorkloadAlgorithmTest.java
index 9ffeb02..5de8880 100644
--- a/src/test/java/org/apache/sysds/test/functions/compress/workload/WorkloadAlgorithmTest.java
+++ b/src/test/java/org/apache/sysds/test/functions/compress/workload/WorkloadAlgorithmTest.java
@@ -73,7 +73,7 @@ public class WorkloadAlgorithmTest extends AutomatedTestBase {
@Test
public void testMLogRegCP() {
- runWorkloadAnalysisTest(TEST_NAME1, ExecMode.HYBRID, 2, false);
+ runWorkloadAnalysisTest(TEST_NAME1, ExecMode.HYBRID, 1, false);
}
@Test