You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by ba...@apache.org on 2021/09/25 21:43:21 UTC

[systemds] branch master updated: [SYSTEMDS-3145] Spark Memory Estimation of Compressed Objects

This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/master by this push:
     new b202f0f  [SYSTEMDS-3145] Spark Memory Estimation of Compressed Objects
b202f0f is described below

commit b202f0f1356d537fde7ef3989ca0c1675772f52c
Author: baunsgaard <ba...@tugraz.at>
AuthorDate: Sat Sep 25 21:47:25 2021 +0200

    [SYSTEMDS-3145] Spark Memory Estimation of Compressed Objects
    
    This commit fixes a bug where the estimate of the compressed objects
    in spark was corrupted by the soft reference to the uncompressed matrix
    blocks. To address this, we clear the soft reference once compression is
    done in spark instructions.
    
    Also contained in this commit, is slight changes to logging when
    using a spark instruction to compress, if we enable trace on the spark
    trace instruction, compression information is printed. And reduction
    in printing from each compression instruction.
---
 .../runtime/compress/CompressedMatrixBlock.java    |   2 +-
 .../compress/CompressedMatrixBlockFactory.java     | 113 +++++++++++----------
 .../runtime/compress/CompressionSettings.java      |  23 ++---
 .../compress/CompressionSettingsBuilder.java       |  13 ++-
 .../estim/CompressedSizeEstimatorFactory.java      |  12 ++-
 .../context/SparkExecutionContext.java             |   1 +
 .../spark/CompressionSPInstruction.java            |  51 +++++++---
 7 files changed, 131 insertions(+), 84 deletions(-)

diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
index 4ff5bcb..c4eb8fa 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
@@ -434,7 +434,7 @@ public class CompressedMatrixBlock extends MatrixBlock {
 			nonZeros = cg.getNumberNonZeros();
 			// clear the soft reference to the decompressed version, since the one column group is perfectly,
 			// representing the decompressed version.
-			decompressedVersion = null;
+			clearSoftReferenceToDecompressed();
 		}
 		// serialize compressed matrix block
 		out.writeInt(rlen);
diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java
index d77ab82..a1d802a 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java
@@ -227,6 +227,11 @@ public class CompressedMatrixBlockFactory {
 		if(res == null)
 			return abortCompression();
 
+		if(compSettings.isInSparkInstruction){
+			// clear soft reference to uncompressed block in case of spark.
+			res.clearSoftReferenceToDecompressed();
+		}
+
 		return new ImmutablePair<>(res, _stats);
 	}
 
@@ -476,58 +481,64 @@ public class CompressedMatrixBlockFactory {
 		setNextTimePhase(time.stop());
 		DMLCompressionStatistics.addCompressionTime(getLastTimePhase(), phase);
 		if(LOG.isDebugEnabled()) {
-			switch(phase) {
-				case 0:
-					LOG.debug("--compression phase " + phase + " Classify  : " + getLastTimePhase());
-					LOG.debug("--Individual Columns Estimated Compression: " + _stats.estimatedSizeCols);
-					break;
-				case 1:
-					LOG.debug("--compression phase " + phase + " Grouping  : " + getLastTimePhase());
-					LOG.debug("Grouping using: " + compSettings.columnPartitioner);
-					LOG.debug("Cost Calculated using: " + costEstimator);
-					LOG.debug("--Cocoded Columns estimated Compression:" + _stats.estimatedSizeCoCoded);
-					break;
-				case 2:
-					LOG.debug("--compression phase " + phase + " Transpose : " + getLastTimePhase());
-					LOG.debug("Did transpose: " + compSettings.transposed);
-					break;
-				case 3:
-					LOG.debug("--compression phase " + phase + " Compress  : " + getLastTimePhase());
-					LOG.debug("--compression Hash collisions:" + "(" + DblArrayIntListHashMap.hashMissCount + ","
-						+ DoubleCountHashMap.hashMissCount + ")");
-					DblArrayIntListHashMap.hashMissCount = 0;
-					DoubleCountHashMap.hashMissCount = 0;
-					LOG.debug("--compressed initial actual size:" + _stats.compressedInitialSize);
-					break;
-				case 4:
-					LOG.debug("--compression phase " + phase + " Share     : " + getLastTimePhase());
-					break;
-				case 5:
-					LOG.debug("--num col groups: " + res.getColGroups().size());
-					LOG.debug("--compression phase " + phase + " Cleanup   : " + getLastTimePhase());
-					LOG.debug("--col groups types " + _stats.getGroupsTypesString());
-					LOG.debug("--col groups sizes " + _stats.getGroupsSizesString());
-					LOG.debug("--dense size:        " + _stats.denseSize);
-					LOG.debug("--original size:     " + _stats.originalSize);
-					LOG.debug("--compressed size:   " + _stats.size);
-					LOG.debug("--compression ratio: " + _stats.getRatio());
-					LOG.debug("--Dense       ratio: " + _stats.getDenseRatio());
-					int[] lengths = new int[res.getColGroups().size()];
-					int i = 0;
-					for(AColGroup colGroup : res.getColGroups())
-						lengths[i++] = colGroup.getNumValues();
-
-					LOG.debug("--compressed colGroup dictionary sizes: " + Arrays.toString(lengths));
-					if(LOG.isTraceEnabled()) {
-						for(AColGroup colGroup : res.getColGroups()) {
-							LOG.trace("--colGroups type       : " + colGroup.getClass().getSimpleName() + " size: "
-								+ colGroup.estimateInMemorySize()
-								+ ((colGroup instanceof ColGroupValue) ? "  numValues :"
-									+ ((ColGroupValue) colGroup).getNumValues() : "")
-								+ "  colIndexes : " + Arrays.toString(colGroup.getColIndices()));
+			if(compSettings.isInSparkInstruction) {
+				if(phase == 5)
+					LOG.debug(_stats);
+			}
+			else {
+				switch(phase) {
+					case 0:
+						LOG.debug("--compression phase " + phase + " Classify  : " + getLastTimePhase());
+						LOG.debug("--Individual Columns Estimated Compression: " + _stats.estimatedSizeCols);
+						break;
+					case 1:
+						LOG.debug("--compression phase " + phase + " Grouping  : " + getLastTimePhase());
+						LOG.debug("Grouping using: " + compSettings.columnPartitioner);
+						LOG.debug("Cost Calculated using: " + costEstimator);
+						LOG.debug("--Cocoded Columns estimated Compression:" + _stats.estimatedSizeCoCoded);
+						break;
+					case 2:
+						LOG.debug("--compression phase " + phase + " Transpose : " + getLastTimePhase());
+						LOG.debug("Did transpose: " + compSettings.transposed);
+						break;
+					case 3:
+						LOG.debug("--compression phase " + phase + " Compress  : " + getLastTimePhase());
+						LOG.debug("--compression Hash collisions:" + "(" + DblArrayIntListHashMap.hashMissCount + ","
+							+ DoubleCountHashMap.hashMissCount + ")");
+						DblArrayIntListHashMap.hashMissCount = 0;
+						DoubleCountHashMap.hashMissCount = 0;
+						LOG.debug("--compressed initial actual size:" + _stats.compressedInitialSize);
+						break;
+					case 4:
+						LOG.debug("--compression phase " + phase + " Share     : " + getLastTimePhase());
+						break;
+					case 5:
+						LOG.debug("--num col groups: " + res.getColGroups().size());
+						LOG.debug("--compression phase " + phase + " Cleanup   : " + getLastTimePhase());
+						LOG.debug("--col groups types " + _stats.getGroupsTypesString());
+						LOG.debug("--col groups sizes " + _stats.getGroupsSizesString());
+						LOG.debug("--dense size:        " + _stats.denseSize);
+						LOG.debug("--original size:     " + _stats.originalSize);
+						LOG.debug("--compressed size:   " + _stats.size);
+						LOG.debug("--compression ratio: " + _stats.getRatio());
+						LOG.debug("--Dense       ratio: " + _stats.getDenseRatio());
+						int[] lengths = new int[res.getColGroups().size()];
+						int i = 0;
+						for(AColGroup colGroup : res.getColGroups())
+							lengths[i++] = colGroup.getNumValues();
+
+						LOG.debug("--compressed colGroup dictionary sizes: " + Arrays.toString(lengths));
+						if(LOG.isTraceEnabled()) {
+							for(AColGroup colGroup : res.getColGroups()) {
+								LOG.trace("--colGroups type       : " + colGroup.getClass().getSimpleName() + " size: "
+									+ colGroup.estimateInMemorySize()
+									+ ((colGroup instanceof ColGroupValue) ? "  numValues :"
+										+ ((ColGroupValue) colGroup).getNumValues() : "")
+									+ "  colIndexes : " + Arrays.toString(colGroup.getColIndices()));
+							}
 						}
-					}
-				default:
+					default:
+				}
 			}
 		}
 		phase++;
diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java b/src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java
index c1a9cd4..c871a64 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java
@@ -87,14 +87,10 @@ public class CompressionSettings {
 	 */
 	public final EnumSet<CompressionType> validCompressions;
 
-	/**
-	 * The minimum size of the sample extracted.
-	 */
+	/** The minimum size of the sample extracted. */
 	public final int minimumSampleSize;
 
-	/**
-	 * The maximum size of the sample extracted.
-	 */
+	/** The maximum size of the sample extracted. */
 	public final int maxSampleSize;
 
 	/** The sample type used for sampling */
@@ -108,15 +104,17 @@ public class CompressionSettings {
 	 */
 	public boolean transposed = false;
 
-	/**
-	 * The minimum compression ratio to achieve.
-	 */
+	/** The minimum compression ratio to achieve. */
 	public final double minimumCompressionRatio;
 
+	/** Is a spark instruction */
+	public final boolean isInSparkInstruction;
+
 	protected CompressionSettings(double samplingRatio, boolean allowSharedDictionary, String transposeInput, int seed,
 		boolean lossy, EnumSet<CompressionType> validCompressions, boolean sortValuesByLength,
-		PartitionerType columnPartitioner, int maxColGroupCoCode, double coCodePercentage, int minimumSampleSize, int maxSampleSize,
-		EstimationType estimationType, CostType costComputationType, double minimumCompressionRatio) {
+		PartitionerType columnPartitioner, int maxColGroupCoCode, double coCodePercentage, int minimumSampleSize,
+		int maxSampleSize, EstimationType estimationType, CostType costComputationType, double minimumCompressionRatio,
+		boolean isInSparkInstruction) {
 		this.samplingRatio = samplingRatio;
 		this.allowSharedDictionary = allowSharedDictionary;
 		this.transposeInput = transposeInput;
@@ -128,10 +126,11 @@ public class CompressionSettings {
 		this.maxColGroupCoCode = maxColGroupCoCode;
 		this.coCodePercentage = coCodePercentage;
 		this.minimumSampleSize = minimumSampleSize;
-		this.maxSampleSize= maxSampleSize;
+		this.maxSampleSize = maxSampleSize;
 		this.estimationType = estimationType;
 		this.costComputationType = costComputationType;
 		this.minimumCompressionRatio = minimumCompressionRatio;
+		this.isInSparkInstruction = isInSparkInstruction;
 		if(LOG.isDebugEnabled())
 			LOG.debug(this);
 	}
diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressionSettingsBuilder.java b/src/main/java/org/apache/sysds/runtime/compress/CompressionSettingsBuilder.java
index d5fd036..30bb1d1 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/CompressionSettingsBuilder.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/CompressionSettingsBuilder.java
@@ -47,6 +47,7 @@ public class CompressionSettingsBuilder {
 	private PartitionerType columnPartitioner;
 	private CostType costType;
 	private double minimumCompressionRatio = 1.0;
+	private boolean isInSparkInstruction = false;
 
 	public CompressionSettingsBuilder() {
 
@@ -293,6 +294,16 @@ public class CompressionSettingsBuilder {
 	}
 
 	/**
+	 * Inform the compression that it is executed in a spark instruction.
+	 * 
+	 * @return The CompressionSettingsBuilder
+	 */
+	public CompressionSettingsBuilder setIsInSparkInstruction() {
+		this.isInSparkInstruction = true;
+		return this;
+	}
+
+	/**
 	 * Create the CompressionSettings object to use in the compression.
 	 * 
 	 * @return The CompressionSettings
@@ -300,6 +311,6 @@ public class CompressionSettingsBuilder {
 	public CompressionSettings create() {
 		return new CompressionSettings(samplingRatio, allowSharedDictionary, transposeInput, seed, lossy,
 			validCompressions, sortValuesByLength, columnPartitioner, maxColGroupCoCode, coCodePercentage,
-			minimumSampleSize, maxSampleSize, estimationType, costType, minimumCompressionRatio);
+			minimumSampleSize, maxSampleSize, estimationType, costType, minimumCompressionRatio, isInSparkInstruction);
 	}
 }
diff --git a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorFactory.java b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorFactory.java
index a9a8e44..8eb2da0 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorFactory.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/estim/CompressedSizeEstimatorFactory.java
@@ -43,16 +43,19 @@ public class CompressedSizeEstimatorFactory {
 		else {
 			if(shouldUseExactEstimator(cs, nRows, sampleSize, nnzRows)) {
 				if(sampleSize > nnzRows && nRows > 10000 && nCols > 10 && !cs.transposed) {
-					LOG.info("Transposing for exact estimator");
+					if(! cs.isInSparkInstruction)
+						LOG.info("Transposing for exact estimator");
 					data = LibMatrixReorg.transpose(data,
 						new MatrixBlock(data.getNumColumns(), data.getNumRows(), data.isInSparseFormat()), k);
 					cs.transposed = true;
 				}
-				LOG.info("Using Exact estimator");
+				if(! cs.isInSparkInstruction)
+					LOG.info("Using Exact estimator");
 				return new CompressedSizeEstimatorExact(data, cs);
 			}
 			else {
-				LOG.info("Trying sample size: " + sampleSize);
+				if(! cs.isInSparkInstruction)
+					LOG.info("Trying sample size: " + sampleSize);
 				return tryToMakeSampleEstimator(data, cs, sampleRatio, sampleSize, nRows, nnzRows, k);
 			}
 		}
@@ -64,7 +67,8 @@ public class CompressedSizeEstimatorFactory {
 		CompressedSizeEstimatorSample estS = new CompressedSizeEstimatorSample(data, cs, sampleSize, k);
 		int double_number = 1;
 		while(estS.getSample() == null) {
-			LOG.warn("Doubling sample size " + double_number++);
+			if(! cs.isInSparkInstruction)
+				LOG.warn("Doubling sample size " + double_number++);
 			sampleSize = sampleSize * 2;
 			if(shouldUseExactEstimator(cs, nRows, sampleSize, nnzRows))
 				return new CompressedSizeEstimatorExact(data, cs);
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
index 921219a..ca73700 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
@@ -269,6 +269,7 @@ public class SparkExecutionContext extends ExecutionContext
 		final String threads = ConfigurationManager.getDMLConfig().getTextValue(DMLConfig.LOCAL_SPARK_NUM_THREADS);
 		conf.setMaster("local[" + threads + "]");
 		conf.setAppName("LocalSparkContextApp");
+		conf.set("spark.ui.showConsoleProgress", "false");
 		conf.set("spark.ui.enabled", "false");
 	}
 
diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/CompressionSPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/CompressionSPInstruction.java
index e6b62ee..49f7ba5 100644
--- a/src/main/java/org/apache/sysds/runtime/instructions/spark/CompressionSPInstruction.java
+++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/CompressionSPInstruction.java
@@ -25,9 +25,13 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.function.Function;
+import org.apache.spark.storage.RDDInfo;
+import org.apache.spark.storage.StorageLevel;
 import org.apache.sysds.runtime.compress.CompressedMatrixBlockFactory;
+import org.apache.sysds.runtime.compress.CompressionSettingsBuilder;
 import org.apache.sysds.runtime.compress.SingletonLookupHashMap;
 import org.apache.sysds.runtime.compress.cost.CostEstimatorBuilder;
+import org.apache.sysds.runtime.compress.cost.CostEstimatorFactory.CostType;
 import org.apache.sysds.runtime.compress.cost.ICostEstimate;
 import org.apache.sysds.runtime.compress.workload.WTreeRoot;
 import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
@@ -87,9 +91,25 @@ public class CompressionSPInstruction extends UnarySPInstruction {
 		// execute compression
 		JavaPairRDD<MatrixIndexes, MatrixBlock> out = in.mapValues(mappingFunction);
 		if(LOG.isTraceEnabled()) {
-			out.checkpoint();
-			LOG.trace("\nSpark compressed    : " + reduceSizes(out.mapValues(new SizeFunction()).collect())
-				+ "\nSpark uncompressed  : " + reduceSizes(in.mapValues(new SizeFunction()).collect()));
+			in.persist(StorageLevel.MEMORY_AND_DISK());
+			out.persist(StorageLevel.MEMORY_AND_DISK());
+			long sparkSizeIn = 0;
+			long sparkSizeOut = 0;
+			long blockSizesIn = reduceSizes(in.mapValues(new SizeFunction()).collect());
+			long blockSizesOut = reduceSizes(out.mapValues(new SizeFunction()).collect());
+			for(RDDInfo info : sec.getSparkContext().sc().getRDDStorageInfo()) {
+				if(info.id() == out.id())
+					sparkSizeOut = info.memSize();
+				else if(info.id() == in.id())
+					sparkSizeIn = info.memSize();
+			}
+			StringBuilder sb = new StringBuilder();
+			sb.append("Spark Compression Instruction sizes:");
+			sb.append(String.format("\nSBCompress: InSize:       %16d", sparkSizeIn));
+			sb.append(String.format("\nSBCompress: InBlockSize:  %16d", blockSizesIn));
+			sb.append(String.format("\nSBCompress: OutSize:      %16d", sparkSizeOut));
+			sb.append(String.format("\nSBCompress: OutBlockSize: %16d", blockSizesOut));
+			LOG.trace(sb.toString());
 		}
 
 		// set outputs
@@ -102,7 +122,9 @@ public class CompressionSPInstruction extends UnarySPInstruction {
 
 		@Override
 		public MatrixBlock call(MatrixBlock arg0) throws Exception {
-			return CompressedMatrixBlockFactory.compress(arg0).getLeft();
+			CompressionSettingsBuilder csb = new CompressionSettingsBuilder().setIsInSparkInstruction()
+				.setCostType(CostType.MEMORY);
+			return CompressedMatrixBlockFactory.compress(arg0, csb).getLeft();
 		}
 	}
 
@@ -117,13 +139,14 @@ public class CompressionSPInstruction extends UnarySPInstruction {
 
 		@Override
 		public MatrixBlock call(MatrixBlock arg0) throws Exception {
-			ICostEstimate a = costBuilder.create(arg0.getNumRows(), arg0.getNumColumns());
-			return CompressedMatrixBlockFactory.compress(arg0, InfrastructureAnalyzer.getLocalParallelism(), a)
+			ICostEstimate ce = costBuilder.create(arg0.getNumRows(), arg0.getNumColumns());
+			CompressionSettingsBuilder csb = new CompressionSettingsBuilder().setIsInSparkInstruction();
+			return CompressedMatrixBlockFactory.compress(arg0, InfrastructureAnalyzer.getLocalParallelism(), csb, ce)
 				.getLeft();
 		}
 	}
 
-	public static class SizeFunction implements Function<MatrixBlock, Double> {
+	public static class SizeFunction implements Function<MatrixBlock, Long> {
 		private static final long serialVersionUID = 1L;
 
 		public SizeFunction() {
@@ -131,17 +154,15 @@ public class CompressionSPInstruction extends UnarySPInstruction {
 		}
 
 		@Override
-		public Double call(MatrixBlock arg0) throws Exception {
-			return (double) arg0.getInMemorySize();
+		public Long call(MatrixBlock arg0) throws Exception {
+			return arg0.getInMemorySize();
 		}
 	}
 
-	public static String reduceSizes(List<Tuple2<MatrixIndexes, Double>> in) {
-		double sum = 0;
-		for(Tuple2<MatrixIndexes, Double> e : in) {
+	public static Long reduceSizes(List<Tuple2<MatrixIndexes, Long>> in) {
+		long sum = 0;
+		for(Tuple2<MatrixIndexes, Long> e : in)
 			sum += e._2();
-		}
-
-		return "sum: " + sum + " mean: " + (sum / in.size());
+		return sum;
 	}
 }