You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/11/11 01:56:48 UTC

spark git commit: [SPARK-4308][SQL] Sets SQL operation state to ERROR when exception is thrown

Repository: spark
Updated Branches:
  refs/heads/master 534b23141 -> acb55aedd


[SPARK-4308][SQL] Sets SQL operation state to ERROR when exception is thrown

In `HiveThriftServer2`, when an exception is thrown during a SQL execution, the SQL operation state should be set to `ERROR`, but now it remains `RUNNING`. This affects the result of the `GetOperationStatus` Thrift API.

Author: Cheng Lian <li...@databricks.com>

Closes #3175 from liancheng/fix-op-state and squashes the following commits:

6d4c1fe [Cheng Lian] Sets SQL operation state to ERROR when exception is thrown


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/acb55aed
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/acb55aed
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/acb55aed

Branch: refs/heads/master
Commit: acb55aeddbe58758d75b9aed130634afe21797cf
Parents: 534b231
Author: Cheng Lian <li...@databricks.com>
Authored: Mon Nov 10 16:56:36 2014 -0800
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Mon Nov 10 16:56:36 2014 -0800

----------------------------------------------------------------------
 .../thriftserver/AbstractSparkSQLDriver.scala   |  2 --
 .../spark/sql/hive/thriftserver/Shim12.scala    | 12 +++----
 .../spark/sql/hive/thriftserver/Shim13.scala    | 36 ++++++++------------
 3 files changed, 21 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/acb55aed/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/AbstractSparkSQLDriver.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/AbstractSparkSQLDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/AbstractSparkSQLDriver.scala
index fcb302e..6ed8fd2 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/AbstractSparkSQLDriver.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/AbstractSparkSQLDriver.scala
@@ -19,8 +19,6 @@ package org.apache.spark.sql.hive.thriftserver
 
 import scala.collection.JavaConversions._
 
-import java.util.{ArrayList => JArrayList}
-
 import org.apache.commons.lang.exception.ExceptionUtils
 import org.apache.hadoop.hive.metastore.api.{FieldSchema, Schema}
 import org.apache.hadoop.hive.ql.Driver

http://git-wip-us.apache.org/repos/asf/spark/blob/acb55aed/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala
index e3ba991..aa2e3ca 100644
--- a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala
+++ b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala
@@ -25,9 +25,7 @@ import scala.collection.mutable.{ArrayBuffer, Map => SMap}
 import scala.math._
 
 import org.apache.hadoop.hive.common.`type`.HiveDecimal
-import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.metastore.api.FieldSchema
-import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory
 import org.apache.hadoop.hive.shims.ShimLoader
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hive.service.cli._
@@ -37,9 +35,9 @@ import org.apache.hive.service.cli.session.HiveSession
 import org.apache.spark.Logging
 import org.apache.spark.sql.catalyst.plans.logical.SetCommand
 import org.apache.spark.sql.catalyst.types._
-import org.apache.spark.sql.{Row => SparkRow, SQLConf, SchemaRDD}
-import org.apache.spark.sql.hive.{HiveMetastoreTypes, HiveContext}
 import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
+import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
+import org.apache.spark.sql.{SQLConf, SchemaRDD, Row => SparkRow}
 
 /**
  * A compatibility layer for interacting with Hive version 0.12.0.
@@ -71,8 +69,9 @@ private[hive] class SparkExecuteStatementOperation(
     statement: String,
     confOverlay: JMap[String, String])(
     hiveContext: HiveContext,
-    sessionToActivePool: SMap[HiveSession, String]) extends ExecuteStatementOperation(
-  parentSession, statement, confOverlay) with Logging {
+    sessionToActivePool: SMap[HiveSession, String])
+  extends ExecuteStatementOperation(parentSession, statement, confOverlay) with Logging {
+
   private var result: SchemaRDD = _
   private var iter: Iterator[SparkRow] = _
   private var dataTypes: Array[DataType] = _
@@ -216,6 +215,7 @@ private[hive] class SparkExecuteStatementOperation(
       // Actually do need to catch Throwable as some failures don't inherit from Exception and
       // HiveServer will silently swallow them.
       case e: Throwable =>
+        setState(OperationState.ERROR)
         logError("Error executing query:",e)
         throw new HiveSQLException(e.toString)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/acb55aed/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
index f2ceba8..a642478 100644
--- a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
+++ b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
@@ -27,10 +27,9 @@ import scala.collection.mutable.{ArrayBuffer, Map => SMap}
 import scala.math._
 
 import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hadoop.hive.metastore.api.FieldSchema
 import org.apache.hadoop.hive.ql.metadata.Hive
-import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory
 import org.apache.hadoop.hive.ql.session.SessionState
-import org.apache.hadoop.hive.metastore.api.FieldSchema
 import org.apache.hadoop.hive.shims.ShimLoader
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hive.service.cli._
@@ -39,9 +38,9 @@ import org.apache.hive.service.cli.session.HiveSession
 
 import org.apache.spark.Logging
 import org.apache.spark.sql.catalyst.types._
-import org.apache.spark.sql.{Row => SparkRow, SchemaRDD}
-import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
 import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
+import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
+import org.apache.spark.sql.{SchemaRDD, Row => SparkRow}
 
 /**
  * A compatibility layer for interacting with Hive version 0.12.0.
@@ -100,6 +99,7 @@ private[hive] class SparkExecuteStatementOperation(
       // Actually do need to catch Throwable as some failures don't inherit from Exception and
       // HiveServer will silently swallow them.
       case e: Throwable =>
+        setState(OperationState.ERROR)
         logError("Error executing query:",e)
         throw new HiveSQLException(e.toString)
     }
@@ -194,14 +194,12 @@ private[hive] class SparkExecuteStatementOperation(
         try {
           sqlOperationConf.verifyAndSet(confEntry.getKey, confEntry.getValue)
         }
-        catch {
-          case e: IllegalArgumentException => {
-            throw new HiveSQLException("Error applying statement specific settings", e)
-          }
+        catch { case e: IllegalArgumentException =>
+          throw new HiveSQLException("Error applying statement specific settings", e)
         }
       }
     }
-    return sqlOperationConf
+    sqlOperationConf
   }
 
   def run(): Unit = {
@@ -219,7 +217,7 @@ private[hive] class SparkExecuteStatementOperation(
       val currentUGI: UserGroupInformation = ShimLoader.getHadoopShims.getUGIForConf(opConfig)
 
       val backgroundOperation: Runnable = new Runnable {
-        def run {
+        def run() {
           val doAsAction: PrivilegedExceptionAction[AnyRef] =
             new PrivilegedExceptionAction[AnyRef] {
               def run: AnyRef = {
@@ -228,23 +226,19 @@ private[hive] class SparkExecuteStatementOperation(
                 try {
                   runInternal(statement)
                 }
-                catch {
-                  case e: HiveSQLException => {
-                    setOperationException(e)
-                    logError("Error running hive query: ", e)
-                  }
+                catch { case e: HiveSQLException =>
+                  setOperationException(e)
+                  logError("Error running hive query: ", e)
                 }
-                return null
+                null
               }
             }
           try {
             ShimLoader.getHadoopShims.doAs(currentUGI, doAsAction)
           }
-          catch {
-            case e: Exception => {
-              setOperationException(new HiveSQLException(e))
-              logError("Error running hive query as user : " + currentUGI.getShortUserName, e)
-            }
+          catch { case e: Exception =>
+            setOperationException(new HiveSQLException(e))
+            logError("Error running hive query as user : " + currentUGI.getShortUserName, e)
           }
           setState(OperationState.FINISHED)
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org