You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemds.apache.org by ba...@apache.org on 2021/09/25 18:35:56 UTC
[systemds] 02/02: [SYSTEMDS-3144] Spark Local Context from command
line
This is an automated email from the ASF dual-hosted git repository.
baunsgaard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemds.git
commit 535263b28e4f36b7575f487bcf9967d9d42bffdd
Author: baunsgaard <ba...@tugraz.at>
AuthorDate: Sat Sep 25 19:54:01 2021 +0200
[SYSTEMDS-3144] Spark Local Context from command line
This commit adds the ability to start a systemDS instance with a
local spark context, this enables us to use our spark instructions
even without a spark cluster.
Also added in this commit is a fallback to our a local spark instance,
in case the spark context is tried to be created but fails.
Closes #1398
Closes #1399
---
src/main/java/org/apache/sysds/conf/DMLConfig.java | 4 +-
.../context/SparkExecutionContext.java | 50 ++++++++++++++--------
2 files changed, 36 insertions(+), 18 deletions(-)
diff --git a/src/main/java/org/apache/sysds/conf/DMLConfig.java b/src/main/java/org/apache/sysds/conf/DMLConfig.java
index a59101b..6194bc2 100644
--- a/src/main/java/org/apache/sysds/conf/DMLConfig.java
+++ b/src/main/java/org/apache/sysds/conf/DMLConfig.java
@@ -89,7 +89,8 @@ public class DMLConfig
public static final String SYNCHRONIZE_GPU = "sysds.gpu.sync.postProcess"; // boolean: whether to synchronize GPUs after every instruction
public static final String EAGER_CUDA_FREE = "sysds.gpu.eager.cudaFree"; // boolean: whether to perform eager CUDA free on rmvar
public static final String GPU_EVICTION_POLICY = "sysds.gpu.eviction.policy"; // string: can be lru, lfu, min_evict
- public static final String LOCAL_SPARK_NUM_THREADS = "sysds.local.spark.number.threads";
+ public static final String USE_LOCAL_SPARK_CONFIG = "sysds.local.spark"; // If set to true, it forces spark execution to a local spark context.
+ public static final String LOCAL_SPARK_NUM_THREADS = "sysds.local.spark.number.threads"; // the number of threads allowed to be used in the local spark configuration, default is * to enable use of all threads.
public static final String LINEAGECACHESPILL = "sysds.lineage.cachespill"; // boolean: whether to spill cache entries to disk
public static final String COMPILERASSISTED_RW = "sysds.lineage.compilerassisted"; // boolean: whether to apply compiler assisted rewrites
@@ -152,6 +153,7 @@ public class DMLConfig
_defaultVals.put(GPU_MEMORY_ALLOCATOR, "cuda");
_defaultVals.put(AVAILABLE_GPUS, "-1");
_defaultVals.put(GPU_EVICTION_POLICY, "min_evict");
+ _defaultVals.put(USE_LOCAL_SPARK_CONFIG, "false");
_defaultVals.put(LOCAL_SPARK_NUM_THREADS, "*"); // * Means it allocates the number of available threads on the local host machine.
_defaultVals.put(SYNCHRONIZE_GPU, "false" );
_defaultVals.put(EAGER_CUDA_FREE, "false" );
diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
index ca3f69c..67efd5c 100644
--- a/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
+++ b/src/main/java/org/apache/sysds/runtime/controlprogram/context/SparkExecutionContext.java
@@ -42,6 +42,7 @@ import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.storage.RDDInfo;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.util.LongAccumulator;
+import org.apache.sysds.api.DMLException;
import org.apache.sysds.api.DMLScript;
import org.apache.sysds.api.mlcontext.MLContext;
import org.apache.sysds.api.mlcontext.MLContextUtil;
@@ -213,23 +214,15 @@ public class SparkExecutionContext extends ExecutionContext
}
else
{
- if(DMLScript.USE_LOCAL_SPARK_CONFIG) {
- // For now set 4 cores for integration testing :)
- SparkConf conf = createSystemDSSparkConf()
- .setMaster("local[" +
- ConfigurationManager.getDMLConfig().getTextValue(DMLConfig.LOCAL_SPARK_NUM_THREADS)+
- "]").setAppName("My local integration test app");
- // This is discouraged in spark but have added only for those testcase that cannot stop the context properly
- // conf.set("spark.driver.allowMultipleContexts", "true");
- conf.set("spark.ui.enabled", "false");
- _spctx = new JavaSparkContext(conf);
- }
- else //default cluster setup
- {
- //setup systemds-preferred spark configuration (w/o user choice)
- SparkConf conf = createSystemDSSparkConf();
- _spctx = new JavaSparkContext(conf);
- }
+ final SparkConf conf = createSystemDSSparkConf();
+ final DMLConfig dmlConfig= ConfigurationManager.getDMLConfig();
+ // Use Spark local config, if already set to True ... keep true, otherwise look up if it should be local.
+ DMLScript.USE_LOCAL_SPARK_CONFIG = DMLScript.USE_LOCAL_SPARK_CONFIG ? true : dmlConfig.getBooleanValue(DMLConfig.USE_LOCAL_SPARK_CONFIG);
+
+ if(DMLScript.USE_LOCAL_SPARK_CONFIG)
+ setLocalConfSettings(conf);
+
+ _spctx = createContext(conf);
_parRDDs.clear();
}
@@ -253,6 +246,29 @@ public class SparkExecutionContext extends ExecutionContext
}
}
+
+ private static JavaSparkContext createContext(SparkConf conf){
+ try{
+ return new JavaSparkContext(conf);
+ }
+ catch(Exception e){
+ if(e.getMessage().contains("A master URL must be set in your configuration")){
+ LOG.error("Error constructing Spark Context, falling back to local Spark context creation");
+ setLocalConfSettings(conf);
+ return createContext(conf);
+ }
+ else
+ throw new DMLException("Error while creating Spark context", e);
+ }
+ }
+
+ private static void setLocalConfSettings(SparkConf conf){
+ final String threads = ConfigurationManager.getDMLConfig().getTextValue(DMLConfig.LOCAL_SPARK_NUM_THREADS);
+ conf.setMaster("local[" + threads + "]");
+ conf.setAppName("LocalSparkContextApp");
+ conf.set("spark.ui.enabled", "false");
+ }
+
/**
* Sets up a SystemDS-preferred Spark configuration based on the implicit
* default configuration (as passed via configurations from outside).