You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2022/06/08 18:21:27 UTC
[spark] branch branch-3.3 updated: [SPARK-39412][SQL] Exclude IllegalStateException from Spark's internal errors
This is an automated email from the ASF dual-hosted git repository.
maxgekk pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 94f3e4113ef [SPARK-39412][SQL] Exclude IllegalStateException from Spark's internal errors
94f3e4113ef is described below
commit 94f3e4113ef6fbf0940578bcb279f233e43c27f1
Author: Max Gekk <ma...@gmail.com>
AuthorDate: Wed Jun 8 21:20:55 2022 +0300
[SPARK-39412][SQL] Exclude IllegalStateException from Spark's internal errors
### What changes were proposed in this pull request?
In the PR, I propose to exclude `IllegalStateException` from the list of exceptions that are wrapped by `SparkException` with the `INTERNAL_ERROR` error class.
### Why are the changes needed?
See explanation in SPARK-39412.
### Does this PR introduce _any_ user-facing change?
No, the reverted changes haven't released yet.
### How was this patch tested?
By running the modified test suites:
```
$ build/sbt "test:testOnly *ContinuousSuite"
$ build/sbt "test:testOnly *MicroBatchExecutionSuite"
$ build/sbt "test:testOnly *KafkaMicroBatchV1SourceSuite"
$ build/sbt "test:testOnly *KafkaMicroBatchV2SourceSuite"
$ build/sbt "test:testOnly *.WholeStageCodegenSuite"
```
Closes #36804 from MaxGekk/exclude-IllegalStateException.
Authored-by: Max Gekk <ma...@gmail.com>
Signed-off-by: Max Gekk <ma...@gmail.com>
(cherry picked from commit 19afe1341d277bc2d7dd47175d142a8c71141138)
Signed-off-by: Max Gekk <ma...@gmail.com>
---
.../spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala | 11 ++++-------
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 2 +-
.../scala/org/apache/spark/sql/execution/QueryExecution.scala | 7 +++----
.../apache/spark/sql/execution/WholeStageCodegenSuite.scala | 11 ++++-------
.../sql/execution/streaming/MicroBatchExecutionSuite.scala | 6 ++----
.../spark/sql/streaming/continuous/ContinuousSuite.scala | 7 +++----
6 files changed, 17 insertions(+), 27 deletions(-)
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 41277a535f5..db71f0fd918 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -34,7 +34,6 @@ import org.apache.kafka.common.TopicPartition
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.time.SpanSugar._
-import org.apache.spark.{SparkException, SparkThrowable}
import org.apache.spark.sql.{Dataset, ForeachWriter, Row, SparkSession}
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.read.streaming.SparkDataStream
@@ -667,10 +666,9 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
testUtils.sendMessages(topic2, Array("6"))
},
StartStream(),
- ExpectFailure[SparkException](e => {
- assert(e.asInstanceOf[SparkThrowable].getErrorClass === "INTERNAL_ERROR")
+ ExpectFailure[IllegalStateException](e => {
// The offset of `topic2` should be changed from 2 to 1
- assert(e.getCause.getMessage.contains("was changed from 2 to 1"))
+ assert(e.getMessage.contains("was changed from 2 to 1"))
})
)
}
@@ -766,13 +764,12 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
testStream(df)(
StartStream(checkpointLocation = metadataPath.getAbsolutePath),
- ExpectFailure[SparkException](e => {
- assert(e.asInstanceOf[SparkThrowable].getErrorClass === "INTERNAL_ERROR")
+ ExpectFailure[IllegalStateException](e => {
Seq(
s"maximum supported log version is v1, but encountered v99999",
"produced by a newer version of Spark and cannot be read by this version"
).foreach { message =>
- assert(e.getCause.toString.contains(message))
+ assert(e.toString.contains(message))
}
}))
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index a4a40cc0e69..6ef9bc2a703 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -3848,7 +3848,7 @@ class Dataset[T] private[sql](
/**
* Wrap a Dataset action to track the QueryExecution and time cost, then report to the
- * user-registered callback functions, and also to convert asserts/illegal states to
+ * user-registered callback functions, and also to convert asserts/NPE to
* the internal error exception.
*/
private def withAction[U](name: String, qe: QueryExecution)(action: SparkPlan => U) = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index ab9b9861c03..840bd436266 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -489,11 +489,10 @@ object QueryExecution {
}
/**
- * Converts asserts, null pointer, illegal state exceptions to internal errors.
+ * Converts asserts, null pointer exceptions to internal errors.
*/
private[sql] def toInternalError(msg: String, e: Throwable): Throwable = e match {
- case e @ (_: java.lang.IllegalStateException | _: java.lang.NullPointerException |
- _: java.lang.AssertionError) =>
+ case e @ (_: java.lang.NullPointerException | _: java.lang.AssertionError) =>
new SparkException(
errorClass = "INTERNAL_ERROR",
messageParameters = Array(msg +
@@ -504,7 +503,7 @@ object QueryExecution {
}
/**
- * Catches asserts, null pointer, illegal state exceptions, and converts them to internal errors.
+ * Catches asserts, null pointer exceptions, and converts them to internal errors.
*/
private[sql] def withInternalError[T](msg: String)(block: => T): T = {
try {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
index 27689bb4d45..2be915f0002 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
@@ -17,7 +17,6 @@
package org.apache.spark.sql.execution
-import org.apache.spark.SparkException
import org.apache.spark.sql.{Dataset, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.expressions.codegen.{ByteCodeStats, CodeAndComment, CodeGenerator}
import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite
@@ -763,11 +762,10 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession
"SELECT AVG(v) FROM VALUES(1) t(v)",
// Tet case with keys
"SELECT k, AVG(v) FROM VALUES((1, 1)) t(k, v) GROUP BY k").foreach { query =>
- val e = intercept[SparkException] {
+ val e = intercept[IllegalStateException] {
sql(query).collect
}
- assert(e.getErrorClass === "INTERNAL_ERROR")
- assert(e.getCause.getMessage.contains(expectedErrMsg))
+ assert(e.getMessage.contains(expectedErrMsg))
}
}
}
@@ -786,11 +784,10 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession
// Tet case with keys
"SELECT k, AVG(a + b), SUM(a + b + c) FROM VALUES((1, 1, 1, 1)) t(k, a, b, c) " +
"GROUP BY k").foreach { query =>
- val e = intercept[SparkException] {
+ val e = intercept[IllegalStateException] {
sql(query).collect
}
- assert(e.getErrorClass === "INTERNAL_ERROR")
- assert(e.getCause.getMessage.contains(expectedErrMsg))
+ assert(e.getMessage.contains(expectedErrMsg))
}
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala
index 9d731248ad4..f06e62b33b1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala
@@ -22,7 +22,6 @@ import java.io.File
import org.apache.commons.io.FileUtils
import org.scalatest.BeforeAndAfter
-import org.apache.spark.{SparkException, SparkThrowable}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.catalyst.plans.logical.Range
import org.apache.spark.sql.connector.read.streaming
@@ -94,9 +93,8 @@ class MicroBatchExecutionSuite extends StreamTest with BeforeAndAfter {
testStream(streamEvent) (
AddData(inputData, 1, 2, 3, 4, 5, 6),
StartStream(Trigger.Once, checkpointLocation = checkpointDir.getAbsolutePath),
- ExpectFailure[SparkException] { e =>
- assert(e.asInstanceOf[SparkThrowable].getErrorClass === "INTERNAL_ERROR")
- assert(e.getCause.getMessage.contains("batch 3 doesn't exist"))
+ ExpectFailure[IllegalStateException] { e =>
+ assert(e.getMessage.contains("batch 3 doesn't exist"))
}
)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
index a28d44caab0..5893c3da098 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.streaming.continuous
import java.sql.Timestamp
-import org.apache.spark.{SparkContext, SparkException, SparkThrowable}
+import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskStart}
import org.apache.spark.sql._
import org.apache.spark.sql.execution.streaming._
@@ -440,9 +440,8 @@ class ContinuousEpochBacklogSuite extends ContinuousSuiteBase {
testStream(df)(
StartStream(Trigger.Continuous(1)),
- ExpectFailure[SparkException] { e =>
- assert(e.asInstanceOf[SparkThrowable].getErrorClass === "INTERNAL_ERROR")
- e.getCause.getMessage.contains("queue has exceeded its maximum")
+ ExpectFailure[IllegalStateException] { e =>
+ e.getMessage.contains("queue has exceeded its maximum")
}
)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org