You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by da...@apache.org on 2015/10/12 22:50:46 UTC

spark git commit: [SPARK-11042] [SQL] Add a mechanism to ban creating multiple root SQLContexts/HiveContexts in a JVM

Repository: spark
Updated Branches:
  refs/heads/master 2e572c413 -> 8a354bef5


[SPARK-11042] [SQL] Add a mechanism to ban creating multiple root SQLContexts/HiveContexts in a JVM

https://issues.apache.org/jira/browse/SPARK-11042

Author: Yin Huai <yh...@databricks.com>

Closes #9058 from yhuai/SPARK-11042.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8a354bef
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8a354bef
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8a354bef

Branch: refs/heads/master
Commit: 8a354bef55ce9cc0fa77fa1c3a9d62c16438ca1b
Parents: 2e572c4
Author: Yin Huai <yh...@databricks.com>
Authored: Mon Oct 12 13:50:34 2015 -0700
Committer: Davies Liu <da...@gmail.com>
Committed: Mon Oct 12 13:50:34 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/SQLConf.scala    | 10 ++
 .../scala/org/apache/spark/sql/SQLContext.scala | 42 ++++++++-
 .../spark/sql/MultiSQLContextsSuite.scala       | 99 ++++++++++++++++++++
 .../org/apache/spark/sql/hive/HiveContext.scala | 12 ++-
 4 files changed, 156 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8a354bef/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 47397c4..f62df9b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -186,6 +186,16 @@ private[spark] object SQLConf {
 
   import SQLConfEntry._
 
+  val ALLOW_MULTIPLE_CONTEXTS = booleanConf("spark.sql.allowMultipleContexts",
+    defaultValue = Some(true),
+    doc = "When set to true, creating multiple SQLContexts/HiveContexts is allowed." +
+      "When set to false, only one SQLContext/HiveContext is allowed to be created " +
+      "through the constructor (new SQLContexts/HiveContexts created through newSession " +
+      "method is allowed). Please note that this conf needs to be set in Spark Conf. Once" +
+      "a SQLContext/HiveContext has been created, changing the value of this conf will not" +
+      "have effect.",
+    isPublic = true)
+
   val COMPRESS_CACHED = booleanConf("spark.sql.inMemoryColumnarStorage.compressed",
     defaultValue = Some(true),
     doc = "When set to true Spark SQL will automatically select a compression codec for each " +

http://git-wip-us.apache.org/repos/asf/spark/blob/8a354bef/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 2bdfd82..1bd2913 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -26,7 +26,7 @@ import scala.collection.immutable
 import scala.reflect.runtime.universe.TypeTag
 import scala.util.control.NonFatal
 
-import org.apache.spark.SparkContext
+import org.apache.spark.{SparkException, SparkContext}
 import org.apache.spark.annotation.{DeveloperApi, Experimental}
 import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
 import org.apache.spark.rdd.RDD
@@ -64,14 +64,37 @@ import org.apache.spark.util.Utils
  */
 class SQLContext private[sql](
     @transient val sparkContext: SparkContext,
-    @transient protected[sql] val cacheManager: CacheManager)
+    @transient protected[sql] val cacheManager: CacheManager,
+    val isRootContext: Boolean)
   extends org.apache.spark.Logging with Serializable {
 
   self =>
 
-  def this(sparkContext: SparkContext) = this(sparkContext, new CacheManager)
+  def this(sparkContext: SparkContext) = this(sparkContext, new CacheManager, true)
   def this(sparkContext: JavaSparkContext) = this(sparkContext.sc)
 
+  // If spark.sql.allowMultipleContexts is true, we will throw an exception if a user
+  // wants to create a new root SQLContext (a SLQContext that is not created by newSession).
+  private val allowMultipleContexts =
+    sparkContext.conf.getBoolean(
+      SQLConf.ALLOW_MULTIPLE_CONTEXTS.key,
+      SQLConf.ALLOW_MULTIPLE_CONTEXTS.defaultValue.get)
+
+  // Assert no root SQLContext is running when allowMultipleContexts is false.
+  {
+    if (!allowMultipleContexts && isRootContext) {
+      SQLContext.getInstantiatedContextOption() match {
+        case Some(rootSQLContext) =>
+          val errMsg = "Only one SQLContext/HiveContext may be running in this JVM. " +
+            s"It is recommended to use SQLContext.getOrCreate to get the instantiated " +
+            s"SQLContext/HiveContext. To ignore this error, " +
+            s"set ${SQLConf.ALLOW_MULTIPLE_CONTEXTS.key} = true in SparkConf."
+          throw new SparkException(errMsg)
+        case None => // OK
+      }
+    }
+  }
+
   /**
    * Returns a SQLContext as new session, with separated SQL configurations, temporary tables,
    * registered functions, but sharing the same SparkContext and CacheManager.
@@ -79,7 +102,10 @@ class SQLContext private[sql](
    * @since 1.6.0
    */
   def newSession(): SQLContext = {
-    new SQLContext(sparkContext, cacheManager)
+    new SQLContext(
+      sparkContext = sparkContext,
+      cacheManager = cacheManager,
+      isRootContext = false)
   }
 
   /**
@@ -1239,6 +1265,10 @@ object SQLContext {
     instantiatedContext.compareAndSet(null, sqlContext)
   }
 
+  private[sql] def getInstantiatedContextOption(): Option[SQLContext] = {
+    Option(instantiatedContext.get())
+  }
+
   /**
    * Changes the SQLContext that will be returned in this thread and its children when
    * SQLContext.getOrCreate() is called. This can be used to ensure that a given thread receives
@@ -1260,6 +1290,10 @@ object SQLContext {
     activeContext.remove()
   }
 
+  private[sql] def getActiveContextOption(): Option[SQLContext] = {
+    Option(activeContext.get())
+  }
+
   /**
    * Converts an iterator of Java Beans to InternalRow using the provided
    * bean info & schema. This is not related to the singleton, but is a static

http://git-wip-us.apache.org/repos/asf/spark/blob/8a354bef/sql/core/src/test/scala/org/apache/spark/sql/MultiSQLContextsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/MultiSQLContextsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/MultiSQLContextsSuite.scala
new file mode 100644
index 0000000..0e8fcb6
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/MultiSQLContextsSuite.scala
@@ -0,0 +1,99 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql
+
+import org.apache.spark._
+import org.scalatest.BeforeAndAfterAll
+
+class MultiSQLContextsSuite extends SparkFunSuite with BeforeAndAfterAll {
+
+  private var originalActiveSQLContext: Option[SQLContext] = _
+  private var originalInstantiatedSQLContext: Option[SQLContext] = _
+  private var sparkConf: SparkConf = _
+
+  override protected def beforeAll(): Unit = {
+    originalActiveSQLContext = SQLContext.getActiveContextOption()
+    originalInstantiatedSQLContext = SQLContext.getInstantiatedContextOption()
+
+    SQLContext.clearActive()
+    originalInstantiatedSQLContext.foreach(ctx => SQLContext.clearInstantiatedContext(ctx))
+    sparkConf =
+      new SparkConf(false)
+        .setMaster("local[*]")
+        .setAppName("test")
+        .set("spark.ui.enabled", "false")
+        .set("spark.driver.allowMultipleContexts", "true")
+  }
+
+  override protected def afterAll(): Unit = {
+    // Set these states back.
+    originalActiveSQLContext.foreach(ctx => SQLContext.setActive(ctx))
+    originalInstantiatedSQLContext.foreach(ctx => SQLContext.setInstantiatedContext(ctx))
+  }
+
+  def testNewSession(rootSQLContext: SQLContext): Unit = {
+    // Make sure we can successfully create new Session.
+    rootSQLContext.newSession()
+
+    // Reset the state. It is always safe to clear the active context.
+    SQLContext.clearActive()
+  }
+
+  def testCreatingNewSQLContext(allowsMultipleContexts: Boolean): Unit = {
+    val conf =
+      sparkConf
+        .clone
+        .set(SQLConf.ALLOW_MULTIPLE_CONTEXTS.key, allowsMultipleContexts.toString)
+    val sparkContext = new SparkContext(conf)
+
+    try {
+      if (allowsMultipleContexts) {
+        new SQLContext(sparkContext)
+        SQLContext.clearActive()
+      } else {
+        // If allowsMultipleContexts is false, make sure we can get the error.
+        val message = intercept[SparkException] {
+          new SQLContext(sparkContext)
+        }.getMessage
+        assert(message.contains("Only one SQLContext/HiveContext may be running"))
+      }
+    } finally {
+      sparkContext.stop()
+    }
+  }
+
+  test("test the flag to disallow creating multiple root SQLContext") {
+    Seq(false, true).foreach { allowMultipleSQLContexts =>
+      val conf =
+        sparkConf
+          .clone
+          .set(SQLConf.ALLOW_MULTIPLE_CONTEXTS.key, allowMultipleSQLContexts.toString)
+      val sc = new SparkContext(conf)
+      try {
+        val rootSQLContext = new SQLContext(sc)
+        testNewSession(rootSQLContext)
+        testNewSession(rootSQLContext)
+        testCreatingNewSQLContext(allowMultipleSQLContexts)
+
+        SQLContext.clearInstantiatedContext(rootSQLContext)
+      } finally {
+        sc.stop()
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/8a354bef/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index dad1e23..ddeadd3 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -89,10 +89,11 @@ class HiveContext private[hive](
     sc: SparkContext,
     cacheManager: CacheManager,
     @transient execHive: ClientWrapper,
-    @transient metaHive: ClientInterface) extends SQLContext(sc, cacheManager) with Logging {
+    @transient metaHive: ClientInterface,
+    isRootContext: Boolean) extends SQLContext(sc, cacheManager, isRootContext) with Logging {
   self =>
 
-  def this(sc: SparkContext) = this(sc, new CacheManager, null, null)
+  def this(sc: SparkContext) = this(sc, new CacheManager, null, null, true)
   def this(sc: JavaSparkContext) = this(sc.sc)
 
   import org.apache.spark.sql.hive.HiveContext._
@@ -105,7 +106,12 @@ class HiveContext private[hive](
    * and Hive client (both of execution and metadata) with existing HiveContext.
    */
   override def newSession(): HiveContext = {
-    new HiveContext(sc, cacheManager, executionHive.newSession(), metadataHive.newSession())
+    new HiveContext(
+      sc = sc,
+      cacheManager = cacheManager,
+      execHive = executionHive.newSession(),
+      metaHive = metadataHive.newSession(),
+      isRootContext = false)
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org