You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2023/06/21 15:00:49 UTC

[spark] branch master updated: [SPARK-44056][SQL] Include UDF name in UDF execution failure error message when available

This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6165f316063 [SPARK-44056][SQL] Include UDF name in UDF execution failure error message when available
6165f316063 is described below

commit 6165f31606344efdf35f060d07cee46b85948e38
Author: Rob Reeves <ro...@linkedin.com>
AuthorDate: Wed Jun 21 18:00:36 2023 +0300

    [SPARK-44056][SQL] Include UDF name in UDF execution failure error message when available
    
    ### What changes were proposed in this pull request?
    This modifies the error message when a Scala UDF fails to execute by including the UDF name if it is available.
    
    ### Why are the changes needed?
    If there are multiple UDFs defined in the same location with the same method signature it can be hard to identify which UDF causes the issue. The current function class alone does not give enough information on its own. Adding the UDF name, if available, makes it easier to identify the exact problematic UDF.
    
    This is particularly helpful when the exception stack trace is not emitted due to a JVM performance optimization and codegen is enabled. Example in 3.1.1:
    ```
    Caused by: org.apache.spark.SparkException: Failed to execute user defined function(UDFRegistration$$Lambda$666/1969461119: (bigint, string) => string)
            at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.subExpr_0$(Unknown Source)
            at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown Source)
            at org.apache.spark.sql.execution.FilterExec.$anonfun$doExecute$3(basicPhysicalOperators.scala:249)
            at org.apache.spark.sql.execution.FilterExec.$anonfun$doExecute$3$adapted(basicPhysicalOperators.scala:248)
            at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:513)
            at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
            at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown Source)
            at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
            at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
            at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
            at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
            at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:131)
            at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
            at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
            at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
            at org.apache.spark.scheduler.Task.run(Task.scala:131)
            at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:523)
            at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1535)
            at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:526)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
    Caused by: java.lang.NullPointerException
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, it adds the UDF name to the UDF failure error message. Before this change:
    > [FAILED_EXECUTE_UDF] Failed to execute user defined function (QueryExecutionErrorsSuite$$Lambda$970/181260145: (string, int) => string).
    
    After this change:
    > [FAILED_EXECUTE_UDF] Failed to execute user defined function (nextChar in QueryExecutionErrorsSuite$$Lambda$970/181260145: (string, int) => string).
    
    ### How was this patch tested?
    Unit test added.
    
    Closes #41599 from robreeves/roreeves/roreeves/udf_error.
    
    Lead-authored-by: Rob Reeves <ro...@linkedin.com>
    Co-authored-by: Rob Reeves <ro...@gmail.com>
    Signed-off-by: Max Gekk <ma...@gmail.com>
---
 .../spark/sql/catalyst/expressions/ScalaUDF.scala  |  6 ++--
 .../spark/sql/errors/QueryExecutionErrors.scala    |  4 +--
 .../sql/errors/QueryExecutionErrorsSuite.scala     | 35 ++++++++++++++++++----
 .../spark/sql/hive/execution/HiveUDFSuite.scala    |  6 ++--
 4 files changed, 39 insertions(+), 12 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
index 137a8976a40..40274a83340 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
@@ -1168,7 +1168,7 @@ case class ScalaUDF(
          |  $funcInvocation;
          |} catch (Throwable e) {
          |  throw QueryExecutionErrors.failedExecuteUserDefinedFunctionError(
-         |    "$funcCls", "$inputTypesString", "$outputType", e);
+         |    "$functionName", "$inputTypesString", "$outputType", e);
          |}
        """.stripMargin
 
@@ -1188,6 +1188,8 @@ case class ScalaUDF(
 
   private[this] val resultConverter = catalystConverter
 
+  private def functionName = udfName.map { uName => s"$uName ($funcCls)" }.getOrElse(funcCls)
+
   lazy val funcCls = Utils.getSimpleName(function.getClass)
   lazy val inputTypesString = children.map(_.dataType.catalogString).mkString(", ")
   lazy val outputType = dataType.catalogString
@@ -1198,7 +1200,7 @@ case class ScalaUDF(
     } catch {
       case e: Exception =>
         throw QueryExecutionErrors.failedExecuteUserDefinedFunctionError(
-          funcCls, inputTypesString, outputType, e)
+          functionName, inputTypesString, outputType, e)
     }
 
     resultConverter(result)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index beedc749e30..df2116df8f2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -207,12 +207,12 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase {
       messageParameters = Map("typeName" -> (dataType + failure)))
   }
 
-  def failedExecuteUserDefinedFunctionError(funcCls: String, inputTypes: String,
+  def failedExecuteUserDefinedFunctionError(functionName: String, inputTypes: String,
       outputType: String, e: Throwable): Throwable = {
     new SparkException(
       errorClass = "FAILED_EXECUTE_UDF",
       messageParameters = Map(
-        "functionName" -> funcCls,
+        "functionName" -> toSQLId(functionName),
         "signature" -> inputTypes,
         "result" -> outputType),
       cause = e)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
index 73a3e088894..61b3610e64e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
@@ -396,22 +396,45 @@ class QueryExecutionErrorsSuite
       sqlState = "22023")
   }
 
+  test("FAILED_EXECUTE_UDF: execute user defined function with registered UDF") {
+    val luckyCharOfWord = udf { (word: String, index: Int) => {
+      word.substring(index, index + 1)
+    }}
+    spark.udf.register("luckyCharOfWord", luckyCharOfWord)
+
+    val e = intercept[SparkException] {
+      Seq(("Jacek", 5), ("Agata", 5), ("Sweet", 6))
+        .toDF("word", "index")
+        .createOrReplaceTempView("words")
+      spark.sql("select luckyCharOfWord(word, index) from words").collect()
+    }
+    assert(e.getCause.isInstanceOf[SparkException])
+
+    checkError(
+      exception = e.getCause.asInstanceOf[SparkException],
+      errorClass = "FAILED_EXECUTE_UDF",
+      parameters = Map(
+        "functionName" ->
+          "`luckyCharOfWord \\(QueryExecutionErrorsSuite\\$\\$Lambda\\$\\d+/\\w+\\)`",
+        "signature" -> "string, int",
+        "result" -> "string"),
+      matchPVals = true)
+  }
+
   test("FAILED_EXECUTE_UDF: execute user defined function") {
     val luckyCharOfWord = udf { (word: String, index: Int) => {
       word.substring(index, index + 1)
     }}
-    val e1 = intercept[SparkException] {
+    val e = intercept[SparkException] {
       val words = Seq(("Jacek", 5), ("Agata", 5), ("Sweet", 6)).toDF("word", "index")
       words.select(luckyCharOfWord($"word", $"index")).collect()
     }
-    assert(e1.getCause.isInstanceOf[SparkException])
-
-    Utils.getSimpleName(luckyCharOfWord.getClass)
+    assert(e.getCause.isInstanceOf[SparkException])
 
     checkError(
-      exception = e1.getCause.asInstanceOf[SparkException],
+      exception = e.getCause.asInstanceOf[SparkException],
       errorClass = "FAILED_EXECUTE_UDF",
-      parameters = Map("functionName" -> "QueryExecutionErrorsSuite\\$\\$Lambda\\$\\d+/\\w+",
+      parameters = Map("functionName" -> "`QueryExecutionErrorsSuite\\$\\$Lambda\\$\\d+/\\w+`",
         "signature" -> "string, int",
         "result" -> "string"),
       matchPVals = true)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
index 8fb9209f9cb..ef430f4b6a2 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
@@ -738,7 +738,8 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
           e,
           "FAILED_EXECUTE_UDF",
           parameters = Map(
-            "functionName" -> s"${classOf[GenericUDFAssertTrue].getName}",
+            "functionName" ->
+              "`org`.`apache`.`hadoop`.`hive`.`ql`.`udf`.`generic`.`GenericUDFAssertTrue`",
             "signature" -> "boolean",
             "result" -> "void"))
       }
@@ -768,7 +769,8 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
           exception = intercept[SparkException](df.collect()).getCause.asInstanceOf[SparkException],
           errorClass = "FAILED_EXECUTE_UDF",
           parameters = Map(
-            "functionName" -> s"${classOf[SimpleUDFAssertTrue].getName}",
+            "functionName" ->
+              "`org`.`apache`.`spark`.`sql`.`hive`.`execution`.`SimpleUDFAssertTrue`",
             "signature" -> "boolean",
             "result" -> "boolean"
           )


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org