You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2022/06/08 18:21:27 UTC

[spark] branch branch-3.3 updated: [SPARK-39412][SQL] Exclude IllegalStateException from Spark's internal errors

This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 94f3e4113ef [SPARK-39412][SQL] Exclude IllegalStateException from Spark's internal errors
94f3e4113ef is described below

commit 94f3e4113ef6fbf0940578bcb279f233e43c27f1
Author: Max Gekk <ma...@gmail.com>
AuthorDate: Wed Jun 8 21:20:55 2022 +0300

    [SPARK-39412][SQL] Exclude IllegalStateException from Spark's internal errors
    
    ### What changes were proposed in this pull request?
    In the PR, I propose to exclude `IllegalStateException` from the list of exceptions that are wrapped by `SparkException` with the `INTERNAL_ERROR` error class.
    
    ### Why are the changes needed?
    See explanation in SPARK-39412.
    
    ### Does this PR introduce _any_ user-facing change?
    No, the reverted changes haven't released yet.
    
    ### How was this patch tested?
    By running the modified test suites:
    ```
    $ build/sbt "test:testOnly *ContinuousSuite"
    $ build/sbt "test:testOnly *MicroBatchExecutionSuite"
    $ build/sbt "test:testOnly *KafkaMicroBatchV1SourceSuite"
    $ build/sbt "test:testOnly *KafkaMicroBatchV2SourceSuite"
    $ build/sbt "test:testOnly *.WholeStageCodegenSuite"
    ```
    
    Closes #36804 from MaxGekk/exclude-IllegalStateException.
    
    Authored-by: Max Gekk <ma...@gmail.com>
    Signed-off-by: Max Gekk <ma...@gmail.com>
    (cherry picked from commit 19afe1341d277bc2d7dd47175d142a8c71141138)
    Signed-off-by: Max Gekk <ma...@gmail.com>
---
 .../spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala       | 11 ++++-------
 sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala    |  2 +-
 .../scala/org/apache/spark/sql/execution/QueryExecution.scala |  7 +++----
 .../apache/spark/sql/execution/WholeStageCodegenSuite.scala   | 11 ++++-------
 .../sql/execution/streaming/MicroBatchExecutionSuite.scala    |  6 ++----
 .../spark/sql/streaming/continuous/ContinuousSuite.scala      |  7 +++----
 6 files changed, 17 insertions(+), 27 deletions(-)

diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 41277a535f5..db71f0fd918 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -34,7 +34,6 @@ import org.apache.kafka.common.TopicPartition
 import org.scalatest.concurrent.PatienceConfiguration.Timeout
 import org.scalatest.time.SpanSugar._
 
-import org.apache.spark.{SparkException, SparkThrowable}
 import org.apache.spark.sql.{Dataset, ForeachWriter, Row, SparkSession}
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.connector.read.streaming.SparkDataStream
@@ -667,10 +666,9 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
         testUtils.sendMessages(topic2, Array("6"))
       },
       StartStream(),
-      ExpectFailure[SparkException](e => {
-        assert(e.asInstanceOf[SparkThrowable].getErrorClass === "INTERNAL_ERROR")
+      ExpectFailure[IllegalStateException](e => {
         // The offset of `topic2` should be changed from 2 to 1
-        assert(e.getCause.getMessage.contains("was changed from 2 to 1"))
+        assert(e.getMessage.contains("was changed from 2 to 1"))
       })
     )
   }
@@ -766,13 +764,12 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
 
       testStream(df)(
         StartStream(checkpointLocation = metadataPath.getAbsolutePath),
-        ExpectFailure[SparkException](e => {
-          assert(e.asInstanceOf[SparkThrowable].getErrorClass === "INTERNAL_ERROR")
+        ExpectFailure[IllegalStateException](e => {
           Seq(
             s"maximum supported log version is v1, but encountered v99999",
             "produced by a newer version of Spark and cannot be read by this version"
           ).foreach { message =>
-            assert(e.getCause.toString.contains(message))
+            assert(e.toString.contains(message))
           }
         }))
     }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index a4a40cc0e69..6ef9bc2a703 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -3848,7 +3848,7 @@ class Dataset[T] private[sql](
 
   /**
    * Wrap a Dataset action to track the QueryExecution and time cost, then report to the
-   * user-registered callback functions, and also to convert asserts/illegal states to
+   * user-registered callback functions, and also to convert asserts/NPE to
    * the internal error exception.
    */
   private def withAction[U](name: String, qe: QueryExecution)(action: SparkPlan => U) = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index ab9b9861c03..840bd436266 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -489,11 +489,10 @@ object QueryExecution {
   }
 
   /**
-   * Converts asserts, null pointer, illegal state exceptions to internal errors.
+   * Converts asserts, null pointer exceptions to internal errors.
    */
   private[sql] def toInternalError(msg: String, e: Throwable): Throwable = e match {
-    case e @ (_: java.lang.IllegalStateException | _: java.lang.NullPointerException |
-              _: java.lang.AssertionError) =>
+    case e @ (_: java.lang.NullPointerException | _: java.lang.AssertionError) =>
       new SparkException(
         errorClass = "INTERNAL_ERROR",
         messageParameters = Array(msg +
@@ -504,7 +503,7 @@ object QueryExecution {
   }
 
   /**
-   * Catches asserts, null pointer, illegal state exceptions, and converts them to internal errors.
+   * Catches asserts, null pointer exceptions, and converts them to internal errors.
    */
   private[sql] def withInternalError[T](msg: String)(block: => T): T = {
     try {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
index 27689bb4d45..2be915f0002 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.sql.execution
 
-import org.apache.spark.SparkException
 import org.apache.spark.sql.{Dataset, QueryTest, Row, SaveMode}
 import org.apache.spark.sql.catalyst.expressions.codegen.{ByteCodeStats, CodeAndComment, CodeGenerator}
 import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite
@@ -763,11 +762,10 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession
           "SELECT AVG(v) FROM VALUES(1) t(v)",
           // Tet case with keys
           "SELECT k, AVG(v) FROM VALUES((1, 1)) t(k, v) GROUP BY k").foreach { query =>
-          val e = intercept[SparkException] {
+          val e = intercept[IllegalStateException] {
             sql(query).collect
           }
-          assert(e.getErrorClass === "INTERNAL_ERROR")
-          assert(e.getCause.getMessage.contains(expectedErrMsg))
+          assert(e.getMessage.contains(expectedErrMsg))
         }
       }
     }
@@ -786,11 +784,10 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession
           // Tet case with keys
           "SELECT k, AVG(a + b), SUM(a + b + c) FROM VALUES((1, 1, 1, 1)) t(k, a, b, c) " +
             "GROUP BY k").foreach { query =>
-          val e = intercept[SparkException] {
+          val e = intercept[IllegalStateException] {
             sql(query).collect
           }
-          assert(e.getErrorClass === "INTERNAL_ERROR")
-          assert(e.getCause.getMessage.contains(expectedErrMsg))
+          assert(e.getMessage.contains(expectedErrMsg))
         }
       }
     }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala
index 9d731248ad4..f06e62b33b1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala
@@ -22,7 +22,6 @@ import java.io.File
 import org.apache.commons.io.FileUtils
 import org.scalatest.BeforeAndAfter
 
-import org.apache.spark.{SparkException, SparkThrowable}
 import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
 import org.apache.spark.sql.catalyst.plans.logical.Range
 import org.apache.spark.sql.connector.read.streaming
@@ -94,9 +93,8 @@ class MicroBatchExecutionSuite extends StreamTest with BeforeAndAfter {
     testStream(streamEvent) (
       AddData(inputData, 1, 2, 3, 4, 5, 6),
       StartStream(Trigger.Once, checkpointLocation = checkpointDir.getAbsolutePath),
-      ExpectFailure[SparkException] { e =>
-        assert(e.asInstanceOf[SparkThrowable].getErrorClass === "INTERNAL_ERROR")
-        assert(e.getCause.getMessage.contains("batch 3 doesn't exist"))
+      ExpectFailure[IllegalStateException] { e =>
+        assert(e.getMessage.contains("batch 3 doesn't exist"))
       }
     )
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
index a28d44caab0..5893c3da098 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.streaming.continuous
 
 import java.sql.Timestamp
 
-import org.apache.spark.{SparkContext, SparkException, SparkThrowable}
+import org.apache.spark.{SparkContext, SparkException}
 import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskStart}
 import org.apache.spark.sql._
 import org.apache.spark.sql.execution.streaming._
@@ -440,9 +440,8 @@ class ContinuousEpochBacklogSuite extends ContinuousSuiteBase {
 
       testStream(df)(
         StartStream(Trigger.Continuous(1)),
-        ExpectFailure[SparkException] { e =>
-          assert(e.asInstanceOf[SparkThrowable].getErrorClass === "INTERNAL_ERROR")
-          e.getCause.getMessage.contains("queue has exceeded its maximum")
+        ExpectFailure[IllegalStateException] { e =>
+          e.getMessage.contains("queue has exceeded its maximum")
         }
       )
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org