You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by ar...@apache.org on 2023/05/28 20:40:00 UTC

[systemds] branch main updated: [SYSTEMDS-3577] Fix default storage memory fractions in Spark

This is an automated email from the ASF dual-hosted git repository.

arnabp20 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new 4a13b7a3c8 [SYSTEMDS-3577] Fix default storage memory fractions in Spark
4a13b7a3c8 is described below

commit 4a13b7a3c8835eeeda32245b4067ef5a2ed8b363
Author: Arnab Phani <ph...@gmail.com>
AuthorDate: Sun May 28 13:25:15 2023 +0200

    [SYSTEMDS-3577] Fix default storage memory fractions in Spark
    
    This patch fixes a bug in analyzing Spark configurations for memory.
    The current code wrongly assumes the storage fraction relative to the
    heap, instead of the unified memory (execution + storage).
    
    Closes #1835
---
 .../context/SparkExecutionContext.java              | 21 +++++++++++++--------
 1 file changed, 13 insertions(+), 8 deletions(-)

diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
index ce7b43972d..371ec3db51 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
@@ -1804,9 +1804,10 @@ public class SparkExecutionContext extends ExecutionContext
 	 */
 	public static class SparkClusterConfig
 	{
-		//broadcasts are stored in mem-and-disk in data space, this config
-		//defines the fraction of data space to be used as broadcast budget
-		private static final double BROADCAST_DATA_FRACTION = 0.35;
+		//broadcasts are stored in mem-and-disk in storage space, this config
+		//defines the fraction of min storage space to be used as broadcast budget
+		private static final double BROADCAST_DATA_FRACTION = 0.70;
+		private static final double BROADCAST_DATA_FRACTION_LEGACY = 0.35;
 
 		//forward private config from Spark's UnifiedMemoryManager.scala (>1.6)
 		private static final long RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024;
@@ -1894,7 +1895,7 @@ public class SparkExecutionContext extends ExecutionContext
 			double dataFrac = sconf.getDouble("spark.storage.memoryFraction", 0.6); //default 60%
 			_memDataMinFrac = dataFrac;
 			_memDataMaxFrac = dataFrac;
-			_memBroadcastFrac = dataFrac * BROADCAST_DATA_FRACTION; //default 18%
+			_memBroadcastFrac = dataFrac * BROADCAST_DATA_FRACTION_LEGACY; //default 18%
 
 			//analyze spark degree of parallelism
 			analyzeSparkParallelismConfiguation(sconf);
@@ -1910,10 +1911,14 @@ public class SparkExecutionContext extends ExecutionContext
 					- RESERVED_SYSTEM_MEMORY_BYTES;
 
 			//get data and shuffle memory ratios (defaults not specified in job conf)
-			_memDataMinFrac = sconf.getDouble("spark.memory.storageFraction", 0.5); //default 50%
-			_memDataMaxFrac = sconf.getDouble("spark.memory.fraction", 0.6); //default 60%
-			_memBroadcastFrac = _memDataMaxFrac * BROADCAST_DATA_FRACTION; //default 21%
-			
+			//first get the unified memory fraction (60%) comprising execution and storage
+			double unifiedMem = sconf.getDouble("spark.memory.fraction", 0.6);
+			//minimum default storage expressed as 50% of unified memory (= 30% of heap)
+			_memDataMinFrac = unifiedMem * sconf.getDouble("spark.memory.storageFraction", 0.5);
+			//storage memory can expand and take up the full unified memory
+			_memDataMaxFrac = unifiedMem;
+			//Heuristic-based broadcast fraction (70% of min storage = 21% of heap)
+			_memBroadcastFrac = _memDataMinFrac * BROADCAST_DATA_FRACTION;
 			//analyze spark degree of parallelism
 			analyzeSparkParallelismConfiguation(sconf);
 		}