You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/07/31 08:30:16 UTC

[spark] branch master updated: [SPARK-32160][CORE][PYSPARK] Add a config to switch allow/disallow to create SparkContext in executors

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8014b0b  [SPARK-32160][CORE][PYSPARK] Add a config to switch allow/disallow to create SparkContext in executors
8014b0b is described below

commit 8014b0b5d61237dc4851d4ae9927778302d692da
Author: Takuya UESHIN <ue...@databricks.com>
AuthorDate: Fri Jul 31 17:28:35 2020 +0900

    [SPARK-32160][CORE][PYSPARK] Add a config to switch allow/disallow to create SparkContext in executors
    
    ### What changes were proposed in this pull request?
    
    This is a follow-up of #28986.
    This PR adds a config to switch allow/disallow to create `SparkContext` in executors.
    
    - `spark.driver.allowSparkContextInExecutors`
    
    ### Why are the changes needed?
    
    Some users or libraries actually create `SparkContext` in executors.
    We shouldn't break their workloads.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, users will be able to create `SparkContext` in executors with the config enabled.
    
    ### How was this patch tested?
    
    More tests are added.
    
    Closes #29278 from ueshin/issues/SPARK-32160/add_configs.
    
    Authored-by: Takuya UESHIN <ue...@databricks.com>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 .../main/scala/org/apache/spark/SparkContext.scala |  6 +++--
 .../org/apache/spark/internal/config/package.scala |  7 ++++++
 .../scala/org/apache/spark/SparkContextSuite.scala |  9 ++++++++
 docs/core-migration-guide.md                       |  4 ++++
 python/pyspark/context.py                          |  6 +++--
 python/pyspark/tests/test_context.py               | 11 +++++++++
 .../scala/org/apache/spark/sql/SparkSession.scala  | 12 ++++++----
 .../spark/sql/SparkSessionBuilderSuite.scala       | 26 +++++++++++++++++++++-
 8 files changed, 72 insertions(+), 9 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 06abc05..9ecf316 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -83,8 +83,10 @@ class SparkContext(config: SparkConf) extends Logging {
   // The call site where this SparkContext was constructed.
   private val creationSite: CallSite = Utils.getCallSite()
 
-  // In order to prevent SparkContext from being created in executors.
-  SparkContext.assertOnDriver()
+  if (!config.get(ALLOW_SPARK_CONTEXT_IN_EXECUTORS)) {
+    // In order to prevent SparkContext from being created in executors.
+    SparkContext.assertOnDriver()
+  }
 
   // In order to prevent multiple SparkContexts from being active at the same time, mark this
   // context as having started construction.
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index e1b598e..fdc9253 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -1908,4 +1908,11 @@ package object config {
       .version("3.1.0")
       .booleanConf
       .createWithDefault(false)
+
+  private[spark] val ALLOW_SPARK_CONTEXT_IN_EXECUTORS =
+    ConfigBuilder("spark.driver.allowSparkContextInExecutors")
+      .doc("If set to true, SparkContext can be created in executors.")
+      .version("3.0.1")
+      .booleanConf
+      .createWithDefault(false)
 }
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index 132e994..1f7aa8ee 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -946,6 +946,15 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
 
     assert(error.contains("SparkContext should only be created and accessed on the driver."))
   }
+
+  test("SPARK-32160: Allow to create SparkContext in executors if the config is set") {
+    sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local-cluster[3, 1, 1024]"))
+
+    sc.range(0, 1).foreach { _ =>
+      new SparkContext(new SparkConf().setAppName("test").setMaster("local")
+        .set(ALLOW_SPARK_CONTEXT_IN_EXECUTORS, true)).stop()
+    }
+  }
 }
 
 object SparkContextSuite {
diff --git a/docs/core-migration-guide.md b/docs/core-migration-guide.md
index 63baef1..b2a0850 100644
--- a/docs/core-migration-guide.md
+++ b/docs/core-migration-guide.md
@@ -22,6 +22,10 @@ license: |
 * Table of contents
 {:toc}
 
+## Upgrading from Core 3.0 to 3.1
+
+- In Spark 3.0 and below, `SparkContext` can be created in executors. Since Spark 3.1, an exception will be thrown when creating `SparkContext` in executors. You can allow it by setting the configuration `spark.driver.allowSparkContextInExecutors` when creating `SparkContext` in executors.
+
 ## Upgrading from Core 2.4 to 3.0
 
 - The `org.apache.spark.ExecutorPlugin` interface and related configuration has been replaced with
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 5ddce9f..0816657 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -117,8 +117,10 @@ class SparkContext(object):
             ...
         ValueError:...
         """
-        # In order to prevent SparkContext from being created in executors.
-        SparkContext._assert_on_driver()
+        if (conf is None or
+                conf.get("spark.driver.allowSparkContextInExecutors", "false").lower() != "true"):
+            # In order to prevent SparkContext from being created in executors.
+            SparkContext._assert_on_driver()
 
         self._callsite = first_spark_call() or CallSite(None, None, None)
         if gateway is not None and gateway.gateway_parameters.auth_token is None:
diff --git a/python/pyspark/tests/test_context.py b/python/pyspark/tests/test_context.py
index 168299e..64fe383 100644
--- a/python/pyspark/tests/test_context.py
+++ b/python/pyspark/tests/test_context.py
@@ -275,6 +275,17 @@ class ContextTests(unittest.TestCase):
             self.assertIn("SparkContext should only be created and accessed on the driver.",
                           str(context.exception))
 
+    def test_allow_to_create_spark_context_in_executors(self):
+        # SPARK-32160: SparkContext can be created in executors if the config is set.
+
+        def create_spark_context():
+            conf = SparkConf().set("spark.driver.allowSparkContextInExecutors", "true")
+            with SparkContext(conf=conf):
+                pass
+
+        with SparkContext("local-cluster[3, 1, 1024]") as sc:
+            sc.range(2).foreach(lambda _: create_spark_context())
+
 
 class ContextTestsWithResources(unittest.TestCase):
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 08b0a1c..306c323 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -29,6 +29,7 @@ import org.apache.spark.{SPARK_VERSION, SparkConf, SparkContext, TaskContext}
 import org.apache.spark.annotation.{DeveloperApi, Experimental, Stable, Unstable}
 import org.apache.spark.api.java.JavaRDD
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.ALLOW_SPARK_CONTEXT_IN_EXECUTORS
 import org.apache.spark.rdd.RDD
 import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
 import org.apache.spark.sql.catalog.Catalog
@@ -900,7 +901,13 @@ object SparkSession extends Logging {
      * @since 2.0.0
      */
     def getOrCreate(): SparkSession = synchronized {
-      assertOnDriver()
+      val sparkConf = new SparkConf()
+      options.foreach { case (k, v) => sparkConf.set(k, v) }
+
+      if (!sparkConf.get(ALLOW_SPARK_CONTEXT_IN_EXECUTORS)) {
+        assertOnDriver()
+      }
+
       // Get the session from current thread's active session.
       var session = activeThreadSession.get()
       if ((session ne null) && !session.sparkContext.isStopped) {
@@ -919,9 +926,6 @@ object SparkSession extends Logging {
 
         // No active nor global default session. Create a new one.
         val sparkContext = userSuppliedContext.getOrElse {
-          val sparkConf = new SparkConf()
-          options.foreach { case (k, v) => sparkConf.set(k, v) }
-
           // set a random app name if not given.
           if (!sparkConf.contains("spark.app.name")) {
             sparkConf.setAppName(java.util.UUID.randomUUID().toString)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
index e914d83..cc261a9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
@@ -19,7 +19,8 @@ package org.apache.spark.sql
 
 import org.scalatest.BeforeAndAfterEach
 
-import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark.{SparkConf, SparkContext, SparkException, SparkFunSuite}
+import org.apache.spark.internal.config.ALLOW_SPARK_CONTEXT_IN_EXECUTORS
 import org.apache.spark.internal.config.UI.UI_ENABLED
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.StaticSQLConf._
@@ -257,4 +258,27 @@ class SparkSessionBuilderSuite extends SparkFunSuite with BeforeAndAfterEach {
       context.stop()
     }
   }
+
+  test("SPARK-32160: Disallow to create SparkSession in executors") {
+    val session = SparkSession.builder().master("local-cluster[3, 1, 1024]").getOrCreate()
+
+    val error = intercept[SparkException] {
+      session.range(1).foreach { v =>
+        SparkSession.builder.master("local").getOrCreate()
+        ()
+      }
+    }.getMessage()
+
+    assert(error.contains("SparkSession should only be created and accessed on the driver."))
+  }
+
+  test("SPARK-32160: Allow to create SparkSession in executors if the config is set") {
+    val session = SparkSession.builder().master("local-cluster[3, 1, 1024]").getOrCreate()
+
+    session.range(1).foreach { v =>
+      SparkSession.builder.master("local")
+        .config(ALLOW_SPARK_CONTEXT_IN_EXECUTORS.key, true).getOrCreate().stop()
+      ()
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org