You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2016/12/28 02:16:32 UTC

spark git commit: [SPARK-18992][SQL] Move spark.sql.hive.thriftServer.singleSession to SQLConf

Repository: spark
Updated Branches:
  refs/heads/master 28ab0ec49 -> 5ac62043c


[SPARK-18992][SQL] Move spark.sql.hive.thriftServer.singleSession to SQLConf

### What changes were proposed in this pull request?

Since `spark.sql.hive.thriftServer.singleSession` is a configuration of SQL component, this conf can be moved from `SparkConf` to `StaticSQLConf`.

When we introduced `spark.sql.hive.thriftServer.singleSession`, all the SQL configuration are session specific. They can be modified in different sessions.

In Spark 2.1, static SQL configuration is added. It is a perfect fit for `spark.sql.hive.thriftServer.singleSession`. Previously, we did the same move for `spark.sql.warehouse.dir` from `SparkConf` to `StaticSQLConf`

### How was this patch tested?
Added test cases in HiveThriftServer2Suites.scala

Author: gatorsmile <ga...@gmail.com>

Closes #16392 from gatorsmile/hiveThriftServerSingleSession.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5ac62043
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5ac62043
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5ac62043

Branch: refs/heads/master
Commit: 5ac62043cf6dc12c986e5ae9d9661fd439f8b5b9
Parents: 28ab0ec
Author: gatorsmile <ga...@gmail.com>
Authored: Wed Dec 28 10:16:22 2016 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Wed Dec 28 10:16:22 2016 +0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/internal/SQLConf.scala |  10 ++
 .../thriftserver/SparkSQLSessionManager.scala   |   5 +-
 .../thriftserver/HiveThriftServer2Suites.scala  | 115 ++++++++++++-------
 .../sql/hive/thriftserver/UISeleniumSuite.scala |   2 +-
 .../spark/sql/hive/HiveSessionState.scala       |   6 -
 5 files changed, 87 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5ac62043/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index cce1626..304dcb6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -831,6 +831,9 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
 
   def warehousePath: String = new Path(getConf(StaticSQLConf.WAREHOUSE_PATH)).toString
 
+  def hiveThriftServerSingleSession: Boolean =
+    getConf(StaticSQLConf.HIVE_THRIFT_SERVER_SINGLESESSION)
+
   override def orderByOrdinal: Boolean = getConf(ORDER_BY_ORDINAL)
 
   override def groupByOrdinal: Boolean = getConf(GROUP_BY_ORDINAL)
@@ -1008,4 +1011,11 @@ object StaticSQLConf {
     .doc("Only used for internal debugging. Not all functions are supported when it is enabled.")
     .booleanConf
     .createWithDefault(false)
+
+  val HIVE_THRIFT_SERVER_SINGLESESSION = buildConf("spark.sql.hive.thriftServer.singleSession")
+    .doc("When set to true, Hive Thrift server is running in a single session mode. " +
+      "All the JDBC/ODBC connections share the temporary views, function registries, " +
+      "SQL configuration and the current database.")
+    .booleanConf
+    .createWithDefault(false)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/5ac62043/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
index 226b7e1..7adaafe 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
@@ -28,7 +28,7 @@ import org.apache.hive.service.cli.thrift.TProtocolVersion
 import org.apache.hive.service.server.HiveServer2
 
 import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.hive.{HiveSessionState, HiveUtils}
+import org.apache.spark.sql.hive.HiveUtils
 import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
 import org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager
 
@@ -72,8 +72,7 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, sqlContext:
     val session = super.getSession(sessionHandle)
     HiveThriftServer2.listener.onSessionCreated(
       session.getIpAddress, sessionHandle.getSessionId.toString, session.getUsername)
-    val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState]
-    val ctx = if (sessionState.hiveThriftServerSingleSession) {
+    val ctx = if (sqlContext.conf.hiveThriftServerSingleSession) {
       sqlContext
     } else {
       sqlContext.newSession()

http://git-wip-us.apache.org/repos/asf/spark/blob/5ac62043/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index 5d20ec9..b6215bd 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -98,9 +98,8 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
       val user = System.getProperty("user.name")
       val sessionHandle = client.openSession(user, "")
 
-      withJdbcStatement { statement =>
+      withJdbcStatement("test_16563") { statement =>
         val queries = Seq(
-          "DROP TABLE IF EXISTS test_16563",
           "CREATE TABLE test_16563(key INT, val STRING)",
           s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_16563")
 
@@ -134,16 +133,14 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
 
           rows_first.numRows()
         }
-        statement.executeQuery("DROP TABLE IF EXISTS test_16563")
       }
     }
   }
 
   test("JDBC query execution") {
-    withJdbcStatement { statement =>
+    withJdbcStatement("test") { statement =>
       val queries = Seq(
         "SET spark.sql.shuffle.partitions=3",
-        "DROP TABLE IF EXISTS test",
         "CREATE TABLE test(key INT, val STRING)",
         s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test",
         "CACHE TABLE test")
@@ -159,7 +156,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
   }
 
   test("Checks Hive version") {
-    withJdbcStatement { statement =>
+    withJdbcStatement() { statement =>
       val resultSet = statement.executeQuery("SET spark.sql.hive.version")
       resultSet.next()
       assert(resultSet.getString(1) === "spark.sql.hive.version")
@@ -168,9 +165,8 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
   }
 
   test("SPARK-3004 regression: result set containing NULL") {
-    withJdbcStatement { statement =>
+    withJdbcStatement("test_null") { statement =>
       val queries = Seq(
-        "DROP TABLE IF EXISTS test_null",
         "CREATE TABLE test_null(key INT, val STRING)",
         s"LOAD DATA LOCAL INPATH '${TestData.smallKvWithNull}' OVERWRITE INTO TABLE test_null")
 
@@ -189,9 +185,8 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
   }
 
   test("SPARK-4292 regression: result set iterator issue") {
-    withJdbcStatement { statement =>
+    withJdbcStatement("test_4292") { statement =>
       val queries = Seq(
-        "DROP TABLE IF EXISTS test_4292",
         "CREATE TABLE test_4292(key INT, val STRING)",
         s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_4292")
 
@@ -203,15 +198,12 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
         resultSet.next()
         assert(resultSet.getInt(1) === key)
       }
-
-      statement.executeQuery("DROP TABLE IF EXISTS test_4292")
     }
   }
 
   test("SPARK-4309 regression: Date type support") {
-    withJdbcStatement { statement =>
+    withJdbcStatement("test_date") { statement =>
       val queries = Seq(
-        "DROP TABLE IF EXISTS test_date",
         "CREATE TABLE test_date(key INT, value STRING)",
         s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_date")
 
@@ -227,9 +219,8 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
   }
 
   test("SPARK-4407 regression: Complex type support") {
-    withJdbcStatement { statement =>
+    withJdbcStatement("test_map") { statement =>
       val queries = Seq(
-        "DROP TABLE IF EXISTS test_map",
         "CREATE TABLE test_map(key INT, value STRING)",
         s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_map")
 
@@ -251,9 +242,8 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
   }
 
   test("SPARK-12143 regression: Binary type support") {
-    withJdbcStatement { statement =>
+    withJdbcStatement("test_binary") { statement =>
       val queries = Seq(
-        "DROP TABLE IF EXISTS test_binary",
         "CREATE TABLE test_binary(key INT, value STRING)",
         s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_binary")
 
@@ -262,7 +252,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
       val expected: Array[Byte] = "val_238".getBytes
       assertResult(expected) {
         val resultSet = statement.executeQuery(
-          "SELECT CAST(value as BINARY) FROM test_date LIMIT 1")
+          "SELECT CAST(value as BINARY) FROM test_binary LIMIT 1")
         resultSet.next()
         resultSet.getObject(1)
       }
@@ -275,12 +265,11 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
     var defaultV2: String = null
     var data: ArrayBuffer[Int] = null
 
-    withMultipleConnectionJdbcStatement(
+    withMultipleConnectionJdbcStatement("test_map")(
       // create table
       { statement =>
 
         val queries = Seq(
-            "DROP TABLE IF EXISTS test_map",
             "CREATE TABLE test_map(key INT, value STRING)",
             s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_map",
             "CACHE TABLE test_table AS SELECT key FROM test_map ORDER BY key DESC",
@@ -418,9 +407,8 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
   // This test often hangs and then times out, leaving the hanging processes.
   // Let's ignore it and improve the test.
   ignore("test jdbc cancel") {
-    withJdbcStatement { statement =>
+    withJdbcStatement("test_map") { statement =>
       val queries = Seq(
-        "DROP TABLE IF EXISTS test_map",
         "CREATE TABLE test_map(key INT, value STRING)",
         s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_map")
 
@@ -478,7 +466,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
   }
 
   test("test add jar") {
-    withMultipleConnectionJdbcStatement(
+    withMultipleConnectionJdbcStatement("smallKV", "addJar")(
       {
         statement =>
           val jarFile =
@@ -492,10 +480,8 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
       {
         statement =>
           val queries = Seq(
-            "DROP TABLE IF EXISTS smallKV",
             "CREATE TABLE smallKV(key INT, val STRING)",
             s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE smallKV",
-            "DROP TABLE IF EXISTS addJar",
             """CREATE TABLE addJar(key string)
               |ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
             """.stripMargin)
@@ -524,15 +510,12 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
           expectedResult.close()
 
           assert(expectedResultBuffer === actualResultBuffer)
-
-          statement.executeQuery("DROP TABLE IF EXISTS addJar")
-          statement.executeQuery("DROP TABLE IF EXISTS smallKV")
       }
     )
   }
 
   test("Checks Hive version via SET -v") {
-    withJdbcStatement { statement =>
+    withJdbcStatement() { statement =>
       val resultSet = statement.executeQuery("SET -v")
 
       val conf = mutable.Map.empty[String, String]
@@ -545,7 +528,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
   }
 
   test("Checks Hive version via SET") {
-    withJdbcStatement { statement =>
+    withJdbcStatement() { statement =>
       val resultSet = statement.executeQuery("SET")
 
       val conf = mutable.Map.empty[String, String]
@@ -558,7 +541,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
   }
 
   test("SPARK-11595 ADD JAR with input path having URL scheme") {
-    withJdbcStatement { statement =>
+    withJdbcStatement("test_udtf") { statement =>
       try {
         val jarPath = "../hive/src/test/resources/TestUDTF.jar"
         val jarURL = s"file://${System.getProperty("user.dir")}/$jarPath"
@@ -586,7 +569,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
         val dataPath = "../hive/src/test/resources/data/files/kv1.txt"
 
         Seq(
-          s"CREATE TABLE test_udtf(key INT, value STRING)",
+          "CREATE TABLE test_udtf(key INT, value STRING)",
           s"LOAD DATA LOCAL INPATH '$dataPath' OVERWRITE INTO TABLE test_udtf"
         ).foreach(statement.execute)
 
@@ -624,8 +607,8 @@ class SingleSessionSuite extends HiveThriftJdbcTest {
   override protected def extraConf: Seq[String] =
     "--conf spark.sql.hive.thriftServer.singleSession=true" :: Nil
 
-  test("test single session") {
-    withMultipleConnectionJdbcStatement(
+  test("share the temporary functions across JDBC connections") {
+    withMultipleConnectionJdbcStatement()(
       { statement =>
         val jarPath = "../hive/src/test/resources/TestUDTF.jar"
         val jarURL = s"file://${System.getProperty("user.dir")}/$jarPath"
@@ -667,16 +650,63 @@ class SingleSessionSuite extends HiveThriftJdbcTest {
       }
     )
   }
+
+  test("unable to changing spark.sql.hive.thriftServer.singleSession using JDBC connections") {
+    withJdbcStatement() { statement =>
+      // JDBC connections are not able to set the conf spark.sql.hive.thriftServer.singleSession
+      val e = intercept[SQLException] {
+        statement.executeQuery("SET spark.sql.hive.thriftServer.singleSession=false")
+      }.getMessage
+      assert(e.contains(
+        "Cannot modify the value of a static config: spark.sql.hive.thriftServer.singleSession"))
+    }
+  }
+
+  test("share the current database and temporary tables across JDBC connections") {
+    withMultipleConnectionJdbcStatement()(
+      { statement =>
+        statement.execute("CREATE DATABASE IF NOT EXISTS db1")
+      },
+
+      { statement =>
+        val rs1 = statement.executeQuery("SELECT current_database()")
+        assert(rs1.next())
+        assert(rs1.getString(1) === "default")
+
+        statement.execute("USE db1")
+
+        val rs2 = statement.executeQuery("SELECT current_database()")
+        assert(rs2.next())
+        assert(rs2.getString(1) === "db1")
+
+        statement.execute("CREATE TEMP VIEW tempView AS SELECT 123")
+      },
+
+      { statement =>
+        // the current database is set to db1 by another JDBC connection.
+        val rs1 = statement.executeQuery("SELECT current_database()")
+        assert(rs1.next())
+        assert(rs1.getString(1) === "db1")
+
+        val rs2 = statement.executeQuery("SELECT * from tempView")
+        assert(rs2.next())
+        assert(rs2.getString(1) === "123")
+
+        statement.execute("USE default")
+        statement.execute("DROP VIEW tempView")
+        statement.execute("DROP DATABASE db1 CASCADE")
+      }
+    )
+  }
 }
 
 class HiveThriftHttpServerSuite extends HiveThriftJdbcTest {
   override def mode: ServerMode.Value = ServerMode.http
 
   test("JDBC query execution") {
-    withJdbcStatement { statement =>
+    withJdbcStatement("test") { statement =>
       val queries = Seq(
         "SET spark.sql.shuffle.partitions=3",
-        "DROP TABLE IF EXISTS test",
         "CREATE TABLE test(key INT, val STRING)",
         s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test",
         "CACHE TABLE test")
@@ -692,7 +722,7 @@ class HiveThriftHttpServerSuite extends HiveThriftJdbcTest {
   }
 
   test("Checks Hive version") {
-    withJdbcStatement { statement =>
+    withJdbcStatement() { statement =>
       val resultSet = statement.executeQuery("SET spark.sql.hive.version")
       resultSet.next()
       assert(resultSet.getString(1) === "spark.sql.hive.version")
@@ -718,7 +748,7 @@ abstract class HiveThriftJdbcTest extends HiveThriftServer2Test {
     s"jdbc:hive2://localhost:$serverPort/"
   }
 
-  def withMultipleConnectionJdbcStatement(fs: (Statement => Unit)*) {
+  def withMultipleConnectionJdbcStatement(tableNames: String*)(fs: (Statement => Unit)*) {
     val user = System.getProperty("user.name")
     val connections = fs.map { _ => DriverManager.getConnection(jdbcUri, user, "") }
     val statements = connections.map(_.createStatement())
@@ -726,13 +756,16 @@ abstract class HiveThriftJdbcTest extends HiveThriftServer2Test {
     try {
       statements.zip(fs).foreach { case (s, f) => f(s) }
     } finally {
+      tableNames.foreach { name =>
+        statements(0).execute(s"DROP TABLE IF EXISTS $name")
+      }
       statements.foreach(_.close())
       connections.foreach(_.close())
     }
   }
 
-  def withJdbcStatement(f: Statement => Unit) {
-    withMultipleConnectionJdbcStatement(f)
+  def withJdbcStatement(tableNames: String*)(f: Statement => Unit) {
+    withMultipleConnectionJdbcStatement(tableNames: _*)(f)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5ac62043/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/UISeleniumSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/UISeleniumSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/UISeleniumSuite.scala
index bf431cd..4c53dd8 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/UISeleniumSuite.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/UISeleniumSuite.scala
@@ -74,7 +74,7 @@ class UISeleniumSuite
   }
 
   ignore("thrift server ui test") {
-    withJdbcStatement { statement =>
+    withJdbcStatement("test_map") { statement =>
       val baseURL = s"http://localhost:$uiPort"
 
       val queries = Seq(

http://git-wip-us.apache.org/repos/asf/spark/blob/5ac62043/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index 6d4fe1a..bea073f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -141,10 +141,4 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
     conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)
   }
 
-  // TODO: why do we get this from SparkConf but not SQLConf?
-  def hiveThriftServerSingleSession: Boolean = {
-    sparkSession.sparkContext.conf.getBoolean(
-      "spark.sql.hive.thriftServer.singleSession", defaultValue = false)
-  }
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org