You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/11/01 23:03:17 UTC

git commit: [SPARK-4037][SQL] Removes the SessionState instance created in HiveThriftServer2

Repository: spark
Updated Branches:
  refs/heads/master f55218aeb -> ad0fde10b


[SPARK-4037][SQL] Removes the SessionState instance created in HiveThriftServer2

`HiveThriftServer2` creates a global singleton `SessionState` instance and overrides `HiveContext` to inject the `SessionState` object. This messes up `SessionState` initialization and causes problems.

This PR replaces the global `SessionState` with `HiveContext.sessionState` to avoid the initialization conflict. Also `HiveContext` reuses existing started `SessionState` if any (this is required by `SparkSQLCLIDriver`, which uses specialized `CliSessionState`).

Author: Cheng Lian <li...@databricks.com>

Closes #2887 from liancheng/spark-4037 and squashes the following commits:

8446675 [Cheng Lian] Removes redundant Driver initialization
a28fef5 [Cheng Lian] Avoid starting HiveContext.sessionState multiple times
49b1c5b [Cheng Lian] Reuses existing started SessionState if any
3cd6fab [Cheng Lian] Fixes SPARK-4037


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ad0fde10
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ad0fde10
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ad0fde10

Branch: refs/heads/master
Commit: ad0fde10b2285e780349be5a8f333db0974a502f
Parents: f55218a
Author: Cheng Lian <li...@databricks.com>
Authored: Sat Nov 1 15:03:11 2014 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Sat Nov 1 15:03:11 2014 -0700

----------------------------------------------------------------------
 .../hive/thriftserver/HiveThriftServer2.scala   | 17 +-------
 .../sql/hive/thriftserver/SparkSQLEnv.scala     | 18 ++++----
 .../thriftserver/HiveThriftServer2Suite.scala   | 10 +++--
 .../org/apache/spark/sql/hive/HiveContext.scala | 44 +++++++++++++-------
 4 files changed, 44 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ad0fde10/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
index 3d468d8..bd4e994 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
@@ -17,11 +17,8 @@
 
 package org.apache.spark.sql.hive.thriftserver
 
-import scala.collection.JavaConversions._
-
 import org.apache.commons.logging.LogFactory
 import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hadoop.hive.ql.session.SessionState
 import org.apache.hive.service.cli.thrift.ThriftBinaryCLIService
 import org.apache.hive.service.server.{HiveServer2, ServerOptionsProcessor}
 
@@ -51,24 +48,12 @@ object HiveThriftServer2 extends Logging {
 
   def main(args: Array[String]) {
     val optionsProcessor = new ServerOptionsProcessor("HiveThriftServer2")
-
     if (!optionsProcessor.process(args)) {
       System.exit(-1)
     }
 
-    val ss = new SessionState(new HiveConf(classOf[SessionState]))
-
-    // Set all properties specified via command line.
-    val hiveConf: HiveConf = ss.getConf
-    hiveConf.getAllProperties.toSeq.sortBy(_._1).foreach { case (k, v) =>
-      logDebug(s"HiveConf var: $k=$v")
-    }
-
-    SessionState.start(ss)
-
     logInfo("Starting SparkContext")
     SparkSQLEnv.init()
-    SessionState.start(ss)
 
     Runtime.getRuntime.addShutdownHook(
       new Thread() {
@@ -80,7 +65,7 @@ object HiveThriftServer2 extends Logging {
 
     try {
       val server = new HiveThriftServer2(SparkSQLEnv.hiveContext)
-      server.init(hiveConf)
+      server.init(SparkSQLEnv.hiveContext.hiveconf)
       server.start()
       logInfo("HiveThriftServer2 started")
     } catch {

http://git-wip-us.apache.org/repos/asf/spark/blob/ad0fde10/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
index 2136a2e..5042586 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
@@ -17,12 +17,10 @@
 
 package org.apache.spark.sql.hive.thriftserver
 
-import org.apache.hadoop.hive.ql.session.SessionState
-
-import org.apache.spark.scheduler.{SplitInfo, StatsReportListener}
-import org.apache.spark.Logging
+import org.apache.spark.scheduler.StatsReportListener
 import org.apache.spark.sql.hive.HiveContext
-import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.{Logging, SparkConf, SparkContext}
+import scala.collection.JavaConversions._
 
 /** A singleton object for the master program. The slaves should not access this. */
 private[hive] object SparkSQLEnv extends Logging {
@@ -37,14 +35,12 @@ private[hive] object SparkSQLEnv extends Logging {
         .setAppName(s"SparkSQL::${java.net.InetAddress.getLocalHost.getHostName}"))
 
       sparkContext.addSparkListener(new StatsReportListener())
+      hiveContext = new HiveContext(sparkContext)
 
-      hiveContext = new HiveContext(sparkContext) {
-        @transient override lazy val sessionState = {
-          val state = SessionState.get()
-          setConf(state.getConf.getAllProperties)
-          state
+      if (log.isDebugEnabled) {
+        hiveContext.hiveconf.getAllProperties.toSeq.sorted.foreach { case (k, v) =>
+          logDebug(s"HiveConf var: $k=$v")
         }
-        @transient override lazy val hiveconf = sessionState.getConf
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/ad0fde10/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala
index e3b4e45..c60e8fa 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala
@@ -150,10 +150,12 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
       val dataFilePath =
         Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt")
 
-      val queries = Seq(
-        "CREATE TABLE test(key INT, val STRING)",
-        s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE test",
-        "CACHE TABLE test")
+      val queries =
+        s"""SET spark.sql.shuffle.partitions=3;
+           |CREATE TABLE test(key INT, val STRING);
+           |LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE test;
+           |CACHE TABLE test;
+         """.stripMargin.split(";").map(_.trim).filter(_.nonEmpty)
 
       queries.foreach(statement.execute)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ad0fde10/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index fad4091..ff8fa44 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -224,21 +224,29 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
   }
 
   /**
-   * SQLConf and HiveConf contracts: when the hive session is first initialized, params in
-   * HiveConf will get picked up by the SQLConf.  Additionally, any properties set by
-   * set() or a SET command inside sql() will be set in the SQLConf *as well as*
-   * in the HiveConf.
+   * SQLConf and HiveConf contracts:
+   *
+   * 1. reuse existing started SessionState if any
+   * 2. when the Hive session is first initialized, params in HiveConf will get picked up by the
+   *    SQLConf.  Additionally, any properties set by set() or a SET command inside sql() will be
+   *    set in the SQLConf *as well as* in the HiveConf.
    */
-  @transient lazy val hiveconf = new HiveConf(classOf[SessionState])
-  @transient protected[hive] lazy val sessionState = {
-    val ss = new SessionState(hiveconf)
-    setConf(hiveconf.getAllProperties)  // Have SQLConf pick up the initial set of HiveConf.
-    SessionState.start(ss)
-    ss.err = new PrintStream(outputBuffer, true, "UTF-8")
-    ss.out = new PrintStream(outputBuffer, true, "UTF-8")
-
-    ss
-  }
+  @transient protected[hive] lazy val (hiveconf, sessionState) =
+    Option(SessionState.get())
+      .orElse {
+        val newState = new SessionState(new HiveConf(classOf[SessionState]))
+        // Only starts newly created `SessionState` instance.  Any existing `SessionState` instance
+        // returned by `SessionState.get()` must be the most recently started one.
+        SessionState.start(newState)
+        Some(newState)
+      }
+      .map { state =>
+        setConf(state.getConf.getAllProperties)
+        if (state.out == null) state.out = new PrintStream(outputBuffer, true, "UTF-8")
+        if (state.err == null) state.err = new PrintStream(outputBuffer, true, "UTF-8")
+        (state.getConf, state)
+      }
+      .get
 
   override def setConf(key: String, value: String): Unit = {
     super.setConf(key, value)
@@ -288,6 +296,14 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
       val cmd_1: String = cmd_trimmed.substring(tokens(0).length()).trim()
       val proc: CommandProcessor = HiveShim.getCommandProcessor(Array(tokens(0)), hiveconf)
 
+      // Makes sure the session represented by the `sessionState` field is activated. This implies
+      // Spark SQL Hive support uses a single `SessionState` for all Hive operations and breaks
+      // session isolation under multi-user scenarios (i.e. HiveThriftServer2).
+      // TODO Fix session isolation
+      if (SessionState.get() != sessionState) {
+        SessionState.start(sessionState)
+      }
+
       proc match {
         case driver: Driver =>
           val results = HiveShim.createDriverResultsArray


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org