You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "heyihong (via GitHub)" <gi...@apache.org> on 2023/09/18 23:32:19 UTC

[GitHub] [spark] heyihong opened a new pull request, #42987: [SPARK-45207][SQL][CONNECT] Implement FetchErrorDetails RPC

heyihong opened a new pull request, #42987:
URL: https://github.com/apache/spark/pull/42987

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   <!--
   If generative AI tooling has been used in the process of authoring this patch, please include the
   phrase: 'Generated-by: ' followed by the name of the tool and its version.
   If no, write 'No'.
   Please refer to the [ASF Generative Tooling Guidance](https://www.apache.org/legal/generative-tooling.html) for details.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #42987: [SPARK-45207][SQL][CONNECT] Implement Error Enrichment for Scala Client

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #42987:
URL: https://github.com/apache/spark/pull/42987#issuecomment-1732724709

   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] heyihong commented on a diff in pull request #42987: [SPARK-45207][SQL][CONNECT] Implement Complete Error Reconstruction for Scala Client

Posted by "heyihong (via GitHub)" <gi...@apache.org>.
heyihong commented on code in PR #42987:
URL: https://github.com/apache/spark/pull/42987#discussion_r1331693297


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2882,8 +2882,7 @@ object SQLConf {
         "level settings.")
       .version("3.0.0")
       .booleanConf
-      // show full stacktrace in tests but hide in production by default.
-      .createWithDefault(Utils.isTesting)

Review Comment:
   The change is rebased already. I reenabled the pyspark jvm flag for tests but disable this flag in a specific test case instead



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #42987: [SPARK-45207][SQL][CONNECT] Implement Error Reconstruction for Scala Client

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #42987:
URL: https://github.com/apache/spark/pull/42987#discussion_r1330859817


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala:
##########
@@ -175,6 +175,37 @@ class ClientStreamingQuerySuite extends QueryTest with SQLHelper with Logging {
     }
   }
 
+  test("throw exception in streaming") {
+    val session = spark
+    import session.implicits._
+
+    val checkForTwo = udf((value: Int) => {
+      if (value == 2) {
+        throw new RuntimeException("Number 2 encountered!")
+      }
+      value
+    })
+
+    val query = spark.readStream
+      .format("rate")
+      .option("rowsPerSecond", "1")
+      .load()
+      .select(checkForTwo($"value").as("checkedValue"))
+      .writeStream
+      .outputMode("append")
+      .format("console")
+      .start()
+
+    val exception = intercept[SparkException] {
+      query.awaitTermination()
+    }
+
+    assert(
+      exception.getCause.getCause.getCause.getMessage

Review Comment:
   Remind me again what is the chain here? And why are we not checking this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #42987: [SPARK-45207][SQL][CONNECT] Implement Error Reconstruction for Scala Client

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #42987:
URL: https://github.com/apache/spark/pull/42987#discussion_r1330870268


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala:
##########
@@ -24,49 +24,135 @@ import scala.reflect.ClassTag
 import com.google.rpc.ErrorInfo
 import io.grpc.StatusRuntimeException
 import io.grpc.protobuf.StatusProto
+import org.json4s.DefaultFormats
+import org.json4s.jackson.JsonMethods
 
 import org.apache.spark.{SparkArithmeticException, SparkArrayIndexOutOfBoundsException, SparkDateTimeException, SparkException, SparkIllegalArgumentException, SparkNumberFormatException, SparkRuntimeException, SparkUnsupportedOperationException, SparkUpgradeException}
+import org.apache.spark.connect.proto.{FetchErrorDetailsRequest, FetchErrorDetailsResponse, UserContext}
+import org.apache.spark.connect.proto.SparkConnectServiceGrpc.SparkConnectServiceBlockingStub
+import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchDatabaseException, NoSuchTableException, TableAlreadyExistsException, TempTableAlreadyExistsException}
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.trees.Origin
-import org.apache.spark.util.JsonUtils
 
-private[client] object GrpcExceptionConverter extends JsonUtils {
-  def convert[T](f: => T): T = {
+/**
+ * GrpcExceptionConverter handles the conversion of StatusRuntimeExceptions into Spark exceptions.
+ * It does so by utilizing the ErrorInfo defined in error_details.proto and making an additional
+ * FetchErrorDetails RPC call to retrieve the full error message and optionally the server-side
+ * stacktrace.
+ *
+ * If the FetchErrorDetails RPC call succeeds, the exceptions will be constructed based on the
+ * response. If the RPC call fails, the exception will be constructed based on the ErrorInfo. If
+ * the ErrorInfo is missing, the exception will be constructed based on the StatusRuntimeException
+ * itself.
+ */
+private[client] class GrpcExceptionConverter(grpcStub: SparkConnectServiceBlockingStub)
+    extends Logging {
+  import GrpcExceptionConverter._
+
+  def convert[T](sessionId: String, userContext: UserContext)(f: => T): T = {
     try {
       f
     } catch {
       case e: StatusRuntimeException =>
-        throw toThrowable(e)
+        throw toThrowable(e, sessionId, userContext)
     }
   }
 
-  def convertIterator[T](iter: CloseableIterator[T]): CloseableIterator[T] = {
+  def convertIterator[T](
+      sessionId: String,
+      userContext: UserContext,
+      iter: CloseableIterator[T]): CloseableIterator[T] = {
     new WrappedCloseableIterator[T] {
 
       override def innerIterator: Iterator[T] = iter
 
       override def hasNext: Boolean = {
-        convert {
+        convert(sessionId, userContext) {
           iter.hasNext
         }
       }
 
       override def next(): T = {
-        convert {
+        convert(sessionId, userContext) {
           iter.next()
         }
       }
 
       override def close(): Unit = {
-        convert {
+        convert(sessionId, userContext) {
           iter.close()
         }
       }
     }
   }
 
+  /**
+   * fetchEnrichedError fetches enriched errors with full exception message and optionally
+   * stacktrace by issuing an additional RPC call to fetch error details. The RPC call is
+   * best-effort at-most-once.
+   */
+  private def fetchEnrichedError(
+      info: ErrorInfo,
+      sessionId: String,
+      userContext: UserContext): Option[Throwable] = {
+    val errorId = info.getMetadataOrDefault("errorId", null)
+    if (errorId == null) {
+      logWarning("Unable to fetch enriched error since errorId is missing")
+      return None
+    }
+
+    try {
+      val errorDetailsResponse = grpcStub.fetchErrorDetails(
+        FetchErrorDetailsRequest
+          .newBuilder()
+          .setSessionId(sessionId)
+          .setErrorId(errorId)
+          .setUserContext(UserContext.newBuilder().setUserId(userContext.getUserId).build())
+          .build())
+
+      if (!errorDetailsResponse.hasRootErrorIdx) {
+        logWarning("Unable to fetch enriched error since error is not found")
+        return None
+      }
+
+      Some(
+        errorsToThrowable(
+          errorDetailsResponse.getRootErrorIdx,
+          errorDetailsResponse.getErrorsList.asScala))
+    } catch {
+      case e: StatusRuntimeException =>
+        logWarning("Unable to fetch enriched error", e)
+        None
+    }
+  }
+
+  private def toThrowable(
+      ex: StatusRuntimeException,
+      sessionId: String,
+      userContext: UserContext): Throwable = {
+    val status = StatusProto.fromThrowable(ex)
+
+    val errorInfoOpt = status.getDetailsList.asScala
+      .find(_.is(classOf[ErrorInfo]))
+      .map(_.unpack(classOf[ErrorInfo]))
+
+    if (errorInfoOpt.isDefined) {

Review Comment:
   The code below implements the policy you described in the documentation. Please add comments for every step of the way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] heyihong commented on a diff in pull request #42987: [SPARK-45207][SQL][CONNECT] Implement Complete Error Reconstruction for Scala Client

Posted by "heyihong (via GitHub)" <gi...@apache.org>.
heyihong commented on code in PR #42987:
URL: https://github.com/apache/spark/pull/42987#discussion_r1331247912


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2882,8 +2882,7 @@ object SQLConf {
         "level settings.")
       .version("3.0.0")
       .booleanConf
-      // show full stacktrace in tests but hide in production by default.
-      .createWithDefault(Utils.isTesting)

Review Comment:
   The size of the ErrorInfo exceeds the header limit sometimes in the tests I added. So disable this by default. It should not be an issue after we migrate to use FetchErrorDetails RPC



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] heyihong commented on pull request #42987: [SPARK-45207][SQL][CONNECT] Implement Error Enrichment for Scala Client

Posted by "heyihong (via GitHub)" <gi...@apache.org>.
heyihong commented on PR #42987:
URL: https://github.com/apache/spark/pull/42987#issuecomment-1729630924

   @juliuszsompolski Please take a look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] heyihong commented on a diff in pull request #42987: [SPARK-45207][SQL][CONNECT] Implement Error Reconstruction for Scala Client

Posted by "heyihong (via GitHub)" <gi...@apache.org>.
heyihong commented on code in PR #42987:
URL: https://github.com/apache/spark/pull/42987#discussion_r1331243346


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala:
##########
@@ -93,33 +179,65 @@ private[client] object GrpcExceptionConverter extends JsonUtils {
       new SparkArrayIndexOutOfBoundsException(message)),
     errorConstructor[DateTimeException]((message, _) => new SparkDateTimeException(message)),
     errorConstructor((message, cause) => new SparkRuntimeException(message, cause)),
-    errorConstructor((message, cause) => new SparkUpgradeException(message, cause)))
-
-  private def errorInfoToThrowable(info: ErrorInfo, message: String): Option[Throwable] = {
-    val classes =
-      mapper.readValue(info.getMetadataOrDefault("classes", "[]"), classOf[Array[String]])
+    errorConstructor((message, cause) => new SparkUpgradeException(message, cause)),
+    errorConstructor((message, cause) => new SparkException(message, cause.orNull)))
+
+  /**
+   * errorsToThrowable reconstructs the exception based on a list of protobuf messages
+   * FetchErrorDetailsResponse.Error with un-truncated error messages and server-side stacktrace
+   * (if set).
+   */
+  private def errorsToThrowable(
+      errorIdx: Int,
+      errors: Seq[FetchErrorDetailsResponse.Error]): Throwable = {
+
+    val error = errors(errorIdx)
+
+    val classHierarchy = error.getErrorTypeHierarchyList.asScala
+
+    val constructor =
+      classHierarchy
+        .flatMap(errorFactory.get)
+        .headOption
+        .getOrElse((message: String, cause: Option[Throwable]) =>
+          new SparkException(

Review Comment:
   This is from https://github.com/apache/spark/pull/42377#discussion_r1329224205. But this one seems to be simpler



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] heyihong commented on a diff in pull request #42987: [SPARK-45207][SQL][CONNECT] Implement Complete Error Reconstruction for Scala Client

Posted by "heyihong (via GitHub)" <gi...@apache.org>.
heyihong commented on code in PR #42987:
URL: https://github.com/apache/spark/pull/42987#discussion_r1331693297


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2882,8 +2882,7 @@ object SQLConf {
         "level settings.")
       .version("3.0.0")
       .booleanConf
-      // show full stacktrace in tests but hide in production by default.
-      .createWithDefault(Utils.isTesting)

Review Comment:
   The change is rebased already. I reenabled the pyspark jvm stacktrace flag for tests but disable this flag in a specific test case instead



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #42987: [SPARK-45207][SQL][CONNECT] Implement Error Reconstruction for Scala Client

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #42987:
URL: https://github.com/apache/spark/pull/42987#discussion_r1330859551


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/CustomSparkConnectBlockingStub.scala:
##########
@@ -27,11 +27,17 @@ private[connect] class CustomSparkConnectBlockingStub(
     retryPolicy: GrpcRetryHandler.RetryPolicy) {
 
   private val stub = SparkConnectServiceGrpc.newBlockingStub(channel)
+
   private val retryHandler = new GrpcRetryHandler(retryPolicy)
 
+  // Constructing GrpcExceptionConverter with a GRPC stub for fetching error details from server.

Review Comment:
   I'd remove the `Constructing` bit, because that this is pretty clear if you do `new`,,,,



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #42987: [SPARK-45207][SQL][CONNECT] Implement Error Reconstruction for Scala Client

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #42987:
URL: https://github.com/apache/spark/pull/42987#discussion_r1330877688


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2882,8 +2882,7 @@ object SQLConf {
         "level settings.")
       .version("3.0.0")
       .booleanConf
-      // show full stacktrace in tests but hide in production by default.
-      .createWithDefault(Utils.isTesting)

Review Comment:
   Why is this needed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #42987: [SPARK-45207][SQL][CONNECT] Implement Complete Error Reconstruction for Scala Client

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #42987:
URL: https://github.com/apache/spark/pull/42987#discussion_r1331644650


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2882,8 +2882,7 @@ object SQLConf {
         "level settings.")
       .version("3.0.0")
       .booleanConf
-      // show full stacktrace in tests but hide in production by default.
-      .createWithDefault(Utils.isTesting)

Review Comment:
   Can you rebase and show me it is not an issue anymore.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] heyihong commented on a diff in pull request #42987: [SPARK-45207][SQL][CONNECT] Implement Complete Error Reconstruction for Scala Client

Posted by "heyihong (via GitHub)" <gi...@apache.org>.
heyihong commented on code in PR #42987:
URL: https://github.com/apache/spark/pull/42987#discussion_r1331251817


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2882,8 +2882,7 @@ object SQLConf {
         "level settings.")
       .version("3.0.0")
       .booleanConf
-      // show full stacktrace in tests but hide in production by default.
-      .createWithDefault(Utils.isTesting)

Review Comment:
   It is disabled by default in python connect tests already: https://github.com/apache/spark/blob/c89221b02bb3000f707a31322e6d40b561e527bd/python/pyspark/testing/connectutils.py#L173



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] heyihong commented on a diff in pull request #42987: [SPARK-45207][SQL][CONNECT] Implement Error Enrichment for Scala Client

Posted by "heyihong (via GitHub)" <gi...@apache.org>.
heyihong commented on code in PR #42987:
URL: https://github.com/apache/spark/pull/42987#discussion_r1334246788


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala:
##########
@@ -93,33 +184,63 @@ private[client] object GrpcExceptionConverter extends JsonUtils {
       new SparkArrayIndexOutOfBoundsException(message)),
     errorConstructor[DateTimeException]((message, _) => new SparkDateTimeException(message)),
     errorConstructor((message, cause) => new SparkRuntimeException(message, cause)),
-    errorConstructor((message, cause) => new SparkUpgradeException(message, cause)))
-
-  private def errorInfoToThrowable(info: ErrorInfo, message: String): Option[Throwable] = {
-    val classes =
-      mapper.readValue(info.getMetadataOrDefault("classes", "[]"), classOf[Array[String]])
+    errorConstructor((message, cause) => new SparkUpgradeException(message, cause)),
+    errorConstructor((message, cause) => new SparkException(message, cause.orNull)))
+
+  /**
+   * errorsToThrowable reconstructs the exception based on a list of protobuf messages
+   * FetchErrorDetailsResponse.Error with un-truncated error messages and server-side stacktrace
+   * (if set).
+   */
+  private def errorsToThrowable(
+      errorIdx: Int,
+      errors: Seq[FetchErrorDetailsResponse.Error]): Throwable = {
+
+    val error = errors(errorIdx)
+
+    val classHierarchy = error.getErrorTypeHierarchyList.asScala
+
+    val constructor =
+      classHierarchy
+        .flatMap(errorFactory.get)
+        .headOption
+        .getOrElse((message: String, cause: Option[Throwable]) =>
+          new SparkException(s"${classHierarchy.head}: ${message}", cause.orNull))
+
+    val causeOpt =
+      if (error.hasCauseIdx) Some(errorsToThrowable(error.getCauseIdx, errors)) else None
+
+    val exception = constructor(error.getMessage, causeOpt)
+
+    if (!error.getStackTraceList.isEmpty) {
+      exception.setStackTrace(error.getStackTraceList.asScala.toArray.map { stackTraceElement =>
+        new StackTraceElement(
+          stackTraceElement.getDeclaringClass,
+          stackTraceElement.getMethodName,
+          stackTraceElement.getFileName,
+          stackTraceElement.getLineNumber)
+      })
+    }
 
-    classes
-      .find(errorFactory.contains)
-      .map { cls =>
-        val constructor = errorFactory.get(cls).get
-        constructor(message, None)
-      }
+    exception
   }
 
-  private def toThrowable(ex: StatusRuntimeException): Throwable = {
-    val status = StatusProto.fromThrowable(ex)
-
-    val fallbackEx = new SparkException(ex.toString, ex.getCause)
-
-    val errorInfoOpt = status.getDetailsList.asScala
-      .find(_.is(classOf[ErrorInfo]))
-
-    if (errorInfoOpt.isEmpty) {
-      return fallbackEx
-    }
-
-    errorInfoToThrowable(errorInfoOpt.get.unpack(classOf[ErrorInfo]), status.getMessage)
-      .getOrElse(fallbackEx)
+  /**
+   * errorInfoToThrowable reconstructs the exception based on the error classes hierarchy and the
+   * truncated error message.
+   */
+  private def errorInfoToThrowable(info: ErrorInfo, message: String): Throwable = {
+    implicit val formats = DefaultFormats
+    val classes =
+      JsonMethods.parse(info.getMetadataOrDefault("classes", "[]")).extract[Array[String]]

Review Comment:
   When the header size exceeds limit, the server will return errors in responses instead of truncating data silently. If the json is malformed, it may be better to fail early?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] heyihong commented on a diff in pull request #42987: [SPARK-45207][SQL][CONNECT] Implement Error Enrichment for Scala Client

Posted by "heyihong (via GitHub)" <gi...@apache.org>.
heyihong commented on code in PR #42987:
URL: https://github.com/apache/spark/pull/42987#discussion_r1331977811


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2882,8 +2882,7 @@ object SQLConf {
         "level settings.")
       .version("3.0.0")
       .booleanConf
-      // show full stacktrace in tests but hide in production by default.
-      .createWithDefault(Utils.isTesting)

Review Comment:
   Here is the fix: https://github.com/apache/spark/pull/43017



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] heyihong commented on a diff in pull request #42987: [SPARK-45207][SQL][CONNECT] Implement Complete Error Reconstruction for Scala Client

Posted by "heyihong (via GitHub)" <gi...@apache.org>.
heyihong commented on code in PR #42987:
URL: https://github.com/apache/spark/pull/42987#discussion_r1331243346


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala:
##########
@@ -93,33 +179,65 @@ private[client] object GrpcExceptionConverter extends JsonUtils {
       new SparkArrayIndexOutOfBoundsException(message)),
     errorConstructor[DateTimeException]((message, _) => new SparkDateTimeException(message)),
     errorConstructor((message, cause) => new SparkRuntimeException(message, cause)),
-    errorConstructor((message, cause) => new SparkUpgradeException(message, cause)))
-
-  private def errorInfoToThrowable(info: ErrorInfo, message: String): Option[Throwable] = {
-    val classes =
-      mapper.readValue(info.getMetadataOrDefault("classes", "[]"), classOf[Array[String]])
+    errorConstructor((message, cause) => new SparkUpgradeException(message, cause)),
+    errorConstructor((message, cause) => new SparkException(message, cause.orNull)))
+
+  /**
+   * errorsToThrowable reconstructs the exception based on a list of protobuf messages
+   * FetchErrorDetailsResponse.Error with un-truncated error messages and server-side stacktrace
+   * (if set).
+   */
+  private def errorsToThrowable(
+      errorIdx: Int,
+      errors: Seq[FetchErrorDetailsResponse.Error]): Throwable = {
+
+    val error = errors(errorIdx)
+
+    val classHierarchy = error.getErrorTypeHierarchyList.asScala
+
+    val constructor =
+      classHierarchy
+        .flatMap(errorFactory.get)
+        .headOption
+        .getOrElse((message: String, cause: Option[Throwable]) =>
+          new SparkException(

Review Comment:
   Will change. The message pattern is from https://github.com/apache/spark/pull/42377#discussion_r1329224205. But this one seems to be simpler



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] heyihong commented on a diff in pull request #42987: [SPARK-45207][SQL][CONNECT] Implement Error Enrichment for Scala Client

Posted by "heyihong (via GitHub)" <gi...@apache.org>.
heyihong commented on code in PR #42987:
URL: https://github.com/apache/spark/pull/42987#discussion_r1334246788


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala:
##########
@@ -93,33 +184,63 @@ private[client] object GrpcExceptionConverter extends JsonUtils {
       new SparkArrayIndexOutOfBoundsException(message)),
     errorConstructor[DateTimeException]((message, _) => new SparkDateTimeException(message)),
     errorConstructor((message, cause) => new SparkRuntimeException(message, cause)),
-    errorConstructor((message, cause) => new SparkUpgradeException(message, cause)))
-
-  private def errorInfoToThrowable(info: ErrorInfo, message: String): Option[Throwable] = {
-    val classes =
-      mapper.readValue(info.getMetadataOrDefault("classes", "[]"), classOf[Array[String]])
+    errorConstructor((message, cause) => new SparkUpgradeException(message, cause)),
+    errorConstructor((message, cause) => new SparkException(message, cause.orNull)))
+
+  /**
+   * errorsToThrowable reconstructs the exception based on a list of protobuf messages
+   * FetchErrorDetailsResponse.Error with un-truncated error messages and server-side stacktrace
+   * (if set).
+   */
+  private def errorsToThrowable(
+      errorIdx: Int,
+      errors: Seq[FetchErrorDetailsResponse.Error]): Throwable = {
+
+    val error = errors(errorIdx)
+
+    val classHierarchy = error.getErrorTypeHierarchyList.asScala
+
+    val constructor =
+      classHierarchy
+        .flatMap(errorFactory.get)
+        .headOption
+        .getOrElse((message: String, cause: Option[Throwable]) =>
+          new SparkException(s"${classHierarchy.head}: ${message}", cause.orNull))
+
+    val causeOpt =
+      if (error.hasCauseIdx) Some(errorsToThrowable(error.getCauseIdx, errors)) else None
+
+    val exception = constructor(error.getMessage, causeOpt)
+
+    if (!error.getStackTraceList.isEmpty) {
+      exception.setStackTrace(error.getStackTraceList.asScala.toArray.map { stackTraceElement =>
+        new StackTraceElement(
+          stackTraceElement.getDeclaringClass,
+          stackTraceElement.getMethodName,
+          stackTraceElement.getFileName,
+          stackTraceElement.getLineNumber)
+      })
+    }
 
-    classes
-      .find(errorFactory.contains)
-      .map { cls =>
-        val constructor = errorFactory.get(cls).get
-        constructor(message, None)
-      }
+    exception
   }
 
-  private def toThrowable(ex: StatusRuntimeException): Throwable = {
-    val status = StatusProto.fromThrowable(ex)
-
-    val fallbackEx = new SparkException(ex.toString, ex.getCause)
-
-    val errorInfoOpt = status.getDetailsList.asScala
-      .find(_.is(classOf[ErrorInfo]))
-
-    if (errorInfoOpt.isEmpty) {
-      return fallbackEx
-    }
-
-    errorInfoToThrowable(errorInfoOpt.get.unpack(classOf[ErrorInfo]), status.getMessage)
-      .getOrElse(fallbackEx)
+  /**
+   * errorInfoToThrowable reconstructs the exception based on the error classes hierarchy and the
+   * truncated error message.
+   */
+  private def errorInfoToThrowable(info: ErrorInfo, message: String): Throwable = {
+    implicit val formats = DefaultFormats
+    val classes =
+      JsonMethods.parse(info.getMetadataOrDefault("classes", "[]")).extract[Array[String]]

Review Comment:
   When the header size exceeds limit, the server will return errors in responses instead of truncating data silently. If the json is malformed, it may be better to fail early



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon closed pull request #42987: [SPARK-45207][SQL][CONNECT] Implement Error Enrichment for Scala Client

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon closed pull request #42987: [SPARK-45207][SQL][CONNECT] Implement Error Enrichment for Scala Client
URL: https://github.com/apache/spark/pull/42987


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] heyihong commented on a diff in pull request #42987: [SPARK-45207][SQL][CONNECT] Implement Complete Error Reconstruction for Scala Client

Posted by "heyihong (via GitHub)" <gi...@apache.org>.
heyihong commented on code in PR #42987:
URL: https://github.com/apache/spark/pull/42987#discussion_r1331297747


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala:
##########
@@ -175,6 +175,37 @@ class ClientStreamingQuerySuite extends QueryTest with SQLHelper with Logging {
     }
   }
 
+  test("throw exception in streaming") {
+    val session = spark
+    import session.implicits._
+
+    val checkForTwo = udf((value: Int) => {
+      if (value == 2) {
+        throw new RuntimeException("Number 2 encountered!")
+      }
+      value
+    })
+
+    val query = spark.readStream
+      .format("rate")
+      .option("rowsPerSecond", "1")
+      .load()
+      .select(checkForTwo($"value").as("checkedValue"))
+      .writeStream
+      .outputMode("append")
+      .format("console")
+      .start()
+
+    val exception = intercept[SparkException] {
+      query.awaitTermination()
+    }
+
+    assert(
+      exception.getCause.getCause.getCause.getMessage

Review Comment:
   https://github.com/apache/spark/pull/42377#discussion_r1323060159



##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala:
##########
@@ -175,6 +175,37 @@ class ClientStreamingQuerySuite extends QueryTest with SQLHelper with Logging {
     }
   }
 
+  test("throw exception in streaming") {
+    val session = spark
+    import session.implicits._
+
+    val checkForTwo = udf((value: Int) => {
+      if (value == 2) {
+        throw new RuntimeException("Number 2 encountered!")
+      }
+      value
+    })
+
+    val query = spark.readStream
+      .format("rate")
+      .option("rowsPerSecond", "1")
+      .load()
+      .select(checkForTwo($"value").as("checkedValue"))
+      .writeStream
+      .outputMode("append")
+      .format("console")
+      .start()
+
+    val exception = intercept[SparkException] {
+      query.awaitTermination()
+    }
+
+    assert(
+      exception.getCause.getCause.getCause.getMessage

Review Comment:
   https://github.com/apache/spark/pull/42377#discussion_r1323060159
   
   I will add the checks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] heyihong commented on a diff in pull request #42987: [SPARK-45207][SQL][CONNECT] Implement Complete Error Reconstruction for Scala Client

Posted by "heyihong (via GitHub)" <gi...@apache.org>.
heyihong commented on code in PR #42987:
URL: https://github.com/apache/spark/pull/42987#discussion_r1331247912


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2882,8 +2882,7 @@ object SQLConf {
         "level settings.")
       .version("3.0.0")
       .booleanConf
-      // show full stacktrace in tests but hide in production by default.
-      .createWithDefault(Utils.isTesting)

Review Comment:
   The size of the ErrorInfo exceeds the header limit sometimes in tests. So disable this by default. It should not be an issue after we migrate to use FetchErrorDetails RPC



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #42987: [SPARK-45207][SQL][CONNECT] Implement Error Reconstruction for Scala Client

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #42987:
URL: https://github.com/apache/spark/pull/42987#discussion_r1330878085


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala:
##########
@@ -24,49 +24,135 @@ import scala.reflect.ClassTag
 import com.google.rpc.ErrorInfo
 import io.grpc.StatusRuntimeException
 import io.grpc.protobuf.StatusProto
+import org.json4s.DefaultFormats
+import org.json4s.jackson.JsonMethods
 
 import org.apache.spark.{SparkArithmeticException, SparkArrayIndexOutOfBoundsException, SparkDateTimeException, SparkException, SparkIllegalArgumentException, SparkNumberFormatException, SparkRuntimeException, SparkUnsupportedOperationException, SparkUpgradeException}
+import org.apache.spark.connect.proto.{FetchErrorDetailsRequest, FetchErrorDetailsResponse, UserContext}
+import org.apache.spark.connect.proto.SparkConnectServiceGrpc.SparkConnectServiceBlockingStub
+import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchDatabaseException, NoSuchTableException, TableAlreadyExistsException, TempTableAlreadyExistsException}
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.trees.Origin
-import org.apache.spark.util.JsonUtils
 
-private[client] object GrpcExceptionConverter extends JsonUtils {
-  def convert[T](f: => T): T = {
+/**
+ * GrpcExceptionConverter handles the conversion of StatusRuntimeExceptions into Spark exceptions.
+ * It does so by utilizing the ErrorInfo defined in error_details.proto and making an additional
+ * FetchErrorDetails RPC call to retrieve the full error message and optionally the server-side
+ * stacktrace.
+ *
+ * If the FetchErrorDetails RPC call succeeds, the exceptions will be constructed based on the
+ * response. If the RPC call fails, the exception will be constructed based on the ErrorInfo. If
+ * the ErrorInfo is missing, the exception will be constructed based on the StatusRuntimeException
+ * itself.
+ */
+private[client] class GrpcExceptionConverter(grpcStub: SparkConnectServiceBlockingStub)
+    extends Logging {
+  import GrpcExceptionConverter._
+
+  def convert[T](sessionId: String, userContext: UserContext)(f: => T): T = {
     try {
       f
     } catch {
       case e: StatusRuntimeException =>
-        throw toThrowable(e)
+        throw toThrowable(e, sessionId, userContext)
     }
   }
 
-  def convertIterator[T](iter: CloseableIterator[T]): CloseableIterator[T] = {
+  def convertIterator[T](
+      sessionId: String,
+      userContext: UserContext,
+      iter: CloseableIterator[T]): CloseableIterator[T] = {
     new WrappedCloseableIterator[T] {
 
       override def innerIterator: Iterator[T] = iter
 
       override def hasNext: Boolean = {
-        convert {
+        convert(sessionId, userContext) {
           iter.hasNext
         }
       }
 
       override def next(): T = {
-        convert {
+        convert(sessionId, userContext) {
           iter.next()
         }
       }
 
       override def close(): Unit = {
-        convert {
+        convert(sessionId, userContext) {
           iter.close()
         }
       }
     }
   }
 
+  /**
+   * fetchEnrichedError fetches enriched errors with full exception message and optionally

Review Comment:
   remove the method name from the doc?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] juliuszsompolski commented on a diff in pull request #42987: [SPARK-45207][SQL][CONNECT] Implement Error Enrichment for Scala Client

Posted by "juliuszsompolski (via GitHub)" <gi...@apache.org>.
juliuszsompolski commented on code in PR #42987:
URL: https://github.com/apache/spark/pull/42987#discussion_r1333313454


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala:
##########
@@ -24,49 +24,140 @@ import scala.reflect.ClassTag
 import com.google.rpc.ErrorInfo
 import io.grpc.StatusRuntimeException
 import io.grpc.protobuf.StatusProto
+import org.json4s.DefaultFormats
+import org.json4s.jackson.JsonMethods
 
 import org.apache.spark.{SparkArithmeticException, SparkArrayIndexOutOfBoundsException, SparkDateTimeException, SparkException, SparkIllegalArgumentException, SparkNumberFormatException, SparkRuntimeException, SparkUnsupportedOperationException, SparkUpgradeException}
+import org.apache.spark.connect.proto.{FetchErrorDetailsRequest, FetchErrorDetailsResponse, UserContext}
+import org.apache.spark.connect.proto.SparkConnectServiceGrpc.SparkConnectServiceBlockingStub
+import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchDatabaseException, NoSuchTableException, TableAlreadyExistsException, TempTableAlreadyExistsException}
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.trees.Origin
-import org.apache.spark.util.JsonUtils
 
-private[client] object GrpcExceptionConverter extends JsonUtils {
-  def convert[T](f: => T): T = {
+/**
+ * GrpcExceptionConverter handles the conversion of StatusRuntimeExceptions into Spark exceptions.
+ * It does so by utilizing the ErrorInfo defined in error_details.proto and making an additional
+ * FetchErrorDetails RPC call to retrieve the full error message and optionally the server-side
+ * stacktrace.
+ *
+ * If the FetchErrorDetails RPC call succeeds, the exceptions will be constructed based on the
+ * response. If the RPC call fails, the exception will be constructed based on the ErrorInfo. If
+ * the ErrorInfo is missing, the exception will be constructed based on the StatusRuntimeException
+ * itself.
+ */
+private[client] class GrpcExceptionConverter(grpcStub: SparkConnectServiceBlockingStub)
+    extends Logging {
+  import GrpcExceptionConverter._
+
+  def convert[T](sessionId: String, userContext: UserContext)(f: => T): T = {
     try {
       f
     } catch {
       case e: StatusRuntimeException =>
-        throw toThrowable(e)
+        throw toThrowable(e, sessionId, userContext)
     }
   }
 
-  def convertIterator[T](iter: CloseableIterator[T]): CloseableIterator[T] = {
+  def convertIterator[T](
+      sessionId: String,
+      userContext: UserContext,
+      iter: CloseableIterator[T]): CloseableIterator[T] = {
     new WrappedCloseableIterator[T] {
 
       override def innerIterator: Iterator[T] = iter
 
       override def hasNext: Boolean = {
-        convert {
+        convert(sessionId, userContext) {
           iter.hasNext
         }
       }
 
       override def next(): T = {
-        convert {
+        convert(sessionId, userContext) {
           iter.next()
         }
       }
 
       override def close(): Unit = {
-        convert {
+        convert(sessionId, userContext) {
           iter.close()
         }
       }
     }
   }
 
+  /**
+   * Fetches enriched errors with full exception message and optionally stacktrace by issuing an
+   * additional RPC call to fetch error details. The RPC call is best-effort at-most-once.
+   */
+  private def fetchEnrichedError(
+      info: ErrorInfo,
+      sessionId: String,
+      userContext: UserContext): Option[Throwable] = {
+    val errorId = info.getMetadataOrDefault("errorId", null)
+    if (errorId == null) {
+      logWarning("Unable to fetch enriched error since errorId is missing")
+      return None
+    }
+
+    try {
+      val errorDetailsResponse = grpcStub.fetchErrorDetails(
+        FetchErrorDetailsRequest
+          .newBuilder()
+          .setSessionId(sessionId)
+          .setErrorId(errorId)
+          .setUserContext(UserContext.newBuilder().setUserId(userContext.getUserId).build())

Review Comment:
   why build new UserContext instead of reusing `userContext`? Why does only UserId field matter?



##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala:
##########
@@ -93,33 +184,63 @@ private[client] object GrpcExceptionConverter extends JsonUtils {
       new SparkArrayIndexOutOfBoundsException(message)),
     errorConstructor[DateTimeException]((message, _) => new SparkDateTimeException(message)),
     errorConstructor((message, cause) => new SparkRuntimeException(message, cause)),
-    errorConstructor((message, cause) => new SparkUpgradeException(message, cause)))
-
-  private def errorInfoToThrowable(info: ErrorInfo, message: String): Option[Throwable] = {
-    val classes =
-      mapper.readValue(info.getMetadataOrDefault("classes", "[]"), classOf[Array[String]])
+    errorConstructor((message, cause) => new SparkUpgradeException(message, cause)),
+    errorConstructor((message, cause) => new SparkException(message, cause.orNull)))
+
+  /**
+   * errorsToThrowable reconstructs the exception based on a list of protobuf messages
+   * FetchErrorDetailsResponse.Error with un-truncated error messages and server-side stacktrace
+   * (if set).
+   */
+  private def errorsToThrowable(
+      errorIdx: Int,
+      errors: Seq[FetchErrorDetailsResponse.Error]): Throwable = {
+
+    val error = errors(errorIdx)
+
+    val classHierarchy = error.getErrorTypeHierarchyList.asScala
+
+    val constructor =
+      classHierarchy
+        .flatMap(errorFactory.get)
+        .headOption
+        .getOrElse((message: String, cause: Option[Throwable]) =>
+          new SparkException(s"${classHierarchy.head}: ${message}", cause.orNull))
+
+    val causeOpt =
+      if (error.hasCauseIdx) Some(errorsToThrowable(error.getCauseIdx, errors)) else None
+
+    val exception = constructor(error.getMessage, causeOpt)
+
+    if (!error.getStackTraceList.isEmpty) {
+      exception.setStackTrace(error.getStackTraceList.asScala.toArray.map { stackTraceElement =>
+        new StackTraceElement(
+          stackTraceElement.getDeclaringClass,
+          stackTraceElement.getMethodName,
+          stackTraceElement.getFileName,
+          stackTraceElement.getLineNumber)
+      })
+    }
 
-    classes
-      .find(errorFactory.contains)
-      .map { cls =>
-        val constructor = errorFactory.get(cls).get
-        constructor(message, None)
-      }
+    exception
   }
 
-  private def toThrowable(ex: StatusRuntimeException): Throwable = {
-    val status = StatusProto.fromThrowable(ex)
-
-    val fallbackEx = new SparkException(ex.toString, ex.getCause)
-
-    val errorInfoOpt = status.getDetailsList.asScala
-      .find(_.is(classOf[ErrorInfo]))
-
-    if (errorInfoOpt.isEmpty) {
-      return fallbackEx
-    }
-
-    errorInfoToThrowable(errorInfoOpt.get.unpack(classOf[ErrorInfo]), status.getMessage)
-      .getOrElse(fallbackEx)
+  /**
+   * errorInfoToThrowable reconstructs the exception based on the error classes hierarchy and the
+   * truncated error message.
+   */
+  private def errorInfoToThrowable(info: ErrorInfo, message: String): Throwable = {
+    implicit val formats = DefaultFormats
+    val classes =
+      JsonMethods.parse(info.getMetadataOrDefault("classes", "[]")).extract[Array[String]]

Review Comment:
   does this need some try { } catch in case the Json is malformed (e.g. because of truncation) and parsing throws an error?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] heyihong commented on a diff in pull request #42987: [SPARK-45207][SQL][CONNECT] Implement Complete Error Reconstruction for Scala Client

Posted by "heyihong (via GitHub)" <gi...@apache.org>.
heyihong commented on code in PR #42987:
URL: https://github.com/apache/spark/pull/42987#discussion_r1331247912


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2882,8 +2882,7 @@ object SQLConf {
         "level settings.")
       .version("3.0.0")
       .booleanConf
-      // show full stacktrace in tests but hide in production by default.
-      .createWithDefault(Utils.isTesting)

Review Comment:
   The size of the ErrorInfo is about to exceed the header limit even in tests. So disable this by default.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] heyihong commented on a diff in pull request #42987: [SPARK-45207][SQL][CONNECT] Implement Error Reconstruction for Scala Client

Posted by "heyihong (via GitHub)" <gi...@apache.org>.
heyihong commented on code in PR #42987:
URL: https://github.com/apache/spark/pull/42987#discussion_r1331243346


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala:
##########
@@ -93,33 +179,65 @@ private[client] object GrpcExceptionConverter extends JsonUtils {
       new SparkArrayIndexOutOfBoundsException(message)),
     errorConstructor[DateTimeException]((message, _) => new SparkDateTimeException(message)),
     errorConstructor((message, cause) => new SparkRuntimeException(message, cause)),
-    errorConstructor((message, cause) => new SparkUpgradeException(message, cause)))
-
-  private def errorInfoToThrowable(info: ErrorInfo, message: String): Option[Throwable] = {
-    val classes =
-      mapper.readValue(info.getMetadataOrDefault("classes", "[]"), classOf[Array[String]])
+    errorConstructor((message, cause) => new SparkUpgradeException(message, cause)),
+    errorConstructor((message, cause) => new SparkException(message, cause.orNull)))
+
+  /**
+   * errorsToThrowable reconstructs the exception based on a list of protobuf messages
+   * FetchErrorDetailsResponse.Error with un-truncated error messages and server-side stacktrace
+   * (if set).
+   */
+  private def errorsToThrowable(
+      errorIdx: Int,
+      errors: Seq[FetchErrorDetailsResponse.Error]): Throwable = {
+
+    val error = errors(errorIdx)
+
+    val classHierarchy = error.getErrorTypeHierarchyList.asScala
+
+    val constructor =
+      classHierarchy
+        .flatMap(errorFactory.get)
+        .headOption
+        .getOrElse((message: String, cause: Option[Throwable]) =>
+          new SparkException(

Review Comment:
   The message pattern is from https://github.com/apache/spark/pull/42377#discussion_r1329224205. But this one seems to be simpler



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] heyihong closed pull request #42987: [SPARK-45207][SQL][CONNECT] Implement FetchErrorDetails RPC

Posted by "heyihong (via GitHub)" <gi...@apache.org>.
heyihong closed pull request #42987: [SPARK-45207][SQL][CONNECT] Implement FetchErrorDetails RPC
URL: https://github.com/apache/spark/pull/42987


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #42987: [SPARK-45207][SQL][CONNECT] Implement Error Reconstruction for Scala Client

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #42987:
URL: https://github.com/apache/spark/pull/42987#discussion_r1330873176


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala:
##########
@@ -93,33 +179,65 @@ private[client] object GrpcExceptionConverter extends JsonUtils {
       new SparkArrayIndexOutOfBoundsException(message)),
     errorConstructor[DateTimeException]((message, _) => new SparkDateTimeException(message)),
     errorConstructor((message, cause) => new SparkRuntimeException(message, cause)),
-    errorConstructor((message, cause) => new SparkUpgradeException(message, cause)))
-
-  private def errorInfoToThrowable(info: ErrorInfo, message: String): Option[Throwable] = {
-    val classes =
-      mapper.readValue(info.getMetadataOrDefault("classes", "[]"), classOf[Array[String]])
+    errorConstructor((message, cause) => new SparkUpgradeException(message, cause)),
+    errorConstructor((message, cause) => new SparkException(message, cause.orNull)))
+
+  /**
+   * errorsToThrowable reconstructs the exception based on a list of protobuf messages
+   * FetchErrorDetailsResponse.Error with un-truncated error messages and server-side stacktrace
+   * (if set).
+   */
+  private def errorsToThrowable(
+      errorIdx: Int,
+      errors: Seq[FetchErrorDetailsResponse.Error]): Throwable = {
+
+    val error = errors(errorIdx)
+
+    val classHierarchy = error.getErrorTypeHierarchyList.asScala
+
+    val constructor =
+      classHierarchy
+        .flatMap(errorFactory.get)
+        .headOption
+        .getOrElse((message: String, cause: Option[Throwable]) =>
+          new SparkException(

Review Comment:
   How about we just do `s"${classHierarchy.head}: ${message}"`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] heyihong commented on a diff in pull request #42987: [SPARK-45207][SQL][CONNECT] Implement Complete Error Reconstruction for Scala Client

Posted by "heyihong (via GitHub)" <gi...@apache.org>.
heyihong commented on code in PR #42987:
URL: https://github.com/apache/spark/pull/42987#discussion_r1331247912


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2882,8 +2882,7 @@ object SQLConf {
         "level settings.")
       .version("3.0.0")
       .booleanConf
-      // show full stacktrace in tests but hide in production by default.
-      .createWithDefault(Utils.isTesting)

Review Comment:
   The size of the ErrorInfo exceeds the header limit sometimes in tests. So disable this by default.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org