You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2023/12/28 08:14:16 UTC
(spark) branch master updated: [SPARK-46532][CONNECT] Pass message parameters in metadata of `ErrorInfo`
This is an automated email from the ASF dual-hosted git repository.
maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 6fcc2685cd3 [SPARK-46532][CONNECT] Pass message parameters in metadata of `ErrorInfo`
6fcc2685cd3 is described below
commit 6fcc2685cd3f9681dc85e0d53047f2e647d24b0b
Author: Max Gekk <ma...@gmail.com>
AuthorDate: Thu Dec 28 11:14:03 2023 +0300
[SPARK-46532][CONNECT] Pass message parameters in metadata of `ErrorInfo`
### What changes were proposed in this pull request?
In the PR, I propose to put message parameters together with an error class in the `messageParameter` field in metadata of `ErrorInfo`.
### Why are the changes needed?
To be able to create an error from an error class and message parameters. Before the changes, it is not possible to re-construct an error having only an error class.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
By running the modified test:
```
$ build/sbt "connect-client-jvm/testOnly *ClientE2ETestSuite"
```
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #44468 from MaxGekk/messageParameters-in-metadata.
Authored-by: Max Gekk <ma...@gmail.com>
Signed-off-by: Max Gekk <ma...@gmail.com>
---
.../scala/org/apache/spark/sql/ClientE2ETestSuite.scala | 8 +++++++-
.../spark/sql/connect/client/GrpcExceptionConverter.scala | 4 ++++
.../org/apache/spark/sql/connect/config/Connect.scala | 9 +++++++++
.../org/apache/spark/sql/connect/utils/ErrorUtils.scala | 14 +++++++++++---
python/pyspark/sql/tests/connect/test_connect_basic.py | 2 +-
5 files changed, 32 insertions(+), 5 deletions(-)
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
index c947d948b4c..0740334724e 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
@@ -85,7 +85,13 @@ class ClientE2ETestSuite extends RemoteSparkSession with SQLHelper with PrivateM
|""".stripMargin)
.collect()
}
- assert(ex.getErrorClass != null)
+ assert(
+ ex.getErrorClass ===
+ "INCONSISTENT_BEHAVIOR_CROSS_VERSION.PARSE_DATETIME_BY_NEW_PARSER")
+ assert(
+ ex.getMessageParameters.asScala == Map(
+ "datetime" -> "'02-29'",
+ "config" -> "\"spark.sql.legacy.timeParserPolicy\""))
if (enrichErrorEnabled) {
assert(ex.getCause.isInstanceOf[DateTimeException])
} else {
diff --git a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
index 075526e7521..cc47924de3b 100644
--- a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
+++ b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
@@ -372,10 +372,14 @@ private[client] object GrpcExceptionConverter {
.addAllErrorTypeHierarchy(classes.toImmutableArraySeq.asJava)
if (errorClass != null) {
+ val messageParameters = JsonMethods
+ .parse(info.getMetadataOrDefault("messageParameters", "{}"))
+ .extract[Map[String, String]]
builder.setSparkThrowable(
FetchErrorDetailsResponse.SparkThrowable
.newBuilder()
.setErrorClass(errorClass)
+ .putAllMessageParameters(messageParameters.asJava)
.build())
}
diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
index ab4f06d508a..39bf1a630af 100644
--- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
+++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
@@ -256,4 +256,13 @@ object Connect {
.version("4.0.0")
.booleanConf
.createWithDefault(true)
+
+ val CONNECT_GRPC_MAX_METADATA_SIZE =
+ buildStaticConf("spark.connect.grpc.maxMetadataSize")
+ .doc(
+ "Sets the maximum size of metadata fields. For instance, it restricts metadata fields " +
+ "in `ErrorInfo`.")
+ .version("4.0.0")
+ .bytesConf(ByteUnit.BYTE)
+ .createWithDefault(1024)
}
diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala
index 703b11c0c73..f489551a1db 100644
--- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala
+++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala
@@ -172,6 +172,7 @@ private[connect] object ErrorUtils extends Logging {
"classes",
JsonMethods.compact(JsonMethods.render(allClasses(st.getClass).map(_.getName))))
+ val maxMetadataSize = SparkEnv.get.conf.get(Connect.CONNECT_GRPC_MAX_METADATA_SIZE)
// Add the SQL State and Error Class to the response metadata of the ErrorInfoObject.
st match {
case e: SparkThrowable =>
@@ -181,7 +182,12 @@ private[connect] object ErrorUtils extends Logging {
}
val errorClass = e.getErrorClass
if (errorClass != null && errorClass.nonEmpty) {
- errorInfo.putMetadata("errorClass", errorClass)
+ val messageParameters = JsonMethods.compact(
+ JsonMethods.render(map2jvalue(e.getMessageParameters.asScala.toMap)))
+ if (messageParameters.length <= maxMetadataSize) {
+ errorInfo.putMetadata("errorClass", errorClass)
+ errorInfo.putMetadata("messageParameters", messageParameters)
+ }
}
case _ =>
}
@@ -200,8 +206,10 @@ private[connect] object ErrorUtils extends Logging {
val withStackTrace =
if (sessionHolderOpt.exists(
_.session.conf.get(SQLConf.PYSPARK_JVM_STACKTRACE_ENABLED) && stackTrace.nonEmpty)) {
- val maxSize = SparkEnv.get.conf.get(Connect.CONNECT_JVM_STACK_TRACE_MAX_SIZE)
- errorInfo.putMetadata("stackTrace", StringUtils.abbreviate(stackTrace.get, maxSize))
+ val maxSize = Math.min(
+ SparkEnv.get.conf.get(Connect.CONNECT_JVM_STACK_TRACE_MAX_SIZE),
+ maxMetadataSize)
+ errorInfo.putMetadata("stackTrace", StringUtils.abbreviate(stackTrace.get, maxSize.toInt))
} else {
errorInfo
}
diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py b/python/pyspark/sql/tests/connect/test_connect_basic.py
index 32cd4ed6249..7275f40b39a 100755
--- a/python/pyspark/sql/tests/connect/test_connect_basic.py
+++ b/python/pyspark/sql/tests/connect/test_connect_basic.py
@@ -3452,7 +3452,7 @@ class SparkConnectSessionTests(ReusedConnectTestCase):
self.spark.stop()
spark = (
PySparkSession.builder.config(conf=self.conf())
- .config("spark.connect.jvmStacktrace.maxSize", 128)
+ .config("spark.connect.grpc.maxMetadataSize", 128)
.remote("local[4]")
.getOrCreate()
)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org