You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/12/16 21:38:18 UTC

spark git commit: [SQL] SPARK-4700: Add HTTP protocol spark thrift server

Repository: spark
Updated Branches:
  refs/heads/master d12c0711f -> 17688d142


[SQL] SPARK-4700: Add HTTP protocol spark thrift server

Add HTTP protocol support and test cases to spark thrift server, so users can deploy thrift server in both TCP and http mode.

Author: Judy Nash <ju...@microsoft.com>
Author: judynash <ju...@microsoft.com>

Closes #3672 from judynash/master and squashes the following commits:

526315d [Judy Nash] correct spacing on startThriftServer method
31a6520 [Judy Nash] fix code style issues and update sql programming guide format issue
47bf87e [Judy Nash] modify withJdbcStatement method definition to meet less than 100 line length
2e9c11c [Judy Nash] add thrift server in http mode documentation on sql programming guide
1cbd305 [Judy Nash] Merge remote-tracking branch 'upstream/master'
2b1d312 [Judy Nash] updated http thrift server support based on feedback
377532c [judynash] add HTTP protocol spark thrift server


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/17688d14
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/17688d14
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/17688d14

Branch: refs/heads/master
Commit: 17688d14299f18a93591818ae5fef69e9dc20eb5
Parents: d12c071
Author: Judy Nash <ju...@microsoft.com>
Authored: Tue Dec 16 12:37:26 2014 -0800
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Tue Dec 16 12:37:26 2014 -0800

----------------------------------------------------------------------
 docs/sql-programming-guide.md                   | 12 +++
 .../hive/thriftserver/HiveThriftServer2.scala   | 21 +++++-
 .../thriftserver/HiveThriftServer2Suite.scala   | 77 ++++++++++++++++----
 3 files changed, 93 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/17688d14/docs/sql-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index be284fb..ad51b9c 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -938,6 +938,18 @@ Configuration of Hive is done by placing your `hive-site.xml` file in `conf/`.
 
 You may also use the beeline script that comes with Hive.
 
+Thrift JDBC server also supports sending thrift RPC messages over HTTP transport. 
+Use the following setting to enable HTTP mode as system property or in `hive-site.xml` file in `conf/`: 
+
+    hive.server2.transport.mode - Set this to value: http 
+    hive.server2.thrift.http.port - HTTP port number fo listen on; default is 10001
+    hive.server2.http.endpoint - HTTP endpoint; default is cliservice
+
+To test, use beeline to connect to the JDBC/ODBC server in http mode with:
+
+    beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint>
+
+
 ## Running the Spark SQL CLI
 
 The Spark SQL CLI is a convenient tool to run the Hive metastore service in local mode and execute

http://git-wip-us.apache.org/repos/asf/spark/blob/17688d14/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
index bd4e994..c5b7323 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
@@ -19,7 +19,8 @@ package org.apache.spark.sql.hive.thriftserver
 
 import org.apache.commons.logging.LogFactory
 import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hive.service.cli.thrift.ThriftBinaryCLIService
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars
+import org.apache.hive.service.cli.thrift.{ThriftBinaryCLIService, ThriftHttpCLIService}
 import org.apache.hive.service.server.{HiveServer2, ServerOptionsProcessor}
 
 import org.apache.spark.Logging
@@ -85,10 +86,22 @@ private[hive] class HiveThriftServer2(hiveContext: HiveContext)
     setSuperField(this, "cliService", sparkSqlCliService)
     addService(sparkSqlCliService)
 
-    val thriftCliService = new ThriftBinaryCLIService(sparkSqlCliService)
-    setSuperField(this, "thriftCLIService", thriftCliService)
-    addService(thriftCliService)
+    if (isHTTPTransportMode(hiveConf)) {
+      val thriftCliService = new ThriftHttpCLIService(sparkSqlCliService)
+      setSuperField(this, "thriftCLIService", thriftCliService)
+      addService(thriftCliService)
+    } else {
+      val thriftCliService = new ThriftBinaryCLIService(sparkSqlCliService)
+      setSuperField(this, "thriftCLIService", thriftCliService)
+      addService(thriftCliService)
+    }
 
     initCompositeService(hiveConf)
   }
+
+  private def isHTTPTransportMode(hiveConf: HiveConf): Boolean = {
+    val transportMode: String = hiveConf.getVar(ConfVars.HIVE_SERVER2_TRANSPORT_MODE)
+    transportMode.equalsIgnoreCase("http")
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/17688d14/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala
index 23d12cb..94d5ed4 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala
@@ -70,11 +70,20 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
     port
   }
 
-  def withJdbcStatement(serverStartTimeout: FiniteDuration = 1.minute)(f: Statement => Unit) {
+  def withJdbcStatement(
+      serverStartTimeout: FiniteDuration = 1.minute,
+      httpMode: Boolean = false)(
+      f: Statement => Unit) {
     val port = randomListeningPort
 
-    startThriftServer(port, serverStartTimeout) {
-      val jdbcUri = s"jdbc:hive2://${"localhost"}:$port/"
+    startThriftServer(port, serverStartTimeout, httpMode) {
+      val jdbcUri = if (httpMode) {
+        s"jdbc:hive2://${"localhost"}:$port/" +
+          "default?hive.server2.transport.mode=http;hive.server2.thrift.http.path=cliservice"
+      } else {
+        s"jdbc:hive2://${"localhost"}:$port/"
+      }
+
       val user = System.getProperty("user.name")
       val connection = DriverManager.getConnection(jdbcUri, user, "")
       val statement = connection.createStatement()
@@ -113,7 +122,8 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
 
   def startThriftServer(
       port: Int,
-      serverStartTimeout: FiniteDuration = 1.minute)(
+      serverStartTimeout: FiniteDuration = 1.minute,
+      httpMode: Boolean = false)(
       f: => Unit) {
     val startScript = "../../sbin/start-thriftserver.sh".split("/").mkString(File.separator)
     val stopScript = "../../sbin/stop-thriftserver.sh".split("/").mkString(File.separator)
@@ -121,15 +131,28 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
     val warehousePath = getTempFilePath("warehouse")
     val metastorePath = getTempFilePath("metastore")
     val metastoreJdbcUri = s"jdbc:derby:;databaseName=$metastorePath;create=true"
+
     val command =
-      s"""$startScript
-         |  --master local
-         |  --hiveconf hive.root.logger=INFO,console
-         |  --hiveconf ${ConfVars.METASTORECONNECTURLKEY}=$metastoreJdbcUri
-         |  --hiveconf ${ConfVars.METASTOREWAREHOUSE}=$warehousePath
-         |  --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST}=${"localhost"}
-         |  --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_PORT}=$port
-       """.stripMargin.split("\\s+").toSeq
+      if (httpMode) {
+          s"""$startScript
+             |  --master local
+             |  --hiveconf hive.root.logger=INFO,console
+             |  --hiveconf ${ConfVars.METASTORECONNECTURLKEY}=$metastoreJdbcUri
+             |  --hiveconf ${ConfVars.METASTOREWAREHOUSE}=$warehousePath
+             |  --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST}=localhost
+             |  --hiveconf ${ConfVars.HIVE_SERVER2_TRANSPORT_MODE}=http
+             |  --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT}=$port
+           """.stripMargin.split("\\s+").toSeq
+      } else {
+          s"""$startScript
+             |  --master local
+             |  --hiveconf hive.root.logger=INFO,console
+             |  --hiveconf ${ConfVars.METASTORECONNECTURLKEY}=$metastoreJdbcUri
+             |  --hiveconf ${ConfVars.METASTOREWAREHOUSE}=$warehousePath
+             |  --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST}=localhost
+             |  --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_PORT}=$port
+           """.stripMargin.split("\\s+").toSeq
+      }
 
     val serverRunning = Promise[Unit]()
     val buffer = new ArrayBuffer[String]()
@@ -140,7 +163,8 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
 
     def captureLogOutput(line: String): Unit = {
       buffer += line
-      if (line.contains("ThriftBinaryCLIService listening on")) {
+      if (line.contains("ThriftBinaryCLIService listening on") ||
+          line.contains("Started ThriftHttpCLIService in http")) {
         serverRunning.success(())
       }
     }
@@ -217,6 +241,25 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
     }
   }
 
+  test("Test JDBC query execution in Http Mode") {
+    withJdbcStatement(httpMode = true) { statement =>
+      val queries = Seq(
+        "SET spark.sql.shuffle.partitions=3",
+        "DROP TABLE IF EXISTS test",
+        "CREATE TABLE test(key INT, val STRING)",
+        s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test",
+        "CACHE TABLE test")
+
+      queries.foreach(statement.execute)
+
+      assertResult(5, "Row count mismatch") {
+        val resultSet = statement.executeQuery("SELECT COUNT(*) FROM test")
+        resultSet.next()
+        resultSet.getInt(1)
+      }
+    }
+  }
+
   test("SPARK-3004 regression: result set containing NULL") {
     withJdbcStatement() { statement =>
       val queries = Seq(
@@ -267,6 +310,14 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
     }
   }
 
+  test("Checks Hive version in Http Mode") {
+    withJdbcStatement(httpMode = true) { statement =>
+      val resultSet = statement.executeQuery("SET spark.sql.hive.version")
+      resultSet.next()
+      assert(resultSet.getString(1) === s"spark.sql.hive.version=${HiveShim.version}")
+    }
+  }
+
   test("SPARK-4292 regression: result set iterator issue") {
     withJdbcStatement() { statement =>
       val queries = Seq(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org