You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2023/12/28 08:14:16 UTC

(spark) branch master updated: [SPARK-46532][CONNECT] Pass message parameters in metadata of `ErrorInfo`

This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6fcc2685cd3 [SPARK-46532][CONNECT] Pass message parameters in metadata of `ErrorInfo`
6fcc2685cd3 is described below

commit 6fcc2685cd3f9681dc85e0d53047f2e647d24b0b
Author: Max Gekk <ma...@gmail.com>
AuthorDate: Thu Dec 28 11:14:03 2023 +0300

    [SPARK-46532][CONNECT] Pass message parameters in metadata of `ErrorInfo`
    
    ### What changes were proposed in this pull request?
    In the PR, I propose to put message parameters together with an error class in the `messageParameter` field in metadata of `ErrorInfo`.
    
    ### Why are the changes needed?
    To be able to create an error from an error class and message parameters. Before the changes, it is not possible to re-construct an error having only an error class.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    By running the modified test:
    ```
    $ build/sbt "connect-client-jvm/testOnly *ClientE2ETestSuite"
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #44468 from MaxGekk/messageParameters-in-metadata.
    
    Authored-by: Max Gekk <ma...@gmail.com>
    Signed-off-by: Max Gekk <ma...@gmail.com>
---
 .../scala/org/apache/spark/sql/ClientE2ETestSuite.scala    |  8 +++++++-
 .../spark/sql/connect/client/GrpcExceptionConverter.scala  |  4 ++++
 .../org/apache/spark/sql/connect/config/Connect.scala      |  9 +++++++++
 .../org/apache/spark/sql/connect/utils/ErrorUtils.scala    | 14 +++++++++++---
 python/pyspark/sql/tests/connect/test_connect_basic.py     |  2 +-
 5 files changed, 32 insertions(+), 5 deletions(-)

diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
index c947d948b4c..0740334724e 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
@@ -85,7 +85,13 @@ class ClientE2ETestSuite extends RemoteSparkSession with SQLHelper with PrivateM
                 |""".stripMargin)
             .collect()
         }
-        assert(ex.getErrorClass != null)
+        assert(
+          ex.getErrorClass ===
+            "INCONSISTENT_BEHAVIOR_CROSS_VERSION.PARSE_DATETIME_BY_NEW_PARSER")
+        assert(
+          ex.getMessageParameters.asScala == Map(
+            "datetime" -> "'02-29'",
+            "config" -> "\"spark.sql.legacy.timeParserPolicy\""))
         if (enrichErrorEnabled) {
           assert(ex.getCause.isInstanceOf[DateTimeException])
         } else {
diff --git a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
index 075526e7521..cc47924de3b 100644
--- a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
+++ b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
@@ -372,10 +372,14 @@ private[client] object GrpcExceptionConverter {
       .addAllErrorTypeHierarchy(classes.toImmutableArraySeq.asJava)
 
     if (errorClass != null) {
+      val messageParameters = JsonMethods
+        .parse(info.getMetadataOrDefault("messageParameters", "{}"))
+        .extract[Map[String, String]]
       builder.setSparkThrowable(
         FetchErrorDetailsResponse.SparkThrowable
           .newBuilder()
           .setErrorClass(errorClass)
+          .putAllMessageParameters(messageParameters.asJava)
           .build())
     }
 
diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
index ab4f06d508a..39bf1a630af 100644
--- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
+++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
@@ -256,4 +256,13 @@ object Connect {
       .version("4.0.0")
       .booleanConf
       .createWithDefault(true)
+
+  val CONNECT_GRPC_MAX_METADATA_SIZE =
+    buildStaticConf("spark.connect.grpc.maxMetadataSize")
+      .doc(
+        "Sets the maximum size of metadata fields. For instance, it restricts metadata fields " +
+          "in `ErrorInfo`.")
+      .version("4.0.0")
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefault(1024)
 }
diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala
index 703b11c0c73..f489551a1db 100644
--- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala
+++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala
@@ -172,6 +172,7 @@ private[connect] object ErrorUtils extends Logging {
         "classes",
         JsonMethods.compact(JsonMethods.render(allClasses(st.getClass).map(_.getName))))
 
+    val maxMetadataSize = SparkEnv.get.conf.get(Connect.CONNECT_GRPC_MAX_METADATA_SIZE)
     // Add the SQL State and Error Class to the response metadata of the ErrorInfoObject.
     st match {
       case e: SparkThrowable =>
@@ -181,7 +182,12 @@ private[connect] object ErrorUtils extends Logging {
         }
         val errorClass = e.getErrorClass
         if (errorClass != null && errorClass.nonEmpty) {
-          errorInfo.putMetadata("errorClass", errorClass)
+          val messageParameters = JsonMethods.compact(
+            JsonMethods.render(map2jvalue(e.getMessageParameters.asScala.toMap)))
+          if (messageParameters.length <= maxMetadataSize) {
+            errorInfo.putMetadata("errorClass", errorClass)
+            errorInfo.putMetadata("messageParameters", messageParameters)
+          }
         }
       case _ =>
     }
@@ -200,8 +206,10 @@ private[connect] object ErrorUtils extends Logging {
     val withStackTrace =
       if (sessionHolderOpt.exists(
           _.session.conf.get(SQLConf.PYSPARK_JVM_STACKTRACE_ENABLED) && stackTrace.nonEmpty)) {
-        val maxSize = SparkEnv.get.conf.get(Connect.CONNECT_JVM_STACK_TRACE_MAX_SIZE)
-        errorInfo.putMetadata("stackTrace", StringUtils.abbreviate(stackTrace.get, maxSize))
+        val maxSize = Math.min(
+          SparkEnv.get.conf.get(Connect.CONNECT_JVM_STACK_TRACE_MAX_SIZE),
+          maxMetadataSize)
+        errorInfo.putMetadata("stackTrace", StringUtils.abbreviate(stackTrace.get, maxSize.toInt))
       } else {
         errorInfo
       }
diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py b/python/pyspark/sql/tests/connect/test_connect_basic.py
index 32cd4ed6249..7275f40b39a 100755
--- a/python/pyspark/sql/tests/connect/test_connect_basic.py
+++ b/python/pyspark/sql/tests/connect/test_connect_basic.py
@@ -3452,7 +3452,7 @@ class SparkConnectSessionTests(ReusedConnectTestCase):
         self.spark.stop()
         spark = (
             PySparkSession.builder.config(conf=self.conf())
-            .config("spark.connect.jvmStacktrace.maxSize", 128)
+            .config("spark.connect.grpc.maxMetadataSize", 128)
             .remote("local[4]")
             .getOrCreate()
         )


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org