You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "juliuszsompolski (via GitHub)" <gi...@apache.org> on 2023/09/21 16:31:20 UTC

[GitHub] [spark] juliuszsompolski commented on a diff in pull request #42987: [SPARK-45207][SQL][CONNECT] Implement Error Enrichment for Scala Client

juliuszsompolski commented on code in PR #42987:
URL: https://github.com/apache/spark/pull/42987#discussion_r1333313454


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala:
##########
@@ -24,49 +24,140 @@ import scala.reflect.ClassTag
 import com.google.rpc.ErrorInfo
 import io.grpc.StatusRuntimeException
 import io.grpc.protobuf.StatusProto
+import org.json4s.DefaultFormats
+import org.json4s.jackson.JsonMethods
 
 import org.apache.spark.{SparkArithmeticException, SparkArrayIndexOutOfBoundsException, SparkDateTimeException, SparkException, SparkIllegalArgumentException, SparkNumberFormatException, SparkRuntimeException, SparkUnsupportedOperationException, SparkUpgradeException}
+import org.apache.spark.connect.proto.{FetchErrorDetailsRequest, FetchErrorDetailsResponse, UserContext}
+import org.apache.spark.connect.proto.SparkConnectServiceGrpc.SparkConnectServiceBlockingStub
+import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchDatabaseException, NoSuchTableException, TableAlreadyExistsException, TempTableAlreadyExistsException}
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.trees.Origin
-import org.apache.spark.util.JsonUtils
 
-private[client] object GrpcExceptionConverter extends JsonUtils {
-  def convert[T](f: => T): T = {
+/**
+ * GrpcExceptionConverter handles the conversion of StatusRuntimeExceptions into Spark exceptions.
+ * It does so by utilizing the ErrorInfo defined in error_details.proto and making an additional
+ * FetchErrorDetails RPC call to retrieve the full error message and optionally the server-side
+ * stacktrace.
+ *
+ * If the FetchErrorDetails RPC call succeeds, the exceptions will be constructed based on the
+ * response. If the RPC call fails, the exception will be constructed based on the ErrorInfo. If
+ * the ErrorInfo is missing, the exception will be constructed based on the StatusRuntimeException
+ * itself.
+ */
+private[client] class GrpcExceptionConverter(grpcStub: SparkConnectServiceBlockingStub)
+    extends Logging {
+  import GrpcExceptionConverter._
+
+  def convert[T](sessionId: String, userContext: UserContext)(f: => T): T = {
     try {
       f
     } catch {
       case e: StatusRuntimeException =>
-        throw toThrowable(e)
+        throw toThrowable(e, sessionId, userContext)
     }
   }
 
-  def convertIterator[T](iter: CloseableIterator[T]): CloseableIterator[T] = {
+  def convertIterator[T](
+      sessionId: String,
+      userContext: UserContext,
+      iter: CloseableIterator[T]): CloseableIterator[T] = {
     new WrappedCloseableIterator[T] {
 
       override def innerIterator: Iterator[T] = iter
 
       override def hasNext: Boolean = {
-        convert {
+        convert(sessionId, userContext) {
           iter.hasNext
         }
       }
 
       override def next(): T = {
-        convert {
+        convert(sessionId, userContext) {
           iter.next()
         }
       }
 
       override def close(): Unit = {
-        convert {
+        convert(sessionId, userContext) {
           iter.close()
         }
       }
     }
   }
 
+  /**
+   * Fetches enriched errors with full exception message and optionally stacktrace by issuing an
+   * additional RPC call to fetch error details. The RPC call is best-effort at-most-once.
+   */
+  private def fetchEnrichedError(
+      info: ErrorInfo,
+      sessionId: String,
+      userContext: UserContext): Option[Throwable] = {
+    val errorId = info.getMetadataOrDefault("errorId", null)
+    if (errorId == null) {
+      logWarning("Unable to fetch enriched error since errorId is missing")
+      return None
+    }
+
+    try {
+      val errorDetailsResponse = grpcStub.fetchErrorDetails(
+        FetchErrorDetailsRequest
+          .newBuilder()
+          .setSessionId(sessionId)
+          .setErrorId(errorId)
+          .setUserContext(UserContext.newBuilder().setUserId(userContext.getUserId).build())

Review Comment:
   why build new UserContext instead of reusing `userContext`? Why does only UserId field matter?



##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala:
##########
@@ -93,33 +184,63 @@ private[client] object GrpcExceptionConverter extends JsonUtils {
       new SparkArrayIndexOutOfBoundsException(message)),
     errorConstructor[DateTimeException]((message, _) => new SparkDateTimeException(message)),
     errorConstructor((message, cause) => new SparkRuntimeException(message, cause)),
-    errorConstructor((message, cause) => new SparkUpgradeException(message, cause)))
-
-  private def errorInfoToThrowable(info: ErrorInfo, message: String): Option[Throwable] = {
-    val classes =
-      mapper.readValue(info.getMetadataOrDefault("classes", "[]"), classOf[Array[String]])
+    errorConstructor((message, cause) => new SparkUpgradeException(message, cause)),
+    errorConstructor((message, cause) => new SparkException(message, cause.orNull)))
+
+  /**
+   * errorsToThrowable reconstructs the exception based on a list of protobuf messages
+   * FetchErrorDetailsResponse.Error with un-truncated error messages and server-side stacktrace
+   * (if set).
+   */
+  private def errorsToThrowable(
+      errorIdx: Int,
+      errors: Seq[FetchErrorDetailsResponse.Error]): Throwable = {
+
+    val error = errors(errorIdx)
+
+    val classHierarchy = error.getErrorTypeHierarchyList.asScala
+
+    val constructor =
+      classHierarchy
+        .flatMap(errorFactory.get)
+        .headOption
+        .getOrElse((message: String, cause: Option[Throwable]) =>
+          new SparkException(s"${classHierarchy.head}: ${message}", cause.orNull))
+
+    val causeOpt =
+      if (error.hasCauseIdx) Some(errorsToThrowable(error.getCauseIdx, errors)) else None
+
+    val exception = constructor(error.getMessage, causeOpt)
+
+    if (!error.getStackTraceList.isEmpty) {
+      exception.setStackTrace(error.getStackTraceList.asScala.toArray.map { stackTraceElement =>
+        new StackTraceElement(
+          stackTraceElement.getDeclaringClass,
+          stackTraceElement.getMethodName,
+          stackTraceElement.getFileName,
+          stackTraceElement.getLineNumber)
+      })
+    }
 
-    classes
-      .find(errorFactory.contains)
-      .map { cls =>
-        val constructor = errorFactory.get(cls).get
-        constructor(message, None)
-      }
+    exception
   }
 
-  private def toThrowable(ex: StatusRuntimeException): Throwable = {
-    val status = StatusProto.fromThrowable(ex)
-
-    val fallbackEx = new SparkException(ex.toString, ex.getCause)
-
-    val errorInfoOpt = status.getDetailsList.asScala
-      .find(_.is(classOf[ErrorInfo]))
-
-    if (errorInfoOpt.isEmpty) {
-      return fallbackEx
-    }
-
-    errorInfoToThrowable(errorInfoOpt.get.unpack(classOf[ErrorInfo]), status.getMessage)
-      .getOrElse(fallbackEx)
+  /**
+   * errorInfoToThrowable reconstructs the exception based on the error classes hierarchy and the
+   * truncated error message.
+   */
+  private def errorInfoToThrowable(info: ErrorInfo, message: String): Throwable = {
+    implicit val formats = DefaultFormats
+    val classes =
+      JsonMethods.parse(info.getMetadataOrDefault("classes", "[]")).extract[Array[String]]

Review Comment:
   does this need some try { } catch in case the Json is malformed (e.g. because of truncation) and parsing throws an error?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org