You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "heyihong (via GitHub)" <gi...@apache.org> on 2023/09/01 22:50:35 UTC

[GitHub] [spark] heyihong commented on a diff in pull request #42377: [SPARK-44622][SQL][CONNECT] Implement error enrichment and setting server-side stacktrace

heyihong commented on code in PR #42377:
URL: https://github.com/apache/spark/pull/42377#discussion_r1313613119


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala:
##########
@@ -19,51 +19,184 @@ package org.apache.spark.sql.connect.client
 import java.time.DateTimeException
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 import scala.reflect.ClassTag
 
 import com.google.rpc.ErrorInfo
 import io.grpc.StatusRuntimeException
 import io.grpc.protobuf.StatusProto
+import org.json4s.DefaultFormats
+import org.json4s.jackson.JsonMethods
 
 import org.apache.spark.{SparkArithmeticException, SparkArrayIndexOutOfBoundsException, SparkDateTimeException, SparkException, SparkIllegalArgumentException, SparkNumberFormatException, SparkRuntimeException, SparkUnsupportedOperationException, SparkUpgradeException}
+import org.apache.spark.connect.proto.{FetchErrorDetailsRequest, FetchErrorDetailsResponse, UserContext}
+import org.apache.spark.connect.proto.SparkConnectServiceGrpc.SparkConnectServiceBlockingStub
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchDatabaseException, NoSuchTableException, TableAlreadyExistsException, TempTableAlreadyExistsException}
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.trees.Origin
-import org.apache.spark.util.JsonUtils
 
-private[client] object GrpcExceptionConverter extends JsonUtils {
-  def convert[T](f: => T): T = {
+/**
+ * GrpcExceptionConverter converts StatusRuntimeException to Spark exceptions.
+ * @param grpcStub
+ *   grpcStub for fetching error details from server.
+ */
+private[client] class GrpcExceptionConverter(grpcStub: SparkConnectServiceBlockingStub) {
+  import GrpcExceptionConverter._
+
+  def convert[T](sessionId: String, userContext: UserContext)(f: => T): T = {
     try {
       f
     } catch {
       case e: StatusRuntimeException =>
-        throw toThrowable(e)
+        throw toThrowable(e, sessionId, userContext)
     }
   }
 
-  def convertIterator[T](iter: CloseableIterator[T]): CloseableIterator[T] = {
+  def convertIterator[T](
+      sessionId: String,
+      userContext: UserContext,
+      iter: CloseableIterator[T]): CloseableIterator[T] = {
     new CloseableIterator[T] {
       override def hasNext: Boolean = {
-        convert {
+        convert(sessionId, userContext) {
           iter.hasNext
         }
       }
 
       override def next(): T = {
-        convert {
+        convert(sessionId, userContext) {
           iter.next()
         }
       }
 
       override def close(): Unit = {
-        convert {
+        convert(sessionId, userContext) {
           iter.close()
         }
       }
     }
   }
 
+  private def exceptionInfosToErrors(
+    errorId: String,
+    exceptionInfos: mutable.Map[String, FetchErrorDetailsResponse.ExceptionInfo]
+  ): List[Error] = {

Review Comment:
   Done



##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala:
##########
@@ -19,51 +19,184 @@ package org.apache.spark.sql.connect.client
 import java.time.DateTimeException
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 import scala.reflect.ClassTag
 
 import com.google.rpc.ErrorInfo
 import io.grpc.StatusRuntimeException
 import io.grpc.protobuf.StatusProto
+import org.json4s.DefaultFormats
+import org.json4s.jackson.JsonMethods
 
 import org.apache.spark.{SparkArithmeticException, SparkArrayIndexOutOfBoundsException, SparkDateTimeException, SparkException, SparkIllegalArgumentException, SparkNumberFormatException, SparkRuntimeException, SparkUnsupportedOperationException, SparkUpgradeException}
+import org.apache.spark.connect.proto.{FetchErrorDetailsRequest, FetchErrorDetailsResponse, UserContext}
+import org.apache.spark.connect.proto.SparkConnectServiceGrpc.SparkConnectServiceBlockingStub
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchDatabaseException, NoSuchTableException, TableAlreadyExistsException, TempTableAlreadyExistsException}
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.trees.Origin
-import org.apache.spark.util.JsonUtils
 
-private[client] object GrpcExceptionConverter extends JsonUtils {
-  def convert[T](f: => T): T = {
+/**
+ * GrpcExceptionConverter converts StatusRuntimeException to Spark exceptions.
+ * @param grpcStub
+ *   grpcStub for fetching error details from server.
+ */
+private[client] class GrpcExceptionConverter(grpcStub: SparkConnectServiceBlockingStub) {
+  import GrpcExceptionConverter._
+
+  def convert[T](sessionId: String, userContext: UserContext)(f: => T): T = {
     try {
       f
     } catch {
       case e: StatusRuntimeException =>
-        throw toThrowable(e)
+        throw toThrowable(e, sessionId, userContext)
     }
   }
 
-  def convertIterator[T](iter: CloseableIterator[T]): CloseableIterator[T] = {
+  def convertIterator[T](
+      sessionId: String,
+      userContext: UserContext,
+      iter: CloseableIterator[T]): CloseableIterator[T] = {
     new CloseableIterator[T] {
       override def hasNext: Boolean = {
-        convert {
+        convert(sessionId, userContext) {
           iter.hasNext
         }
       }
 
       override def next(): T = {
-        convert {
+        convert(sessionId, userContext) {
           iter.next()
         }
       }
 
       override def close(): Unit = {
-        convert {
+        convert(sessionId, userContext) {
           iter.close()
         }
       }
     }
   }
 
+  private def exceptionInfosToErrors(
+    errorId: String,
+    exceptionInfos: mutable.Map[String, FetchErrorDetailsResponse.ExceptionInfo]
+  ): List[Error] = {

Review Comment:
   Good point, done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org