You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "rangadi (via GitHub)" <gi...@apache.org> on 2023/09/19 01:31:37 UTC

[GitHub] [spark] rangadi commented on a diff in pull request #42986: [SPARK-44463][SS][CONNECT] Improve error handling for Connect steaming Python worker

rangadi commented on code in PR #42986:
URL: https://github.com/apache/spark/pull/42986#discussion_r1329448797


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala:
##########
@@ -125,8 +128,21 @@ object StreamingForeachBatchHelper extends Logging {
       dataOut.writeLong(args.batchId)
       dataOut.flush()
 
-      val ret = dataIn.readInt()
-      logInfo(s"Python foreach batch for dfId ${args.dfId} completed (ret: $ret)")
+      try {
+        dataIn.readInt() match {
+          case ret if ret == 0 =>

Review Comment:
   Minor: Can simply be `case 0 => `



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingQueryListenerHelper.scala:
##########
@@ -38,33 +43,56 @@ class PythonStreamingQueryListener(listener: SimplePythonFunction, sessionHolder
     sessionHolder.sessionId,
     "pyspark.sql.connect.streaming.worker.listener_worker")
 
-  val (dataOut, _) = runner.init()
+  val (dataOut, dataIn) = runner.init()
 
   override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
     PythonRDD.writeUTF(event.json, dataOut)
     dataOut.writeInt(0)
     dataOut.flush()
+    handlePythonWorkerError("onQueryStarted")
   }
 
   override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
     PythonRDD.writeUTF(event.json, dataOut)
     dataOut.writeInt(1)
     dataOut.flush()
+    handlePythonWorkerError("onQueryProgress")
   }
 
   override def onQueryIdle(event: StreamingQueryListener.QueryIdleEvent): Unit = {
     PythonRDD.writeUTF(event.json, dataOut)
     dataOut.writeInt(2)
     dataOut.flush()
+    handlePythonWorkerError("onQueryIdle")
   }
 
   override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
     PythonRDD.writeUTF(event.json, dataOut)
     dataOut.writeInt(3)
     dataOut.flush()
+    handlePythonWorkerError("onQueryTerminated")
   }
 
   private[spark] def stopListenerProcess(): Unit = {
     runner.stop()
   }
+
+  private def handlePythonWorkerError(functionName: String): Unit = {
+    try {
+      dataIn.readInt() match {

Review Comment:
   Can you add a comment to reuse this code? 



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala:
##########
@@ -125,8 +128,21 @@ object StreamingForeachBatchHelper extends Logging {
       dataOut.writeLong(args.batchId)
       dataOut.flush()
 
-      val ret = dataIn.readInt()
-      logInfo(s"Python foreach batch for dfId ${args.dfId} completed (ret: $ret)")
+      try {
+        dataIn.readInt() match {
+          case ret if ret == 0 =>
+            logInfo(s"Python foreach batch for dfId ${args.dfId} completed (ret: $ret)")
+          case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
+            val exLength = dataIn.readInt()
+            val obj = new Array[Byte](exLength)
+            dataIn.readFully(obj)
+            val msg = new String(obj, StandardCharsets.UTF_8)
+            throw new PythonException(s"Found error inside foreachBatch Python process: $msg", null)

Review Comment:
   Handle other return here.



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingQueryListenerHelper.scala:
##########
@@ -38,33 +43,56 @@ class PythonStreamingQueryListener(listener: SimplePythonFunction, sessionHolder
     sessionHolder.sessionId,
     "pyspark.sql.connect.streaming.worker.listener_worker")
 
-  val (dataOut, _) = runner.init()
+  val (dataOut, dataIn) = runner.init()
 
   override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
     PythonRDD.writeUTF(event.json, dataOut)
     dataOut.writeInt(0)
     dataOut.flush()
+    handlePythonWorkerError("onQueryStarted")
   }
 
   override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
     PythonRDD.writeUTF(event.json, dataOut)
     dataOut.writeInt(1)
     dataOut.flush()
+    handlePythonWorkerError("onQueryProgress")
   }
 
   override def onQueryIdle(event: StreamingQueryListener.QueryIdleEvent): Unit = {
     PythonRDD.writeUTF(event.json, dataOut)
     dataOut.writeInt(2)
     dataOut.flush()
+    handlePythonWorkerError("onQueryIdle")
   }
 
   override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
     PythonRDD.writeUTF(event.json, dataOut)
     dataOut.writeInt(3)
     dataOut.flush()
+    handlePythonWorkerError("onQueryTerminated")
   }
 
   private[spark] def stopListenerProcess(): Unit = {
     runner.stop()
   }
+
+  private def handlePythonWorkerError(functionName: String): Unit = {
+    try {
+      dataIn.readInt() match {
+        case ret if ret == 0 =>
+          logInfo(s"Streaming query listener function $functionName completed (ret: $ret)")
+        case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
+          val exLength = dataIn.readInt()
+          val obj = new Array[Byte](exLength)
+          dataIn.readFully(obj)
+          val msg = new String(obj, StandardCharsets.UTF_8)
+          throw new PythonException(s"Found error inside Streaming query listener Python " +
+            s"process for function $functionName: $msg", null)
+      }
+    } catch {
+      case eof: EOFException =>
+        throw new SparkException("Python worker exited unexpectedly (crashed)", eof)

Review Comment:
   Always include the base exception (same for foreach batch side).



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingQueryListenerHelper.scala:
##########
@@ -76,16 +78,21 @@ class PythonStreamingQueryListener(listener: SimplePythonFunction, sessionHolder
   }
 
   private def handlePythonWorkerError(functionName: String): Unit = {
-    dataIn.readInt() match {
-      case ret if ret == 0 =>
-        logInfo(s"Streaming query listener function $functionName completed (ret: $ret)")
-      case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
-        val exLength = dataIn.readInt()
-        val obj = new Array[Byte](exLength)
-        dataIn.readFully(obj)
-        val msg = new String(obj, StandardCharsets.UTF_8)
-        throw new IllegalStateException(s"Found error inside Streaming query listener Python " +
-          s"process for function $functionName: $msg")
+    try {
+      dataIn.readInt() match {
+        case ret if ret == 0 =>
+          logInfo(s"Streaming query listener function $functionName completed (ret: $ret)")
+        case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
+          val exLength = dataIn.readInt()
+          val obj = new Array[Byte](exLength)
+          dataIn.readFully(obj)
+          val msg = new String(obj, StandardCharsets.UTF_8)
+          throw new PythonException(s"Found error inside Streaming query listener Python " +
+            s"process for function $functionName: $msg", null)
+      }
+    } catch {
+      case eof: EOFException =>

Review Comment:
   Yeah, we can't handle the retries in the PR. we need to implement that feature properly.
   @bogao007 can catch any `NonFatal(ex)` here. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org