You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2022/06/01 17:16:31 UTC
[spark] branch master updated: [SPARK-39346][SQL] Convert asserts/illegal state exception to internal errors on each phase
This is an automated email from the ASF dual-hosted git repository.
maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 8894e785eda [SPARK-39346][SQL] Convert asserts/illegal state exception to internal errors on each phase
8894e785eda is described below
commit 8894e785edae42a642351ad91e539324c39da8e4
Author: Max Gekk <ma...@gmail.com>
AuthorDate: Wed Jun 1 20:16:17 2022 +0300
[SPARK-39346][SQL] Convert asserts/illegal state exception to internal errors on each phase
### What changes were proposed in this pull request?
In the PR, I propose to catch asserts/illegal state exception on each phase of query execution: ANALYSIS, OPTIMIZATION, PLANNING, and convert them to a SparkException w/ the `INTERNAL_ERROR` error class.
### Why are the changes needed?
To improve user experience with Spark SQL and unify representation of user-facing errors.
### Does this PR introduce _any_ user-facing change?
No. The changes might affect users in corner cases only.
### How was this patch tested?
By running the affected test suites:
```
$ build/sbt "test:testOnly *KafkaMicroBatchV1SourceSuite"
$ build/sbt "test:testOnly *KafkaMicroBatchV2SourceSuite"
```
Closes #36704 from MaxGekk/wrapby-INTERNAL_ERROR-every-phase.
Authored-by: Max Gekk <ma...@gmail.com>
Signed-off-by: Max Gekk <ma...@gmail.com>
---
.../sql/kafka010/KafkaMicroBatchSourceSuite.scala | 11 +++++---
.../main/scala/org/apache/spark/sql/Dataset.scala | 14 +++-------
.../spark/sql/execution/QueryExecution.scala | 31 +++++++++++++++++++++-
.../sql/execution/streaming/StreamExecution.scala | 4 ++-
.../streaming/MicroBatchExecutionSuite.scala | 6 +++--
.../sql/streaming/continuous/ContinuousSuite.scala | 7 ++---
6 files changed, 51 insertions(+), 22 deletions(-)
diff --git a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 2396f31b954..0a32b1b54d0 100644
--- a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -34,6 +34,7 @@ import org.apache.kafka.common.TopicPartition
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.time.SpanSugar._
+import org.apache.spark.{SparkException, SparkThrowable}
import org.apache.spark.sql.{Dataset, ForeachWriter, Row, SparkSession}
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.read.streaming.SparkDataStream
@@ -666,9 +667,10 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
testUtils.sendMessages(topic2, Array("6"))
},
StartStream(),
- ExpectFailure[IllegalStateException](e => {
+ ExpectFailure[SparkException](e => {
+ assert(e.asInstanceOf[SparkThrowable].getErrorClass === "INTERNAL_ERROR")
// The offset of `topic2` should be changed from 2 to 1
- assert(e.getMessage.contains("was changed from 2 to 1"))
+ assert(e.getCause.getMessage.contains("was changed from 2 to 1"))
})
)
}
@@ -764,12 +766,13 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
testStream(df)(
StartStream(checkpointLocation = metadataPath.getAbsolutePath),
- ExpectFailure[IllegalStateException](e => {
+ ExpectFailure[SparkException](e => {
+ assert(e.asInstanceOf[SparkThrowable].getErrorClass === "INTERNAL_ERROR")
Seq(
s"maximum supported log version is v1, but encountered v99999",
"produced by a newer version of Spark and cannot be read by this version"
).foreach { message =>
- assert(e.toString.contains(message))
+ assert(e.getCause.toString.contains(message))
}
}))
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index f00ebf51d6d..0a45cf92c6e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -27,7 +27,7 @@ import scala.util.control.NonFatal
import org.apache.commons.lang3.StringUtils
-import org.apache.spark.{SparkException, SparkThrowable, TaskContext}
+import org.apache.spark.TaskContext
import org.apache.spark.annotation.{DeveloperApi, Stable, Unstable}
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.api.java.function._
@@ -3920,19 +3920,11 @@ class Dataset[T] private[sql](
* the internal error exception.
*/
private def withAction[U](name: String, qe: QueryExecution)(action: SparkPlan => U) = {
- try {
- SQLExecution.withNewExecutionId(qe, Some(name)) {
+ SQLExecution.withNewExecutionId(qe, Some(name)) {
+ QueryExecution.withInternalError(s"""The "$name" action failed.""") {
qe.executedPlan.resetMetrics()
action(qe.executedPlan)
}
- } catch {
- case e: SparkThrowable => throw e
- case e @ (_: java.lang.IllegalStateException | _: java.lang.AssertionError) =>
- throw new SparkException(
- errorClass = "INTERNAL_ERROR",
- messageParameters = Array(s"""The "$name" action failed."""),
- cause = e)
- case e: Throwable => throw e
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 9ea769b4cf1..206f2a24e0e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicLong
import org.apache.hadoop.fs.Path
+import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
@@ -180,7 +181,9 @@ class QueryExecution(
}
protected def executePhase[T](phase: String)(block: => T): T = sparkSession.withActive {
- tracker.measurePhase(phase)(block)
+ QueryExecution.withInternalError(s"The Spark SQL phase $phase failed with an internal error.") {
+ tracker.measurePhase(phase)(block)
+ }
}
def simpleString: String = {
@@ -486,4 +489,30 @@ object QueryExecution {
val preparationRules = preparations(session, Option(InsertAdaptiveSparkPlan(context)), true)
prepareForExecution(preparationRules, sparkPlan.clone())
}
+
+ /**
+ * Converts asserts, null pointer, illegal state exceptions to internal errors.
+ */
+ private[sql] def toInternalError(msg: String, e: Throwable): Throwable = e match {
+ case e @ (_: java.lang.IllegalStateException | _: java.lang.NullPointerException |
+ _: java.lang.AssertionError) =>
+ new SparkException(
+ errorClass = "INTERNAL_ERROR",
+ messageParameters = Array(msg +
+ " Please, fill a bug report in, and provide the full stack trace."),
+ cause = e)
+ case e: Throwable =>
+ e
+ }
+
+ /**
+ * Catches asserts, null pointer, illegal state exceptions, and converts them to internal errors.
+ */
+ private[sql] def withInternalError[T](msg: String)(block: => T): T = {
+ try {
+ block
+ } catch {
+ case e: Throwable => throw toInternalError(msg, e)
+ }
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 88896c55455..eeaa37aa7ff 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -39,6 +39,7 @@ import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table}
import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2, ReadLimit, SparkDataStream}
import org.apache.spark.sql.connector.write.{LogicalWriteInfoImpl, SupportsTruncate, Write}
+import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.command.StreamingExplainCommand
import org.apache.spark.sql.execution.datasources.v2.StreamWriterCommitProgress
import org.apache.spark.sql.internal.SQLConf
@@ -319,7 +320,8 @@ abstract class StreamExecution(
// This is a workaround for HADOOP-12074: `Shell.runCommand` converts `InterruptedException`
// to `new IOException(ie.toString())` before Hadoop 2.8.
updateStatusMessage("Stopped")
- case e: Throwable =>
+ case t: Throwable =>
+ val e = QueryExecution.toInternalError(msg = s"Execution of the stream $name failed.", t)
streamDeathCause = new StreamingQueryException(
toDebugString(includeLogicalPlan = isInitialized),
s"Query $prettyIdString terminated with exception: ${e.getMessage}",
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala
index f06e62b33b1..9d731248ad4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala
@@ -22,6 +22,7 @@ import java.io.File
import org.apache.commons.io.FileUtils
import org.scalatest.BeforeAndAfter
+import org.apache.spark.{SparkException, SparkThrowable}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.catalyst.plans.logical.Range
import org.apache.spark.sql.connector.read.streaming
@@ -93,8 +94,9 @@ class MicroBatchExecutionSuite extends StreamTest with BeforeAndAfter {
testStream(streamEvent) (
AddData(inputData, 1, 2, 3, 4, 5, 6),
StartStream(Trigger.Once, checkpointLocation = checkpointDir.getAbsolutePath),
- ExpectFailure[IllegalStateException] { e =>
- assert(e.getMessage.contains("batch 3 doesn't exist"))
+ ExpectFailure[SparkException] { e =>
+ assert(e.asInstanceOf[SparkThrowable].getErrorClass === "INTERNAL_ERROR")
+ assert(e.getCause.getMessage.contains("batch 3 doesn't exist"))
}
)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
index 26c201d5921..cd1c865f5aa 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.streaming.continuous
import java.sql.Timestamp
-import org.apache.spark.{SparkContext, SparkException}
+import org.apache.spark.{SparkContext, SparkException, SparkThrowable}
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskStart}
import org.apache.spark.sql._
import org.apache.spark.sql.execution.streaming._
@@ -440,8 +440,9 @@ class ContinuousEpochBacklogSuite extends ContinuousSuiteBase {
testStream(df)(
StartStream(Trigger.Continuous(1)),
- ExpectFailure[IllegalStateException] { e =>
- e.getMessage.contains("queue has exceeded its maximum")
+ ExpectFailure[SparkException] { e =>
+ assert(e.asInstanceOf[SparkThrowable].getErrorClass === "INTERNAL_ERROR")
+ e.getCause.getMessage.contains("queue has exceeded its maximum")
}
)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org