You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/07/30 06:45:32 UTC

[spark] branch master updated: [SPARK-32412][SQL] Unify error handling for spark thrift server operations

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 510a165  [SPARK-32412][SQL] Unify error handling for spark thrift server operations
510a165 is described below

commit 510a1656e650246a708d3866c8a400b7a1b9f962
Author: Kent Yao <ya...@hotmail.com>
AuthorDate: Thu Jul 30 06:44:13 2020 +0000

    [SPARK-32412][SQL] Unify error handling for spark thrift server operations
    
    ### What changes were proposed in this pull request?
    
    Log error/warn message only once at the server-side for both sync and async modes
    
    ### Why are the changes needed?
    
    In https://github.com/apache/spark/commit/b151194299f5ba15e0d9d8d7d2980fd164fe0822 we make the error logging for  SparkExecuteStatementOperation with `runInBackground=true` not duplicated, but the operations with runInBackground=false and other metadata operation still will be log twice which happened in the operation's `runInternal` method and ThriftCLIService.
    
    In this PR, I propose to reflect the logic to get a unified error handling approach.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, when spark.sql.hive.thriftServer.async=false and people call sync APIs the error message will be logged only once at server-side.
    ### How was this patch tested?
    
    locally verified the result in target/unit-test.log
    
    add unit tests.
    
    Closes #29204 from yaooqinn/SPARK-32412.
    
    Authored-by: Kent Yao <ya...@hotmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../SparkExecuteStatementOperation.scala           | 56 +++-----------------
 .../sql/hive/thriftserver/SparkOperation.scala     | 35 ++++++++++---
 .../ThriftServerWithSparkContextSuite.scala        | 60 +++++++++++++++-------
 3 files changed, 76 insertions(+), 75 deletions(-)

diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index d30951f..922af72 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -19,11 +19,9 @@ package org.apache.spark.sql.hive.thriftserver
 
 import java.security.PrivilegedExceptionAction
 import java.util.{Arrays, Map => JMap}
-import java.util.concurrent.RejectedExecutionException
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
-import scala.util.control.NonFatal
 
 import org.apache.hadoop.hive.metastore.api.FieldSchema
 import org.apache.hadoop.hive.shims.Utils
@@ -38,7 +36,6 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.VariableSubstitution
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.CalendarInterval
-import org.apache.spark.util.{Utils => SparkUtils}
 
 private[hive] class SparkExecuteStatementOperation(
     val sqlContext: SQLContext,
@@ -113,7 +110,7 @@ private[hive] class SparkExecuteStatementOperation(
   }
 
   def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = withLocalProperties {
-    log.info(s"Received getNextRowSet request order=${order} and maxRowsL=${maxRowsL} " +
+    logInfo(s"Received getNextRowSet request order=${order} and maxRowsL=${maxRowsL} " +
       s"with ${statementId}")
     validateDefaultFetchOrientation(order)
     assertState(OperationState.FINISHED)
@@ -182,7 +179,7 @@ private[hive] class SparkExecuteStatementOperation(
         resultOffset += 1
       }
       previousFetchEndOffset = resultOffset
-      log.info(s"Returning result set with ${curRow} rows from offsets " +
+      logInfo(s"Returning result set with ${curRow} rows from offsets " +
         s"[$previousFetchStartOffset, $previousFetchEndOffset) with $statementId")
       resultRowSet
     }
@@ -219,7 +216,9 @@ private[hive] class SparkExecuteStatementOperation(
                   execute()
                 }
               } catch {
-                case e: HiveSQLException => setOperationException(e)
+                case e: HiveSQLException =>
+                  setOperationException(e)
+                  logError(s"Error executing query with $statementId,", e)
               }
             }
           }
@@ -239,21 +238,7 @@ private[hive] class SparkExecuteStatementOperation(
         val backgroundHandle =
           parentSession.getSessionManager().submitBackgroundOperation(backgroundOperation)
         setBackgroundHandle(backgroundHandle)
-      } catch {
-        case rejected: RejectedExecutionException =>
-          logError("Error submitting query in background, query rejected", rejected)
-          setState(OperationState.ERROR)
-          HiveThriftServer2.eventManager.onStatementError(
-            statementId, rejected.getMessage, SparkUtils.exceptionString(rejected))
-          throw new HiveSQLException("The background threadpool cannot accept" +
-            " new task for execution, please retry the operation", rejected)
-        case NonFatal(e) =>
-          logError(s"Error executing query in background", e)
-          setState(OperationState.ERROR)
-          HiveThriftServer2.eventManager.onStatementError(
-            statementId, e.getMessage, SparkUtils.exceptionString(e))
-          throw new HiveSQLException(e)
-      }
+      } catch onError()
     }
   }
 
@@ -294,30 +279,7 @@ private[hive] class SparkExecuteStatementOperation(
       }
       dataTypes = result.schema.fields.map(_.dataType)
     } catch {
-      // Actually do need to catch Throwable as some failures don't inherit from Exception and
-      // HiveServer will silently swallow them.
-      case e: Throwable =>
-        // When cancel() or close() is called very quickly after the query is started,
-        // then they may both call cleanup() before Spark Jobs are started. But before background
-        // task interrupted, it may have start some spark job, so we need to cancel again to
-        // make sure job was cancelled when background thread was interrupted
-        if (statementId != null) {
-          sqlContext.sparkContext.cancelJobGroup(statementId)
-        }
-        val currentState = getStatus().getState()
-        if (currentState.isTerminal) {
-          // This may happen if the execution was cancelled, and then closed from another thread.
-          logWarning(s"Ignore exception in terminal state with $statementId: $e")
-        } else {
-          logError(s"Error executing query with $statementId, currentState $currentState, ", e)
-          setState(OperationState.ERROR)
-          HiveThriftServer2.eventManager.onStatementError(
-            statementId, e.getMessage, SparkUtils.exceptionString(e))
-          e match {
-            case _: HiveSQLException => throw e
-            case _ => throw new HiveSQLException("Error running query: " + e.toString, e)
-          }
-        }
+      onError(needCancel = true)
     } finally {
       synchronized {
         if (!getStatus.getState.isTerminal) {
@@ -348,9 +310,7 @@ private[hive] class SparkExecuteStatementOperation(
       }
     }
     // RDDs will be cleaned automatically upon garbage collection.
-    if (statementId != null) {
-      sqlContext.sparkContext.cancelJobGroup(statementId)
-    }
+    sqlContext.sparkContext.cancelJobGroup(statementId)
   }
 }
 
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkOperation.scala
index bbfc1b8..8e8b2d7 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkOperation.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkOperation.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.hive.thriftserver
 
+import java.util.concurrent.RejectedExecutionException
+
 import org.apache.hive.service.cli.{HiveSQLException, OperationState}
 import org.apache.hive.service.cli.operation.Operation
 
@@ -94,15 +96,32 @@ private[hive] trait SparkOperation extends Operation with Logging {
       throw new IllegalArgumentException(s"Unknown table type is found: $t")
   }
 
-  protected def onError(): PartialFunction[Throwable, Unit] = {
+  protected def onError(needCancel: Boolean = false): PartialFunction[Throwable, Unit] = {
+    // Actually do need to catch Throwable as some failures don't inherit from Exception and
+    // HiveServer will silently swallow them.
     case e: Throwable =>
-      logError(s"Error operating $getType with $statementId", e)
-      super.setState(OperationState.ERROR)
-      HiveThriftServer2.eventManager.onStatementError(
-        statementId, e.getMessage, Utils.exceptionString(e))
-      e match {
-        case _: HiveSQLException => throw e
-        case _ => throw new HiveSQLException(s"Error operating $getType ${e.getMessage}", e)
+      // When cancel() or close() is called very quickly after the query is started,
+      // then they may both call cleanup() before Spark Jobs are started. But before background
+      // task interrupted, it may have start some spark job, so we need to cancel again to
+      // make sure job was cancelled when background thread was interrupted
+      if (needCancel) sqlContext.sparkContext.cancelJobGroup(statementId)
+      val currentState = getStatus.getState
+      if (currentState.isTerminal) {
+        // This may happen if the execution was cancelled, and then closed from another thread.
+        logWarning(s"Ignore exception in terminal state with $statementId: $e")
+      } else {
+        super.setState(OperationState.ERROR)
+        HiveThriftServer2.eventManager.onStatementError(
+          statementId, e.getMessage, Utils.exceptionString(e))
+        e match {
+          case _: HiveSQLException => throw e
+          case rejected: RejectedExecutionException =>
+            throw new HiveSQLException("The background threadpool cannot accept" +
+              " new task for execution, please retry the operation", rejected)
+          case _ =>
+            val tips = if (shouldRunAsync()) " in background" else ""
+            throw new HiveSQLException(s"Error operating $getType$tips: ${e.getMessage}", e)
+        }
       }
   }
 }
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
index fd3a638..2bb9169 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
@@ -21,6 +21,8 @@ import java.sql.SQLException
 
 import org.apache.hive.service.cli.HiveSQLException
 
+import org.apache.spark.sql.hive.HiveUtils
+
 trait ThriftServerWithSparkContextSuite extends SharedThriftServer {
 
   test("the scratch dir will be deleted during server start but recreated with new operation") {
@@ -52,31 +54,51 @@ trait ThriftServerWithSparkContextSuite extends SharedThriftServer {
 
   test("Full stack traces as error message for jdbc or thrift client") {
     val sql = "select date_sub(date'2011-11-11', '1.2')"
-    withCLIServiceClient { client =>
-      val sessionHandle = client.openSession(user, "")
+    val confOverlay = new java.util.HashMap[java.lang.String, java.lang.String]
 
-      val confOverlay = new java.util.HashMap[java.lang.String, java.lang.String]
-      val e = intercept[HiveSQLException] {
-        client.executeStatement(
-          sessionHandle,
-          sql,
-          confOverlay)
+    withSQLConf((HiveUtils.HIVE_THRIFT_SERVER_ASYNC.key, "false")) {
+      withCLIServiceClient { client =>
+        val sessionHandle = client.openSession(user, "")
+        val e = intercept[HiveSQLException] {
+          client.executeStatement(sessionHandle, sql, confOverlay)
+        }
+        assert(e.getMessage
+          .contains("The second argument of 'date_sub' function needs to be an integer."))
+        assert(!e.getMessage
+          .contains("java.lang.NumberFormatException: invalid input syntax for type numeric: 1.2"))
       }
+    }
+
+    withSQLConf((HiveUtils.HIVE_THRIFT_SERVER_ASYNC.key, "true")) {
+      withCLIServiceClient { client =>
+        val sessionHandle = client.openSession(user, "")
+        val opHandle = client.executeStatementAsync(sessionHandle, sql, confOverlay)
+        var status = client.getOperationStatus(opHandle)
+        while (!status.getState.isTerminal) {
+          Thread.sleep(10)
+          status = client.getOperationStatus(opHandle)
+        }
+        val e = status.getOperationException
 
-      assert(e.getMessage
-        .contains("The second argument of 'date_sub' function needs to be an integer."))
-      assert(!e.getMessage.contains("" +
-        "java.lang.NumberFormatException: invalid input syntax for type numeric: 1.2"))
+        assert(e.getMessage
+          .contains("The second argument of 'date_sub' function needs to be an integer."))
+        assert(e.getMessage
+          .contains("java.lang.NumberFormatException: invalid input syntax for type numeric: 1.2"))
+      }
     }
 
-    withJdbcStatement { statement =>
-      val e = intercept[SQLException] {
-        statement.executeQuery(sql)
+    Seq("true", "false").foreach { value =>
+      withSQLConf((HiveUtils.HIVE_THRIFT_SERVER_ASYNC.key, value)) {
+        withJdbcStatement { statement =>
+          val e = intercept[SQLException] {
+            statement.executeQuery(sql)
+          }
+          assert(e.getMessage.contains(
+            "The second argument of 'date_sub' function needs to be an integer."))
+          assert(e.getMessage.contains(
+            "java.lang.NumberFormatException: invalid input syntax for type numeric: 1.2"))
+        }
       }
-      assert(e.getMessage
-        .contains("The second argument of 'date_sub' function needs to be an integer."))
-      assert(e.getMessage.contains("" +
-        "java.lang.NumberFormatException: invalid input syntax for type numeric: 1.2"))
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org