You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2021/07/15 02:44:37 UTC
[spark] branch branch-3.2 updated: [SPARK-36106][SQL][CORE] Label
error classes for subset of QueryCompilationErrors
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 8b35bc4 [SPARK-36106][SQL][CORE] Label error classes for subset of QueryCompilationErrors
8b35bc4 is described below
commit 8b35bc4d2b328a151e7df37f83bea72c4a100e5d
Author: Karen Feng <ka...@databricks.com>
AuthorDate: Thu Jul 15 11:43:18 2021 +0900
[SPARK-36106][SQL][CORE] Label error classes for subset of QueryCompilationErrors
### What changes were proposed in this pull request?
Adds error classes to some of the exceptions in QueryCompilationErrors.
### Why are the changes needed?
Improves auditing for developers and adds useful fields for users (error class and SQLSTATE).
### Does this PR introduce _any_ user-facing change?
Yes, fills in missing error class and SQLSTATE fields.
### How was this patch tested?
Existing tests and new unit tests.
Closes #33309 from karenfeng/group-compilation-errors-1.
Authored-by: Karen Feng <ka...@databricks.com>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
(cherry picked from commit e92b8ea6f881db4be6e1f2c10588cd2eb5055db8)
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
core/src/main/resources/error/error-classes.json | 34 +++++++++++++++++++
.../org/apache/spark/sql/AnalysisException.scala | 3 ++
.../spark/sql/catalyst/analysis/Analyzer.scala | 4 +--
.../spark/sql/errors/QueryCompilationErrors.scala | 39 +++++++++++-----------
.../apache/spark/sql/DataFrameAggregateSuite.scala | 18 ++++++++++
.../scala/org/apache/spark/sql/SQLQuerySuite.scala | 24 +++++++------
.../spark/sql/connector/InsertIntoTests.scala | 18 ++++++++++
7 files changed, 108 insertions(+), 32 deletions(-)
diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json
index 6ab113be..9ac5f06 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -11,6 +11,24 @@
"message" : [ "Found duplicate keys '%s'" ],
"sqlState" : "23000"
},
+ "GROUPING_COLUMN_MISMATCH" : {
+ "message" : [ "Column of grouping (%s) can't be found in grouping columns %s" ],
+ "sqlState" : "42000"
+ },
+ "GROUPING_ID_COLUMN_MISMATCH" : {
+ "message" : [ "Columns of grouping_id (%s) does not match grouping columns (%s)" ],
+ "sqlState" : "42000"
+ },
+ "GROUPING_SIZE_LIMIT_EXCEEDED" : {
+ "message" : [ "Grouping sets size cannot be greater than %s" ]
+ },
+ "IF_PARTITION_NOT_EXISTS_UNSUPPORTED" : {
+ "message" : [ "Cannot write, IF NOT EXISTS is not supported for table: %s" ]
+ },
+ "INCOMPARABLE_PIVOT_COLUMN" : {
+ "message" : [ "Invalid pivot column '%s'. Pivot columns must be comparable." ],
+ "sqlState" : "42000"
+ },
"INVALID_FIELD_NAME" : {
"message" : [ "Field name %s is invalid: %s is not a struct." ],
"sqlState" : "42000"
@@ -19,6 +37,22 @@
"message" : [ "cannot resolve '%s' given input columns: [%s]" ],
"sqlState" : "42000"
},
+ "MISSING_STATIC_PARTITION_COLUMN" : {
+ "message" : [ "Unknown static partition column: %s" ],
+ "sqlState" : "42000"
+ },
+ "NON_LITERAL_PIVOT_VALUES" : {
+ "message" : [ "Literal expressions required for pivot values, found '%s'" ],
+ "sqlState" : "42000"
+ },
+ "NON_PARTITION_COLUMN" : {
+ "message" : [ "PARTITION clause cannot contain a non-partition column name: %s" ],
+ "sqlState" : "42000"
+ },
+ "PIVOT_VALUE_DATA_TYPE_MISMATCH" : {
+ "message" : [ "Invalid pivot value '%s': value data type %s does not match pivot column data type %s" ],
+ "sqlState" : "42000"
+ },
"SECOND_FUNCTION_ARGUMENT_NOT_INTEGER" : {
"message" : [ "The second argument of '%s' function needs to be an integer." ],
"sqlState" : "22023"
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala
index d0a3a71..35eb040 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala
@@ -46,6 +46,9 @@ class AnalysisException protected[sql] (
messageParameters = messageParameters,
cause = cause)
+ def this(errorClass: String, messageParameters: Array[String]) =
+ this(errorClass = errorClass, messageParameters = messageParameters, cause = None)
+
def this(
errorClass: String,
messageParameters: Array[String],
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 64f6b79..3bab58d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1344,7 +1344,7 @@ class Analyzer(override val catalogManager: CatalogManager)
case Some(attr) =>
attr.name -> staticName
case _ =>
- throw QueryCompilationErrors.addStaticValToUnknownColError(staticName)
+ throw QueryCompilationErrors.missingStaticPartitionColumn(staticName)
}).toMap
val queryColumns = query.output.iterator
@@ -1392,7 +1392,7 @@ class Analyzer(override val catalogManager: CatalogManager)
UnresolvedAttribute.quoted(attr.name),
Cast(Literal(value), attr.dataType))
case None =>
- throw QueryCompilationErrors.unknownStaticPartitionColError(name)
+ throw QueryCompilationErrors.missingStaticPartitionColumn(name)
}
}.reduce(And)
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 6322676..2cee614 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -50,56 +50,57 @@ private[spark] object QueryCompilationErrors {
def groupingIDMismatchError(groupingID: GroupingID, groupByExprs: Seq[Expression]): Throwable = {
new AnalysisException(
- s"Columns of grouping_id (${groupingID.groupByExprs.mkString(",")}) " +
- s"does not match grouping columns (${groupByExprs.mkString(",")})")
+ errorClass = "GROUPING_ID_COLUMN_MISMATCH",
+ messageParameters = Array(groupingID.groupByExprs.mkString(","), groupByExprs.mkString(",")))
}
def groupingColInvalidError(groupingCol: Expression, groupByExprs: Seq[Expression]): Throwable = {
new AnalysisException(
- s"Column of grouping ($groupingCol) can't be found " +
- s"in grouping columns ${groupByExprs.mkString(",")}")
+ errorClass = "GROUPING_COLUMN_MISMATCH",
+ messageParameters = Array(groupingCol.toString, groupByExprs.mkString(",")))
}
def groupingSizeTooLargeError(sizeLimit: Int): Throwable = {
new AnalysisException(
- s"Grouping sets size cannot be greater than $sizeLimit")
+ errorClass = "GROUPING_SIZE_LIMIT_EXCEEDED",
+ messageParameters = Array(sizeLimit.toString))
}
def unorderablePivotColError(pivotCol: Expression): Throwable = {
new AnalysisException(
- s"Invalid pivot column '$pivotCol'. Pivot columns must be comparable."
- )
+ errorClass = "INCOMPARABLE_PIVOT_COLUMN",
+ messageParameters = Array(pivotCol.toString))
}
def nonLiteralPivotValError(pivotVal: Expression): Throwable = {
new AnalysisException(
- s"Literal expressions required for pivot values, found '$pivotVal'")
+ errorClass = "NON_LITERAL_PIVOT_VALUES",
+ messageParameters = Array(pivotVal.toString))
}
def pivotValDataTypeMismatchError(pivotVal: Expression, pivotCol: Expression): Throwable = {
new AnalysisException(
- s"Invalid pivot value '$pivotVal': " +
- s"value data type ${pivotVal.dataType.simpleString} does not match " +
- s"pivot column data type ${pivotCol.dataType.catalogString}")
+ errorClass = "PIVOT_VALUE_DATA_TYPE_MISMATCH",
+ messageParameters = Array(
+ pivotVal.toString, pivotVal.dataType.simpleString, pivotCol.dataType.catalogString))
}
def unsupportedIfNotExistsError(tableName: String): Throwable = {
new AnalysisException(
- s"Cannot write, IF NOT EXISTS is not supported for table: $tableName")
+ errorClass = "IF_PARTITION_NOT_EXISTS_UNSUPPORTED",
+ messageParameters = Array(tableName))
}
def nonPartitionColError(partitionName: String): Throwable = {
new AnalysisException(
- s"PARTITION clause cannot contain a non-partition column name: $partitionName")
+ errorClass = "NON_PARTITION_COLUMN",
+ messageParameters = Array(partitionName))
}
- def addStaticValToUnknownColError(staticName: String): Throwable = {
+ def missingStaticPartitionColumn(staticName: String): Throwable = {
new AnalysisException(
- s"Cannot add static value for unknown column: $staticName")
- }
-
- def unknownStaticPartitionColError(name: String): Throwable = {
- new AnalysisException(s"Unknown static partition column: $name")
+ errorClass = "MISSING_STATIC_PARTITION_COLUMN",
+ messageParameters = Array(staticName))
}
def nestedGeneratorError(trimmedNestedGenerator: Expression): Throwable = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
index 62d68b8..d0a122e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -193,9 +193,27 @@ class DataFrameAggregateSuite extends QueryTest
intercept[AnalysisException] {
courseSales.groupBy().agg(grouping("course")).explain()
}
+
intercept[AnalysisException] {
courseSales.groupBy().agg(grouping_id("course")).explain()
}
+
+ val groupingColMismatchEx = intercept[AnalysisException] {
+ courseSales.cube("course", "year").agg(grouping("earnings")).explain()
+ }
+ assert(groupingColMismatchEx.getErrorClass == "GROUPING_COLUMN_MISMATCH")
+ assert(groupingColMismatchEx.getMessage.matches(
+ "Column of grouping \\(earnings.*\\) can't be found in grouping columns course.*,year.*"))
+
+
+ val groupingIdColMismatchEx = intercept[AnalysisException] {
+ courseSales.cube("course", "year").agg(grouping_id("earnings")).explain()
+ }
+ assert(groupingIdColMismatchEx.getErrorClass == "GROUPING_ID_COLUMN_MISMATCH")
+ assert(groupingIdColMismatchEx.getMessage.matches(
+ "Columns of grouping_id \\(earnings.*\\) does not match " +
+ "grouping columns \\(course.*,year.*\\)"),
+ groupingIdColMismatchEx.getMessage)
}
test("grouping/grouping_id inside window function") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 9bd83ca..b0d5c89 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -3377,7 +3377,7 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
s"VALUES(${(0 until 65).map { _ => 1 }.mkString(", ")}, 3) AS " +
s"t(${(0 until 65).map { i => s"k$i" }.mkString(", ")}, v)")
- def testGropingIDs(numGroupingSet: Int, expectedIds: Seq[Any] = Nil): Unit = {
+ def testGroupingIDs(numGroupingSet: Int, expectedIds: Seq[Any] = Nil): Unit = {
val groupingCols = (0 until numGroupingSet).map { i => s"k$i" }
val df = sql("SELECT GROUPING_ID(), SUM(v) FROM t GROUP BY " +
s"GROUPING SETS ((${groupingCols.mkString(",")}), (${groupingCols.init.mkString(",")}))")
@@ -3385,19 +3385,21 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
}
withSQLConf(SQLConf.LEGACY_INTEGER_GROUPING_ID.key -> "true") {
- testGropingIDs(32, Seq(0, 1))
- val errMsg = intercept[AnalysisException] {
- testGropingIDs(33)
- }.getMessage
- assert(errMsg.contains("Grouping sets size cannot be greater than 32"))
+ testGroupingIDs(32, Seq(0, 1))
+ val ex = intercept[AnalysisException] {
+ testGroupingIDs(33)
+ }
+ assert(ex.getMessage.contains("Grouping sets size cannot be greater than 32"))
+ assert(ex.getErrorClass == "GROUPING_SIZE_LIMIT_EXCEEDED")
}
withSQLConf(SQLConf.LEGACY_INTEGER_GROUPING_ID.key -> "false") {
- testGropingIDs(64, Seq(0L, 1L))
- val errMsg = intercept[AnalysisException] {
- testGropingIDs(65)
- }.getMessage
- assert(errMsg.contains("Grouping sets size cannot be greater than 64"))
+ testGroupingIDs(64, Seq(0L, 1L))
+ val ex = intercept[AnalysisException] {
+ testGroupingIDs(65)
+ }
+ assert(ex.getMessage.contains("Grouping sets size cannot be greater than 64"))
+ assert(ex.getErrorClass == "GROUPING_SIZE_LIMIT_EXCEEDED")
}
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala
index ad73037..0dee48f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala
@@ -261,6 +261,7 @@ trait InsertIntoSQLOnlyTests
assert(exc.getMessage.contains(
"PARTITION clause cannot contain a non-partition column name"))
assert(exc.getMessage.contains("id"))
+ assert(exc.getErrorClass == "NON_PARTITION_COLUMN")
}
}
@@ -277,6 +278,23 @@ trait InsertIntoSQLOnlyTests
assert(exc.getMessage.contains(
"PARTITION clause cannot contain a non-partition column name"))
assert(exc.getMessage.contains("data"))
+ assert(exc.getErrorClass == "NON_PARTITION_COLUMN")
+ }
+ }
+
+ test("InsertInto: IF PARTITION NOT EXISTS not supported") {
+ val t1 = s"${catalogAndNamespace}tbl"
+ withTableAndData(t1) { view =>
+ sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format PARTITIONED BY (id)")
+
+ val exc = intercept[AnalysisException] {
+ sql(s"INSERT OVERWRITE TABLE $t1 PARTITION (id = 1) IF NOT EXISTS SELECT * FROM $view")
+ }
+
+ verifyTable(t1, spark.emptyDataFrame)
+ assert(exc.getMessage.contains("Cannot write, IF NOT EXISTS is not supported for table"))
+ assert(exc.getMessage.contains(t1))
+ assert(exc.getErrorClass == "IF_PARTITION_NOT_EXISTS_UNSUPPORTED")
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org