You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2022/06/20 02:04:30 UTC
[spark] branch branch-3.3 updated: [SPARK-39163][SQL][3.3] Throw an exception w/ error class for an invalid bucket file
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 5d3f3365c0b [SPARK-39163][SQL][3.3] Throw an exception w/ error class for an invalid bucket file
5d3f3365c0b is described below
commit 5d3f3365c0b7d4515fd97d0ff7b7b29db69b2faf
Author: panbingkun <pb...@gmail.com>
AuthorDate: Mon Jun 20 11:04:14 2022 +0900
[SPARK-39163][SQL][3.3] Throw an exception w/ error class for an invalid bucket file
### What changes were proposed in this pull request?
In the PR, I propose to use the INVALID_BUCKET_FILE error classes for an invalid bucket file.
This is a backport of https://github.com/apache/spark/pull/36603.
### Why are the changes needed?
Porting the executing errors for multiple rows from a subquery used as an expression to the new error framework should improve user experience with Spark SQL.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
UT
Closes #36913 from panbingkun/branch-3.3-SPARK-39163.
Authored-by: panbingkun <pb...@gmail.com>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
core/src/main/resources/error/error-classes.json | 3 +++
.../spark/sql/errors/QueryExecutionErrors.scala | 5 +++++
.../spark/sql/execution/DataSourceScanExec.scala | 4 ++--
.../sql/errors/QueryExecutionErrorsSuite.scala | 25 ++++++++++++++++++++++
.../adaptive/AdaptiveQueryExecSuite.scala | 8 +++----
.../spark/sql/sources/BucketedReadSuite.scala | 21 ------------------
6 files changed, 39 insertions(+), 27 deletions(-)
diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json
index 4eb3a4e8e1e..fc712fc9c52 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -95,6 +95,9 @@
"INVALID_ARRAY_INDEX_IN_ELEMENT_AT" : {
"message" : [ "The index <indexValue> is out of bounds. The array has <arraySize> elements. Use `try_element_at` to tolerate accessing element at invalid index and return NULL instead. If necessary set <config> to \"false\" to bypass this error." ]
},
+ "INVALID_BUCKET_FILE" : {
+ "message" : [ "Invalid bucket file: <path>" ]
+ },
"INVALID_FIELD_NAME" : {
"message" : [ "Field name <fieldName> is invalid: <path> is not a struct." ],
"sqlState" : "42000"
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index 6c6139d2ccc..161bfd3c03d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -2075,4 +2075,9 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase {
new SparkException(errorClass = "NULL_COMPARISON_RESULT",
messageParameters = Array(), cause = null)
}
+
+ def invalidBucketFile(path: String): Throwable = {
+ new SparkException(errorClass = "INVALID_BUCKET_FILE", messageParameters = Array(path),
+ cause = null)
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 1ec93a614b7..9e8ae9a714d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
import org.apache.spark.sql.catalyst.util.truncatedString
+import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource}
import org.apache.spark.sql.execution.datasources.v2.PushedDownOperators
@@ -592,8 +593,7 @@ case class FileSourceScanExec(
}.groupBy { f =>
BucketingUtils
.getBucketId(new Path(f.filePath).getName)
- // TODO(SPARK-39163): Throw an exception w/ error class for an invalid bucket file
- .getOrElse(throw new IllegalStateException(s"Invalid bucket file ${f.filePath}"))
+ .getOrElse(throw QueryExecutionErrors.invalidBucketFile(f.filePath))
}
val prunedFilesGroupedToBuckets = if (optionalBucketSet.isDefined) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
index 73c5b12849a..21acea53ed0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
@@ -17,6 +17,9 @@
package org.apache.spark.sql.errors
+import java.io.File
+import java.net.URI
+
import org.apache.spark.{SparkArithmeticException, SparkException, SparkIllegalArgumentException, SparkRuntimeException, SparkUnsupportedOperationException, SparkUpgradeException}
import org.apache.spark.sql.{DataFrame, QueryTest, SaveMode}
import org.apache.spark.sql.execution.datasources.orc.OrcTest
@@ -286,4 +289,26 @@ class QueryExecutionErrorsSuite extends QueryTest
assert(e2.getMessage === "The save mode NULL is not supported for: an existent path.")
}
}
+
+ test("INVALID_BUCKET_FILE: error if there exists any malformed bucket files") {
+ val df1 = (0 until 50).map(i => (i % 5, i % 13, i.toString)).
+ toDF("i", "j", "k").as("df1")
+
+ withTable("bucketed_table") {
+ df1.write.format("parquet").bucketBy(8, "i").
+ saveAsTable("bucketed_table")
+ val warehouseFilePath = new URI(spark.sessionState.conf.warehousePath).getPath
+ val tableDir = new File(warehouseFilePath, "bucketed_table")
+ Utils.deleteRecursively(tableDir)
+ df1.write.parquet(tableDir.getAbsolutePath)
+
+ val aggregated = spark.table("bucketed_table").groupBy("i").count()
+
+ val e = intercept[SparkException] {
+ aggregated.count()
+ }
+ assert(e.getErrorClass === "INVALID_BUCKET_FILE")
+ assert(e.getMessage.matches("Invalid bucket file: .+"))
+ }
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index f068ab8a4e2..831a998dfaa 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -23,6 +23,7 @@ import java.net.URI
import org.apache.logging.log4j.Level
import org.scalatest.PrivateMethodTester
+import org.apache.spark.SparkException
import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListenerJobStart}
import org.apache.spark.sql.{Dataset, QueryTest, Row, SparkSession, Strategy}
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
@@ -856,12 +857,11 @@ class AdaptiveQueryExecSuite
df1.write.parquet(tableDir.getAbsolutePath)
val aggregated = spark.table("bucketed_table").groupBy("i").count()
- val error = intercept[IllegalStateException] {
+ val error = intercept[SparkException] {
aggregated.count()
}
- // TODO(SPARK-39163): Throw an exception w/ error class for an invalid bucket file
- assert(error.toString contains "Invalid bucket file")
- assert(error.getSuppressed.size === 0)
+ assert(error.getErrorClass === "INVALID_BUCKET_FILE")
+ assert(error.getMessage contains "Invalid bucket file")
}
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index 8d593a55a7e..bdd642a1f90 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -17,9 +17,6 @@
package org.apache.spark.sql.sources
-import java.io.File
-import java.net.URI
-
import scala.util.Random
import org.apache.spark.sql._
@@ -36,7 +33,6 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
-import org.apache.spark.util.Utils
import org.apache.spark.util.collection.BitSet
class BucketedReadWithoutHiveSupportSuite
@@ -832,23 +828,6 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti
}
}
- test("error if there exists any malformed bucket files") {
- withTable("bucketed_table") {
- df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table")
- val warehouseFilePath = new URI(spark.sessionState.conf.warehousePath).getPath
- val tableDir = new File(warehouseFilePath, "bucketed_table")
- Utils.deleteRecursively(tableDir)
- df1.write.parquet(tableDir.getAbsolutePath)
-
- val aggregated = spark.table("bucketed_table").groupBy("i").count()
- val e = intercept[IllegalStateException] {
- aggregated.count()
- }
- // TODO(SPARK-39163): Throw an exception w/ error class for an invalid bucket file
- assert(e.toString contains "Invalid bucket file")
- }
- }
-
test("disable bucketing when the output doesn't contain all bucketing columns") {
withTable("bucketed_table") {
df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org