You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2021/07/15 02:44:37 UTC

[spark] branch branch-3.2 updated: [SPARK-36106][SQL][CORE] Label error classes for subset of QueryCompilationErrors

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 8b35bc4  [SPARK-36106][SQL][CORE] Label error classes for subset of QueryCompilationErrors
8b35bc4 is described below

commit 8b35bc4d2b328a151e7df37f83bea72c4a100e5d
Author: Karen Feng <ka...@databricks.com>
AuthorDate: Thu Jul 15 11:43:18 2021 +0900

    [SPARK-36106][SQL][CORE] Label error classes for subset of QueryCompilationErrors
    
    ### What changes were proposed in this pull request?
    
    Adds error classes to some of the exceptions in QueryCompilationErrors.
    
    ### Why are the changes needed?
    
    Improves auditing for developers and adds useful fields for users (error class and SQLSTATE).
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, fills in missing error class and SQLSTATE fields.
    
    ### How was this patch tested?
    
    Existing tests and new unit tests.
    
    Closes #33309 from karenfeng/group-compilation-errors-1.
    
    Authored-by: Karen Feng <ka...@databricks.com>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
    (cherry picked from commit e92b8ea6f881db4be6e1f2c10588cd2eb5055db8)
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 core/src/main/resources/error/error-classes.json   | 34 +++++++++++++++++++
 .../org/apache/spark/sql/AnalysisException.scala   |  3 ++
 .../spark/sql/catalyst/analysis/Analyzer.scala     |  4 +--
 .../spark/sql/errors/QueryCompilationErrors.scala  | 39 +++++++++++-----------
 .../apache/spark/sql/DataFrameAggregateSuite.scala | 18 ++++++++++
 .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 24 +++++++------
 .../spark/sql/connector/InsertIntoTests.scala      | 18 ++++++++++
 7 files changed, 108 insertions(+), 32 deletions(-)

diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json
index 6ab113be..9ac5f06 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -11,6 +11,24 @@
     "message" : [ "Found duplicate keys '%s'" ],
     "sqlState" : "23000"
   },
+  "GROUPING_COLUMN_MISMATCH" : {
+    "message" : [ "Column of grouping (%s) can't be found in grouping columns %s" ],
+    "sqlState" : "42000"
+  },
+  "GROUPING_ID_COLUMN_MISMATCH" : {
+    "message" : [ "Columns of grouping_id (%s) does not match grouping columns (%s)" ],
+    "sqlState" : "42000"
+  },
+  "GROUPING_SIZE_LIMIT_EXCEEDED" : {
+    "message" : [ "Grouping sets size cannot be greater than %s" ]
+  },
+  "IF_PARTITION_NOT_EXISTS_UNSUPPORTED" : {
+    "message" : [ "Cannot write, IF NOT EXISTS is not supported for table: %s" ]
+  },
+  "INCOMPARABLE_PIVOT_COLUMN" : {
+    "message" : [ "Invalid pivot column '%s'. Pivot columns must be comparable." ],
+    "sqlState" : "42000"
+  },
   "INVALID_FIELD_NAME" : {
     "message" : [ "Field name %s is invalid: %s is not a struct." ],
     "sqlState" : "42000"
@@ -19,6 +37,22 @@
     "message" : [ "cannot resolve '%s' given input columns: [%s]" ],
     "sqlState" : "42000"
   },
+  "MISSING_STATIC_PARTITION_COLUMN" : {
+    "message" : [ "Unknown static partition column: %s" ],
+    "sqlState" : "42000"
+  },
+  "NON_LITERAL_PIVOT_VALUES" : {
+    "message" : [ "Literal expressions required for pivot values, found '%s'" ],
+    "sqlState" : "42000"
+  },
+  "NON_PARTITION_COLUMN" : {
+    "message" : [ "PARTITION clause cannot contain a non-partition column name: %s" ],
+    "sqlState" : "42000"
+  },
+  "PIVOT_VALUE_DATA_TYPE_MISMATCH" : {
+    "message" : [ "Invalid pivot value '%s': value data type %s does not match pivot column data type %s" ],
+    "sqlState" : "42000"
+  },
   "SECOND_FUNCTION_ARGUMENT_NOT_INTEGER" : {
     "message" : [ "The second argument of '%s' function needs to be an integer." ],
     "sqlState" : "22023"
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala
index d0a3a71..35eb040 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala
@@ -46,6 +46,9 @@ class AnalysisException protected[sql] (
       messageParameters = messageParameters,
       cause = cause)
 
+  def this(errorClass: String, messageParameters: Array[String]) =
+    this(errorClass = errorClass, messageParameters = messageParameters, cause = None)
+
   def this(
       errorClass: String,
       messageParameters: Array[String],
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 64f6b79..3bab58d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1344,7 +1344,7 @@ class Analyzer(override val catalogManager: CatalogManager)
               case Some(attr) =>
                 attr.name -> staticName
               case _ =>
-                throw QueryCompilationErrors.addStaticValToUnknownColError(staticName)
+                throw QueryCompilationErrors.missingStaticPartitionColumn(staticName)
             }).toMap
 
           val queryColumns = query.output.iterator
@@ -1392,7 +1392,7 @@ class Analyzer(override val catalogManager: CatalogManager)
                 UnresolvedAttribute.quoted(attr.name),
                 Cast(Literal(value), attr.dataType))
             case None =>
-              throw QueryCompilationErrors.unknownStaticPartitionColError(name)
+              throw QueryCompilationErrors.missingStaticPartitionColumn(name)
           }
         }.reduce(And)
       }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 6322676..2cee614 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -50,56 +50,57 @@ private[spark] object QueryCompilationErrors {
 
   def groupingIDMismatchError(groupingID: GroupingID, groupByExprs: Seq[Expression]): Throwable = {
     new AnalysisException(
-      s"Columns of grouping_id (${groupingID.groupByExprs.mkString(",")}) " +
-        s"does not match grouping columns (${groupByExprs.mkString(",")})")
+      errorClass = "GROUPING_ID_COLUMN_MISMATCH",
+      messageParameters = Array(groupingID.groupByExprs.mkString(","), groupByExprs.mkString(",")))
   }
 
   def groupingColInvalidError(groupingCol: Expression, groupByExprs: Seq[Expression]): Throwable = {
     new AnalysisException(
-      s"Column of grouping ($groupingCol) can't be found " +
-        s"in grouping columns ${groupByExprs.mkString(",")}")
+      errorClass = "GROUPING_COLUMN_MISMATCH",
+      messageParameters = Array(groupingCol.toString, groupByExprs.mkString(",")))
   }
 
   def groupingSizeTooLargeError(sizeLimit: Int): Throwable = {
     new AnalysisException(
-      s"Grouping sets size cannot be greater than $sizeLimit")
+      errorClass = "GROUPING_SIZE_LIMIT_EXCEEDED",
+      messageParameters = Array(sizeLimit.toString))
   }
 
   def unorderablePivotColError(pivotCol: Expression): Throwable = {
     new AnalysisException(
-      s"Invalid pivot column '$pivotCol'. Pivot columns must be comparable."
-    )
+      errorClass = "INCOMPARABLE_PIVOT_COLUMN",
+      messageParameters = Array(pivotCol.toString))
   }
 
   def nonLiteralPivotValError(pivotVal: Expression): Throwable = {
     new AnalysisException(
-      s"Literal expressions required for pivot values, found '$pivotVal'")
+      errorClass = "NON_LITERAL_PIVOT_VALUES",
+      messageParameters = Array(pivotVal.toString))
   }
 
   def pivotValDataTypeMismatchError(pivotVal: Expression, pivotCol: Expression): Throwable = {
     new AnalysisException(
-      s"Invalid pivot value '$pivotVal': " +
-        s"value data type ${pivotVal.dataType.simpleString} does not match " +
-        s"pivot column data type ${pivotCol.dataType.catalogString}")
+      errorClass = "PIVOT_VALUE_DATA_TYPE_MISMATCH",
+      messageParameters = Array(
+        pivotVal.toString, pivotVal.dataType.simpleString, pivotCol.dataType.catalogString))
   }
 
   def unsupportedIfNotExistsError(tableName: String): Throwable = {
     new AnalysisException(
-      s"Cannot write, IF NOT EXISTS is not supported for table: $tableName")
+      errorClass = "IF_PARTITION_NOT_EXISTS_UNSUPPORTED",
+      messageParameters = Array(tableName))
   }
 
   def nonPartitionColError(partitionName: String): Throwable = {
     new AnalysisException(
-      s"PARTITION clause cannot contain a non-partition column name: $partitionName")
+      errorClass = "NON_PARTITION_COLUMN",
+      messageParameters = Array(partitionName))
   }
 
-  def addStaticValToUnknownColError(staticName: String): Throwable = {
+  def missingStaticPartitionColumn(staticName: String): Throwable = {
     new AnalysisException(
-      s"Cannot add static value for unknown column: $staticName")
-  }
-
-  def unknownStaticPartitionColError(name: String): Throwable = {
-    new AnalysisException(s"Unknown static partition column: $name")
+      errorClass = "MISSING_STATIC_PARTITION_COLUMN",
+      messageParameters = Array(staticName))
   }
 
   def nestedGeneratorError(trimmedNestedGenerator: Expression): Throwable = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
index 62d68b8..d0a122e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -193,9 +193,27 @@ class DataFrameAggregateSuite extends QueryTest
     intercept[AnalysisException] {
       courseSales.groupBy().agg(grouping("course")).explain()
     }
+
     intercept[AnalysisException] {
       courseSales.groupBy().agg(grouping_id("course")).explain()
     }
+
+    val groupingColMismatchEx = intercept[AnalysisException] {
+      courseSales.cube("course", "year").agg(grouping("earnings")).explain()
+    }
+    assert(groupingColMismatchEx.getErrorClass == "GROUPING_COLUMN_MISMATCH")
+    assert(groupingColMismatchEx.getMessage.matches(
+      "Column of grouping \\(earnings.*\\) can't be found in grouping columns course.*,year.*"))
+
+
+    val groupingIdColMismatchEx = intercept[AnalysisException] {
+      courseSales.cube("course", "year").agg(grouping_id("earnings")).explain()
+    }
+    assert(groupingIdColMismatchEx.getErrorClass == "GROUPING_ID_COLUMN_MISMATCH")
+    assert(groupingIdColMismatchEx.getMessage.matches(
+      "Columns of grouping_id \\(earnings.*\\) does not match " +
+        "grouping columns \\(course.*,year.*\\)"),
+      groupingIdColMismatchEx.getMessage)
   }
 
   test("grouping/grouping_id inside window function") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 9bd83ca..b0d5c89 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -3377,7 +3377,7 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
         s"VALUES(${(0 until 65).map { _ => 1 }.mkString(", ")}, 3) AS " +
         s"t(${(0 until 65).map { i => s"k$i" }.mkString(", ")}, v)")
 
-      def testGropingIDs(numGroupingSet: Int, expectedIds: Seq[Any] = Nil): Unit = {
+      def testGroupingIDs(numGroupingSet: Int, expectedIds: Seq[Any] = Nil): Unit = {
         val groupingCols = (0 until numGroupingSet).map { i => s"k$i" }
         val df = sql("SELECT GROUPING_ID(), SUM(v) FROM t GROUP BY " +
           s"GROUPING SETS ((${groupingCols.mkString(",")}), (${groupingCols.init.mkString(",")}))")
@@ -3385,19 +3385,21 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
       }
 
       withSQLConf(SQLConf.LEGACY_INTEGER_GROUPING_ID.key -> "true") {
-        testGropingIDs(32, Seq(0, 1))
-        val errMsg = intercept[AnalysisException] {
-          testGropingIDs(33)
-        }.getMessage
-        assert(errMsg.contains("Grouping sets size cannot be greater than 32"))
+        testGroupingIDs(32, Seq(0, 1))
+        val ex = intercept[AnalysisException] {
+          testGroupingIDs(33)
+        }
+        assert(ex.getMessage.contains("Grouping sets size cannot be greater than 32"))
+        assert(ex.getErrorClass == "GROUPING_SIZE_LIMIT_EXCEEDED")
       }
 
       withSQLConf(SQLConf.LEGACY_INTEGER_GROUPING_ID.key -> "false") {
-        testGropingIDs(64, Seq(0L, 1L))
-        val errMsg = intercept[AnalysisException] {
-          testGropingIDs(65)
-        }.getMessage
-        assert(errMsg.contains("Grouping sets size cannot be greater than 64"))
+        testGroupingIDs(64, Seq(0L, 1L))
+        val ex = intercept[AnalysisException] {
+          testGroupingIDs(65)
+        }
+        assert(ex.getMessage.contains("Grouping sets size cannot be greater than 64"))
+        assert(ex.getErrorClass == "GROUPING_SIZE_LIMIT_EXCEEDED")
       }
     }
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala
index ad73037..0dee48f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala
@@ -261,6 +261,7 @@ trait InsertIntoSQLOnlyTests
         assert(exc.getMessage.contains(
           "PARTITION clause cannot contain a non-partition column name"))
         assert(exc.getMessage.contains("id"))
+        assert(exc.getErrorClass == "NON_PARTITION_COLUMN")
       }
     }
 
@@ -277,6 +278,23 @@ trait InsertIntoSQLOnlyTests
         assert(exc.getMessage.contains(
           "PARTITION clause cannot contain a non-partition column name"))
         assert(exc.getMessage.contains("data"))
+        assert(exc.getErrorClass == "NON_PARTITION_COLUMN")
+      }
+    }
+
+    test("InsertInto: IF PARTITION NOT EXISTS not supported") {
+      val t1 = s"${catalogAndNamespace}tbl"
+      withTableAndData(t1) { view =>
+        sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format PARTITIONED BY (id)")
+
+        val exc = intercept[AnalysisException] {
+          sql(s"INSERT OVERWRITE TABLE $t1 PARTITION (id = 1) IF NOT EXISTS SELECT * FROM $view")
+        }
+
+        verifyTable(t1, spark.emptyDataFrame)
+        assert(exc.getMessage.contains("Cannot write, IF NOT EXISTS is not supported for table"))
+        assert(exc.getMessage.contains(t1))
+        assert(exc.getErrorClass == "IF_PARTITION_NOT_EXISTS_UNSUPPORTED")
       }
     }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org