You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "heyihong (via GitHub)" <gi...@apache.org> on 2023/09/01 22:50:35 UTC
[GitHub] [spark] heyihong commented on a diff in pull request #42377: [SPARK-44622][SQL][CONNECT] Implement error enrichment and setting server-side stacktrace
heyihong commented on code in PR #42377:
URL: https://github.com/apache/spark/pull/42377#discussion_r1313613119
##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala:
##########
@@ -19,51 +19,184 @@ package org.apache.spark.sql.connect.client
import java.time.DateTimeException
import scala.collection.JavaConverters._
+import scala.collection.mutable
import scala.reflect.ClassTag
import com.google.rpc.ErrorInfo
import io.grpc.StatusRuntimeException
import io.grpc.protobuf.StatusProto
+import org.json4s.DefaultFormats
+import org.json4s.jackson.JsonMethods
import org.apache.spark.{SparkArithmeticException, SparkArrayIndexOutOfBoundsException, SparkDateTimeException, SparkException, SparkIllegalArgumentException, SparkNumberFormatException, SparkRuntimeException, SparkUnsupportedOperationException, SparkUpgradeException}
+import org.apache.spark.connect.proto.{FetchErrorDetailsRequest, FetchErrorDetailsResponse, UserContext}
+import org.apache.spark.connect.proto.SparkConnectServiceGrpc.SparkConnectServiceBlockingStub
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchDatabaseException, NoSuchTableException, TableAlreadyExistsException, TempTableAlreadyExistsException}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.trees.Origin
-import org.apache.spark.util.JsonUtils
-private[client] object GrpcExceptionConverter extends JsonUtils {
- def convert[T](f: => T): T = {
+/**
+ * GrpcExceptionConverter converts StatusRuntimeException to Spark exceptions.
+ * @param grpcStub
+ * grpcStub for fetching error details from server.
+ */
+private[client] class GrpcExceptionConverter(grpcStub: SparkConnectServiceBlockingStub) {
+ import GrpcExceptionConverter._
+
+ def convert[T](sessionId: String, userContext: UserContext)(f: => T): T = {
try {
f
} catch {
case e: StatusRuntimeException =>
- throw toThrowable(e)
+ throw toThrowable(e, sessionId, userContext)
}
}
- def convertIterator[T](iter: CloseableIterator[T]): CloseableIterator[T] = {
+ def convertIterator[T](
+ sessionId: String,
+ userContext: UserContext,
+ iter: CloseableIterator[T]): CloseableIterator[T] = {
new CloseableIterator[T] {
override def hasNext: Boolean = {
- convert {
+ convert(sessionId, userContext) {
iter.hasNext
}
}
override def next(): T = {
- convert {
+ convert(sessionId, userContext) {
iter.next()
}
}
override def close(): Unit = {
- convert {
+ convert(sessionId, userContext) {
iter.close()
}
}
}
}
+ private def exceptionInfosToErrors(
+ errorId: String,
+ exceptionInfos: mutable.Map[String, FetchErrorDetailsResponse.ExceptionInfo]
+ ): List[Error] = {
Review Comment:
Done
##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala:
##########
@@ -19,51 +19,184 @@ package org.apache.spark.sql.connect.client
import java.time.DateTimeException
import scala.collection.JavaConverters._
+import scala.collection.mutable
import scala.reflect.ClassTag
import com.google.rpc.ErrorInfo
import io.grpc.StatusRuntimeException
import io.grpc.protobuf.StatusProto
+import org.json4s.DefaultFormats
+import org.json4s.jackson.JsonMethods
import org.apache.spark.{SparkArithmeticException, SparkArrayIndexOutOfBoundsException, SparkDateTimeException, SparkException, SparkIllegalArgumentException, SparkNumberFormatException, SparkRuntimeException, SparkUnsupportedOperationException, SparkUpgradeException}
+import org.apache.spark.connect.proto.{FetchErrorDetailsRequest, FetchErrorDetailsResponse, UserContext}
+import org.apache.spark.connect.proto.SparkConnectServiceGrpc.SparkConnectServiceBlockingStub
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchDatabaseException, NoSuchTableException, TableAlreadyExistsException, TempTableAlreadyExistsException}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.trees.Origin
-import org.apache.spark.util.JsonUtils
-private[client] object GrpcExceptionConverter extends JsonUtils {
- def convert[T](f: => T): T = {
+/**
+ * GrpcExceptionConverter converts StatusRuntimeException to Spark exceptions.
+ * @param grpcStub
+ * grpcStub for fetching error details from server.
+ */
+private[client] class GrpcExceptionConverter(grpcStub: SparkConnectServiceBlockingStub) {
+ import GrpcExceptionConverter._
+
+ def convert[T](sessionId: String, userContext: UserContext)(f: => T): T = {
try {
f
} catch {
case e: StatusRuntimeException =>
- throw toThrowable(e)
+ throw toThrowable(e, sessionId, userContext)
}
}
- def convertIterator[T](iter: CloseableIterator[T]): CloseableIterator[T] = {
+ def convertIterator[T](
+ sessionId: String,
+ userContext: UserContext,
+ iter: CloseableIterator[T]): CloseableIterator[T] = {
new CloseableIterator[T] {
override def hasNext: Boolean = {
- convert {
+ convert(sessionId, userContext) {
iter.hasNext
}
}
override def next(): T = {
- convert {
+ convert(sessionId, userContext) {
iter.next()
}
}
override def close(): Unit = {
- convert {
+ convert(sessionId, userContext) {
iter.close()
}
}
}
}
+ private def exceptionInfosToErrors(
+ errorId: String,
+ exceptionInfos: mutable.Map[String, FetchErrorDetailsResponse.ExceptionInfo]
+ ): List[Error] = {
Review Comment:
Good point, done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org