You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2022/11/01 08:42:46 UTC

[spark] branch master updated: [SPARK-40890][SQL][TESTS] Check error classes in DataSourceV2SQLSuite

This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ba019fde708 [SPARK-40890][SQL][TESTS] Check error classes in DataSourceV2SQLSuite
ba019fde708 is described below

commit ba019fde708658d0df4f480f93232f8caf2837dd
Author: panbingkun <pb...@gmail.com>
AuthorDate: Tue Nov 1 11:42:32 2022 +0300

    [SPARK-40890][SQL][TESTS] Check error classes in DataSourceV2SQLSuite
    
    ### What changes were proposed in this pull request?
    This PR aims to replace 'intercept' with 'Check error classes' in DataSourceV2SQLSuite.
    
    ### Why are the changes needed?
    The changes improve the error framework.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    By running the modified test suite:
    ```
    $ build/sbt "test:testOnly *DataSourceV2SQLSuite"
    ```
    
    Closes #38439 from panbingkun/SPARK-40890.
    
    Authored-by: panbingkun <pb...@gmail.com>
    Signed-off-by: Max Gekk <ma...@gmail.com>
---
 .../spark/sql/connector/DataSourceV2SQLSuite.scala | 1038 +++++++++++++-------
 1 file changed, 665 insertions(+), 373 deletions(-)

diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index 41a1563aebc..c9c66395a3b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -19,10 +19,12 @@ package org.apache.spark.sql.connector
 
 import java.sql.Timestamp
 import java.time.{Duration, LocalDate, Period}
+import java.util.Locale
 
 import scala.collection.JavaConverters._
 import scala.concurrent.duration.MICROSECONDS
 
+import org.apache.spark.{SparkException, SparkUnsupportedOperationException}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchDatabaseException, NoSuchNamespaceException, TableAlreadyExistsException}
@@ -61,13 +63,8 @@ abstract class DataSourceV2SQLSuite
     checkAnswer(spark.table(tableName), expected)
   }
 
-  protected def assertAnalysisError(
-      sqlStatement: String,
-      expectedError: String): Unit = {
-    val ex = intercept[AnalysisException] {
-      sql(sqlStatement)
-    }
-    assert(ex.getMessage.contains(expectedError))
+  protected def analysisException(sqlText: String): AnalysisException = {
+    intercept[AnalysisException](sql(sqlText))
   }
 }
 
@@ -118,12 +115,16 @@ class DataSourceV2SQLSuiteV1Filter extends DataSourceV2SQLSuite with AlterTableT
         Row("data_type", "string"),
         Row("comment", "hello")))
 
-      assertAnalysisErrorClass(
-        s"DESCRIBE $t invalid_col",
-        "UNRESOLVED_COLUMN.WITH_SUGGESTION",
-        Map(
+      checkError(
+        exception = analysisException(s"DESCRIBE $t invalid_col"),
+        errorClass = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
+        parameters = Map(
           "objectName" -> "`invalid_col`",
-          "proposal" -> "`testcat`.`tbl`.`id`, `testcat`.`tbl`.`data`"))
+          "proposal" -> "`testcat`.`tbl`.`id`, `testcat`.`tbl`.`data`"),
+        context = ExpectedContext(
+          fragment = "DESCRIBE testcat.tbl invalid_col",
+          start = 0,
+          stop = 31))
     }
   }
 
@@ -147,9 +148,12 @@ class DataSourceV2SQLSuiteV1Filter extends DataSourceV2SQLSuite with AlterTableT
     val t = "testcat.tbl"
     withTable(t) {
       sql(s"CREATE TABLE $t (d struct<a: INT, b: INT>) USING foo")
-      assertAnalysisError(
-        s"describe $t d.a",
-        "DESC TABLE COLUMN does not support nested column")
+      checkError(
+        exception = analysisException(s"describe $t d.a"),
+        errorClass = "_LEGACY_ERROR_TEMP_1060",
+        parameters = Map(
+          "command" -> "DESC TABLE COLUMN",
+          "column" -> "d.a"))
     }
   }
 
@@ -195,11 +199,13 @@ class DataSourceV2SQLSuiteV1Filter extends DataSourceV2SQLSuite with AlterTableT
     assert(table.schema == new StructType().add("id", LongType).add("data", StringType))
 
     // run a second create query that should fail
-    val exc = intercept[TableAlreadyExistsException] {
-      spark.sql("CREATE TABLE testcat.table_name (id bigint, data string, id2 bigint) USING bar")
-    }
-
-    assert(exc.getMessage.contains("table_name"))
+    checkError(
+      exception = intercept[TableAlreadyExistsException] {
+        spark.sql("CREATE TABLE testcat.table_name " +
+          "(id bigint, data string, id2 bigint) USING bar")
+      },
+      errorClass = "TABLE_OR_VIEW_ALREADY_EXISTS",
+      parameters = Map("relationName" -> "`table_name`"))
 
     // table should not have changed
     val table2 = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
@@ -277,24 +283,38 @@ class DataSourceV2SQLSuiteV1Filter extends DataSourceV2SQLSuite with AlterTableT
 
   test("CreateTable/ReplaceTable: invalid schema if has interval type") {
     Seq("CREATE", "REPLACE").foreach { action =>
-      val e1 = intercept[AnalysisException](
-        sql(s"$action TABLE table_name (id int, value interval) USING $v2Format"))
-      assert(e1.getMessage.contains(s"Cannot use interval type in the table schema."))
-      val e2 = intercept[AnalysisException](
-        sql(s"$action TABLE table_name (id array<interval>) USING $v2Format"))
-      assert(e2.getMessage.contains(s"Cannot use interval type in the table schema."))
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql(s"$action TABLE table_name (id int, value interval) USING $v2Format")
+        },
+        errorClass = "_LEGACY_ERROR_TEMP_1183",
+        parameters = Map.empty)
+
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql(s"$action TABLE table_name (id array<interval>) USING $v2Format")
+        },
+        errorClass = "_LEGACY_ERROR_TEMP_1183",
+        parameters = Map.empty)
     }
   }
 
   test("CTAS/RTAS: invalid schema if has interval type") {
     withSQLConf(SQLConf.LEGACY_INTERVAL_ENABLED.key -> "true") {
       Seq("CREATE", "REPLACE").foreach { action =>
-        val e1 = intercept[AnalysisException](
-          sql(s"$action TABLE table_name USING $v2Format as select interval 1 day"))
-        assert(e1.getMessage.contains(s"Cannot use interval type in the table schema."))
-        val e2 = intercept[AnalysisException](
-          sql(s"$action TABLE table_name USING $v2Format as select array(interval 1 day)"))
-        assert(e2.getMessage.contains(s"Cannot use interval type in the table schema."))
+        checkError(
+          exception = intercept[AnalysisException] {
+            sql(s"$action TABLE table_name USING $v2Format as select interval 1 day")
+          },
+          errorClass = "_LEGACY_ERROR_TEMP_1183",
+          parameters = Map.empty)
+
+        checkError(
+          exception = intercept[AnalysisException] {
+            sql(s"$action TABLE table_name USING $v2Format as select array(interval 1 day)")
+          },
+          errorClass = "_LEGACY_ERROR_TEMP_1183",
+          parameters = Map.empty)
       }
     }
   }
@@ -606,21 +626,28 @@ class DataSourceV2SQLSuiteV1Filter extends DataSourceV2SQLSuite with AlterTableT
   test("ReplaceTableAsSelect: REPLACE TABLE throws exception if table does not exist.") {
     Seq("testcat", "testcat_atomic").foreach { catalog =>
       spark.sql(s"CREATE TABLE $catalog.created USING $v2Source AS SELECT id, data FROM source")
-      intercept[CannotReplaceMissingTableException] {
-        spark.sql(s"REPLACE TABLE $catalog.replaced USING $v2Source AS SELECT id, data FROM source")
-      }
+      checkError(
+        exception = intercept[CannotReplaceMissingTableException] {
+          spark.sql(s"REPLACE TABLE $catalog.replaced USING $v2Source " +
+            s"AS SELECT id, data FROM source")
+        },
+        errorClass = "TABLE_OR_VIEW_NOT_FOUND",
+        parameters = Map("relationName" -> "`replaced`"))
     }
   }
 
   test("ReplaceTableAsSelect: REPLACE TABLE throws exception if table is dropped before commit.") {
     import InMemoryTableCatalog._
     spark.sql(s"CREATE TABLE testcat_atomic.created USING $v2Source AS SELECT id, data FROM source")
-    intercept[CannotReplaceMissingTableException] {
-      spark.sql(s"REPLACE TABLE testcat_atomic.replaced" +
-        s" USING $v2Source" +
-        s" TBLPROPERTIES (`$SIMULATE_DROP_BEFORE_REPLACE_PROPERTY`=true)" +
-        s" AS SELECT id, data FROM source")
-    }
+    checkError(
+      exception = intercept[CannotReplaceMissingTableException] {
+        spark.sql(s"REPLACE TABLE testcat_atomic.replaced" +
+          s" USING $v2Source" +
+          s" TBLPROPERTIES (`$SIMULATE_DROP_BEFORE_REPLACE_PROPERTY`=true)" +
+          s" AS SELECT id, data FROM source")
+      },
+      errorClass = "TABLE_OR_VIEW_NOT_FOUND",
+      parameters = Map("relationName" -> "`replaced`"))
   }
 
   test("CreateTableAsSelect: use v2 plan and session catalog when provider is v2") {
@@ -657,12 +684,13 @@ class DataSourceV2SQLSuiteV1Filter extends DataSourceV2SQLSuite with AlterTableT
     checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), spark.table("source"))
 
     // run a second CTAS query that should fail
-    val exc = intercept[TableAlreadyExistsException] {
-      spark.sql(
-        "CREATE TABLE testcat.table_name USING bar AS SELECT id, data, id as id2 FROM source2")
-    }
-
-    assert(exc.getMessage.contains("table_name"))
+    checkError(
+      exception = intercept[TableAlreadyExistsException] {
+        spark.sql("CREATE TABLE testcat.table_name USING bar AS " +
+          "SELECT id, data, id as id2 FROM source2")
+      },
+      errorClass = "TABLE_OR_VIEW_ALREADY_EXISTS",
+      parameters = Map("relationName" -> "`table_name`"))
 
     // table should not have changed
     val table2 = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
@@ -994,12 +1022,16 @@ class DataSourceV2SQLSuiteV1Filter extends DataSourceV2SQLSuite with AlterTableT
       sql("USE testcat.ns1.ns2")
       check("tbl")
 
-      assertAnalysisErrorClass(
-        s"SELECT ns1.ns2.ns3.tbl.id from $t",
-        "UNRESOLVED_COLUMN.WITH_SUGGESTION",
-        Map(
+      checkError(
+        exception = analysisException(s"SELECT ns1.ns2.ns3.tbl.id from $t"),
+        errorClass = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
+        parameters = Map(
           "objectName" -> "`ns1`.`ns2`.`ns3`.`tbl`.`id`",
-          "proposal" -> "`testcat`.`ns1`.`ns2`.`tbl`.`id`, `testcat`.`ns1`.`ns2`.`tbl`.`point`"))
+          "proposal" -> "`testcat`.`ns1`.`ns2`.`tbl`.`id`, `testcat`.`ns1`.`ns2`.`tbl`.`point`"),
+        context = ExpectedContext(
+          fragment = "ns1.ns2.ns3.tbl.id",
+          start = 7,
+          stop = 24))
     }
   }
 
@@ -1039,44 +1071,82 @@ class DataSourceV2SQLSuiteV1Filter extends DataSourceV2SQLSuite with AlterTableT
   }
 
   test("ShowViews: using v1 catalog, db name with multipartIdentifier ('a.b') is not allowed.") {
-    val exception = intercept[AnalysisException] {
-      sql("SHOW VIEWS FROM a.b")
-    }
-
-    assert(exception.getMessage.contains(
-      "Nested databases are not supported by v1 session catalog: a.b"))
+    checkError(
+      exception = intercept[AnalysisException] {
+        sql("SHOW VIEWS FROM a.b")
+      },
+      errorClass = "_LEGACY_ERROR_TEMP_1126",
+      parameters = Map("catalog" -> "a.b"))
   }
 
   test("ShowViews: using v2 catalog, command not supported.") {
-    val exception = intercept[AnalysisException] {
-      sql("SHOW VIEWS FROM testcat")
-    }
-
-    assert(exception.getMessage.contains("Catalog testcat does not support views"))
+    checkError(
+      exception = intercept[AnalysisException] {
+        sql("SHOW VIEWS FROM testcat")
+      },
+      errorClass = "_LEGACY_ERROR_TEMP_1184",
+      parameters = Map("plugin" -> "testcat", "ability" -> "views"))
   }
 
   test("create/replace/alter table - reserved properties") {
     import TableCatalog._
+    val keyParameters = Map[String, String](
+      PROP_PROVIDER -> "please use the USING clause to specify it",
+      PROP_LOCATION -> "please use the LOCATION clause to specify it",
+      PROP_OWNER -> "it will be set to the current user",
+      PROP_EXTERNAL -> "please use CREATE EXTERNAL TABLE"
+    )
     withSQLConf((SQLConf.LEGACY_PROPERTY_NON_RESERVED.key, "false")) {
       CatalogV2Util.TABLE_RESERVED_PROPERTIES.filterNot(_ == PROP_COMMENT).foreach { key =>
         Seq("OPTIONS", "TBLPROPERTIES").foreach { clause =>
           Seq("CREATE", "REPLACE").foreach { action =>
-            val e = intercept[ParseException] {
-              sql(s"$action TABLE testcat.reservedTest (key int) USING foo $clause ('$key'='bar')")
-            }
-            assert(e.getMessage.contains(s"$key is a reserved table property"))
+            val sqlText = s"$action TABLE testcat.reservedTest (key int) " +
+              s"USING foo $clause ('$key'='bar')"
+            checkError(
+              exception = intercept[ParseException] {
+                sql(sqlText)
+              },
+              errorClass = "UNSUPPORTED_FEATURE.SET_TABLE_PROPERTY",
+              parameters = Map(
+                "property" -> key,
+                "msg" -> keyParameters.getOrElse(
+                  key, "please remove it from the TBLPROPERTIES list.")),
+              context = ExpectedContext(
+                fragment = sqlText,
+                start = 0,
+                stop = 58 + key.length + clause.length + action.length))
           }
         }
 
-        val e1 = intercept[ParseException] {
-          sql(s"ALTER TABLE testcat.reservedTest SET TBLPROPERTIES ('$key'='bar')")
-        }
-        assert(e1.getMessage.contains(s"$key is a reserved table property"))
-
-        val e2 = intercept[ParseException] {
-          sql(s"ALTER TABLE testcat.reservedTest UNSET TBLPROPERTIES ('$key')")
-        }
-        assert(e2.getMessage.contains(s"$key is a reserved table property"))
+        val sql1 = s"ALTER TABLE testcat.reservedTest SET TBLPROPERTIES ('$key'='bar')"
+        checkError(
+          exception = intercept[ParseException] {
+            sql(sql1)
+          },
+          errorClass = "UNSUPPORTED_FEATURE.SET_TABLE_PROPERTY",
+          parameters = Map(
+            "property" -> key,
+            "msg" -> keyParameters.getOrElse(
+              key, "please remove it from the TBLPROPERTIES list.")),
+          context = ExpectedContext(
+            fragment = sql1,
+            start = 0,
+            stop = 60 + key.length))
+
+        val sql2 = s"ALTER TABLE testcat.reservedTest UNSET TBLPROPERTIES ('$key')"
+        checkError(
+          exception = intercept[ParseException] {
+            sql(sql2)
+          },
+          errorClass = "UNSUPPORTED_FEATURE.SET_TABLE_PROPERTY",
+          parameters = Map(
+            "property" -> key,
+            "msg" -> keyParameters.getOrElse(
+              key, "please remove it from the TBLPROPERTIES list.")),
+          context = ExpectedContext(
+            fragment = sql2,
+            start = 0,
+            stop = 56 + key.length))
       }
     }
     withSQLConf((SQLConf.LEGACY_PROPERTY_NON_RESERVED.key, "true")) {
@@ -1107,17 +1177,31 @@ class DataSourceV2SQLSuiteV1Filter extends DataSourceV2SQLSuite with AlterTableT
       withSQLConf((SQLConf.LEGACY_PROPERTY_NON_RESERVED.key, conf)) {
         withTable("testcat.reservedTest") {
           Seq("CREATE", "REPLACE").foreach { action =>
-            val e1 = intercept[ParseException] {
-              sql(s"$action TABLE testcat.reservedTest USING foo LOCATION 'foo' OPTIONS" +
-                s" ('path'='bar')")
-            }
-            assert(e1.getMessage.contains(s"Duplicated table paths found: 'foo' and 'bar'"))
-
-            val e2 = intercept[ParseException] {
-              sql(s"$action TABLE testcat.reservedTest USING foo OPTIONS" +
-                s" ('path'='foo', 'PaTh'='bar')")
-            }
-            assert(e2.getMessage.contains(s"Duplicated table paths found: 'foo' and 'bar'"))
+            val sql1 = s"$action TABLE testcat.reservedTest USING foo LOCATION 'foo' OPTIONS" +
+              s" ('path'='bar')"
+            checkError(
+              exception = intercept[ParseException] {
+                sql(sql1)
+              },
+              errorClass = "_LEGACY_ERROR_TEMP_0032",
+              parameters = Map("pathOne" -> "foo", "pathTwo" -> "bar"),
+              context = ExpectedContext(
+                fragment = sql1,
+                start = 0,
+                stop = 74 + action.length))
+
+            val sql2 = s"$action TABLE testcat.reservedTest USING foo OPTIONS" +
+              s" ('path'='foo', 'PaTh'='bar')"
+            checkError(
+              exception = intercept[ParseException] {
+                sql(sql2)
+              },
+              errorClass = "_LEGACY_ERROR_TEMP_0032",
+              parameters = Map("pathOne" -> "foo", "pathTwo" -> "bar"),
+              context = ExpectedContext(
+                fragment = sql2,
+                start = 0,
+                stop = 73 + action.length))
 
             sql(s"$action TABLE testcat.reservedTest USING foo LOCATION 'foo' TBLPROPERTIES" +
               s" ('path'='bar', 'Path'='noop')")
@@ -1288,10 +1372,12 @@ class DataSourceV2SQLSuiteV1Filter extends DataSourceV2SQLSuite with AlterTableT
   test("tableCreation: partition column case sensitive resolution") {
     def checkFailure(statement: String): Unit = {
       withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
-        val e = intercept[AnalysisException] {
-          sql(statement)
-        }
-        assert(e.getMessage.contains("Couldn't find column"))
+        checkError(
+          exception = intercept[AnalysisException] {
+            sql(statement)
+          },
+          errorClass = null,
+          parameters = Map.empty)
       }
     }
 
@@ -1307,22 +1393,33 @@ class DataSourceV2SQLSuiteV1Filter extends DataSourceV2SQLSuite with AlterTableT
     val errorMsg = "Found duplicate column(s) in the table definition of"
     Seq((true, ("a", "a")), (false, ("aA", "Aa"))).foreach { case (caseSensitive, (c0, c1)) =>
       withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
-        assertAnalysisError(
-          s"CREATE TABLE t ($c0 INT, $c1 INT) USING $v2Source",
-          s"$errorMsg default.t"
-        )
-        assertAnalysisError(
-          s"CREATE TABLE testcat.t ($c0 INT, $c1 INT) USING $v2Source",
-          s"$errorMsg t"
-        )
-        assertAnalysisError(
-          s"CREATE OR REPLACE TABLE t ($c0 INT, $c1 INT) USING $v2Source",
-          s"$errorMsg default.t"
-        )
-        assertAnalysisError(
-          s"CREATE OR REPLACE TABLE testcat.t ($c0 INT, $c1 INT) USING $v2Source",
-          s"$errorMsg t"
-        )
+        checkError(
+          exception = analysisException(s"CREATE TABLE t ($c0 INT, $c1 INT) USING $v2Source"),
+          errorClass = "_LEGACY_ERROR_TEMP_1233",
+          parameters = Map(
+            "colType" -> "in the table definition of default.t",
+            "duplicateCol" -> s"`${c0.toLowerCase(Locale.ROOT)}`"))
+        checkError(
+          exception = analysisException(
+            s"CREATE TABLE testcat.t ($c0 INT, $c1 INT) USING $v2Source"),
+          errorClass = "_LEGACY_ERROR_TEMP_1233",
+          parameters = Map(
+            "colType" -> "in the table definition of t",
+            "duplicateCol" -> s"`${c0.toLowerCase(Locale.ROOT)}`"))
+        checkError(
+          exception = analysisException(
+            s"CREATE OR REPLACE TABLE t ($c0 INT, $c1 INT) USING $v2Source"),
+          errorClass = "_LEGACY_ERROR_TEMP_1233",
+          parameters = Map(
+            "colType" -> "in the table definition of default.t",
+            "duplicateCol" -> s"`${c0.toLowerCase(Locale.ROOT)}`"))
+        checkError(
+          exception = analysisException(
+            s"CREATE OR REPLACE TABLE testcat.t ($c0 INT, $c1 INT) USING $v2Source"),
+          errorClass = "_LEGACY_ERROR_TEMP_1233",
+          parameters = Map(
+            "colType" -> "in the table definition of t",
+            "duplicateCol" -> s"`${c0.toLowerCase(Locale.ROOT)}`"))
       }
     }
   }
@@ -1331,46 +1428,61 @@ class DataSourceV2SQLSuiteV1Filter extends DataSourceV2SQLSuite with AlterTableT
     val errorMsg = "Found duplicate column(s) in the table definition of"
     Seq((true, ("a", "a")), (false, ("aA", "Aa"))).foreach { case (caseSensitive, (c0, c1)) =>
       withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
-        assertAnalysisError(
-          s"CREATE TABLE t (d struct<$c0: INT, $c1: INT>) USING $v2Source",
-          s"$errorMsg default.t"
-        )
-        assertAnalysisError(
-          s"CREATE TABLE testcat.t (d struct<$c0: INT, $c1: INT>) USING $v2Source",
-          s"$errorMsg t"
-        )
-        assertAnalysisError(
-          s"CREATE OR REPLACE TABLE t (d struct<$c0: INT, $c1: INT>) USING $v2Source",
-          s"$errorMsg default.t"
-        )
-        assertAnalysisError(
-          s"CREATE OR REPLACE TABLE testcat.t (d struct<$c0: INT, $c1: INT>) USING $v2Source",
-          s"$errorMsg t"
+        checkError(
+          exception = analysisException(
+            s"CREATE TABLE t (d struct<$c0: INT, $c1: INT>) USING $v2Source"),
+          errorClass = "_LEGACY_ERROR_TEMP_1233",
+          parameters = Map(
+            "colType" -> "in the table definition of default.t",
+            "duplicateCol" -> s"`d.${c0.toLowerCase(Locale.ROOT)}`"
+          )
         )
+        checkError(
+          exception = analysisException(
+            s"CREATE TABLE testcat.t (d struct<$c0: INT, $c1: INT>) USING $v2Source"),
+          errorClass = "_LEGACY_ERROR_TEMP_1233",
+          parameters = Map(
+            "colType" -> "in the table definition of t",
+            "duplicateCol" -> s"`d.${c0.toLowerCase(Locale.ROOT)}`"))
+        checkError(
+          exception = analysisException(
+            s"CREATE OR REPLACE TABLE t (d struct<$c0: INT, $c1: INT>) USING $v2Source"),
+          errorClass = "_LEGACY_ERROR_TEMP_1233",
+          parameters = Map(
+            "colType" -> "in the table definition of default.t",
+            "duplicateCol" -> s"`d.${c0.toLowerCase(Locale.ROOT)}`"))
+        checkError(
+          exception = analysisException(
+            s"CREATE OR REPLACE TABLE testcat.t (d struct<$c0: INT, $c1: INT>) USING $v2Source"),
+          errorClass = "_LEGACY_ERROR_TEMP_1233",
+          parameters = Map(
+            "colType" -> "in the table definition of t",
+            "duplicateCol" -> s"`d.${c0.toLowerCase(Locale.ROOT)}`"))
       }
     }
   }
 
   test("tableCreation: bucket column names not in table definition") {
-    val errorMsg = "Couldn't find column c in"
-    assertAnalysisError(
-      s"CREATE TABLE tbl (a int, b string) USING $v2Source CLUSTERED BY (c) INTO 4 BUCKETS",
-      errorMsg
-    )
-    assertAnalysisError(
-      s"CREATE TABLE testcat.tbl (a int, b string) USING $v2Source CLUSTERED BY (c) INTO 4 BUCKETS",
-      errorMsg
-    )
-    assertAnalysisError(
-      s"CREATE OR REPLACE TABLE tbl (a int, b string) USING $v2Source " +
-        "CLUSTERED BY (c) INTO 4 BUCKETS",
-      errorMsg
-    )
-    assertAnalysisError(
-      s"CREATE OR REPLACE TABLE testcat.tbl (a int, b string) USING $v2Source " +
-        "CLUSTERED BY (c) INTO 4 BUCKETS",
-      errorMsg
-    )
+    checkError(
+      exception = analysisException(
+        s"CREATE TABLE tbl (a int, b string) USING $v2Source CLUSTERED BY (c) INTO 4 BUCKETS"),
+      errorClass = null,
+      parameters = Map.empty)
+    checkError(
+      exception = analysisException(s"CREATE TABLE testcat.tbl (a int, b string) " +
+        s"USING $v2Source CLUSTERED BY (c) INTO 4 BUCKETS"),
+      errorClass = null,
+      parameters = Map.empty)
+    checkError(
+      exception = analysisException(s"CREATE OR REPLACE TABLE tbl (a int, b string) " +
+        s"USING $v2Source CLUSTERED BY (c) INTO 4 BUCKETS"),
+      errorClass = null,
+      parameters = Map.empty)
+    checkError(
+      exception = analysisException(s"CREATE OR REPLACE TABLE testcat.tbl (a int, b string) " +
+        s"USING $v2Source CLUSTERED BY (c) INTO 4 BUCKETS"),
+      errorClass = null,
+      parameters = Map.empty)
   }
 
   test("tableCreation: bucket column name containing dot") {
@@ -1392,25 +1504,28 @@ class DataSourceV2SQLSuiteV1Filter extends DataSourceV2SQLSuite with AlterTableT
   }
 
   test("tableCreation: column repeated in partition columns") {
-    val errorMsg = "Found duplicate column(s) in the partitioning"
     Seq((true, ("a", "a")), (false, ("aA", "Aa"))).foreach { case (caseSensitive, (c0, c1)) =>
       withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
-        assertAnalysisError(
-          s"CREATE TABLE t ($c0 INT) USING $v2Source PARTITIONED BY ($c0, $c1)",
-          errorMsg
-        )
-        assertAnalysisError(
-          s"CREATE TABLE testcat.t ($c0 INT) USING $v2Source PARTITIONED BY ($c0, $c1)",
-          errorMsg
-        )
-        assertAnalysisError(
-          s"CREATE OR REPLACE TABLE t ($c0 INT) USING $v2Source PARTITIONED BY ($c0, $c1)",
-          errorMsg
-        )
-        assertAnalysisError(
-          s"CREATE OR REPLACE TABLE testcat.t ($c0 INT) USING $v2Source PARTITIONED BY ($c0, $c1)",
-          errorMsg
-        )
+        checkError(
+          exception = analysisException(
+            s"CREATE TABLE t ($c0 INT) USING $v2Source PARTITIONED BY ($c0, $c1)"),
+          errorClass = null,
+          parameters = Map.empty)
+        checkError(
+          exception = analysisException(
+            s"CREATE TABLE testcat.t ($c0 INT) USING $v2Source PARTITIONED BY ($c0, $c1)"),
+          errorClass = null,
+          parameters = Map.empty)
+        checkError(
+          exception = analysisException(
+            s"CREATE OR REPLACE TABLE t ($c0 INT) USING $v2Source PARTITIONED BY ($c0, $c1)"),
+          errorClass = null,
+          parameters = Map.empty)
+        checkError(
+          exception = analysisException(s"CREATE OR REPLACE TABLE testcat.t ($c0 INT) " +
+            s"USING $v2Source PARTITIONED BY ($c0, $c1)"),
+          errorClass = null,
+          parameters = Map.empty)
       }
     }
   }
@@ -1419,26 +1534,38 @@ class DataSourceV2SQLSuiteV1Filter extends DataSourceV2SQLSuite with AlterTableT
     val errorMsg = "Found duplicate column(s) in the bucket definition"
     Seq((true, ("a", "a")), (false, ("aA", "Aa"))).foreach { case (caseSensitive, (c0, c1)) =>
       withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
-        assertAnalysisError(
-          s"CREATE TABLE t ($c0 INT) USING $v2Source " +
-            s"CLUSTERED BY ($c0, $c1) INTO 2 BUCKETS",
-          errorMsg
-        )
-        assertAnalysisError(
-          s"CREATE TABLE testcat.t ($c0 INT) USING $v2Source " +
-            s"CLUSTERED BY ($c0, $c1) INTO 2 BUCKETS",
-          errorMsg
-        )
-        assertAnalysisError(
-          s"CREATE OR REPLACE TABLE t ($c0 INT) USING $v2Source " +
-            s"CLUSTERED BY ($c0, $c1) INTO 2 BUCKETS",
-          errorMsg
-        )
-        assertAnalysisError(
-          s"CREATE OR REPLACE TABLE testcat.t ($c0 INT) USING $v2Source " +
-            s"CLUSTERED BY ($c0, $c1) INTO 2 BUCKETS",
-          errorMsg
-        )
+        checkError(
+          exception = analysisException(
+            s"CREATE TABLE t ($c0 INT) USING $v2Source " +
+              s"CLUSTERED BY ($c0, $c1) INTO 2 BUCKETS"),
+          errorClass = "_LEGACY_ERROR_TEMP_1233",
+          parameters = Map(
+            "colType" -> "in the bucket definition",
+            "duplicateCol" -> s"`${c0.toLowerCase(Locale.ROOT)}`"))
+        checkError(
+          exception = analysisException(
+            s"CREATE TABLE testcat.t ($c0 INT) USING $v2Source " +
+              s"CLUSTERED BY ($c0, $c1) INTO 2 BUCKETS"),
+          errorClass = "_LEGACY_ERROR_TEMP_1233",
+          parameters = Map(
+            "colType" -> "in the bucket definition",
+            "duplicateCol" -> s"`${c0.toLowerCase(Locale.ROOT)}`"))
+        checkError(
+          exception = analysisException(
+            s"CREATE OR REPLACE TABLE t ($c0 INT) USING $v2Source " +
+              s"CLUSTERED BY ($c0, $c1) INTO 2 BUCKETS"),
+          errorClass = "_LEGACY_ERROR_TEMP_1233",
+          parameters = Map(
+            "colType" -> "in the bucket definition",
+            "duplicateCol" -> s"`${c0.toLowerCase(Locale.ROOT)}`"))
+        checkError(
+          exception = analysisException(
+            s"CREATE OR REPLACE TABLE testcat.t ($c0 INT) USING $v2Source " +
+              s"CLUSTERED BY ($c0, $c1) INTO 2 BUCKETS"),
+          errorClass = "_LEGACY_ERROR_TEMP_1233",
+          parameters = Map(
+            "colType" -> "in the bucket definition",
+            "duplicateCol" -> s"`${c0.toLowerCase(Locale.ROOT)}`"))
       }
     }
   }
@@ -1552,12 +1679,15 @@ class DataSourceV2SQLSuiteV1Filter extends DataSourceV2SQLSuite with AlterTableT
     withTable(t) {
       sql(s"CREATE TABLE $t (id bigint, data string, p int) USING foo PARTITIONED BY (id, p)")
       sql(s"INSERT INTO $t VALUES (2L, 'a', 2), (2L, 'b', 3), (3L, 'c', 3)")
-      val exc = intercept[AnalysisException] {
-        sql(s"DELETE FROM $t WHERE id = 2 AND id = id")
-      }
-
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql(s"DELETE FROM $t WHERE id = 2 AND id = id")
+        },
+        errorClass = "_LEGACY_ERROR_TEMP_1110",
+        parameters = Map(
+          "table" -> "testcat.ns1.ns2.tbl",
+          "filters" -> "[id = 2, id = id]"))
       assert(spark.table(t).count === 3)
-      assert(exc.getMessage.contains(s"Cannot delete from table $t"))
     }
   }
 
@@ -1572,33 +1702,49 @@ class DataSourceV2SQLSuiteV1Filter extends DataSourceV2SQLSuite with AlterTableT
          """.stripMargin)
 
       // UPDATE non-existing table
-      assertAnalysisErrorClass(
-        "UPDATE dummy SET name='abc'",
-        expectedErrorClass = "TABLE_OR_VIEW_NOT_FOUND",
-        expectedErrorMessageParameters = Map("relationName" -> "`dummy`"))
+      checkError(
+        exception = analysisException("UPDATE dummy SET name='abc'"),
+        errorClass = "TABLE_OR_VIEW_NOT_FOUND",
+        parameters = Map("relationName" -> "`dummy`"),
+        context = ExpectedContext(
+          fragment = "dummy",
+          start = 7,
+          stop = 11))
 
       // UPDATE non-existing column
-      assertAnalysisErrorClass(
-        s"UPDATE $t SET dummy='abc'",
-        "UNRESOLVED_COLUMN.WITH_SUGGESTION",
-        Map(
+      checkError(
+        exception = analysisException(s"UPDATE $t SET dummy='abc'"),
+        errorClass = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
+        parameters = Map(
           "objectName" -> "`dummy`",
           "proposal" -> ("`testcat`.`ns1`.`ns2`.`tbl`.`p`, `testcat`.`ns1`.`ns2`.`tbl`.`id`, " +
-            "`testcat`.`ns1`.`ns2`.`tbl`.`age`, `testcat`.`ns1`.`ns2`.`tbl`.`name`")))
-      assertAnalysisErrorClass(
-        s"UPDATE $t SET name='abc' WHERE dummy=1",
-        "UNRESOLVED_COLUMN.WITH_SUGGESTION",
-        Map(
+            "`testcat`.`ns1`.`ns2`.`tbl`.`age`, `testcat`.`ns1`.`ns2`.`tbl`.`name`")
+        ),
+        context = ExpectedContext(
+          fragment = "dummy='abc'",
+          start = 31,
+          stop = 41))
+      checkError(
+        exception = analysisException(s"UPDATE $t SET name='abc' WHERE dummy=1"),
+        errorClass = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
+        parameters = Map(
           "objectName" -> "`dummy`",
-          "proposal" -> ("`testcat`.`ns1`.`ns2`.`tbl`.`p`, " +
-            "`testcat`.`ns1`.`ns2`.`tbl`.`id`, " +
-            "`testcat`.`ns1`.`ns2`.`tbl`.`age`, `testcat`.`ns1`.`ns2`.`tbl`.`name`")))
+          "proposal" -> ("`testcat`.`ns1`.`ns2`.`tbl`.`p`, `testcat`.`ns1`.`ns2`.`tbl`.`id`, " +
+            "`testcat`.`ns1`.`ns2`.`tbl`.`age`, `testcat`.`ns1`.`ns2`.`tbl`.`name`")
+        ),
+        context = ExpectedContext(
+          fragment = "dummy",
+          start = 48,
+          stop = 52))
 
       // UPDATE is not implemented yet.
-      val e = intercept[UnsupportedOperationException] {
-        sql(s"UPDATE $t SET name='Robert', age=32 WHERE p=1")
-      }
-      assert(e.getMessage.contains("UPDATE TABLE is not supported temporarily"))
+      checkError(
+        exception = intercept[SparkUnsupportedOperationException] {
+          sql(s"UPDATE $t SET name='Robert', age=32 WHERE p=1")
+        },
+        errorClass = "_LEGACY_ERROR_TEMP_2096",
+        parameters = Map("ddl" -> "UPDATE TABLE")
+      )
     }
   }
 
@@ -1620,73 +1766,91 @@ class DataSourceV2SQLSuiteV1Filter extends DataSourceV2SQLSuite with AlterTableT
          """.stripMargin)
 
       // MERGE INTO non-existing table
-      assertAnalysisErrorClass(
-        s"""
-           |MERGE INTO testcat.ns1.ns2.dummy AS target
-           |USING testcat.ns1.ns2.source AS source
-           |ON target.id = source.id
-           |WHEN MATCHED AND (target.age < 10) THEN DELETE
-           |WHEN MATCHED AND (target.age > 10) THEN UPDATE SET *
-           |WHEN NOT MATCHED AND (target.col2='insert')
-           |THEN INSERT *
-         """.stripMargin,
-        expectedErrorClass = "TABLE_OR_VIEW_NOT_FOUND",
-        expectedErrorMessageParameters = Map("relationName" -> "`testcat`.`ns1`.`ns2`.`dummy`"))
+      checkError(
+        exception = analysisException(
+          s"""
+             |MERGE INTO testcat.ns1.ns2.dummy AS target
+             |USING testcat.ns1.ns2.source AS source
+             |ON target.id = source.id
+             |WHEN MATCHED AND (target.age < 10) THEN DELETE
+             |WHEN MATCHED AND (target.age > 10) THEN UPDATE SET *
+             |WHEN NOT MATCHED AND (target.col2='insert')
+             |THEN INSERT *
+           """.stripMargin),
+        errorClass = "TABLE_OR_VIEW_NOT_FOUND",
+        parameters = Map("relationName" -> "`testcat`.`ns1`.`ns2`.`dummy`"),
+        context = ExpectedContext(
+          fragment = "testcat.ns1.ns2.dummy",
+          start = 12,
+          stop = 32)
+      )
 
       // USING non-existing table
-      assertAnalysisErrorClass(
-        s"""
-           |MERGE INTO testcat.ns1.ns2.target AS target
-           |USING testcat.ns1.ns2.dummy AS source
-           |ON target.id = source.id
-           |WHEN MATCHED AND (target.age < 10) THEN DELETE
-           |WHEN MATCHED AND (target.age > 10) THEN UPDATE SET *
-           |WHEN NOT MATCHED AND (target.col2='insert')
-           |THEN INSERT *
-         """.stripMargin,
-        expectedErrorClass = "TABLE_OR_VIEW_NOT_FOUND",
-        expectedErrorMessageParameters = Map("relationName" -> "`testcat`.`ns1`.`ns2`.`dummy`"))
-
+      checkError(
+        exception = analysisException(
+          s"""
+             |MERGE INTO testcat.ns1.ns2.target AS target
+             |USING testcat.ns1.ns2.dummy AS source
+             |ON target.id = source.id
+             |WHEN MATCHED AND (target.age < 10) THEN DELETE
+             |WHEN MATCHED AND (target.age > 10) THEN UPDATE SET *
+             |WHEN NOT MATCHED AND (target.col2='insert')
+             |THEN INSERT *
+           """.stripMargin),
+        errorClass = "TABLE_OR_VIEW_NOT_FOUND",
+        parameters = Map("relationName" -> "`testcat`.`ns1`.`ns2`.`dummy`"),
+        context = ExpectedContext(
+          fragment = "testcat.ns1.ns2.dummy",
+          start = 51,
+          stop = 71))
 
       // UPDATE non-existing column
-      assertAnalysisError(
-        s"""
-           |MERGE INTO testcat.ns1.ns2.target AS target
+      val sql1 =
+        s"""MERGE INTO testcat.ns1.ns2.target AS target
            |USING testcat.ns1.ns2.source AS source
            |ON target.id = source.id
            |WHEN MATCHED AND (target.age < 10) THEN DELETE
            |WHEN MATCHED AND (target.age > 10) THEN UPDATE SET target.dummy = source.age
            |WHEN NOT MATCHED AND (target.col2='insert')
-           |THEN INSERT *
-         """.stripMargin,
-        "cannot resolve")
+           |THEN INSERT *""".stripMargin
+      checkError(
+        exception = analysisException(sql1),
+        errorClass = "_LEGACY_ERROR_TEMP_2309",
+        parameters = Map(
+          "sqlExpr" -> "target.dummy",
+          "cols" -> "target.age, target.id, target.name, target.p"),
+        context = ExpectedContext("target.dummy = source.age", 206, 230))
 
       // UPDATE using non-existing column
-      assertAnalysisError(
-        s"""
-           |MERGE INTO testcat.ns1.ns2.target AS target
-           |USING testcat.ns1.ns2.source AS source
-           |ON target.id = source.id
-           |WHEN MATCHED AND (target.age < 10) THEN DELETE
-           |WHEN MATCHED AND (target.age > 10) THEN UPDATE SET target.age = source.dummy
-           |WHEN NOT MATCHED AND (target.col2='insert')
-           |THEN INSERT *
-         """.stripMargin,
-        "cannot resolve")
-
-      // MERGE INTO is not implemented yet.
-      val e = intercept[UnsupportedOperationException] {
-        sql(
-          s"""
-             |MERGE INTO testcat.ns1.ns2.target AS target
+      checkError(
+        exception = analysisException(
+          s"""MERGE INTO testcat.ns1.ns2.target AS target
              |USING testcat.ns1.ns2.source AS source
              |ON target.id = source.id
-             |WHEN MATCHED AND (target.p < 0) THEN DELETE
-             |WHEN MATCHED AND (target.p > 0) THEN UPDATE SET *
-             |WHEN NOT MATCHED THEN INSERT *
-           """.stripMargin)
-      }
-      assert(e.getMessage.contains("MERGE INTO TABLE is not supported temporarily"))
+             |WHEN MATCHED AND (target.age < 10) THEN DELETE
+             |WHEN MATCHED AND (target.age > 10) THEN UPDATE SET target.age = source.dummy
+             |WHEN NOT MATCHED AND (target.col2='insert')
+             |THEN INSERT *""".stripMargin),
+        errorClass = "_LEGACY_ERROR_TEMP_2309",
+        parameters = Map(
+          "sqlExpr" -> "source.dummy",
+          "cols" -> ("target.age, source.age, target.id, source.id, " +
+            "target.name, source.name, target.p, source.p")),
+        context = ExpectedContext("source.dummy", 219, 230))
+
+      // MERGE INTO is not implemented yet.
+      checkError(
+        exception = intercept[SparkUnsupportedOperationException] {
+          sql(
+            s"""MERGE INTO testcat.ns1.ns2.target AS target
+               |USING testcat.ns1.ns2.source AS source
+               |ON target.id = source.id
+               |WHEN MATCHED AND (target.p < 0) THEN DELETE
+               |WHEN MATCHED AND (target.p > 0) THEN UPDATE SET *
+               |WHEN NOT MATCHED THEN INSERT *""".stripMargin)
+        },
+        errorClass = "_LEGACY_ERROR_TEMP_2096",
+        parameters = Map("ddl" -> "MERGE INTO TABLE"))
     }
   }
 
@@ -1694,12 +1858,12 @@ class DataSourceV2SQLSuiteV1Filter extends DataSourceV2SQLSuite with AlterTableT
     withTable("testcat.ns1.new") {
       sql("CREATE TABLE testcat.ns1.ns2.old USING foo AS SELECT id, data FROM source")
       checkAnswer(sql("SHOW TABLES FROM testcat.ns1.ns2"), Seq(Row("ns1.ns2", "old", false)))
-
-      val e = intercept[AnalysisException] {
-        sql("ALTER VIEW testcat.ns1.ns2.old RENAME TO ns1.new")
-      }
-      assert(e.getMessage.contains(
-        "Cannot rename a table with ALTER VIEW. Please use ALTER TABLE instead"))
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql("ALTER VIEW testcat.ns1.ns2.old RENAME TO ns1.new")
+        },
+        errorClass = "_LEGACY_ERROR_TEMP_1123",
+        parameters = Map.empty)
     }
   }
 
@@ -1785,10 +1949,12 @@ class DataSourceV2SQLSuiteV1Filter extends DataSourceV2SQLSuite with AlterTableT
 
   test("CREATE VIEW") {
     val v = "testcat.ns1.ns2.v"
-    val e = intercept[AnalysisException] {
-      sql(s"CREATE VIEW $v AS SELECT 1")
-    }
-    assert(e.message.contains("Catalog testcat does not support views"))
+    checkError(
+      exception = intercept[AnalysisException] {
+        sql(s"CREATE VIEW $v AS SELECT 1")
+      },
+      errorClass = "_LEGACY_ERROR_TEMP_1184",
+      parameters = Map("plugin" -> "testcat", "ability" -> "views"))
   }
 
   test("global temp view should not be masked by v2 catalog") {
@@ -1819,13 +1985,14 @@ class DataSourceV2SQLSuiteV1Filter extends DataSourceV2SQLSuite with AlterTableT
   test("SPARK-30104: v2 catalog named global_temp will be masked") {
     val globalTempDB = spark.conf.get(StaticSQLConf.GLOBAL_TEMP_DATABASE)
     spark.conf.set(s"spark.sql.catalog.$globalTempDB", classOf[InMemoryTableCatalog].getName)
-
-    val e = intercept[AnalysisException] {
-      // Since the following multi-part name starts with `globalTempDB`, it is resolved to
-      // the session catalog, not the `global_temp` v2 catalog.
-      sql(s"CREATE TABLE $globalTempDB.ns1.ns2.tbl (id bigint, data string) USING json")
-    }
-    assert(e.message.contains("requires a single-part namespace"))
+    checkError(
+      exception = intercept[AnalysisException] {
+        // Since the following multi-part name starts with `globalTempDB`, it is resolved to
+        // the session catalog, not the `global_temp` v2 catalog.
+        sql(s"CREATE TABLE $globalTempDB.ns1.ns2.tbl (id bigint, data string) USING json")
+      },
+      errorClass = "_LEGACY_ERROR_TEMP_1117",
+      parameters = Map("sessionCatalog" -> "spark_catalog", "ns" -> "[global_temp, ns1, ns2]"))
   }
 
   test("table name same as catalog can be used") {
@@ -1856,8 +2023,10 @@ class DataSourceV2SQLSuiteV1Filter extends DataSourceV2SQLSuite with AlterTableT
         val t = "spark_catalog.t"
 
         def verify(sql: String): Unit = {
-          val e = intercept[AnalysisException](spark.sql(sql))
-          assert(e.getMessage.contains("requires a single-part namespace"))
+          checkError(
+            exception = intercept[AnalysisException](spark.sql(sql)),
+            errorClass = "_LEGACY_ERROR_TEMP_1117",
+            parameters = Map("sessionCatalog" -> "spark_catalog", "ns" -> "[]"))
         }
 
         verify(s"select * from $t")
@@ -1929,10 +2098,12 @@ class DataSourceV2SQLSuiteV1Filter extends DataSourceV2SQLSuite with AlterTableT
     withTempView("t") {
       spark.range(10).createTempView("t")
       withView(s"$sessionCatalogName.default.v") {
-        val e = intercept[AnalysisException] {
-          sql(s"CREATE VIEW $sessionCatalogName.default.v AS SELECT * FROM t")
-        }
-        assert(e.message.contains("referencing a temporary view"))
+        checkError(
+          exception = intercept[AnalysisException] {
+            sql(s"CREATE VIEW $sessionCatalogName.default.v AS SELECT * FROM t")
+          },
+          errorClass = "_LEGACY_ERROR_TEMP_1283",
+          parameters = Map("name" -> "`spark_catalog`.`default`.`v`", "nameParts" -> "t"))
       }
     }
 
@@ -1956,14 +2127,21 @@ class DataSourceV2SQLSuiteV1Filter extends DataSourceV2SQLSuite with AlterTableT
     checkNamespaceComment("ns", "minor revision")
     checkNamespaceComment("ns", null)
     checkNamespaceComment("ns", "NULL")
-    intercept[AnalysisException](sql("COMMENT ON NAMESPACE abc IS NULL"))
+
+    checkError(
+      exception = intercept[AnalysisException](sql("COMMENT ON NAMESPACE abc IS NULL")),
+      errorClass = "SCHEMA_NOT_FOUND",
+      parameters = Map("schemaName" -> "`abc`"))
 
     // V2 non-session catalog is used.
     sql("CREATE NAMESPACE testcat.ns1")
     checkNamespaceComment("testcat.ns1", "minor revision")
     checkNamespaceComment("testcat.ns1", null)
     checkNamespaceComment("testcat.ns1", "NULL")
-    intercept[AnalysisException](sql("COMMENT ON NAMESPACE testcat.abc IS NULL"))
+    checkError(
+      exception = intercept[AnalysisException](sql("COMMENT ON NAMESPACE testcat.abc IS NULL")),
+      errorClass = "SCHEMA_NOT_FOUND",
+      parameters = Map("schemaName" -> "`abc`"))
   }
 
   private def checkNamespaceComment(namespace: String, comment: String): Unit = {
@@ -1985,7 +2163,12 @@ class DataSourceV2SQLSuiteV1Filter extends DataSourceV2SQLSuite with AlterTableT
       checkTableComment("t", null)
       checkTableComment("t", "NULL")
     }
-    intercept[AnalysisException](sql("COMMENT ON TABLE abc IS NULL"))
+    val sql1 = "COMMENT ON TABLE abc IS NULL"
+    checkError(
+      exception = intercept[AnalysisException](sql(sql1)),
+      errorClass = "TABLE_OR_VIEW_NOT_FOUND",
+      parameters = Map("relationName" -> "`abc`"),
+      context = ExpectedContext(fragment = "abc", start = 17, stop = 19))
 
     // V2 non-session catalog is used.
     withTable("testcat.ns1.ns2.t") {
@@ -1994,15 +2177,25 @@ class DataSourceV2SQLSuiteV1Filter extends DataSourceV2SQLSuite with AlterTableT
       checkTableComment("testcat.ns1.ns2.t", null)
       checkTableComment("testcat.ns1.ns2.t", "NULL")
     }
-    intercept[AnalysisException](sql("COMMENT ON TABLE testcat.abc IS NULL"))
+    val sql2 = "COMMENT ON TABLE testcat.abc IS NULL"
+    checkError(
+      exception = intercept[AnalysisException](sql(sql2)),
+      errorClass = "TABLE_OR_VIEW_NOT_FOUND",
+      parameters = Map("relationName" -> "`testcat`.`abc`"),
+      context = ExpectedContext(fragment = "testcat.abc", start = 17, stop = 27))
 
     val globalTempDB = spark.conf.get(StaticSQLConf.GLOBAL_TEMP_DATABASE)
     spark.conf.set(s"spark.sql.catalog.$globalTempDB", classOf[InMemoryTableCatalog].getName)
     withTempView("v") {
       sql("create global temp view v as select 1")
-      val e = intercept[AnalysisException](sql("COMMENT ON TABLE global_temp.v IS NULL"))
-      assert(e.getMessage.contains(
-        "global_temp.v is a temp view. 'COMMENT ON TABLE' expects a table"))
+      checkError(
+        exception = intercept[AnalysisException](sql("COMMENT ON TABLE global_temp.v IS NULL")),
+        errorClass = "_LEGACY_ERROR_TEMP_1013",
+        parameters = Map(
+          "nameParts" -> "global_temp.v",
+          "viewStr" -> "temp view",
+          "cmd" -> "COMMENT ON TABLE", "hintStr" -> ""),
+        context = ExpectedContext(fragment = "global_temp.v", start = 17, stop = 29))
     }
   }
 
@@ -2157,10 +2350,12 @@ class DataSourceV2SQLSuiteV1Filter extends DataSourceV2SQLSuite with AlterTableT
     sql("SET CATALOG testcat2")
     assert(catalogManager.currentCatalog.name() == "testcat2")
 
-    val errMsg = intercept[CatalogNotFoundException] {
-      sql("SET CATALOG not_exist_catalog")
-    }.getMessage
-    assert(errMsg.contains("Catalog 'not_exist_catalog' plugin class not found"))
+    checkError(
+      exception = intercept[CatalogNotFoundException] {
+        sql("SET CATALOG not_exist_catalog")
+      },
+      errorClass = null,
+      parameters = Map.empty)
   }
 
   test("SPARK-35973: ShowCatalogs") {
@@ -2188,15 +2383,33 @@ class DataSourceV2SQLSuiteV1Filter extends DataSourceV2SQLSuite with AlterTableT
     val t = "testcat.tbl"
     withTable(t) {
       sql(s"CREATE TABLE $t (id bigint, data string COMMENT 'hello') USING foo")
-      val e1 = intercept[AnalysisException] {
-        sql(s"CREATE index i1 ON $t(non_exist)")
-      }
-      assert(e1.getMessage.contains(s"Missing field non_exist in table $t"))
-
-      val e2 = intercept[AnalysisException] {
-        sql(s"CREATE index i1 ON $t(id)")
-      }
-      assert(e2.getMessage.contains(s"CreateIndex is not supported in this table $t."))
+      val sql1 = s"CREATE index i1 ON $t(non_exist)"
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql(sql1)
+        },
+        errorClass = "_LEGACY_ERROR_TEMP_1331",
+        parameters = Map(
+          "fieldName" -> "non_exist",
+          "table" -> "testcat.tbl",
+          "schema" ->
+            """root
+              | |-- id: long (nullable = true)
+              | |-- data: string (nullable = true)
+              |""".stripMargin),
+        context = ExpectedContext(
+          fragment = sql1,
+          start = 0,
+          stop = 40))
+
+      val sql2 = s"CREATE index i1 ON $t(id)"
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql(sql2)
+        },
+        errorClass = "_LEGACY_ERROR_TEMP_1332",
+        parameters = Map(
+          "errorMessage" -> "CreateIndex is not supported in this table testcat.tbl."))
     }
   }
 
@@ -2327,69 +2540,158 @@ class DataSourceV2SQLSuiteV1Filter extends DataSourceV2SQLSuite with AlterTableT
       assert(sql("SELECT * FROM t TIMESTAMP AS OF (SELECT (SELECT make_date(2021, 1, 29)))").collect
         === Array(Row(7), Row(8)))
 
-      val e1 = intercept[AnalysisException](
-        sql("SELECT * FROM t TIMESTAMP AS OF INTERVAL 1 DAY").collect()
-      )
-      assert(e1.message.contains("is not a valid timestamp expression for time travel"))
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql("SELECT * FROM t TIMESTAMP AS OF INTERVAL 1 DAY").collect()
+        },
+        errorClass = "_LEGACY_ERROR_TEMP_1335",
+        parameters = Map("expr" -> "INTERVAL '1' DAY"))
 
-      val e2 = intercept[AnalysisException](
-        sql("SELECT * FROM t TIMESTAMP AS OF 'abc'").collect()
-      )
-      assert(e2.message.contains("is not a valid timestamp expression for time travel"))
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql("SELECT * FROM t TIMESTAMP AS OF 'abc'").collect()
+        },
+        errorClass = "_LEGACY_ERROR_TEMP_1335",
+        parameters = Map("expr" -> "'abc'"))
 
-      val e3 = intercept[AnalysisException](
-        sql("SELECT * FROM t TIMESTAMP AS OF current_user()").collect()
-      )
-      assert(e3.message.contains("is not a valid timestamp expression for time travel"))
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql("SELECT * FROM t TIMESTAMP AS OF current_user()").collect()
+        },
+        errorClass = "_LEGACY_ERROR_TEMP_1335",
+        parameters = Map("expr" -> "current_user()"))
 
-      val e4 = intercept[AnalysisException](
-        sql("SELECT * FROM t TIMESTAMP AS OF CAST(rand() AS STRING)").collect()
-      )
-      assert(e4.message.contains("is not a valid timestamp expression for time travel"))
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql("SELECT * FROM t TIMESTAMP AS OF CAST(rand() AS STRING)").collect()
+        },
+        errorClass = "_LEGACY_ERROR_TEMP_1335",
+        parameters = Map("expr" -> "CAST(rand() AS STRING)"))
 
-    checkError(
-      exception = intercept[AnalysisException] {
-        sql("SELECT * FROM t TIMESTAMP AS OF abs(true)").collect()
-      },
-      errorClass = "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
-      sqlState = None,
-      parameters = Map(
-        "sqlExpr" -> "\"abs(true)\"",
-        "paramIndex" -> "1",
-        "inputSql" -> "\"true\"",
-        "inputType" -> "\"BOOLEAN\"",
-        "requiredType" ->
-          "(\"NUMERIC\" or \"INTERVAL DAY TO SECOND\" or \"INTERVAL YEAR TO MONTH\")"),
-      context = ExpectedContext(
-        fragment = "abs(true)",
-        start = 32,
-        stop = 40))
-
-      val e6 = intercept[AnalysisException](
-        sql("SELECT * FROM parquet.`/the/path` VERSION AS OF 1")
-      )
-      assert(e6.message.contains("Cannot time travel path-based tables"))
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql("SELECT * FROM t TIMESTAMP AS OF abs(true)").collect()
+        },
+        errorClass = "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+        sqlState = None,
+        parameters = Map(
+          "sqlExpr" -> "\"abs(true)\"",
+          "paramIndex" -> "1",
+          "inputSql" -> "\"true\"",
+          "inputType" -> "\"BOOLEAN\"",
+          "requiredType" ->
+            "(\"NUMERIC\" or \"INTERVAL DAY TO SECOND\" or \"INTERVAL YEAR TO MONTH\")"),
+        context = ExpectedContext(
+          fragment = "abs(true)",
+          start = 32,
+          stop = 40))
 
-      val e7 = intercept[AnalysisException](
-        sql("WITH x AS (SELECT 1) SELECT * FROM x VERSION AS OF 1")
-      )
-      assert(e7.message.contains("Cannot time travel subqueries from WITH clause"))
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql("SELECT * FROM parquet.`/the/path` VERSION AS OF 1")
+        },
+        errorClass = "_LEGACY_ERROR_TEMP_1336",
+        sqlState = None,
+        parameters = Map("target" -> "path-based tables"))
 
-      def checkSubqueryError(subquery: String, errMsg: String): Unit = {
-        val e1 = intercept[Exception](
-          sql(s"SELECT * FROM t TIMESTAMP AS OF ($subquery)").collect()
-        )
-        assert(e1.getMessage.contains(errMsg))
-        // Nested subquery should also report error correctly.
-        val e2 = intercept[Exception](
-          sql(s"SELECT * FROM t TIMESTAMP AS OF (SELECT ($subquery))").collect()
-        )
-        assert(e2.getMessage.contains(errMsg))
-      }
-      checkSubqueryError("SELECT 1 FROM non_exist", "TABLE_OR_VIEW_NOT_FOUND")
-      checkSubqueryError("SELECT col", "UNRESOLVED_COLUMN")
-      checkSubqueryError("SELECT 1, 2", "Scalar subquery must return only one column")
-      checkSubqueryError("SELECT * FROM VALUES (1), (2)", "MULTI_VALUE_SUBQUERY_ERROR")
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql("WITH x AS (SELECT 1) SELECT * FROM x VERSION AS OF 1")
+        },
+        errorClass = "_LEGACY_ERROR_TEMP_1336",
+        sqlState = None,
+        parameters = Map("target" -> "subqueries from WITH clause"))
+
+      val subquery1 = "SELECT 1 FROM non_exist"
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql(s"SELECT * FROM t TIMESTAMP AS OF ($subquery1)").collect()
+        },
+        errorClass = "TABLE_OR_VIEW_NOT_FOUND",
+        parameters = Map("relationName" -> "`non_exist`"),
+        ExpectedContext(
+          fragment = "non_exist",
+          start = 47,
+          stop = 55))
+      // Nested subquery should also report error correctly.
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql(s"SELECT * FROM t TIMESTAMP AS OF (SELECT ($subquery1))").collect()
+        },
+        errorClass = "TABLE_OR_VIEW_NOT_FOUND",
+        parameters = Map("relationName" -> "`non_exist`"),
+        ExpectedContext(
+          fragment = "non_exist",
+          start = 55,
+          stop = 63))
+
+      val subquery2 = "SELECT col"
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql(s"SELECT * FROM t TIMESTAMP AS OF ($subquery2)").collect()
+        },
+        errorClass = "UNRESOLVED_COLUMN.WITHOUT_SUGGESTION",
+        parameters = Map("objectName" -> "`col`"),
+        ExpectedContext(
+          fragment = "col",
+          start = 40,
+          stop = 42))
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql(s"SELECT * FROM t TIMESTAMP AS OF (SELECT ($subquery2))").collect()
+        },
+        errorClass = "UNRESOLVED_COLUMN.WITHOUT_SUGGESTION",
+        parameters = Map("objectName" -> "`col`"),
+        ExpectedContext(
+          fragment = "col",
+          start = 48,
+          stop = 50))
+
+      val subquery3 = "SELECT 1, 2"
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql(s"SELECT * FROM t TIMESTAMP AS OF ($subquery3)").collect()
+        },
+        errorClass =
+          "INVALID_SUBQUERY_EXPRESSION.SCALAR_SUBQUERY_RETURN_MORE_THAN_ONE_OUTPUT_COLUMN",
+        parameters = Map("number" -> "2"),
+        ExpectedContext(
+          fragment = "(SELECT 1, 2)",
+          start = 32,
+          stop = 44))
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql(s"SELECT * FROM t TIMESTAMP AS OF (SELECT ($subquery3))").collect()
+        },
+        errorClass =
+          "INVALID_SUBQUERY_EXPRESSION.SCALAR_SUBQUERY_RETURN_MORE_THAN_ONE_OUTPUT_COLUMN",
+        parameters = Map("number" -> "2"),
+        ExpectedContext(
+          fragment = "(SELECT 1, 2)",
+          start = 40,
+          stop = 52))
+
+      val subquery4 = "SELECT * FROM VALUES (1), (2)"
+      checkError(
+        exception = intercept[SparkException] {
+          sql(s"SELECT * FROM t TIMESTAMP AS OF ($subquery4)").collect()
+        },
+        errorClass = "MULTI_VALUE_SUBQUERY_ERROR",
+        parameters = Map.empty,
+        ExpectedContext(
+          fragment = "(SELECT * FROM VALUES (1), (2))",
+          start = 32,
+          stop = 62))
+      checkError(
+        exception = intercept[SparkException] {
+          sql(s"SELECT * FROM t TIMESTAMP AS OF (SELECT ($subquery4))").collect()
+        },
+        errorClass = "MULTI_VALUE_SUBQUERY_ERROR",
+        parameters = Map.empty,
+        ExpectedContext(
+          fragment = "(SELECT * FROM VALUES (1), (2))",
+          start = 40,
+          stop = 70))
     }
   }
 
@@ -2428,23 +2730,13 @@ class DataSourceV2SQLSuiteV1Filter extends DataSourceV2SQLSuite with AlterTableT
   }
 
   private def testNotSupportedV2Command(sqlCommand: String, sqlParams: String): Unit = {
-    val e = intercept[AnalysisException] {
-      sql(s"$sqlCommand $sqlParams")
-    }
-    assert(e.message.contains(s"$sqlCommand is not supported for v2 tables"))
-  }
-
-  private def assertAnalysisErrorClass(
-      sqlStatement: String,
-      expectedErrorClass: String,
-      expectedErrorMessageParameters: Map[String, String]): Unit = {
-    val ex = intercept[AnalysisException] {
-      sql(sqlStatement)
-    }
-    assert(ex.getErrorClass == expectedErrorClass)
-    assert(ex.messageParameters.sameElements(expectedErrorMessageParameters))
+    checkError(
+      exception = intercept[AnalysisException] {
+        sql(s"$sqlCommand $sqlParams")
+      },
+      errorClass = "_LEGACY_ERROR_TEMP_1124",
+      parameters = Map("cmd" -> sqlCommand))
   }
-
 }
 
 class DataSourceV2SQLSuiteV2Filter extends DataSourceV2SQLSuite {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org