You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/05/29 04:27:57 UTC

[spark] branch branch-3.0 updated: [SPARK-31862][SQL] Remove exception wrapping in AQE

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 52c2988  [SPARK-31862][SQL] Remove exception wrapping in AQE
52c2988 is described below

commit 52c2988a65d6a2d65f45e222423c59c9fb169b87
Author: Maryann Xue <ma...@gmail.com>
AuthorDate: Fri May 29 04:23:38 2020 +0000

    [SPARK-31862][SQL] Remove exception wrapping in AQE
    
    ### What changes were proposed in this pull request?
    
    This PR removes the excessive exception wrapping in AQE so that error messages are less verbose and mostly consistent with non-aqe execution. Exceptions from stage materialization are now only wrapped with `SparkException` if there are multiple stage failures. Also, stage cancelling errors will not be included as part the exception thrown, but rather just be error logged.
    
    ### Why are the changes needed?
    
    This will make the AQE error reporting more readable and debuggable.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Updated existing tests.
    
    Closes #28668 from maryannxue/spark-31862.
    
    Authored-by: Maryann Xue <ma...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 45864faaf2c9837a2ca48c456d3c2300736aa1ba)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../execution/adaptive/AdaptiveSparkPlanExec.scala | 38 +++++++++-------------
 .../sql/execution/adaptive/QueryStageExec.scala    |  3 +-
 .../org/apache/spark/sql/MetadataCacheSuite.scala  |  9 +++--
 .../adaptive/AdaptiveQueryExecSuite.scala          |  2 +-
 .../sql/execution/adaptive/AdaptiveTestUtils.scala | 22 -------------
 .../sql/execution/datasources/json/JsonSuite.scala |  3 +-
 .../execution/datasources/orc/OrcQuerySuite.scala  |  9 +++--
 .../sql/execution/joins/BroadcastJoinSuite.scala   |  4 +--
 .../spark/sql/sources/BucketedReadSuite.scala      |  3 +-
 .../spark/sql/hive/HiveMetadataCacheSuite.scala    |  5 ++-
 10 files changed, 33 insertions(+), 65 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index 90d1db9..f6a3333 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -156,7 +156,7 @@ case class AdaptiveSparkPlanExec(
       var currentLogicalPlan = currentPhysicalPlan.logicalLink.get
       var result = createQueryStages(currentPhysicalPlan)
       val events = new LinkedBlockingQueue[StageMaterializationEvent]()
-      val errors = new mutable.ArrayBuffer[SparkException]()
+      val errors = new mutable.ArrayBuffer[Throwable]()
       var stagesToReplace = Seq.empty[QueryStageExec]
       while (!result.allChildStagesMaterialized) {
         currentPhysicalPlan = result.newPlan
@@ -176,9 +176,7 @@ case class AdaptiveSparkPlanExec(
               }(AdaptiveSparkPlanExec.executionContext)
             } catch {
               case e: Throwable =>
-                val ex = new SparkException(
-                  s"Early failed query stage found: ${stage.treeString}", e)
-                cleanUpAndThrowException(Seq(ex), Some(stage.id))
+                cleanUpAndThrowException(Seq(e), Some(stage.id))
             }
           }
         }
@@ -193,8 +191,7 @@ case class AdaptiveSparkPlanExec(
           case StageSuccess(stage, res) =>
             stage.resultOption = Some(res)
           case StageFailure(stage, ex) =>
-            errors.append(
-              new SparkException(s"Failed to materialize query stage: ${stage.treeString}.", ex))
+            errors.append(ex)
         }
 
         // In case of errors, we cancel all running stages and throw exception.
@@ -536,31 +533,28 @@ case class AdaptiveSparkPlanExec(
    * materialization errors and stage cancellation errors.
    */
   private def cleanUpAndThrowException(
-       errors: Seq[SparkException],
+       errors: Seq[Throwable],
        earlyFailedStage: Option[Int]): Unit = {
-    val runningStages = currentPhysicalPlan.collect {
+    currentPhysicalPlan.foreach {
       // earlyFailedStage is the stage which failed before calling doMaterialize,
       // so we should avoid calling cancel on it to re-trigger the failure again.
-      case s: QueryStageExec if !earlyFailedStage.contains(s.id) => s
-    }
-    val cancelErrors = new mutable.ArrayBuffer[SparkException]()
-    try {
-      runningStages.foreach { s =>
+      case s: QueryStageExec if !earlyFailedStage.contains(s.id) =>
         try {
           s.cancel()
         } catch {
           case NonFatal(t) =>
-            cancelErrors.append(
-              new SparkException(s"Failed to cancel query stage: ${s.treeString}", t))
+            logError(s"Exception in cancelling query stage: ${s.treeString}", t)
         }
-      }
-    } finally {
-      val ex = new SparkException(
-        "Adaptive execution failed due to stage materialization failures.", errors.head)
-      errors.tail.foreach(ex.addSuppressed)
-      cancelErrors.foreach(ex.addSuppressed)
-      throw ex
+      case _ =>
+    }
+    val e = if (errors.size == 1) {
+      errors.head
+    } else {
+      val se = new SparkException("Multiple failures in stage materialization.", errors.head)
+      errors.tail.foreach(se.addSuppressed)
+      se
     }
+    throw e
   }
 }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
index f414f85..9a9a8b1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
@@ -25,6 +25,7 @@ import org.apache.spark.{FutureAction, MapOutputStatistics, SparkException}
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.errors.attachTree
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical.Statistics
 import org.apache.spark.sql.catalyst.plans.physical.Partitioning
@@ -147,7 +148,7 @@ case class ShuffleQueryStageExec(
       throw new IllegalStateException("wrong plan for shuffle stage:\n " + plan.treeString)
   }
 
-  override def doMaterialize(): Future[Any] = {
+  override def doMaterialize(): Future[Any] = attachTree(this, "execute") {
     shuffle.mapOutputStatisticsFuture
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala
index a9f443b..956bd78 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql
 import java.io.File
 
 import org.apache.spark.{SparkConf, SparkException}
-import org.apache.spark.sql.execution.adaptive.AdaptiveTestUtils.assertExceptionMessage
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
 
@@ -56,8 +55,8 @@ abstract class MetadataCacheSuite extends QueryTest with SharedSparkSession {
       val e = intercept[SparkException] {
         df.count()
       }
-      assertExceptionMessage(e, "FileNotFoundException")
-      assertExceptionMessage(e, "recreating the Dataset/DataFrame involved")
+      assert(e.getMessage.contains("FileNotFoundException"))
+      assert(e.getMessage.contains("recreating the Dataset/DataFrame involved"))
     }
   }
 }
@@ -85,8 +84,8 @@ class MetadataCacheV1Suite extends MetadataCacheSuite {
       val e = intercept[SparkException] {
         sql("select count(*) from view_refresh").first()
       }
-      assertExceptionMessage(e, "FileNotFoundException")
-      assertExceptionMessage(e, "REFRESH")
+      assert(e.getMessage.contains("FileNotFoundException"))
+      assert(e.getMessage.contains("REFRESH"))
 
       // Refresh and we should be able to read it again.
       spark.catalog.refreshTable("view_refresh")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 18923b2..37e8e13 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -706,7 +706,7 @@ class AdaptiveQueryExecSuite
         val error = intercept[Exception] {
           agged.count()
         }
-        assert(error.getCause().toString contains "Early failed query stage found")
+        assert(error.getCause().toString contains "Invalid bucket file")
         assert(error.getSuppressed.size === 0)
       }
     }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveTestUtils.scala
index ddaeb57..48f85ae 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveTestUtils.scala
@@ -69,25 +69,3 @@ trait DisableAdaptiveExecutionSuite extends SQLTestUtils {
     }
   }
 }
-
-object AdaptiveTestUtils {
-  def assertExceptionMessage(e: Exception, expected: String): Unit = {
-    val stringWriter = new StringWriter()
-    e.printStackTrace(new PrintWriter(stringWriter))
-    val errorMsg = stringWriter.toString
-    assert(errorMsg.contains(expected))
-  }
-
-  def assertExceptionCause(t: Throwable, causeClass: Class[_]): Unit = {
-    var c = t.getCause
-    var foundCause = false
-    while (c != null && !foundCause) {
-      if (causeClass.isAssignableFrom(c.getClass)) {
-        foundCause = true
-      } else {
-        c = c.getCause
-      }
-    }
-    assert(foundCause, s"Can not find cause: $causeClass")
-  }
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 4982991..19ec586 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -35,7 +35,6 @@ import org.apache.spark.sql.{functions => F, _}
 import org.apache.spark.sql.catalyst.json._
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.ExternalRDD
-import org.apache.spark.sql.execution.adaptive.AdaptiveTestUtils.assertExceptionMessage
 import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
@@ -2239,7 +2238,7 @@ abstract class JsonSuite extends QueryTest with SharedSparkSession with TestJson
         .count()
     }
 
-    assertExceptionMessage(exception, "Malformed records are detected in record parsing")
+    assert(exception.getMessage.contains("Malformed records are detected in record parsing"))
   }
 
   def checkEncoding(expectedEncoding: String, pathToJsonFiles: String,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
index 60f278b..9caf0c8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
@@ -34,7 +34,6 @@ import org.apache.orc.mapreduce.OrcInputFormat
 import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.execution.adaptive.AdaptiveTestUtils.assertExceptionMessage
 import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, RecordReaderIterator}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
@@ -599,19 +598,19 @@ abstract class OrcQueryTest extends OrcTest {
       val e1 = intercept[SparkException] {
         testIgnoreCorruptFiles()
       }
-      assertExceptionMessage(e1, "Malformed ORC file")
+      assert(e1.getMessage.contains("Malformed ORC file"))
       val e2 = intercept[SparkException] {
         testIgnoreCorruptFilesWithoutSchemaInfer()
       }
-      assertExceptionMessage(e2, "Malformed ORC file")
+      assert(e2.getMessage.contains("Malformed ORC file"))
       val e3 = intercept[SparkException] {
         testAllCorruptFiles()
       }
-      assertExceptionMessage(e3, "Could not read footer for file")
+      assert(e3.getMessage.contains("Could not read footer for file"))
       val e4 = intercept[SparkException] {
         testAllCorruptFilesWithoutSchemaInfer()
       }
-      assertExceptionMessage(e4, "Malformed ORC file")
+      assert(e4.getMessage.contains("Malformed ORC file"))
     }
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
index 1be9308..335ef25 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.{Dataset, QueryTest, Row, SparkSession}
 import org.apache.spark.sql.catalyst.expressions.{BitwiseAnd, BitwiseOr, Cast, Literal, ShiftLeft}
 import org.apache.spark.sql.catalyst.plans.logical.BROADCAST
 import org.apache.spark.sql.execution.{SparkPlan, WholeStageCodegenExec}
-import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, AdaptiveTestUtils, DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite}
+import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite}
 import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
 import org.apache.spark.sql.execution.exchange.EnsureRequirements
 import org.apache.spark.sql.functions._
@@ -411,7 +411,7 @@ abstract class BroadcastJoinSuiteBase extends QueryTest with SQLTestUtils
       val e = intercept[Exception] {
         testDf.collect()
       }
-      AdaptiveTestUtils.assertExceptionMessage(e, s"Could not execute broadcast in $timeout secs.")
+      assert(e.getMessage.contains(s"Could not execute broadcast in $timeout secs."))
     }
   }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index 707d9c2..14ba008 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -29,7 +29,6 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
 import org.apache.spark.sql.execution.{DataSourceScanExec, FileSourceScanExec, SortExec, SparkPlan}
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
-import org.apache.spark.sql.execution.adaptive.AdaptiveTestUtils.assertExceptionMessage
 import org.apache.spark.sql.execution.datasources.BucketingUtils
 import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
 import org.apache.spark.sql.execution.joins.SortMergeJoinExec
@@ -759,7 +758,7 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils {
         agged.count()
       }
 
-      assertExceptionMessage(error, "Invalid bucket file")
+      assert(error.getCause().toString contains "Invalid bucket file")
     }
   }
 
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
index 743cdbd..db8ebcd 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
@@ -21,7 +21,6 @@ import org.apache.hadoop.fs.Path
 
 import org.apache.spark.SparkException
 import org.apache.spark.sql.QueryTest
-import org.apache.spark.sql.execution.adaptive.AdaptiveTestUtils.assertExceptionMessage
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SQLTestUtils
@@ -100,7 +99,7 @@ class HiveMetadataCacheSuite extends QueryTest with SQLTestUtils with TestHiveSi
             val e = intercept[SparkException] {
               sql("select * from test").count()
             }
-            assertExceptionMessage(e, "FileNotFoundException")
+            assert(e.getMessage.contains("FileNotFoundException"))
 
             // Test refreshing the cache.
             spark.catalog.refreshTable("test")
@@ -115,7 +114,7 @@ class HiveMetadataCacheSuite extends QueryTest with SQLTestUtils with TestHiveSi
             val e2 = intercept[SparkException] {
               sql("select * from test").count()
             }
-            assertExceptionMessage(e2, "FileNotFoundException")
+            assert(e.getMessage.contains("FileNotFoundException"))
             spark.catalog.refreshByPath(dir.getAbsolutePath)
             assert(sql("select * from test").count() == 3)
           }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org