You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/07/30 06:45:32 UTC
[spark] branch master updated: [SPARK-32412][SQL] Unify error
handling for spark thrift server operations
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 510a165 [SPARK-32412][SQL] Unify error handling for spark thrift server operations
510a165 is described below
commit 510a1656e650246a708d3866c8a400b7a1b9f962
Author: Kent Yao <ya...@hotmail.com>
AuthorDate: Thu Jul 30 06:44:13 2020 +0000
[SPARK-32412][SQL] Unify error handling for spark thrift server operations
### What changes were proposed in this pull request?
Log error/warn message only once at the server-side for both sync and async modes
### Why are the changes needed?
In https://github.com/apache/spark/commit/b151194299f5ba15e0d9d8d7d2980fd164fe0822 we make the error logging for SparkExecuteStatementOperation with `runInBackground=true` not duplicated, but the operations with runInBackground=false and other metadata operation still will be log twice which happened in the operation's `runInternal` method and ThriftCLIService.
In this PR, I propose to reflect the logic to get a unified error handling approach.
### Does this PR introduce _any_ user-facing change?
Yes, when spark.sql.hive.thriftServer.async=false and people call sync APIs the error message will be logged only once at server-side.
### How was this patch tested?
locally verified the result in target/unit-test.log
add unit tests.
Closes #29204 from yaooqinn/SPARK-32412.
Authored-by: Kent Yao <ya...@hotmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../SparkExecuteStatementOperation.scala | 56 +++-----------------
.../sql/hive/thriftserver/SparkOperation.scala | 35 ++++++++++---
.../ThriftServerWithSparkContextSuite.scala | 60 +++++++++++++++-------
3 files changed, 76 insertions(+), 75 deletions(-)
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index d30951f..922af72 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -19,11 +19,9 @@ package org.apache.spark.sql.hive.thriftserver
import java.security.PrivilegedExceptionAction
import java.util.{Arrays, Map => JMap}
-import java.util.concurrent.RejectedExecutionException
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
-import scala.util.control.NonFatal
import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.hadoop.hive.shims.Utils
@@ -38,7 +36,6 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.VariableSubstitution
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.CalendarInterval
-import org.apache.spark.util.{Utils => SparkUtils}
private[hive] class SparkExecuteStatementOperation(
val sqlContext: SQLContext,
@@ -113,7 +110,7 @@ private[hive] class SparkExecuteStatementOperation(
}
def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = withLocalProperties {
- log.info(s"Received getNextRowSet request order=${order} and maxRowsL=${maxRowsL} " +
+ logInfo(s"Received getNextRowSet request order=${order} and maxRowsL=${maxRowsL} " +
s"with ${statementId}")
validateDefaultFetchOrientation(order)
assertState(OperationState.FINISHED)
@@ -182,7 +179,7 @@ private[hive] class SparkExecuteStatementOperation(
resultOffset += 1
}
previousFetchEndOffset = resultOffset
- log.info(s"Returning result set with ${curRow} rows from offsets " +
+ logInfo(s"Returning result set with ${curRow} rows from offsets " +
s"[$previousFetchStartOffset, $previousFetchEndOffset) with $statementId")
resultRowSet
}
@@ -219,7 +216,9 @@ private[hive] class SparkExecuteStatementOperation(
execute()
}
} catch {
- case e: HiveSQLException => setOperationException(e)
+ case e: HiveSQLException =>
+ setOperationException(e)
+ logError(s"Error executing query with $statementId,", e)
}
}
}
@@ -239,21 +238,7 @@ private[hive] class SparkExecuteStatementOperation(
val backgroundHandle =
parentSession.getSessionManager().submitBackgroundOperation(backgroundOperation)
setBackgroundHandle(backgroundHandle)
- } catch {
- case rejected: RejectedExecutionException =>
- logError("Error submitting query in background, query rejected", rejected)
- setState(OperationState.ERROR)
- HiveThriftServer2.eventManager.onStatementError(
- statementId, rejected.getMessage, SparkUtils.exceptionString(rejected))
- throw new HiveSQLException("The background threadpool cannot accept" +
- " new task for execution, please retry the operation", rejected)
- case NonFatal(e) =>
- logError(s"Error executing query in background", e)
- setState(OperationState.ERROR)
- HiveThriftServer2.eventManager.onStatementError(
- statementId, e.getMessage, SparkUtils.exceptionString(e))
- throw new HiveSQLException(e)
- }
+ } catch onError()
}
}
@@ -294,30 +279,7 @@ private[hive] class SparkExecuteStatementOperation(
}
dataTypes = result.schema.fields.map(_.dataType)
} catch {
- // Actually do need to catch Throwable as some failures don't inherit from Exception and
- // HiveServer will silently swallow them.
- case e: Throwable =>
- // When cancel() or close() is called very quickly after the query is started,
- // then they may both call cleanup() before Spark Jobs are started. But before background
- // task interrupted, it may have start some spark job, so we need to cancel again to
- // make sure job was cancelled when background thread was interrupted
- if (statementId != null) {
- sqlContext.sparkContext.cancelJobGroup(statementId)
- }
- val currentState = getStatus().getState()
- if (currentState.isTerminal) {
- // This may happen if the execution was cancelled, and then closed from another thread.
- logWarning(s"Ignore exception in terminal state with $statementId: $e")
- } else {
- logError(s"Error executing query with $statementId, currentState $currentState, ", e)
- setState(OperationState.ERROR)
- HiveThriftServer2.eventManager.onStatementError(
- statementId, e.getMessage, SparkUtils.exceptionString(e))
- e match {
- case _: HiveSQLException => throw e
- case _ => throw new HiveSQLException("Error running query: " + e.toString, e)
- }
- }
+ onError(needCancel = true)
} finally {
synchronized {
if (!getStatus.getState.isTerminal) {
@@ -348,9 +310,7 @@ private[hive] class SparkExecuteStatementOperation(
}
}
// RDDs will be cleaned automatically upon garbage collection.
- if (statementId != null) {
- sqlContext.sparkContext.cancelJobGroup(statementId)
- }
+ sqlContext.sparkContext.cancelJobGroup(statementId)
}
}
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkOperation.scala
index bbfc1b8..8e8b2d7 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkOperation.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkOperation.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.hive.thriftserver
+import java.util.concurrent.RejectedExecutionException
+
import org.apache.hive.service.cli.{HiveSQLException, OperationState}
import org.apache.hive.service.cli.operation.Operation
@@ -94,15 +96,32 @@ private[hive] trait SparkOperation extends Operation with Logging {
throw new IllegalArgumentException(s"Unknown table type is found: $t")
}
- protected def onError(): PartialFunction[Throwable, Unit] = {
+ protected def onError(needCancel: Boolean = false): PartialFunction[Throwable, Unit] = {
+ // Actually do need to catch Throwable as some failures don't inherit from Exception and
+ // HiveServer will silently swallow them.
case e: Throwable =>
- logError(s"Error operating $getType with $statementId", e)
- super.setState(OperationState.ERROR)
- HiveThriftServer2.eventManager.onStatementError(
- statementId, e.getMessage, Utils.exceptionString(e))
- e match {
- case _: HiveSQLException => throw e
- case _ => throw new HiveSQLException(s"Error operating $getType ${e.getMessage}", e)
+ // When cancel() or close() is called very quickly after the query is started,
+ // then they may both call cleanup() before Spark Jobs are started. But before background
+ // task interrupted, it may have start some spark job, so we need to cancel again to
+ // make sure job was cancelled when background thread was interrupted
+ if (needCancel) sqlContext.sparkContext.cancelJobGroup(statementId)
+ val currentState = getStatus.getState
+ if (currentState.isTerminal) {
+ // This may happen if the execution was cancelled, and then closed from another thread.
+ logWarning(s"Ignore exception in terminal state with $statementId: $e")
+ } else {
+ super.setState(OperationState.ERROR)
+ HiveThriftServer2.eventManager.onStatementError(
+ statementId, e.getMessage, Utils.exceptionString(e))
+ e match {
+ case _: HiveSQLException => throw e
+ case rejected: RejectedExecutionException =>
+ throw new HiveSQLException("The background threadpool cannot accept" +
+ " new task for execution, please retry the operation", rejected)
+ case _ =>
+ val tips = if (shouldRunAsync()) " in background" else ""
+ throw new HiveSQLException(s"Error operating $getType$tips: ${e.getMessage}", e)
+ }
}
}
}
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
index fd3a638..2bb9169 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
@@ -21,6 +21,8 @@ import java.sql.SQLException
import org.apache.hive.service.cli.HiveSQLException
+import org.apache.spark.sql.hive.HiveUtils
+
trait ThriftServerWithSparkContextSuite extends SharedThriftServer {
test("the scratch dir will be deleted during server start but recreated with new operation") {
@@ -52,31 +54,51 @@ trait ThriftServerWithSparkContextSuite extends SharedThriftServer {
test("Full stack traces as error message for jdbc or thrift client") {
val sql = "select date_sub(date'2011-11-11', '1.2')"
- withCLIServiceClient { client =>
- val sessionHandle = client.openSession(user, "")
+ val confOverlay = new java.util.HashMap[java.lang.String, java.lang.String]
- val confOverlay = new java.util.HashMap[java.lang.String, java.lang.String]
- val e = intercept[HiveSQLException] {
- client.executeStatement(
- sessionHandle,
- sql,
- confOverlay)
+ withSQLConf((HiveUtils.HIVE_THRIFT_SERVER_ASYNC.key, "false")) {
+ withCLIServiceClient { client =>
+ val sessionHandle = client.openSession(user, "")
+ val e = intercept[HiveSQLException] {
+ client.executeStatement(sessionHandle, sql, confOverlay)
+ }
+ assert(e.getMessage
+ .contains("The second argument of 'date_sub' function needs to be an integer."))
+ assert(!e.getMessage
+ .contains("java.lang.NumberFormatException: invalid input syntax for type numeric: 1.2"))
}
+ }
+
+ withSQLConf((HiveUtils.HIVE_THRIFT_SERVER_ASYNC.key, "true")) {
+ withCLIServiceClient { client =>
+ val sessionHandle = client.openSession(user, "")
+ val opHandle = client.executeStatementAsync(sessionHandle, sql, confOverlay)
+ var status = client.getOperationStatus(opHandle)
+ while (!status.getState.isTerminal) {
+ Thread.sleep(10)
+ status = client.getOperationStatus(opHandle)
+ }
+ val e = status.getOperationException
- assert(e.getMessage
- .contains("The second argument of 'date_sub' function needs to be an integer."))
- assert(!e.getMessage.contains("" +
- "java.lang.NumberFormatException: invalid input syntax for type numeric: 1.2"))
+ assert(e.getMessage
+ .contains("The second argument of 'date_sub' function needs to be an integer."))
+ assert(e.getMessage
+ .contains("java.lang.NumberFormatException: invalid input syntax for type numeric: 1.2"))
+ }
}
- withJdbcStatement { statement =>
- val e = intercept[SQLException] {
- statement.executeQuery(sql)
+ Seq("true", "false").foreach { value =>
+ withSQLConf((HiveUtils.HIVE_THRIFT_SERVER_ASYNC.key, value)) {
+ withJdbcStatement { statement =>
+ val e = intercept[SQLException] {
+ statement.executeQuery(sql)
+ }
+ assert(e.getMessage.contains(
+ "The second argument of 'date_sub' function needs to be an integer."))
+ assert(e.getMessage.contains(
+ "java.lang.NumberFormatException: invalid input syntax for type numeric: 1.2"))
+ }
}
- assert(e.getMessage
- .contains("The second argument of 'date_sub' function needs to be an integer."))
- assert(e.getMessage.contains("" +
- "java.lang.NumberFormatException: invalid input syntax for type numeric: 1.2"))
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org