You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/11/28 17:42:46 UTC

spark git commit: [SPARK-4645][SQL] Disables asynchronous execution in Hive 0.13.1 HiveThriftServer2

Repository: spark
Updated Branches:
  refs/heads/master ceb628197 -> 5b99bf243


[SPARK-4645][SQL] Disables asynchronous execution in Hive 0.13.1 HiveThriftServer2

This PR disables HiveThriftServer2 asynchronous execution by setting `runInBackground` argument in `ExecuteStatementOperation` to `false`, and reverting `SparkExecuteStatementOperation.run` in Hive 13 shim to Hive 12 version. This change makes Simba ODBC driver v1.0.0.1000 work.

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/3506)
<!-- Reviewable:end -->

Author: Cheng Lian <li...@databricks.com>

Closes #3506 from liancheng/disable-async-exec and squashes the following commits:

593804d [Cheng Lian] Disables asynchronous execution in Hive 0.13.1 HiveThriftServer2


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5b99bf24
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5b99bf24
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5b99bf24

Branch: refs/heads/master
Commit: 5b99bf243e2956fe933ab2dccd069266e82cad8d
Parents: ceb6281
Author: Cheng Lian <li...@databricks.com>
Authored: Fri Nov 28 11:42:40 2014 -0500
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Fri Nov 28 11:42:40 2014 -0500

----------------------------------------------------------------------
 .../spark/sql/hive/thriftserver/Shim13.scala    | 139 ++++++-------------
 1 file changed, 39 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5b99bf24/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
index 99c1987..17f1ad3 100644
--- a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
+++ b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
@@ -17,30 +17,25 @@
 
 package org.apache.spark.sql.hive.thriftserver
 
-import java.security.PrivilegedExceptionAction
 import java.sql.{Date, Timestamp}
-import java.util.concurrent.Future
 import java.util.{ArrayList => JArrayList, List => JList, Map => JMap}
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable.{ArrayBuffer, Map => SMap}
 import scala.math._
 
-import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.metastore.api.FieldSchema
-import org.apache.hadoop.hive.ql.metadata.Hive
-import org.apache.hadoop.hive.ql.session.SessionState
-import org.apache.hadoop.hive.shims.ShimLoader
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hive.service.cli._
 import org.apache.hive.service.cli.operation.ExecuteStatementOperation
 import org.apache.hive.service.cli.session.HiveSession
 
 import org.apache.spark.Logging
+import org.apache.spark.sql.catalyst.plans.logical.SetCommand
 import org.apache.spark.sql.catalyst.types._
 import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
 import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
-import org.apache.spark.sql.{SchemaRDD, Row => SparkRow}
+import org.apache.spark.sql.{Row => SparkRow, SQLConf, SchemaRDD}
 
 /**
  * A compatibility layer for interacting with Hive version 0.13.1.
@@ -48,7 +43,9 @@ import org.apache.spark.sql.{SchemaRDD, Row => SparkRow}
 private[thriftserver] object HiveThriftServerShim {
   val version = "0.13.1"
 
-  def setServerUserName(sparkServiceUGI: UserGroupInformation, sparkCliService:SparkSQLCLIService) = {
+  def setServerUserName(
+      sparkServiceUGI: UserGroupInformation,
+      sparkCliService:SparkSQLCLIService) = {
     setSuperField(sparkCliService, "serviceUGI", sparkServiceUGI)
   }
 }
@@ -72,39 +69,14 @@ private[hive] class SparkExecuteStatementOperation(
     confOverlay: JMap[String, String],
     runInBackground: Boolean = true)(
     hiveContext: HiveContext,
-    sessionToActivePool: SMap[HiveSession, String]) extends ExecuteStatementOperation(
-  parentSession, statement, confOverlay, runInBackground) with Logging {
+    sessionToActivePool: SMap[HiveSession, String])
+  // NOTE: `runInBackground` is set to `false` intentionally to disable asynchronous execution
+  extends ExecuteStatementOperation(parentSession, statement, confOverlay, false) with Logging {
 
   private var result: SchemaRDD = _
   private var iter: Iterator[SparkRow] = _
   private var dataTypes: Array[DataType] = _
 
-  private def runInternal(cmd: String) = {
-    try {
-      result = hiveContext.sql(cmd)
-      logDebug(result.queryExecution.toString())
-      val groupId = round(random * 1000000).toString
-      hiveContext.sparkContext.setJobGroup(groupId, statement)
-      iter = {
-        val useIncrementalCollect =
-          hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean
-        if (useIncrementalCollect) {
-          result.toLocalIterator
-        } else {
-          result.collect().iterator
-        }
-      }
-      dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray
-    } catch {
-      // Actually do need to catch Throwable as some failures don't inherit from Exception and
-      // HiveServer will silently swallow them.
-      case e: Throwable =>
-        setState(OperationState.ERROR)
-        logError("Error executing query:",e)
-        throw new HiveSQLException(e.toString)
-    }
-  }
-
   def close(): Unit = {
     // RDDs will be cleaned automatically upon garbage collection.
     logDebug("CLOSING")
@@ -182,76 +154,43 @@ private[hive] class SparkExecuteStatementOperation(
     }
   }
 
-  private def getConfigForOperation: HiveConf = {
-    var sqlOperationConf: HiveConf = getParentSession.getHiveConf
-    if (!getConfOverlay.isEmpty || shouldRunAsync) {
-      sqlOperationConf = new HiveConf(sqlOperationConf)
-      import scala.collection.JavaConversions._
-      for (confEntry <- getConfOverlay.entrySet) {
-        try {
-          sqlOperationConf.verifyAndSet(confEntry.getKey, confEntry.getValue)
-        }
-        catch { case e: IllegalArgumentException =>
-          throw new HiveSQLException("Error applying statement specific settings", e)
-        }
-      }
-    }
-    sqlOperationConf
-  }
-
   def run(): Unit = {
     logInfo(s"Running query '$statement'")
-    val opConfig: HiveConf = getConfigForOperation
     setState(OperationState.RUNNING)
-    setHasResultSet(true)
-
-    if (!shouldRunAsync) {
-      runInternal(statement)
-      setState(OperationState.FINISHED)
-    } else {
-      val parentSessionState = SessionState.get
-      val sessionHive: Hive = Hive.get
-      val currentUGI: UserGroupInformation = ShimLoader.getHadoopShims.getUGIForConf(opConfig)
-
-      val backgroundOperation: Runnable = new Runnable {
-        def run() {
-          val doAsAction: PrivilegedExceptionAction[AnyRef] =
-            new PrivilegedExceptionAction[AnyRef] {
-              def run: AnyRef = {
-                Hive.set(sessionHive)
-                SessionState.setCurrentSessionState(parentSessionState)
-                try {
-                  runInternal(statement)
-                }
-                catch { case e: HiveSQLException =>
-                  setOperationException(e)
-                  logError("Error running hive query: ", e)
-                }
-                null
-              }
-            }
-          try {
-            ShimLoader.getHadoopShims.doAs(currentUGI, doAsAction)
-          }
-          catch { case e: Exception =>
-            setOperationException(new HiveSQLException(e))
-            logError("Error running hive query as user : " + currentUGI.getShortUserName, e)
-          }
-          setState(OperationState.FINISHED)
-        }
+    try {
+      result = hiveContext.sql(statement)
+      logDebug(result.queryExecution.toString())
+      result.queryExecution.logical match {
+        case SetCommand(Some((SQLConf.THRIFTSERVER_POOL, Some(value)))) =>
+          sessionToActivePool(parentSession) = value
+          logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.")
+        case _ =>
       }
 
-      try {
-        val backgroundHandle: Future[_] = getParentSession.getSessionManager.
-          submitBackgroundOperation(backgroundOperation)
-        setBackgroundHandle(backgroundHandle)
-      } catch {
-        // Actually do need to catch Throwable as some failures don't inherit from Exception and
-        // HiveServer will silently swallow them.
-        case e: Throwable =>
-          logError("Error executing query:",e)
-          throw new HiveSQLException(e.toString)
+      val groupId = round(random * 1000000).toString
+      hiveContext.sparkContext.setJobGroup(groupId, statement)
+      sessionToActivePool.get(parentSession).foreach { pool =>
+        hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool)
+      }
+      iter = {
+        val useIncrementalCollect =
+          hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean
+        if (useIncrementalCollect) {
+          result.toLocalIterator
+        } else {
+          result.collect().iterator
+        }
       }
+      dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray
+      setHasResultSet(true)
+    } catch {
+      // Actually do need to catch Throwable as some failures don't inherit from Exception and
+      // HiveServer will silently swallow them.
+      case e: Throwable =>
+        setState(OperationState.ERROR)
+        logError("Error executing query:", e)
+        throw new HiveSQLException(e.toString)
     }
+    setState(OperationState.FINISHED)
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org