You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2022/06/20 02:04:30 UTC

[spark] branch branch-3.3 updated: [SPARK-39163][SQL][3.3] Throw an exception w/ error class for an invalid bucket file

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 5d3f3365c0b [SPARK-39163][SQL][3.3] Throw an exception w/ error class for an invalid bucket file
5d3f3365c0b is described below

commit 5d3f3365c0b7d4515fd97d0ff7b7b29db69b2faf
Author: panbingkun <pb...@gmail.com>
AuthorDate: Mon Jun 20 11:04:14 2022 +0900

    [SPARK-39163][SQL][3.3] Throw an exception w/ error class for an invalid bucket file
    
    ### What changes were proposed in this pull request?
    In the PR, I propose to use the INVALID_BUCKET_FILE error classes for an invalid bucket file.
    
    This is a backport of https://github.com/apache/spark/pull/36603.
    
    ### Why are the changes needed?
    Porting the executing errors for multiple rows from a subquery used as an expression to the new error framework should improve user experience with Spark SQL.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    UT
    
    Closes #36913 from panbingkun/branch-3.3-SPARK-39163.
    
    Authored-by: panbingkun <pb...@gmail.com>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 core/src/main/resources/error/error-classes.json   |  3 +++
 .../spark/sql/errors/QueryExecutionErrors.scala    |  5 +++++
 .../spark/sql/execution/DataSourceScanExec.scala   |  4 ++--
 .../sql/errors/QueryExecutionErrorsSuite.scala     | 25 ++++++++++++++++++++++
 .../adaptive/AdaptiveQueryExecSuite.scala          |  8 +++----
 .../spark/sql/sources/BucketedReadSuite.scala      | 21 ------------------
 6 files changed, 39 insertions(+), 27 deletions(-)

diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json
index 4eb3a4e8e1e..fc712fc9c52 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -95,6 +95,9 @@
   "INVALID_ARRAY_INDEX_IN_ELEMENT_AT" : {
     "message" : [ "The index <indexValue> is out of bounds. The array has <arraySize> elements. Use `try_element_at` to tolerate accessing element at invalid index and return NULL instead. If necessary set <config> to \"false\" to bypass this error." ]
   },
+  "INVALID_BUCKET_FILE" : {
+    "message" : [ "Invalid bucket file: <path>" ]
+  },
   "INVALID_FIELD_NAME" : {
     "message" : [ "Field name <fieldName> is invalid: <path> is not a struct." ],
     "sqlState" : "42000"
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index 6c6139d2ccc..161bfd3c03d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -2075,4 +2075,9 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase {
     new SparkException(errorClass = "NULL_COMPARISON_RESULT",
       messageParameters = Array(), cause = null)
   }
+
+  def invalidBucketFile(path: String): Throwable = {
+    new SparkException(errorClass = "INVALID_BUCKET_FILE", messageParameters = Array(path),
+      cause = null)
+  }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 1ec93a614b7..9e8ae9a714d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
 import org.apache.spark.sql.catalyst.util.truncatedString
+import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource}
 import org.apache.spark.sql.execution.datasources.v2.PushedDownOperators
@@ -592,8 +593,7 @@ case class FileSourceScanExec(
       }.groupBy { f =>
         BucketingUtils
           .getBucketId(new Path(f.filePath).getName)
-          // TODO(SPARK-39163): Throw an exception w/ error class for an invalid bucket file
-          .getOrElse(throw new IllegalStateException(s"Invalid bucket file ${f.filePath}"))
+          .getOrElse(throw QueryExecutionErrors.invalidBucketFile(f.filePath))
       }
 
     val prunedFilesGroupedToBuckets = if (optionalBucketSet.isDefined) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
index 73c5b12849a..21acea53ed0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
@@ -17,6 +17,9 @@
 
 package org.apache.spark.sql.errors
 
+import java.io.File
+import java.net.URI
+
 import org.apache.spark.{SparkArithmeticException, SparkException, SparkIllegalArgumentException, SparkRuntimeException, SparkUnsupportedOperationException, SparkUpgradeException}
 import org.apache.spark.sql.{DataFrame, QueryTest, SaveMode}
 import org.apache.spark.sql.execution.datasources.orc.OrcTest
@@ -286,4 +289,26 @@ class QueryExecutionErrorsSuite extends QueryTest
       assert(e2.getMessage === "The save mode NULL is not supported for: an existent path.")
     }
   }
+
+  test("INVALID_BUCKET_FILE: error if there exists any malformed bucket files") {
+    val df1 = (0 until 50).map(i => (i % 5, i % 13, i.toString)).
+      toDF("i", "j", "k").as("df1")
+
+    withTable("bucketed_table") {
+      df1.write.format("parquet").bucketBy(8, "i").
+        saveAsTable("bucketed_table")
+      val warehouseFilePath = new URI(spark.sessionState.conf.warehousePath).getPath
+      val tableDir = new File(warehouseFilePath, "bucketed_table")
+      Utils.deleteRecursively(tableDir)
+      df1.write.parquet(tableDir.getAbsolutePath)
+
+      val aggregated = spark.table("bucketed_table").groupBy("i").count()
+
+      val e = intercept[SparkException] {
+        aggregated.count()
+      }
+      assert(e.getErrorClass === "INVALID_BUCKET_FILE")
+      assert(e.getMessage.matches("Invalid bucket file: .+"))
+    }
+  }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index f068ab8a4e2..831a998dfaa 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -23,6 +23,7 @@ import java.net.URI
 import org.apache.logging.log4j.Level
 import org.scalatest.PrivateMethodTester
 
+import org.apache.spark.SparkException
 import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListenerJobStart}
 import org.apache.spark.sql.{Dataset, QueryTest, Row, SparkSession, Strategy}
 import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
@@ -856,12 +857,11 @@ class AdaptiveQueryExecSuite
         df1.write.parquet(tableDir.getAbsolutePath)
 
         val aggregated = spark.table("bucketed_table").groupBy("i").count()
-        val error = intercept[IllegalStateException] {
+        val error = intercept[SparkException] {
           aggregated.count()
         }
-        // TODO(SPARK-39163): Throw an exception w/ error class for an invalid bucket file
-        assert(error.toString contains "Invalid bucket file")
-        assert(error.getSuppressed.size === 0)
+        assert(error.getErrorClass === "INVALID_BUCKET_FILE")
+        assert(error.getMessage contains "Invalid bucket file")
       }
     }
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index 8d593a55a7e..bdd642a1f90 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -17,9 +17,6 @@
 
 package org.apache.spark.sql.sources
 
-import java.io.File
-import java.net.URI
-
 import scala.util.Random
 
 import org.apache.spark.sql._
@@ -36,7 +33,6 @@ import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
 import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
-import org.apache.spark.util.Utils
 import org.apache.spark.util.collection.BitSet
 
 class BucketedReadWithoutHiveSupportSuite
@@ -832,23 +828,6 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
     }
   }
 
-  test("error if there exists any malformed bucket files") {
-    withTable("bucketed_table") {
-      df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table")
-      val warehouseFilePath = new URI(spark.sessionState.conf.warehousePath).getPath
-      val tableDir = new File(warehouseFilePath, "bucketed_table")
-      Utils.deleteRecursively(tableDir)
-      df1.write.parquet(tableDir.getAbsolutePath)
-
-      val aggregated = spark.table("bucketed_table").groupBy("i").count()
-      val e = intercept[IllegalStateException] {
-        aggregated.count()
-      }
-      // TODO(SPARK-39163): Throw an exception w/ error class for an invalid bucket file
-      assert(e.toString contains "Invalid bucket file")
-    }
-  }
-
   test("disable bucketing when the output doesn't contain all bucketing columns") {
     withTable("bucketed_table") {
       df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org