You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by ba...@apache.org on 2021/09/25 18:35:56 UTC

[systemds] 02/02: [SYSTEMDS-3144] Spark Local Context from command line

This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git

commit 535263b28e4f36b7575f487bcf9967d9d42bffdd
Author: baunsgaard <ba...@tugraz.at>
AuthorDate: Sat Sep 25 19:54:01 2021 +0200

    [SYSTEMDS-3144] Spark Local Context from command line
    
    This commit adds the ability to start a systemDS instance with a
    local spark context, this enables us to use our spark instructions
    even without a spark cluster.
    
    Also added in this commit is a fallback to our a local spark instance,
    in case the spark context is tried to be created but fails.
    
    Closes #1398
    Closes #1399
---
 src/main/java/org/apache/sysds/conf/DMLConfig.java |  4 +-
 .../context/SparkExecutionContext.java             | 50 ++++++++++++++--------
 2 files changed, 36 insertions(+), 18 deletions(-)

diff --git a/src/main/java/org/apache/sysds/conf/DMLConfig.java b/src/main/java/org/apache/sysds/conf/DMLConfig.java
index a59101b..6194bc2 100644
--- a/src/main/java/org/apache/sysds/conf/DMLConfig.java
+++ b/src/main/java/org/apache/sysds/conf/DMLConfig.java
@@ -89,7 +89,8 @@ public class DMLConfig
 	public static final String SYNCHRONIZE_GPU      = "sysds.gpu.sync.postProcess"; // boolean: whether to synchronize GPUs after every instruction
 	public static final String EAGER_CUDA_FREE      = "sysds.gpu.eager.cudaFree"; // boolean: whether to perform eager CUDA free on rmvar
 	public static final String GPU_EVICTION_POLICY  = "sysds.gpu.eviction.policy"; // string: can be lru, lfu, min_evict
-	public static final String LOCAL_SPARK_NUM_THREADS = "sysds.local.spark.number.threads";
+	public static final String USE_LOCAL_SPARK_CONFIG = "sysds.local.spark"; // If set to true, it forces spark execution to a local spark context.
+	public static final String LOCAL_SPARK_NUM_THREADS = "sysds.local.spark.number.threads"; // the number of threads allowed to be used in the local spark configuration, default is * to enable use of all threads.
 	public static final String LINEAGECACHESPILL    = "sysds.lineage.cachespill"; // boolean: whether to spill cache entries to disk
 	public static final String COMPILERASSISTED_RW  = "sysds.lineage.compilerassisted"; // boolean: whether to apply compiler assisted rewrites
 	
@@ -152,6 +153,7 @@ public class DMLConfig
 		_defaultVals.put(GPU_MEMORY_ALLOCATOR,   "cuda");
 		_defaultVals.put(AVAILABLE_GPUS,         "-1");
 		_defaultVals.put(GPU_EVICTION_POLICY,    "min_evict");
+		_defaultVals.put(USE_LOCAL_SPARK_CONFIG, "false");
 		_defaultVals.put(LOCAL_SPARK_NUM_THREADS, "*"); // * Means it allocates the number of available threads on the local host machine.
 		_defaultVals.put(SYNCHRONIZE_GPU,        "false" );
 		_defaultVals.put(EAGER_CUDA_FREE,        "false" );
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
index ca3f69c..67efd5c 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
@@ -42,6 +42,7 @@ import org.apache.spark.broadcast.Broadcast;
 import org.apache.spark.storage.RDDInfo;
 import org.apache.spark.storage.StorageLevel;
 import org.apache.spark.util.LongAccumulator;
+import org.apache.sysds.api.DMLException;
 import org.apache.sysds.api.DMLScript;
 import org.apache.sysds.api.mlcontext.MLContext;
 import org.apache.sysds.api.mlcontext.MLContextUtil;
@@ -213,23 +214,15 @@ public class SparkExecutionContext extends ExecutionContext
 		}
 		else
 		{
-			if(DMLScript.USE_LOCAL_SPARK_CONFIG) {
-				// For now set 4 cores for integration testing :)
-				SparkConf conf = createSystemDSSparkConf()
-						.setMaster("local[" +
-							ConfigurationManager.getDMLConfig().getTextValue(DMLConfig.LOCAL_SPARK_NUM_THREADS)+
-							"]").setAppName("My local integration test app");
-				// This is discouraged in spark but have added only for those testcase that cannot stop the context properly
-				// conf.set("spark.driver.allowMultipleContexts", "true");
-				conf.set("spark.ui.enabled", "false");
-				_spctx = new JavaSparkContext(conf);
-			}
-			else //default cluster setup
-			{
-				//setup systemds-preferred spark configuration (w/o user choice)
-				SparkConf conf = createSystemDSSparkConf();
-				_spctx = new JavaSparkContext(conf);
-			}
+			final SparkConf conf = createSystemDSSparkConf();
+			final DMLConfig dmlConfig= ConfigurationManager.getDMLConfig();
+			// Use Spark local config, if already set to True ... keep true, otherwise look up if it should be local.
+			DMLScript.USE_LOCAL_SPARK_CONFIG = DMLScript.USE_LOCAL_SPARK_CONFIG ? true : dmlConfig.getBooleanValue(DMLConfig.USE_LOCAL_SPARK_CONFIG);
+			
+			if(DMLScript.USE_LOCAL_SPARK_CONFIG)
+				setLocalConfSettings(conf);
+			
+			_spctx = createContext(conf);
 
 			_parRDDs.clear();
 		}
@@ -253,6 +246,29 @@ public class SparkExecutionContext extends ExecutionContext
 		}
 	}
 
+
+	private static JavaSparkContext createContext(SparkConf conf){
+		try{
+			return new JavaSparkContext(conf);
+		} 
+		catch(Exception e){
+			if(e.getMessage().contains("A master URL must be set in your configuration")){
+				LOG.error("Error constructing Spark Context, falling back to local Spark context creation");
+				setLocalConfSettings(conf);
+				return createContext(conf);
+			}
+			else
+				throw new DMLException("Error while creating Spark context", e);
+		}
+	}
+
+	private static void setLocalConfSettings(SparkConf conf){
+		final String threads = ConfigurationManager.getDMLConfig().getTextValue(DMLConfig.LOCAL_SPARK_NUM_THREADS);
+		conf.setMaster("local[" + threads + "]");
+		conf.setAppName("LocalSparkContextApp");
+		conf.set("spark.ui.enabled", "false");
+	}
+
 	/**
 	 * Sets up a SystemDS-preferred Spark configuration based on the implicit
 	 * default configuration (as passed via configurations from outside).