You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2024/03/10 23:44:43 UTC

(spark) branch master updated: [SPARK-47200][SS] Make Foreach batch sink user function error handling backward compatible

This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new edb970b8a73e [SPARK-47200][SS] Make Foreach batch sink user function error handling backward compatible
edb970b8a73e is described below

commit edb970b8a73e5b1e08b01f9370dadb05a3e231e3
Author: micheal-o <mi...@gmail.com>
AuthorDate: Mon Mar 11 08:44:30 2024 +0900

    [SPARK-47200][SS] Make Foreach batch sink user function error handling backward compatible
    
    ### What changes were proposed in this pull request?
    I checked in a previous PR (https://github.com/apache/spark/pull/45299), that handles and classifies exceptions thrown in user provided functions for foreach batch sink. This change is to make it backward compatible in order not to break current users, since users may be depending on getting the user code error from the `StreamingQueryException.cause` instead of `StreamingQueryException.cause.cause`
    
    ### Why are the changes needed?
    To prevent breaking existing usage pattern.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, better error message with error class for ForeachBatchSink user function failures.
    
    ### How was this patch tested?
    updated existing tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #45449 from micheal-o/ForeachBatchExBackwardCompat.
    
    Authored-by: micheal-o <mi...@gmail.com>
    Signed-off-by: Jungtaek Lim <ka...@gmail.com>
---
 .../src/main/resources/error/error-classes.json    |  2 +-
 docs/sql-error-conditions.md                       |  2 +-
 .../sql/execution/streaming/StreamExecution.scala  | 29 +++++++++++++++-------
 .../streaming/sources/ForeachBatchSink.scala       | 14 ++++++++---
 .../sql/errors/QueryExecutionErrorsSuite.scala     |  2 +-
 .../streaming/sources/ForeachBatchSinkSuite.scala  | 17 +++++++------
 6 files changed, 43 insertions(+), 23 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json
index 57746d6dbf1e..9717ff2ed49c 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -1297,7 +1297,7 @@
   },
   "FOREACH_BATCH_USER_FUNCTION_ERROR" : {
     "message" : [
-      "An error occurred in the user provided function in foreach batch sink."
+      "An error occurred in the user provided function in foreach batch sink. Reason: <reason>"
     ],
     "sqlState" : "39000"
   },
diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md
index 7be01f8cb513..0be75cde968f 100644
--- a/docs/sql-error-conditions.md
+++ b/docs/sql-error-conditions.md
@@ -778,7 +778,7 @@ The operation `<statement>` is not allowed on the `<objectType>`: `<objectName>`
 
 [SQLSTATE: 39000](sql-error-conditions-sqlstates.html#class-39-external-routine-invocation-exception)
 
-An error occurred in the user provided function in foreach batch sink.
+An error occurred in the user provided function in foreach batch sink. Reason: `<reason>`
 
 ### FOUND_MULTIPLE_DATA_SOURCES
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 859fce8b1154..50a73082a8c4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -40,6 +40,7 @@ import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table}
 import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2, ReadLimit, SparkDataStream}
 import org.apache.spark.sql.connector.write.{LogicalWriteInfoImpl, SupportsTruncate, Write}
 import org.apache.spark.sql.execution.command.StreamingExplainCommand
+import org.apache.spark.sql.execution.streaming.sources.ForeachBatchUserFuncException
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.connector.SupportsStreamingUpdateAsAppend
 import org.apache.spark.sql.streaming._
@@ -279,6 +280,7 @@ abstract class StreamExecution(
    * `start()` method returns.
    */
   private def runStream(): Unit = {
+    var errorClassOpt: Option[String] = None
     try {
       sparkSession.sparkContext.setJobGroup(runId.toString, getBatchDescriptionString,
         interruptOnCancel = true)
@@ -330,9 +332,17 @@ abstract class StreamExecution(
         getLatestExecutionContext().updateStatusMessage("Stopped")
       case e: Throwable =>
         val message = if (e.getMessage == null) "" else e.getMessage
+        val cause = if (e.isInstanceOf[ForeachBatchUserFuncException]) {
+          // We want to maintain the current way users get the causing exception
+          // from the StreamingQueryException. Hence the ForeachBatch exception is unwrapped here.
+          e.getCause
+        } else {
+          e
+        }
+
         streamDeathCause = new StreamingQueryException(
           toDebugString(includeLogicalPlan = isInitialized),
-          cause = e,
+          cause = cause,
           getLatestExecutionContext().startOffsets
             .toOffsetSeq(sources.toSeq, getLatestExecutionContext().offsetSeqMetadata)
             .toString,
@@ -350,12 +360,18 @@ abstract class StreamExecution(
             "endOffset" -> getLatestExecutionContext().endOffsets.toOffsetSeq(
               sources.toSeq, getLatestExecutionContext().offsetSeqMetadata).toString
           ))
+
+        errorClassOpt = e match {
+          case t: SparkThrowable => Option(t.getErrorClass)
+          case _ => None
+        }
+
         logError(s"Query $prettyIdString terminated with error", e)
         getLatestExecutionContext().updateStatusMessage(s"Terminated with exception: $message")
         // Rethrow the fatal errors to allow the user using `Thread.UncaughtExceptionHandler` to
         // handle them
-        if (!NonFatal(e)) {
-          throw e
+        if (!NonFatal(cause)) {
+          throw cause
         }
     } finally queryExecutionThread.runUninterruptibly {
       // The whole `finally` block must run inside `runUninterruptibly` to avoid being interrupted
@@ -379,12 +395,6 @@ abstract class StreamExecution(
 
         // Notify others
         sparkSession.streams.notifyQueryTermination(StreamExecution.this)
-        val errorClassOpt = exception.flatMap {
-          _.cause match {
-            case t: SparkThrowable => Some(t.getErrorClass)
-            case _ => None
-          }
-        }
         postEvent(
           new QueryTerminatedEvent(id, runId, exception.map(_.cause).map(Utils.exceptionString),
             errorClassOpt))
@@ -691,6 +701,7 @@ object StreamExecution {
     case e2 @ (_: UncheckedIOException | _: ExecutionException | _: UncheckedExecutionException)
         if e2.getCause != null =>
       isInterruptionException(e2.getCause, sc)
+    case fe: ForeachBatchUserFuncException => isInterruptionException(fe.getCause, sc)
     case se: SparkException =>
       val jobGroup = sc.getLocalProperty("spark.jobGroup.id")
       if (jobGroup == null) return false
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSink.scala
index 5cf98dab21eb..1262731790be 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSink.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSink.scala
@@ -45,14 +45,20 @@ class ForeachBatchSink[T](batchWriter: (Dataset[T], Long) => Unit, encoder: Expr
     } catch {
       // The user code can throw any type of exception.
       case NonFatal(e) if !e.isInstanceOf[SparkThrowable] =>
-        throw new SparkException(
-          errorClass = "FOREACH_BATCH_USER_FUNCTION_ERROR",
-          messageParameters = Map.empty,
-          cause = e)
+        throw ForeachBatchUserFuncException(e)
     }
   }
 }
 
+/**
+ * Exception that wraps the exception thrown in the user provided function in ForeachBatch sink.
+ */
+private[streaming] case class ForeachBatchUserFuncException(cause: Throwable)
+  extends SparkException(
+    errorClass = "FOREACH_BATCH_USER_FUNCTION_ERROR",
+    messageParameters = Map("reason" -> Option(cause.getMessage).getOrElse("")),
+    cause = cause)
+
 /**
  * Interface that is meant to be extended by Python classes via Py4J.
  * Py4J allows Python classes to implement Java interfaces so that the JVM can call back
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
index 9f7224315b0e..d4697773742f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
@@ -889,7 +889,7 @@ class QueryExecutionErrorsSuite
       query.awaitTermination()
     }
     assert(e.getErrorClass === "STREAM_FAILED")
-    assert(e.getCause.getCause.isInstanceOf[NullPointerException])
+    assert(e.getCause.isInstanceOf[NullPointerException])
   }
 
   test("CONCURRENT_QUERY: streaming query is resumed from many sessions") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSinkSuite.scala
index 7d04d14a17ab..5304ea3b69dd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSinkSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.streaming.sources
 import scala.collection.mutable
 import scala.language.implicitConversions
 
-import org.apache.spark.{ExecutorDeadException, SparkException}
+import org.apache.spark.ExecutorDeadException
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.execution.SerializeFromObjectExec
@@ -193,23 +193,26 @@ class ForeachBatchSinkSuite extends StreamTest {
     mem.addData(1, 2, 3, 4, 5)
 
     val funcEx = new IllegalAccessException("access error")
-    val wrapperEx = intercept[StreamingQueryException] {
+    val queryEx = intercept[StreamingQueryException] {
       val query = ds.writeStream.foreachBatch((_: Dataset[Int], _: Long) => throw funcEx).start()
       query.awaitTermination()
-    }.getCause
+    }
+
+    val errClass = "FOREACH_BATCH_USER_FUNCTION_ERROR"
 
     // verify that we classified the exception
-    checkError(wrapperEx.asInstanceOf[SparkException], "FOREACH_BATCH_USER_FUNCTION_ERROR")
-    assert(wrapperEx.getCause == funcEx)
+    assert(queryEx.getMessage.contains(errClass))
+    assert(queryEx.getCause == funcEx)
 
     val sparkEx = ExecutorDeadException("network error")
     val ex = intercept[StreamingQueryException] {
       val query = ds.writeStream.foreachBatch((_: Dataset[Int], _: Long) => throw sparkEx).start()
       query.awaitTermination()
-    }.getCause
+    }
 
     // we didn't wrap the spark exception
-    assert(ex == sparkEx)
+    assert(!ex.getMessage.contains(errClass))
+    assert(ex.getCause == sparkEx)
   }
 
   // ============== Helper classes and methods =================


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org