You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/11/19 09:24:33 UTC

[spark] branch branch-3.3 updated: [SPARK-41188][CORE][ML] Set executorEnv OMP_NUM_THREADS to be spark.task.cpus by default for spark executor JVM processes

This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new f431cdf0944 [SPARK-41188][CORE][ML] Set executorEnv OMP_NUM_THREADS to be spark.task.cpus by default for spark executor JVM processes
f431cdf0944 is described below

commit f431cdf09442b86133d17fa6e56cb8b77ca4e486
Author: Weichen Xu <we...@databricks.com>
AuthorDate: Sat Nov 19 17:23:20 2022 +0800

    [SPARK-41188][CORE][ML] Set executorEnv OMP_NUM_THREADS to be spark.task.cpus by default for spark executor JVM processes
    
    Signed-off-by: Weichen Xu <weichen.xudatabricks.com>
    
    ### What changes were proposed in this pull request?
    
    Set executorEnv OMP_NUM_THREADS to be spark.task.cpus by default for spark executor JVM processes.
    
    ### Why are the changes needed?
    
    This is for limiting the thread number for OpenBLAS routine to the number of cores assigned to this executor because some spark ML algorithms calls OpenBlAS via netlib-java,
    e.g.:
    Spark ALS estimator training calls LAPACK API `dppsv` (internally it will call BLAS lib), if it calls OpenBLAS lib, by default OpenBLAS will try to use all CPU cores. But spark will launch multiple spark tasks on a spark worker, and each spark task might call `dppsv` API at the same time, and each call internally it will create multiple threads (threads number equals to CPU cores), this causes CPU oversubscription.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Manually.
    
    Closes #38699 from WeichenXu123/SPARK-41188.
    
    Authored-by: Weichen Xu <we...@databricks.com>
    Signed-off-by: Weichen Xu <we...@databricks.com>
    (cherry picked from commit 82a41d8ca273e7a93333268324c6958f8bb14d9e)
    Signed-off-by: Weichen Xu <we...@databricks.com>
---
 core/src/main/scala/org/apache/spark/SparkContext.scala        | 10 ++++++++++
 .../main/scala/org/apache/spark/api/python/PythonRunner.scala  |  7 -------
 .../scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala | 10 ++++++----
 3 files changed, 16 insertions(+), 11 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 02c58d2a9b4..0d0d4fe83a4 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -546,6 +546,16 @@ class SparkContext(config: SparkConf) extends Logging {
     executorEnvs ++= _conf.getExecutorEnv
     executorEnvs("SPARK_USER") = sparkUser
 
+    if (_conf.getOption("spark.executorEnv.OMP_NUM_THREADS").isEmpty) {
+      // if OMP_NUM_THREADS is not explicitly set, override it with the value of "spark.task.cpus"
+      // SPARK-41188: limit the thread number for OpenBLAS routine to the number of cores assigned
+      // to this executor because some spark ML algorithms calls OpenBlAS via netlib-java
+      // SPARK-28843: limit the OpenMP thread pool to the number of cores assigned to this executor
+      // this avoids high memory consumption with pandas/numpy because of a large OpenMP thread pool
+      // see https://github.com/numpy/numpy/issues/10455
+      executorEnvs.put("OMP_NUM_THREADS", _conf.get("spark.task.cpus", "1"))
+    }
+
     _shuffleDriverComponents = ShuffleDataIOUtils.loadShuffleDataIO(config).driver()
     _shuffleDriverComponents.initializeApplication().asScala.foreach { case (k, v) =>
       _conf.set(ShuffleDataIOUtils.SHUFFLE_SPARK_CONF_PREFIX + k, v)
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
index f32c80f3ef5..bf5b862438a 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
@@ -133,13 +133,6 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
     val execCoresProp = Option(context.getLocalProperty(EXECUTOR_CORES_LOCAL_PROPERTY))
     val memoryMb = Option(context.getLocalProperty(PYSPARK_MEMORY_LOCAL_PROPERTY)).map(_.toLong)
     val localdir = env.blockManager.diskBlockManager.localDirs.map(f => f.getPath()).mkString(",")
-    // if OMP_NUM_THREADS is not explicitly set, override it with the number of cores
-    if (conf.getOption("spark.executorEnv.OMP_NUM_THREADS").isEmpty) {
-      // SPARK-28843: limit the OpenMP thread pool to the number of cores assigned to this executor
-      // this avoids high memory consumption with pandas/numpy because of a large OpenMP thread pool
-      // see https://github.com/numpy/numpy/issues/10455
-      execCoresProp.foreach(envVars.put("OMP_NUM_THREADS", _))
-    }
     envVars.put("SPARK_LOCAL_DIRS", localdir) // it's also used in monitor thread
     if (reuseWorker) {
       envVars.put("SPARK_REUSE_WORKER", "1")
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
index 92676cc4e73..0a2c0cef31e 100644
--- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
@@ -128,8 +128,9 @@ object Utils {
       .getEnvironment
       .getVariablesList
       .asScala
-    assert(envVars
-      .count(!_.getName.startsWith("SPARK_")) == 2) // user-defined secret env vars
+    assert(envVars.count { x =>
+      !x.getName.startsWith("SPARK_") && x.getName != "OMP_NUM_THREADS"
+    } == 2) // user-defined secret env vars
     val variableOne = envVars.filter(_.getName == "SECRET_ENV_KEY").head
     assert(variableOne.getSecret.isInitialized)
     assert(variableOne.getSecret.getType == Secret.Type.REFERENCE)
@@ -157,8 +158,9 @@ object Utils {
       .getEnvironment
       .getVariablesList
       .asScala
-    assert(envVars
-      .count(!_.getName.startsWith("SPARK_")) == 2) // user-defined secret env vars
+    assert(envVars.count { x =>
+      !x.getName.startsWith("SPARK_") && x.getName != "OMP_NUM_THREADS"
+    } == 2) // user-defined secret env vars
     val variableOne = envVars.filter(_.getName == "USER").head
     assert(variableOne.getSecret.isInitialized)
     assert(variableOne.getSecret.getType == Secret.Type.VALUE)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org