You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/11/19 09:23:43 UTC

[spark] branch master updated: [SPARK-41188][CORE][ML] Set executorEnv OMP_NUM_THREADS to be spark.task.cpus by default for spark executor JVM processes

This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 82a41d8ca27 [SPARK-41188][CORE][ML] Set executorEnv OMP_NUM_THREADS to be spark.task.cpus by default for spark executor JVM processes
82a41d8ca27 is described below

commit 82a41d8ca273e7a93333268324c6958f8bb14d9e
Author: Weichen Xu <we...@databricks.com>
AuthorDate: Sat Nov 19 17:23:20 2022 +0800

    [SPARK-41188][CORE][ML] Set executorEnv OMP_NUM_THREADS to be spark.task.cpus by default for spark executor JVM processes
    
    Signed-off-by: Weichen Xu <weichen.xudatabricks.com>
    
    ### What changes were proposed in this pull request?
    
    Set executorEnv OMP_NUM_THREADS to be spark.task.cpus by default for spark executor JVM processes.
    
    ### Why are the changes needed?
    
    This is for limiting the thread number for OpenBLAS routine to the number of cores assigned to this executor because some spark ML algorithms calls OpenBlAS via netlib-java,
    e.g.:
    Spark ALS estimator training calls LAPACK API `dppsv` (internally it will call BLAS lib), if it calls OpenBLAS lib, by default OpenBLAS will try to use all CPU cores. But spark will launch multiple spark tasks on a spark worker, and each spark task might call `dppsv` API at the same time, and each call internally it will create multiple threads (threads number equals to CPU cores), this causes CPU oversubscription.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Manually.
    
    Closes #38699 from WeichenXu123/SPARK-41188.
    
    Authored-by: Weichen Xu <we...@databricks.com>
    Signed-off-by: Weichen Xu <we...@databricks.com>
---
 core/src/main/scala/org/apache/spark/SparkContext.scala        | 10 ++++++++++
 .../main/scala/org/apache/spark/api/python/PythonRunner.scala  |  7 -------
 .../scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala | 10 ++++++----
 3 files changed, 16 insertions(+), 11 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index d2c7067e596..5cbf2e83371 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -541,6 +541,16 @@ class SparkContext(config: SparkConf) extends Logging {
     executorEnvs ++= _conf.getExecutorEnv
     executorEnvs("SPARK_USER") = sparkUser
 
+    if (_conf.getOption("spark.executorEnv.OMP_NUM_THREADS").isEmpty) {
+      // if OMP_NUM_THREADS is not explicitly set, override it with the value of "spark.task.cpus"
+      // SPARK-41188: limit the thread number for OpenBLAS routine to the number of cores assigned
+      // to this executor because some spark ML algorithms calls OpenBlAS via netlib-java
+      // SPARK-28843: limit the OpenMP thread pool to the number of cores assigned to this executor
+      // this avoids high memory consumption with pandas/numpy because of a large OpenMP thread pool
+      // see https://github.com/numpy/numpy/issues/10455
+      executorEnvs.put("OMP_NUM_THREADS", _conf.get("spark.task.cpus", "1"))
+    }
+
     _shuffleDriverComponents = ShuffleDataIOUtils.loadShuffleDataIO(config).driver()
     _shuffleDriverComponents.initializeApplication().asScala.foreach { case (k, v) =>
       _conf.set(ShuffleDataIOUtils.SHUFFLE_SPARK_CONF_PREFIX + k, v)
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
index 14d5df14ed8..cdb2c620656 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
@@ -135,13 +135,6 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
     val execCoresProp = Option(context.getLocalProperty(EXECUTOR_CORES_LOCAL_PROPERTY))
     val memoryMb = Option(context.getLocalProperty(PYSPARK_MEMORY_LOCAL_PROPERTY)).map(_.toLong)
     val localdir = env.blockManager.diskBlockManager.localDirs.map(f => f.getPath()).mkString(",")
-    // if OMP_NUM_THREADS is not explicitly set, override it with the number of cores
-    if (conf.getOption("spark.executorEnv.OMP_NUM_THREADS").isEmpty) {
-      // SPARK-28843: limit the OpenMP thread pool to the number of cores assigned to this executor
-      // this avoids high memory consumption with pandas/numpy because of a large OpenMP thread pool
-      // see https://github.com/numpy/numpy/issues/10455
-      execCoresProp.foreach(envVars.put("OMP_NUM_THREADS", _))
-    }
     envVars.put("SPARK_LOCAL_DIRS", localdir) // it's also used in monitor thread
     if (reuseWorker) {
       envVars.put("SPARK_REUSE_WORKER", "1")
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
index 92676cc4e73..0a2c0cef31e 100644
--- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
@@ -128,8 +128,9 @@ object Utils {
       .getEnvironment
       .getVariablesList
       .asScala
-    assert(envVars
-      .count(!_.getName.startsWith("SPARK_")) == 2) // user-defined secret env vars
+    assert(envVars.count { x =>
+      !x.getName.startsWith("SPARK_") && x.getName != "OMP_NUM_THREADS"
+    } == 2) // user-defined secret env vars
     val variableOne = envVars.filter(_.getName == "SECRET_ENV_KEY").head
     assert(variableOne.getSecret.isInitialized)
     assert(variableOne.getSecret.getType == Secret.Type.REFERENCE)
@@ -157,8 +158,9 @@ object Utils {
       .getEnvironment
       .getVariablesList
       .asScala
-    assert(envVars
-      .count(!_.getName.startsWith("SPARK_")) == 2) // user-defined secret env vars
+    assert(envVars.count { x =>
+      !x.getName.startsWith("SPARK_") && x.getName != "OMP_NUM_THREADS"
+    } == 2) // user-defined secret env vars
     val variableOne = envVars.filter(_.getName == "USER").head
     assert(variableOne.getSecret.isInitialized)
     assert(variableOne.getSecret.getType == Secret.Type.VALUE)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org