You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/05/29 04:27:57 UTC
[spark] branch branch-3.0 updated: [SPARK-31862][SQL] Remove
exception wrapping in AQE
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 52c2988 [SPARK-31862][SQL] Remove exception wrapping in AQE
52c2988 is described below
commit 52c2988a65d6a2d65f45e222423c59c9fb169b87
Author: Maryann Xue <ma...@gmail.com>
AuthorDate: Fri May 29 04:23:38 2020 +0000
[SPARK-31862][SQL] Remove exception wrapping in AQE
### What changes were proposed in this pull request?
This PR removes the excessive exception wrapping in AQE so that error messages are less verbose and mostly consistent with non-aqe execution. Exceptions from stage materialization are now only wrapped with `SparkException` if there are multiple stage failures. Also, stage cancelling errors will not be included as part the exception thrown, but rather just be error logged.
### Why are the changes needed?
This will make the AQE error reporting more readable and debuggable.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Updated existing tests.
Closes #28668 from maryannxue/spark-31862.
Authored-by: Maryann Xue <ma...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
(cherry picked from commit 45864faaf2c9837a2ca48c456d3c2300736aa1ba)
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../execution/adaptive/AdaptiveSparkPlanExec.scala | 38 +++++++++-------------
.../sql/execution/adaptive/QueryStageExec.scala | 3 +-
.../org/apache/spark/sql/MetadataCacheSuite.scala | 9 +++--
.../adaptive/AdaptiveQueryExecSuite.scala | 2 +-
.../sql/execution/adaptive/AdaptiveTestUtils.scala | 22 -------------
.../sql/execution/datasources/json/JsonSuite.scala | 3 +-
.../execution/datasources/orc/OrcQuerySuite.scala | 9 +++--
.../sql/execution/joins/BroadcastJoinSuite.scala | 4 +--
.../spark/sql/sources/BucketedReadSuite.scala | 3 +-
.../spark/sql/hive/HiveMetadataCacheSuite.scala | 5 ++-
10 files changed, 33 insertions(+), 65 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index 90d1db9..f6a3333 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -156,7 +156,7 @@ case class AdaptiveSparkPlanExec(
var currentLogicalPlan = currentPhysicalPlan.logicalLink.get
var result = createQueryStages(currentPhysicalPlan)
val events = new LinkedBlockingQueue[StageMaterializationEvent]()
- val errors = new mutable.ArrayBuffer[SparkException]()
+ val errors = new mutable.ArrayBuffer[Throwable]()
var stagesToReplace = Seq.empty[QueryStageExec]
while (!result.allChildStagesMaterialized) {
currentPhysicalPlan = result.newPlan
@@ -176,9 +176,7 @@ case class AdaptiveSparkPlanExec(
}(AdaptiveSparkPlanExec.executionContext)
} catch {
case e: Throwable =>
- val ex = new SparkException(
- s"Early failed query stage found: ${stage.treeString}", e)
- cleanUpAndThrowException(Seq(ex), Some(stage.id))
+ cleanUpAndThrowException(Seq(e), Some(stage.id))
}
}
}
@@ -193,8 +191,7 @@ case class AdaptiveSparkPlanExec(
case StageSuccess(stage, res) =>
stage.resultOption = Some(res)
case StageFailure(stage, ex) =>
- errors.append(
- new SparkException(s"Failed to materialize query stage: ${stage.treeString}.", ex))
+ errors.append(ex)
}
// In case of errors, we cancel all running stages and throw exception.
@@ -536,31 +533,28 @@ case class AdaptiveSparkPlanExec(
* materialization errors and stage cancellation errors.
*/
private def cleanUpAndThrowException(
- errors: Seq[SparkException],
+ errors: Seq[Throwable],
earlyFailedStage: Option[Int]): Unit = {
- val runningStages = currentPhysicalPlan.collect {
+ currentPhysicalPlan.foreach {
// earlyFailedStage is the stage which failed before calling doMaterialize,
// so we should avoid calling cancel on it to re-trigger the failure again.
- case s: QueryStageExec if !earlyFailedStage.contains(s.id) => s
- }
- val cancelErrors = new mutable.ArrayBuffer[SparkException]()
- try {
- runningStages.foreach { s =>
+ case s: QueryStageExec if !earlyFailedStage.contains(s.id) =>
try {
s.cancel()
} catch {
case NonFatal(t) =>
- cancelErrors.append(
- new SparkException(s"Failed to cancel query stage: ${s.treeString}", t))
+ logError(s"Exception in cancelling query stage: ${s.treeString}", t)
}
- }
- } finally {
- val ex = new SparkException(
- "Adaptive execution failed due to stage materialization failures.", errors.head)
- errors.tail.foreach(ex.addSuppressed)
- cancelErrors.foreach(ex.addSuppressed)
- throw ex
+ case _ =>
+ }
+ val e = if (errors.size == 1) {
+ errors.head
+ } else {
+ val se = new SparkException("Multiple failures in stage materialization.", errors.head)
+ errors.tail.foreach(se.addSuppressed)
+ se
}
+ throw e
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
index f414f85..9a9a8b1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
@@ -25,6 +25,7 @@ import org.apache.spark.{FutureAction, MapOutputStatistics, SparkException}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.errors.attachTree
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
@@ -147,7 +148,7 @@ case class ShuffleQueryStageExec(
throw new IllegalStateException("wrong plan for shuffle stage:\n " + plan.treeString)
}
- override def doMaterialize(): Future[Any] = {
+ override def doMaterialize(): Future[Any] = attachTree(this, "execute") {
shuffle.mapOutputStatisticsFuture
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala
index a9f443b..956bd78 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql
import java.io.File
import org.apache.spark.{SparkConf, SparkException}
-import org.apache.spark.sql.execution.adaptive.AdaptiveTestUtils.assertExceptionMessage
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
@@ -56,8 +55,8 @@ abstract class MetadataCacheSuite extends QueryTest with SharedSparkSession {
val e = intercept[SparkException] {
df.count()
}
- assertExceptionMessage(e, "FileNotFoundException")
- assertExceptionMessage(e, "recreating the Dataset/DataFrame involved")
+ assert(e.getMessage.contains("FileNotFoundException"))
+ assert(e.getMessage.contains("recreating the Dataset/DataFrame involved"))
}
}
}
@@ -85,8 +84,8 @@ class MetadataCacheV1Suite extends MetadataCacheSuite {
val e = intercept[SparkException] {
sql("select count(*) from view_refresh").first()
}
- assertExceptionMessage(e, "FileNotFoundException")
- assertExceptionMessage(e, "REFRESH")
+ assert(e.getMessage.contains("FileNotFoundException"))
+ assert(e.getMessage.contains("REFRESH"))
// Refresh and we should be able to read it again.
spark.catalog.refreshTable("view_refresh")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 18923b2..37e8e13 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -706,7 +706,7 @@ class AdaptiveQueryExecSuite
val error = intercept[Exception] {
agged.count()
}
- assert(error.getCause().toString contains "Early failed query stage found")
+ assert(error.getCause().toString contains "Invalid bucket file")
assert(error.getSuppressed.size === 0)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveTestUtils.scala
index ddaeb57..48f85ae 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveTestUtils.scala
@@ -69,25 +69,3 @@ trait DisableAdaptiveExecutionSuite extends SQLTestUtils {
}
}
}
-
-object AdaptiveTestUtils {
- def assertExceptionMessage(e: Exception, expected: String): Unit = {
- val stringWriter = new StringWriter()
- e.printStackTrace(new PrintWriter(stringWriter))
- val errorMsg = stringWriter.toString
- assert(errorMsg.contains(expected))
- }
-
- def assertExceptionCause(t: Throwable, causeClass: Class[_]): Unit = {
- var c = t.getCause
- var foundCause = false
- while (c != null && !foundCause) {
- if (causeClass.isAssignableFrom(c.getClass)) {
- foundCause = true
- } else {
- c = c.getCause
- }
- }
- assert(foundCause, s"Can not find cause: $causeClass")
- }
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 4982991..19ec586 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -35,7 +35,6 @@ import org.apache.spark.sql.{functions => F, _}
import org.apache.spark.sql.catalyst.json._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.ExternalRDD
-import org.apache.spark.sql.execution.adaptive.AdaptiveTestUtils.assertExceptionMessage
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
@@ -2239,7 +2238,7 @@ abstract class JsonSuite extends QueryTest with SharedSparkSession with TestJson
.count()
}
- assertExceptionMessage(exception, "Malformed records are detected in record parsing")
+ assert(exception.getMessage.contains("Malformed records are detected in record parsing"))
}
def checkEncoding(expectedEncoding: String, pathToJsonFiles: String,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
index 60f278b..9caf0c8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
@@ -34,7 +34,6 @@ import org.apache.orc.mapreduce.OrcInputFormat
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.execution.adaptive.AdaptiveTestUtils.assertExceptionMessage
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, RecordReaderIterator}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
@@ -599,19 +598,19 @@ abstract class OrcQueryTest extends OrcTest {
val e1 = intercept[SparkException] {
testIgnoreCorruptFiles()
}
- assertExceptionMessage(e1, "Malformed ORC file")
+ assert(e1.getMessage.contains("Malformed ORC file"))
val e2 = intercept[SparkException] {
testIgnoreCorruptFilesWithoutSchemaInfer()
}
- assertExceptionMessage(e2, "Malformed ORC file")
+ assert(e2.getMessage.contains("Malformed ORC file"))
val e3 = intercept[SparkException] {
testAllCorruptFiles()
}
- assertExceptionMessage(e3, "Could not read footer for file")
+ assert(e3.getMessage.contains("Could not read footer for file"))
val e4 = intercept[SparkException] {
testAllCorruptFilesWithoutSchemaInfer()
}
- assertExceptionMessage(e4, "Malformed ORC file")
+ assert(e4.getMessage.contains("Malformed ORC file"))
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
index 1be9308..335ef25 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.{Dataset, QueryTest, Row, SparkSession}
import org.apache.spark.sql.catalyst.expressions.{BitwiseAnd, BitwiseOr, Cast, Literal, ShiftLeft}
import org.apache.spark.sql.catalyst.plans.logical.BROADCAST
import org.apache.spark.sql.execution.{SparkPlan, WholeStageCodegenExec}
-import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, AdaptiveTestUtils, DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite}
+import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite}
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.exchange.EnsureRequirements
import org.apache.spark.sql.functions._
@@ -411,7 +411,7 @@ abstract class BroadcastJoinSuiteBase extends QueryTest with SQLTestUtils
val e = intercept[Exception] {
testDf.collect()
}
- AdaptiveTestUtils.assertExceptionMessage(e, s"Could not execute broadcast in $timeout secs.")
+ assert(e.getMessage.contains(s"Could not execute broadcast in $timeout secs."))
}
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index 707d9c2..14ba008 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -29,7 +29,6 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
import org.apache.spark.sql.execution.{DataSourceScanExec, FileSourceScanExec, SortExec, SparkPlan}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
-import org.apache.spark.sql.execution.adaptive.AdaptiveTestUtils.assertExceptionMessage
import org.apache.spark.sql.execution.datasources.BucketingUtils
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.execution.joins.SortMergeJoinExec
@@ -759,7 +758,7 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils {
agged.count()
}
- assertExceptionMessage(error, "Invalid bucket file")
+ assert(error.getCause().toString contains "Invalid bucket file")
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
index 743cdbd..db8ebcd 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
@@ -21,7 +21,6 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.SparkException
import org.apache.spark.sql.QueryTest
-import org.apache.spark.sql.execution.adaptive.AdaptiveTestUtils.assertExceptionMessage
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils
@@ -100,7 +99,7 @@ class HiveMetadataCacheSuite extends QueryTest with SQLTestUtils with TestHiveSi
val e = intercept[SparkException] {
sql("select * from test").count()
}
- assertExceptionMessage(e, "FileNotFoundException")
+ assert(e.getMessage.contains("FileNotFoundException"))
// Test refreshing the cache.
spark.catalog.refreshTable("test")
@@ -115,7 +114,7 @@ class HiveMetadataCacheSuite extends QueryTest with SQLTestUtils with TestHiveSi
val e2 = intercept[SparkException] {
sql("select * from test").count()
}
- assertExceptionMessage(e2, "FileNotFoundException")
+ assert(e.getMessage.contains("FileNotFoundException"))
spark.catalog.refreshByPath(dir.getAbsolutePath)
assert(sql("select * from test").count() == 3)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org