You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2022/06/01 17:16:31 UTC

[spark] branch master updated: [SPARK-39346][SQL] Convert asserts/illegal state exception to internal errors on each phase

This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8894e785eda [SPARK-39346][SQL] Convert asserts/illegal state exception to internal errors on each phase
8894e785eda is described below

commit 8894e785edae42a642351ad91e539324c39da8e4
Author: Max Gekk <ma...@gmail.com>
AuthorDate: Wed Jun 1 20:16:17 2022 +0300

    [SPARK-39346][SQL] Convert asserts/illegal state exception to internal errors on each phase
    
    ### What changes were proposed in this pull request?
    In the PR, I propose to catch asserts/illegal state exception on each phase of query execution: ANALYSIS, OPTIMIZATION, PLANNING, and convert them to a SparkException w/ the `INTERNAL_ERROR` error class.
    
    ### Why are the changes needed?
    To improve user experience with Spark SQL and unify representation of user-facing errors.
    
    ### Does this PR introduce _any_ user-facing change?
    No. The changes might affect users in corner cases only.
    
    ### How was this patch tested?
    By running the affected test suites:
    ```
    $ build/sbt "test:testOnly *KafkaMicroBatchV1SourceSuite"
    $ build/sbt "test:testOnly *KafkaMicroBatchV2SourceSuite"
    ```
    
    Closes #36704 from MaxGekk/wrapby-INTERNAL_ERROR-every-phase.
    
    Authored-by: Max Gekk <ma...@gmail.com>
    Signed-off-by: Max Gekk <ma...@gmail.com>
---
 .../sql/kafka010/KafkaMicroBatchSourceSuite.scala  | 11 +++++---
 .../main/scala/org/apache/spark/sql/Dataset.scala  | 14 +++-------
 .../spark/sql/execution/QueryExecution.scala       | 31 +++++++++++++++++++++-
 .../sql/execution/streaming/StreamExecution.scala  |  4 ++-
 .../streaming/MicroBatchExecutionSuite.scala       |  6 +++--
 .../sql/streaming/continuous/ContinuousSuite.scala |  7 ++---
 6 files changed, 51 insertions(+), 22 deletions(-)

diff --git a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 2396f31b954..0a32b1b54d0 100644
--- a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -34,6 +34,7 @@ import org.apache.kafka.common.TopicPartition
 import org.scalatest.concurrent.PatienceConfiguration.Timeout
 import org.scalatest.time.SpanSugar._
 
+import org.apache.spark.{SparkException, SparkThrowable}
 import org.apache.spark.sql.{Dataset, ForeachWriter, Row, SparkSession}
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.connector.read.streaming.SparkDataStream
@@ -666,9 +667,10 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
         testUtils.sendMessages(topic2, Array("6"))
       },
       StartStream(),
-      ExpectFailure[IllegalStateException](e => {
+      ExpectFailure[SparkException](e => {
+        assert(e.asInstanceOf[SparkThrowable].getErrorClass === "INTERNAL_ERROR")
         // The offset of `topic2` should be changed from 2 to 1
-        assert(e.getMessage.contains("was changed from 2 to 1"))
+        assert(e.getCause.getMessage.contains("was changed from 2 to 1"))
       })
     )
   }
@@ -764,12 +766,13 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
 
       testStream(df)(
         StartStream(checkpointLocation = metadataPath.getAbsolutePath),
-        ExpectFailure[IllegalStateException](e => {
+        ExpectFailure[SparkException](e => {
+          assert(e.asInstanceOf[SparkThrowable].getErrorClass === "INTERNAL_ERROR")
           Seq(
             s"maximum supported log version is v1, but encountered v99999",
             "produced by a newer version of Spark and cannot be read by this version"
           ).foreach { message =>
-            assert(e.toString.contains(message))
+            assert(e.getCause.toString.contains(message))
           }
         }))
     }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index f00ebf51d6d..0a45cf92c6e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -27,7 +27,7 @@ import scala.util.control.NonFatal
 
 import org.apache.commons.lang3.StringUtils
 
-import org.apache.spark.{SparkException, SparkThrowable, TaskContext}
+import org.apache.spark.TaskContext
 import org.apache.spark.annotation.{DeveloperApi, Stable, Unstable}
 import org.apache.spark.api.java.JavaRDD
 import org.apache.spark.api.java.function._
@@ -3920,19 +3920,11 @@ class Dataset[T] private[sql](
    * the internal error exception.
    */
   private def withAction[U](name: String, qe: QueryExecution)(action: SparkPlan => U) = {
-    try {
-      SQLExecution.withNewExecutionId(qe, Some(name)) {
+    SQLExecution.withNewExecutionId(qe, Some(name)) {
+      QueryExecution.withInternalError(s"""The "$name" action failed.""") {
         qe.executedPlan.resetMetrics()
         action(qe.executedPlan)
       }
-    } catch {
-      case e: SparkThrowable => throw e
-      case e @ (_: java.lang.IllegalStateException | _: java.lang.AssertionError) =>
-        throw new SparkException(
-          errorClass = "INTERNAL_ERROR",
-          messageParameters = Array(s"""The "$name" action failed."""),
-          cause = e)
-      case e: Throwable => throw e
     }
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 9ea769b4cf1..206f2a24e0e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicLong
 
 import org.apache.hadoop.fs.Path
 
+import org.apache.spark.SparkException
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
@@ -180,7 +181,9 @@ class QueryExecution(
   }
 
   protected def executePhase[T](phase: String)(block: => T): T = sparkSession.withActive {
-    tracker.measurePhase(phase)(block)
+    QueryExecution.withInternalError(s"The Spark SQL phase $phase failed with an internal error.") {
+      tracker.measurePhase(phase)(block)
+    }
   }
 
   def simpleString: String = {
@@ -486,4 +489,30 @@ object QueryExecution {
     val preparationRules = preparations(session, Option(InsertAdaptiveSparkPlan(context)), true)
     prepareForExecution(preparationRules, sparkPlan.clone())
   }
+
+  /**
+   * Converts asserts, null pointer, illegal state exceptions to internal errors.
+   */
+  private[sql] def toInternalError(msg: String, e: Throwable): Throwable = e match {
+    case e @ (_: java.lang.IllegalStateException | _: java.lang.NullPointerException |
+              _: java.lang.AssertionError) =>
+      new SparkException(
+        errorClass = "INTERNAL_ERROR",
+        messageParameters = Array(msg +
+          " Please, fill a bug report in, and provide the full stack trace."),
+        cause = e)
+    case e: Throwable =>
+      e
+  }
+
+  /**
+   * Catches asserts, null pointer, illegal state exceptions, and converts them to internal errors.
+   */
+  private[sql] def withInternalError[T](msg: String)(block: => T): T = {
+    try {
+      block
+    } catch {
+      case e: Throwable => throw toInternalError(msg, e)
+    }
+  }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 88896c55455..eeaa37aa7ff 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -39,6 +39,7 @@ import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
 import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table}
 import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2, ReadLimit, SparkDataStream}
 import org.apache.spark.sql.connector.write.{LogicalWriteInfoImpl, SupportsTruncate, Write}
+import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.execution.command.StreamingExplainCommand
 import org.apache.spark.sql.execution.datasources.v2.StreamWriterCommitProgress
 import org.apache.spark.sql.internal.SQLConf
@@ -319,7 +320,8 @@ abstract class StreamExecution(
         // This is a workaround for HADOOP-12074: `Shell.runCommand` converts `InterruptedException`
         // to `new IOException(ie.toString())` before Hadoop 2.8.
         updateStatusMessage("Stopped")
-      case e: Throwable =>
+      case t: Throwable =>
+        val e = QueryExecution.toInternalError(msg = s"Execution of the stream $name failed.", t)
         streamDeathCause = new StreamingQueryException(
           toDebugString(includeLogicalPlan = isInitialized),
           s"Query $prettyIdString terminated with exception: ${e.getMessage}",
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala
index f06e62b33b1..9d731248ad4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala
@@ -22,6 +22,7 @@ import java.io.File
 import org.apache.commons.io.FileUtils
 import org.scalatest.BeforeAndAfter
 
+import org.apache.spark.{SparkException, SparkThrowable}
 import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
 import org.apache.spark.sql.catalyst.plans.logical.Range
 import org.apache.spark.sql.connector.read.streaming
@@ -93,8 +94,9 @@ class MicroBatchExecutionSuite extends StreamTest with BeforeAndAfter {
     testStream(streamEvent) (
       AddData(inputData, 1, 2, 3, 4, 5, 6),
       StartStream(Trigger.Once, checkpointLocation = checkpointDir.getAbsolutePath),
-      ExpectFailure[IllegalStateException] { e =>
-        assert(e.getMessage.contains("batch 3 doesn't exist"))
+      ExpectFailure[SparkException] { e =>
+        assert(e.asInstanceOf[SparkThrowable].getErrorClass === "INTERNAL_ERROR")
+        assert(e.getCause.getMessage.contains("batch 3 doesn't exist"))
       }
     )
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
index 26c201d5921..cd1c865f5aa 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.streaming.continuous
 
 import java.sql.Timestamp
 
-import org.apache.spark.{SparkContext, SparkException}
+import org.apache.spark.{SparkContext, SparkException, SparkThrowable}
 import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskStart}
 import org.apache.spark.sql._
 import org.apache.spark.sql.execution.streaming._
@@ -440,8 +440,9 @@ class ContinuousEpochBacklogSuite extends ContinuousSuiteBase {
 
       testStream(df)(
         StartStream(Trigger.Continuous(1)),
-        ExpectFailure[IllegalStateException] { e =>
-          e.getMessage.contains("queue has exceeded its maximum")
+        ExpectFailure[SparkException] { e =>
+          assert(e.asInstanceOf[SparkThrowable].getErrorClass === "INTERNAL_ERROR")
+          e.getCause.getMessage.contains("queue has exceeded its maximum")
         }
       )
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org