You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2023/06/03 19:22:36 UTC

[spark] branch master updated: [SPARK-43957][SQL][TESTS] Use `checkError()` to check `Exception` in `*Insert*Suite`

This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 90ec7eaddb6 [SPARK-43957][SQL][TESTS] Use `checkError()` to check `Exception` in `*Insert*Suite`
90ec7eaddb6 is described below

commit 90ec7eaddb66b6b2fe3afb8cdb68a9cf88f714de
Author: panbingkun <pb...@gmail.com>
AuthorDate: Sat Jun 3 22:22:20 2023 +0300

    [SPARK-43957][SQL][TESTS] Use `checkError()` to check `Exception` in `*Insert*Suite`
    
    ### What changes were proposed in this pull request?
    The pr aims to use `checkError()` to check `Exception` in `*Insert*Suite`, include:
    - sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite
    - sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedUpdateAsDeleteAndInsertTableSuite
    - sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite
    - sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite
    
    #### Note:
    But this pr does not include some of these cases, which directly throw AnalysisExecution, such as:
    https://github.com/apache/spark/blob/898ad77900d887ac64800a616bd382def816eea6/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala#L505-L515
    After this PR, I will refactor these, assign them a name, and use the error framework. As these tasks are completed, all exceptions checks in `*Insert*Suite` will eventually be migrated to `checkError`.
    
    ### Why are the changes needed?
    Migration on checkError() will make the tests independent from the text of error messages.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    - Manually test.
    - Pass GA.
    
    Closes #41447 from panbingkun/check_error_for_insert_suites.
    
    Authored-by: panbingkun <pb...@gmail.com>
    Signed-off-by: Max Gekk <ma...@gmail.com>
---
 .../org/apache/spark/sql/SQLInsertTestSuite.scala  |  82 ++-
 ...ltaBasedUpdateAsDeleteAndInsertTableSuite.scala |  11 +-
 .../org/apache/spark/sql/sources/InsertSuite.scala | 570 ++++++++++++++-------
 .../org/apache/spark/sql/hive/InsertSuite.scala    |  50 +-
 4 files changed, 477 insertions(+), 236 deletions(-)

diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala
index 904980d58d6..af85e44519b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql
 
 import org.apache.spark.SparkConf
+import org.apache.spark.SparkNumberFormatException
 import org.apache.spark.sql.catalyst.expressions.Hex
 import org.apache.spark.sql.connector.catalog.InMemoryPartitionTableCatalog
 import org.apache.spark.sql.internal.SQLConf
@@ -181,16 +182,28 @@ trait SQLInsertTestSuite extends QueryTest with SQLTestUtils {
   }
 
   test("insert with column list - mismatched column list size") {
-    val msgs = Seq("Cannot write to table due to mismatched user specified column size",
-      "expected 3 columns but found")
     def test: Unit = {
       withTable("t1") {
         val cols = Seq("c1", "c2", "c3")
         createTable("t1", cols, Seq("int", "long", "string"))
-        val e1 = intercept[AnalysisException](sql(s"INSERT INTO t1 (c1, c2) values(1, 2, 3)"))
-        assert(e1.getMessage.contains(msgs(0)) || e1.getMessage.contains(msgs(1)))
-        val e2 = intercept[AnalysisException](sql(s"INSERT INTO t1 (c1, c2, c3) values(1, 2)"))
-        assert(e2.getMessage.contains(msgs(0)) || e2.getMessage.contains(msgs(1)))
+        checkError(
+          exception = intercept[AnalysisException] {
+            sql(s"INSERT INTO t1 (c1, c2) values(1, 2, 3)")
+          },
+          sqlState = None,
+          errorClass = "_LEGACY_ERROR_TEMP_1038",
+          parameters = Map("columnSize" -> "2", "outputSize" -> "3"),
+          context = ExpectedContext("values(1, 2, 3)", 24, 38)
+        )
+        checkError(
+          exception = intercept[AnalysisException] {
+            sql(s"INSERT INTO t1 (c1, c2, c3) values(1, 2)")
+          },
+          sqlState = None,
+          errorClass = "_LEGACY_ERROR_TEMP_1038",
+          parameters = Map("columnSize" -> "3", "outputSize" -> "2"),
+          context = ExpectedContext("values(1, 2)", 28, 39)
+        )
       }
     }
     withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "false") {
@@ -259,10 +272,15 @@ trait SQLInsertTestSuite extends QueryTest with SQLTestUtils {
     "checking duplicate static partition columns should respect case sensitive conf") {
     withTable("t") {
       sql(s"CREATE TABLE t(i STRING, c string) USING PARQUET PARTITIONED BY (c)")
-      val e = intercept[AnalysisException] {
-        sql("INSERT OVERWRITE t PARTITION (c='2', C='3') VALUES (1)")
-      }
-      assert(e.getMessage.contains("Found duplicate keys `c`"))
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql("INSERT OVERWRITE t PARTITION (c='2', C='3') VALUES (1)")
+        },
+        sqlState = None,
+        errorClass = "DUPLICATE_KEY",
+        parameters = Map("keyColumn" -> "`c`"),
+        context = ExpectedContext("PARTITION (c='2', C='3')", 19, 42)
+      )
     }
     // The following code is skipped for Hive because columns stored in Hive Metastore is always
     // case insensitive and we cannot create such table in Hive Metastore.
@@ -297,11 +315,19 @@ trait SQLInsertTestSuite extends QueryTest with SQLTestUtils {
         withTable("t") {
           sql("create table t(a int, b string) using parquet partitioned by (a)")
           if (shouldThrowException(policy)) {
-            val errorMsg = intercept[NumberFormatException] {
-              sql("insert into t partition(a='ansi') values('ansi')")
-            }.getMessage
-            assert(errorMsg.contains(
-              """The value 'ansi' of the type "STRING" cannot be cast to "INT""""))
+            checkError(
+              exception = intercept[SparkNumberFormatException] {
+                sql("insert into t partition(a='ansi') values('ansi')")
+              },
+              errorClass = "CAST_INVALID_INPUT",
+              parameters = Map(
+                "expression" -> "'ansi'",
+                "sourceType" -> "\"STRING\"",
+                "targetType" -> "\"INT\"",
+                "ansiConfig" -> "\"spark.sql.ansi.enabled\""
+              ),
+              context = ExpectedContext("insert into t partition(a='ansi')", 0, 32)
+            )
           } else {
             sql("insert into t partition(a='ansi') values('ansi')")
             checkAnswer(sql("select * from t"), Row("ansi", null) :: Nil)
@@ -340,8 +366,17 @@ trait SQLInsertTestSuite extends QueryTest with SQLTestUtils {
         ).foreach { query =>
           checkAnswer(sql(query), Seq(Row("a", 10, "08")))
         }
-        val e = intercept[AnalysisException](sql("alter table t drop partition(dt='8')"))
-        assert(e.getMessage.contains("PARTITIONS_NOT_FOUND"))
+        checkError(
+          exception = intercept[AnalysisException] {
+            sql("alter table t drop partition(dt='8')")
+          },
+          errorClass = "PARTITIONS_NOT_FOUND",
+          sqlState = None,
+          parameters = Map(
+            "partitionList" -> "PARTITION \\(`dt` = 8\\)",
+            "tableName" -> ".*`t`"),
+          matchPVals = true
+        )
       }
     }
 
@@ -351,8 +386,17 @@ trait SQLInsertTestSuite extends QueryTest with SQLTestUtils {
         sql("insert into t partition(dt=08) values('a', 10)")
         checkAnswer(sql("select * from t where dt='08'"), sql("select * from t where dt='07'"))
         checkAnswer(sql("select * from t where dt=08"), Seq(Row("a", 10, "8")))
-        val e = intercept[AnalysisException](sql("alter table t drop partition(dt='08')"))
-        assert(e.getMessage.contains("PARTITIONS_NOT_FOUND"))
+        checkError(
+          exception = intercept[AnalysisException] {
+            sql("alter table t drop partition(dt='08')")
+          },
+          errorClass = "PARTITIONS_NOT_FOUND",
+          sqlState = None,
+          parameters = Map(
+            "partitionList" -> "PARTITION \\(`dt` = 08\\)",
+            "tableName" -> ".*.`t`"),
+          matchPVals = true
+        )
       }
     }
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedUpdateAsDeleteAndInsertTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedUpdateAsDeleteAndInsertTableSuite.scala
index c88d6d7e905..51f3d49372c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedUpdateAsDeleteAndInsertTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedUpdateAsDeleteAndInsertTableSuite.scala
@@ -35,10 +35,13 @@ class DeltaBasedUpdateAsDeleteAndInsertTableSuite extends UpdateTableSuiteBase {
         |{ "pk": 3, "salary": 120, "dep": 'hr' }
         |""".stripMargin)
 
-    val exception = intercept[AnalysisException] {
-      sql(s"UPDATE $tableNameAsString SET salary = -1 WHERE pk = 1")
-    }
-    assert(exception.message.contains("Row ID attributes cannot be nullable"))
+    checkErrorMatchPVals(
+      exception = intercept[AnalysisException] {
+        sql(s"UPDATE $tableNameAsString SET salary = -1 WHERE pk = 1")
+      },
+      errorClass = "NULLABLE_ROW_ID_ATTRIBUTES",
+      parameters = Map("nullableRowIdAttrs" -> "pk#\\d+")
+    )
   }
 
   test("update with assignments to row ID") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index 1df860ef9c4..10f46fb0e47 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -23,12 +23,11 @@ import java.time.{Duration, Period}
 
 import org.apache.hadoop.fs.{FileAlreadyExistsException, FSDataOutputStream, Path, RawLocalFileSystem}
 
-import org.apache.spark.SparkException
+import org.apache.spark.{SparkArithmeticException, SparkException}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
 import org.apache.spark.sql.catalyst.parser.ParseException
-import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.execution.datasources.DataSourceUtils
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
@@ -448,7 +447,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
 
       val v1 =
         s"""
-           | INSERT OVERWRITE DIRECTORY '${path}'
+           | INSERT OVERWRITE DIRECTORY '$path'
            | USING json
            | OPTIONS (a 1, b 0.1, c TRUE)
            | SELECT 1 as a, 'c' as b
@@ -470,7 +469,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
         s"""
            | INSERT OVERWRITE DIRECTORY
            | USING json
-           | OPTIONS ('path' '${path}')
+           | OPTIONS ('path' '$path')
            | SELECT 1 as a, 'c' as b
          """.stripMargin
 
@@ -504,16 +503,20 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
 
       val v1 =
         s"""
-           | INSERT OVERWRITE DIRECTORY '${path}'
+           | INSERT OVERWRITE DIRECTORY '$path'
            | USING JDBC
            | OPTIONS (a 1, b 0.1, c TRUE)
            | SELECT 1 as a, 'c' as b
          """.stripMargin
-      val e = intercept[SparkException] {
-        spark.sql(v1)
-      }.getMessage
-
-      assert(e.contains("Only Data Sources providing FileFormat are supported"))
+      checkError(
+        exception = intercept[SparkException] {
+          spark.sql(v1)
+        },
+        errorClass = "_LEGACY_ERROR_TEMP_2233",
+        parameters = Map(
+          "providingClass" -> ("class org.apache.spark.sql.execution.datasources." +
+            "jdbc.JdbcRelationProvider"))
+      )
     }
   }
 
@@ -615,21 +618,33 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
       SQLConf.STORE_ASSIGNMENT_POLICY.key -> SQLConf.StoreAssignmentPolicy.STRICT.toString) {
       withTable("t") {
         sql("create table t(i int, d double) using parquet")
-        var msg = intercept[AnalysisException] {
-          sql("insert into t select 1L, 2")
-        }.getMessage
-        assert(msg.contains("Cannot safely cast 'i': bigint to int"))
-
-        msg = intercept[AnalysisException] {
-          sql("insert into t select 1, 2.0")
-        }.getMessage
-        assert(msg.contains("Cannot safely cast 'd': decimal(2,1) to double"))
-
-        msg = intercept[AnalysisException] {
-          sql("insert into t select 1, 2.0D, 3")
-        }.getMessage
-        assert(msg.contains(
-          "Cannot write to '`spark_catalog`.`default`.`t`', too many data columns"))
+        checkError(
+          exception = intercept[AnalysisException] {
+            sql("insert into t select 1L, 2")
+          },
+          errorClass = "_LEGACY_ERROR_TEMP_1204",
+          parameters = Map("tableName" -> "`spark_catalog`.`default`.`t`",
+            "errors" -> "Cannot safely cast 'i': bigint to int"))
+
+        checkError(
+          exception = intercept[AnalysisException] {
+            sql("insert into t select 1, 2.0")
+          },
+          errorClass = "_LEGACY_ERROR_TEMP_1204",
+          parameters = Map(
+            "tableName" -> "`spark_catalog`.`default`.`t`",
+            "errors" -> "Cannot safely cast 'd': decimal(2,1) to double"))
+
+        checkError(
+          exception = intercept[AnalysisException] {
+            sql("insert into t select 1, 2.0D, 3")
+          },
+          errorClass = "INSERT_COLUMN_ARITY_MISMATCH",
+          parameters = Map(
+            "tableName" -> "`spark_catalog`.`default`.`t`",
+            "reason" -> "too many data columns",
+            "tableColumns" -> "'i', 'd'",
+            "dataColumns" -> "'1', '2.0', '3'"))
 
         // Insert into table successfully.
         sql("insert into t select 1, 2.0D")
@@ -644,21 +659,36 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
       SQLConf.STORE_ASSIGNMENT_POLICY.key -> SQLConf.StoreAssignmentPolicy.ANSI.toString) {
       withTable("t") {
         sql("create table t(i int, d double) using parquet")
-        var msg = intercept[AnalysisException] {
-          sql("insert into t values('a', 'b')")
-        }.getMessage
-        assert(msg.contains("Cannot safely cast 'i': string to int") &&
-          msg.contains("Cannot safely cast 'd': string to double"))
-        msg = intercept[AnalysisException] {
-          sql("insert into t values(now(), now())")
-        }.getMessage
-        assert(msg.contains("Cannot safely cast 'i': timestamp to int") &&
-          msg.contains("Cannot safely cast 'd': timestamp to double"))
-        msg = intercept[AnalysisException] {
-          sql("insert into t values(true, false)")
-        }.getMessage
-        assert(msg.contains("Cannot safely cast 'i': boolean to int") &&
-          msg.contains("Cannot safely cast 'd': boolean to double"))
+        checkError(
+          exception = intercept[AnalysisException] {
+            sql("insert into t values('a', 'b')")
+          },
+          errorClass = "_LEGACY_ERROR_TEMP_1204",
+          parameters = Map(
+            "tableName" -> "`spark_catalog`.`default`.`t`",
+            "errors" -> ("Cannot safely cast 'i': string to int\n- " +
+              "Cannot safely cast 'd': string to double"))
+        )
+        checkError(
+          exception = intercept[AnalysisException] {
+            sql("insert into t values(now(), now())")
+          },
+          errorClass = "_LEGACY_ERROR_TEMP_1204",
+          parameters = Map(
+            "tableName" -> "`spark_catalog`.`default`.`t`",
+            "errors" -> ("Cannot safely cast 'i': timestamp to int\n- " +
+              "Cannot safely cast 'd': timestamp to double"))
+        )
+        checkError(
+          exception = intercept[AnalysisException] {
+            sql("insert into t values(true, false)")
+          },
+          errorClass = "_LEGACY_ERROR_TEMP_1204",
+          parameters = Map(
+            "tableName" -> "`spark_catalog`.`default`.`t`",
+            "errors" -> ("Cannot safely cast 'i': boolean to int\n- " +
+              "Cannot safely cast 'd': boolean to double"))
+        )
       }
     }
   }
@@ -695,18 +725,25 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
       withTable("t") {
         sql("create table t(b int) using parquet")
         val outOfRangeValue1 = (Int.MaxValue + 1L).toString
-        val expectedMsg = "Fail to insert a value of \"BIGINT\" type into the \"INT\" type column" +
-          " `b` due to an overflow."
-        var msg = intercept[SparkException] {
-          sql(s"insert into t values($outOfRangeValue1)")
-        }.getMessage
-        assert(msg.contains(expectedMsg))
-
+        checkError(
+          exception = intercept[SparkException] {
+            sql(s"insert into t values($outOfRangeValue1)")
+          }.getCause.asInstanceOf[SparkException].getCause.asInstanceOf[SparkArithmeticException],
+          errorClass = "CAST_OVERFLOW_IN_TABLE_INSERT",
+          parameters = Map(
+            "sourceType" -> "\"BIGINT\"",
+            "targetType" -> "\"INT\"",
+            "columnName" -> "`b`"))
         val outOfRangeValue2 = (Int.MinValue - 1L).toString
-        msg = intercept[SparkException] {
-          sql(s"insert into t values($outOfRangeValue2)")
-        }.getMessage
-        assert(msg.contains(expectedMsg))
+        checkError(
+          exception = intercept[SparkException] {
+            sql(s"insert into t values($outOfRangeValue2)")
+          }.getCause.asInstanceOf[SparkException].getCause.asInstanceOf[SparkArithmeticException],
+          errorClass = "CAST_OVERFLOW_IN_TABLE_INSERT",
+          parameters = Map(
+            "sourceType" -> "\"BIGINT\"",
+            "targetType" -> "\"INT\"",
+            "columnName" -> "`b`"))
       }
     }
   }
@@ -717,18 +754,25 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
       withTable("t") {
         sql("create table t(b long) using parquet")
         val outOfRangeValue1 = Math.nextUp(Long.MaxValue)
-        val expectedMsg = "Fail to insert a value of \"DOUBLE\" type into the \"BIGINT\" type " +
-          "column `b` due to an overflow."
-        var msg = intercept[SparkException] {
-          sql(s"insert into t values(${outOfRangeValue1}D)")
-        }.getMessage
-        assert(msg.contains(expectedMsg))
-
+        checkError(
+          exception = intercept[SparkException] {
+            sql(s"insert into t values(${outOfRangeValue1}D)")
+          }.getCause.asInstanceOf[SparkException].getCause.asInstanceOf[SparkArithmeticException],
+          errorClass = "CAST_OVERFLOW_IN_TABLE_INSERT",
+          parameters = Map(
+            "sourceType" -> "\"DOUBLE\"",
+            "targetType" -> "\"BIGINT\"",
+            "columnName" -> "`b`"))
         val outOfRangeValue2 = Math.nextDown(Long.MinValue)
-        msg = intercept[SparkException] {
-          sql(s"insert into t values(${outOfRangeValue2}D)")
-        }.getMessage
-        assert(msg.contains(expectedMsg))
+        checkError(
+          exception = intercept[SparkException] {
+            sql(s"insert into t values(${outOfRangeValue2}D)")
+          }.getCause.asInstanceOf[SparkException].getCause.asInstanceOf[SparkArithmeticException],
+          errorClass = "CAST_OVERFLOW_IN_TABLE_INSERT",
+          parameters = Map(
+            "sourceType" -> "\"DOUBLE\"",
+            "targetType" -> "\"BIGINT\"",
+            "columnName" -> "`b`"))
       }
     }
   }
@@ -739,12 +783,15 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
       withTable("t") {
         sql("create table t(b decimal(3,2)) using parquet")
         val outOfRangeValue = "123.45"
-        val expectedMsg = "Fail to insert a value of \"DECIMAL(5,2)\" type into the " +
-          "\"DECIMAL(3,2)\" type column `b` due to an overflow."
-        val msg = intercept[SparkException] {
-          sql(s"insert into t values(${outOfRangeValue})")
-        }.getMessage
-        assert(msg.contains(expectedMsg))
+        checkError(
+          exception = intercept[SparkException] {
+            sql(s"insert into t values($outOfRangeValue)")
+          }.getCause.asInstanceOf[SparkException].getCause.asInstanceOf[SparkArithmeticException],
+          errorClass = "CAST_OVERFLOW_IN_TABLE_INSERT",
+          parameters = Map(
+            "sourceType" -> "\"DECIMAL(5,2)\"",
+            "targetType" -> "\"DECIMAL(3,2)\"",
+            "columnName" -> "`b`"))
       }
     }
   }
@@ -754,38 +801,58 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
       SQLConf.STORE_ASSIGNMENT_POLICY.key -> SQLConf.StoreAssignmentPolicy.ANSI.toString) {
       withTable("t") {
         sql("CREATE TABLE t(i int, t timestamp) USING parquet")
-        val msg = intercept[AnalysisException] {
-          sql("INSERT INTO t VALUES (TIMESTAMP('2010-09-02 14:10:10'), 1)")
-        }.getMessage
-        assert(msg.contains("Cannot safely cast 'i': timestamp to int"))
-        assert(msg.contains("Cannot safely cast 't': int to timestamp"))
+        checkError(
+          exception = intercept[AnalysisException] {
+            sql("INSERT INTO t VALUES (TIMESTAMP('2010-09-02 14:10:10'), 1)")
+          },
+          errorClass = "_LEGACY_ERROR_TEMP_1204",
+          parameters = Map(
+            "tableName" -> "`spark_catalog`.`default`.`t`",
+            "errors" -> ("Cannot safely cast 'i': timestamp to int\n- " +
+              "Cannot safely cast 't': int to timestamp"))
+        )
       }
 
       withTable("t") {
         sql("CREATE TABLE t(i int, d date) USING parquet")
-        val msg = intercept[AnalysisException] {
-          sql("INSERT INTO t VALUES (date('2010-09-02'), 1)")
-        }.getMessage
-        assert(msg.contains("Cannot safely cast 'i': date to int"))
-        assert(msg.contains("Cannot safely cast 'd': int to date"))
+        checkError(
+          exception = intercept[AnalysisException] {
+            sql("INSERT INTO t VALUES (date('2010-09-02'), 1)")
+          },
+          errorClass = "_LEGACY_ERROR_TEMP_1204",
+          parameters = Map(
+            "tableName" -> "`spark_catalog`.`default`.`t`",
+            "errors" -> ("Cannot safely cast 'i': date to int\n- " +
+              "Cannot safely cast 'd': int to date"))
+        )
       }
 
       withTable("t") {
         sql("CREATE TABLE t(b boolean, t timestamp) USING parquet")
-        val msg = intercept[AnalysisException] {
-          sql("INSERT INTO t VALUES (TIMESTAMP('2010-09-02 14:10:10'), true)")
-        }.getMessage
-        assert(msg.contains("Cannot safely cast 'b': timestamp to boolean"))
-        assert(msg.contains("Cannot safely cast 't': boolean to timestamp"))
+        checkError(
+          exception = intercept[AnalysisException] {
+            sql("INSERT INTO t VALUES (TIMESTAMP('2010-09-02 14:10:10'), true)")
+          },
+          errorClass = "_LEGACY_ERROR_TEMP_1204",
+          parameters = Map(
+            "tableName" -> "`spark_catalog`.`default`.`t`",
+            "errors" -> ("Cannot safely cast 'b': timestamp to boolean\n- " +
+              "Cannot safely cast 't': boolean to timestamp"))
+        )
       }
 
       withTable("t") {
         sql("CREATE TABLE t(b boolean, d date) USING parquet")
-        val msg = intercept[AnalysisException] {
-          sql("INSERT INTO t VALUES (date('2010-09-02'), true)")
-        }.getMessage
-        assert(msg.contains("Cannot safely cast 'b': date to boolean"))
-        assert(msg.contains("Cannot safely cast 'd': boolean to date"))
+        checkError(
+          exception = intercept[AnalysisException] {
+            sql("INSERT INTO t VALUES (date('2010-09-02'), true)")
+          },
+          errorClass = "_LEGACY_ERROR_TEMP_1204",
+          parameters = Map(
+            "tableName" -> "`spark_catalog`.`default`.`t`",
+            "errors" -> ("Cannot safely cast 'b': date to boolean\n- " +
+              "Cannot safely cast 'd': boolean to date"))
+        )
       }
     }
   }
@@ -866,10 +933,16 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
     )
 
     withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "false") {
-      val message = intercept[AnalysisException] {
-        sql("INSERT OVERWRITE TABLE jsonTable SELECT a FROM jt")
-      }.getMessage
-      assert(message.contains("Cannot write to 'unknown', not enough data columns"))
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql("INSERT OVERWRITE TABLE jsonTable SELECT a FROM jt")
+        },
+        errorClass = "INSERT_COLUMN_ARITY_MISMATCH",
+        parameters = Map(
+          "tableName" -> "unknown",
+          "reason" -> "not enough data columns",
+          "tableColumns" -> "'a', 'b'",
+          "dataColumns" -> "'a'"))
     }
   }
 
@@ -1026,15 +1099,17 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
   }
 
   test("SPARK-38336 INSERT INTO statements with tables with default columns: negative tests") {
-    object Errors {
-      val COMMON_SUBSTRING = " has a DEFAULT value"
-      val TARGET_TABLE_NOT_FOUND = "The table or view `t` cannot be found"
-    }
     // The default value fails to analyze.
     withTable("t") {
-      assert(intercept[AnalysisException] {
-        sql("create table t(i boolean, s bigint default badvalue) using parquet")
-      }.getMessage.contains(Errors.COMMON_SUBSTRING))
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql("create table t(i boolean, s bigint default badvalue) using parquet")
+        },
+        errorClass = "INVALID_DEFAULT_VALUE.UNRESOLVED_EXPRESSION",
+        parameters = Map(
+          "statement" -> "CREATE TABLE",
+          "colName" -> "`s`",
+          "defaultValue" -> "badvalue"))
     }
     // The default value analyzes to a table not in the catalog.
     withTable("t") {
@@ -1079,50 +1154,78 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
     // Explicit default values may not participate in complex expressions in the VALUES list.
     withTable("t") {
       sql("create table t(i boolean, s bigint default 42) using parquet")
-      assert(intercept[AnalysisException] {
-        sql("insert into t values(false, default + 1)")
-      }.getMessage.contains(
-        QueryCompilationErrors.defaultReferencesNotAllowedInComplexExpressionsInInsertValuesList()
-          .getMessage))
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql("insert into t values(false, default + 1)")
+        },
+        errorClass = "_LEGACY_ERROR_TEMP_1339",
+        parameters = Map.empty
+      )
     }
     // Explicit default values may not participate in complex expressions in the SELECT query.
     withTable("t") {
       sql("create table t(i boolean, s bigint default 42) using parquet")
-      assert(intercept[AnalysisException] {
-        sql("insert into t select false, default + 1")
-      }.getMessage.contains(
-        QueryCompilationErrors.defaultReferencesNotAllowedInComplexExpressionsInInsertValuesList()
-          .getMessage))
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql("insert into t select false, default + 1")
+        },
+        errorClass = "_LEGACY_ERROR_TEMP_1339",
+        parameters = Map.empty
+      )
     }
     // Explicit default values have a reasonable error path if the table is not found.
     withTable("t") {
-      assert(intercept[AnalysisException] {
-        sql("insert into t values(false, default)")
-      }.getMessage.contains(Errors.TARGET_TABLE_NOT_FOUND))
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql("insert into t values(false, default)")
+        },
+        errorClass = "TABLE_OR_VIEW_NOT_FOUND",
+        parameters = Map("relationName" -> "`t`"),
+        context = ExpectedContext("t", 12, 12)
+      )
     }
     // The default value parses but the type is not coercible.
     withTable("t") {
-      assert(intercept[AnalysisException] {
-        sql("create table t(i boolean, s bigint default false) using parquet")
-      }.getMessage.contains(Errors.COMMON_SUBSTRING))
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql("create table t(i boolean, s bigint default false) using parquet")
+        },
+        errorClass = "INVALID_DEFAULT_VALUE.DATA_TYPE",
+        parameters = Map(
+          "statement" -> "CREATE TABLE",
+          "colName" -> "`s`",
+          "expectedType" -> "\"BIGINT\"",
+          "defaultValue" -> "false",
+          "actualType" -> "\"BOOLEAN\""))
     }
     // The number of columns in the INSERT INTO statement is greater than the number of columns in
     // the table.
     withTable("t") {
       sql("create table num_data(id int, val decimal(38,10)) using parquet")
       sql("create table t(id1 int, int2 int, result decimal(38,10)) using parquet")
-      assert(intercept[AnalysisException] {
-        sql("insert into t select t1.id, t2.id, t1.val, t2.val, t1.val * t2.val " +
-          "from num_data t1, num_data t2")
-      }.getMessage.contains(
-        "Cannot write to '`spark_catalog`.`default`.`t`', too many data columns"))
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql("insert into t select t1.id, t2.id, t1.val, t2.val, t1.val * t2.val " +
+            "from num_data t1, num_data t2")
+        },
+        errorClass = "INSERT_COLUMN_ARITY_MISMATCH",
+        parameters = Map(
+          "tableName" -> "`spark_catalog`.`default`.`t`",
+          "reason" -> "too many data columns",
+          "tableColumns" -> "'id1', 'int2', 'result'",
+          "dataColumns" -> "'id', 'id', 'val', 'val', '(val * val)'"))
     }
     // The default value is disabled per configuration.
     withTable("t") {
       withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "false") {
-        assert(intercept[AnalysisException] {
-          sql("create table t(i boolean, s bigint default 42L) using parquet")
-        }.getMessage.contains("Support for DEFAULT column values is not allowed"))
+        checkError(
+          exception = intercept[AnalysisException] {
+            sql("create table t(i boolean, s bigint default 42L) using parquet")
+          },
+          errorClass = "_LEGACY_ERROR_TEMP_0058",
+          parameters = Map.empty,
+          context = ExpectedContext("s bigint default 42L", 26, 45)
+        )
       }
     }
     // The table has a partitioning column with a default value; this is not allowed.
@@ -1145,10 +1248,16 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
     withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "false") {
       withTable("t") {
         sql("create table t(i boolean, s bigint) using parquet")
-        assert(intercept[AnalysisException] {
-          sql("insert into t values(true)")
-        }.getMessage.contains(
-          "Cannot write to '`spark_catalog`.`default`.`t`', not enough data columns"))
+        checkError(
+          exception = intercept[AnalysisException] {
+            sql("insert into t values(true)")
+          },
+          errorClass = "INSERT_COLUMN_ARITY_MISMATCH",
+          parameters = Map(
+            "tableName" -> "`spark_catalog`.`default`.`t`",
+            "reason" -> "not enough data columns",
+            "tableColumns" -> "'i', 's'",
+            "dataColumns" -> "'col1'"))
       }
     }
   }
@@ -1211,14 +1320,17 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
   }
 
   test("SPARK- 38795 INSERT INTO with user specified columns and defaults: negative tests") {
-    val missingColError = "Cannot find data for output column "
     // The missing columns in these INSERT INTO commands do not have explicit default values.
     withTable("t") {
       sql("create table t(i boolean, s bigint, q int default 43) using parquet")
-      assert(intercept[AnalysisException] {
-        sql("insert into t (i, q) select true from (select 1)")
-      }.getMessage.contains("Cannot write to table due to mismatched user specified column " +
-        "size(2) and data column size(1)"))
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql("insert into t (i, q) select true from (select 1)")
+        },
+        errorClass = "_LEGACY_ERROR_TEMP_1038",
+        parameters = Map("columnSize" -> "2", "outputSize" -> "1"),
+        ExpectedContext(
+          fragment = "select true from (select 1)", start = 21, stop = 47))
     }
     // When the USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES configuration is disabled, and no
     // explicit DEFAULT value is available when the INSERT INTO statement provides fewer
@@ -1226,39 +1338,69 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
     withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "false") {
       withTable("t") {
         sql("create table t(i boolean, s bigint) using parquet")
-        assert(intercept[AnalysisException] {
-          sql("insert into t (i) values (true)")
-        }.getMessage.contains(missingColError + "'s'"))
+        checkError(
+          exception = intercept[AnalysisException] {
+            sql("insert into t (i) values (true)")
+          },
+          errorClass = "_LEGACY_ERROR_TEMP_1204",
+          parameters = Map(
+            "tableName" -> "`spark_catalog`.`default`.`t`",
+            "errors" -> "Cannot find data for output column 's'"))
       }
       withTable("t") {
         sql("create table t(i boolean default true, s bigint) using parquet")
-        assert(intercept[AnalysisException] {
-          sql("insert into t (i) values (default)")
-        }.getMessage.contains(missingColError + "'s'"))
+        checkError(
+          exception = intercept[AnalysisException] {
+            sql("insert into t (i) values (default)")
+          },
+          errorClass = "_LEGACY_ERROR_TEMP_1204",
+          parameters = Map(
+            "tableName" -> "`spark_catalog`.`default`.`t`",
+            "errors" -> "Cannot find data for output column 's'"))
       }
       withTable("t") {
         sql("create table t(i boolean, s bigint default 42) using parquet")
-        assert(intercept[AnalysisException] {
-          sql("insert into t (s) values (default)")
-        }.getMessage.contains(missingColError + "'i'"))
+        checkError(
+          exception = intercept[AnalysisException] {
+            sql("insert into t (s) values (default)")
+          },
+          errorClass = "_LEGACY_ERROR_TEMP_1204",
+          parameters = Map(
+            "tableName" -> "`spark_catalog`.`default`.`t`",
+            "errors" -> "Cannot find data for output column 'i'"))
       }
       withTable("t") {
         sql("create table t(i boolean, s bigint, q int) using parquet partitioned by (i)")
-        assert(intercept[AnalysisException] {
-          sql("insert into t partition(i='true') (s) values(5)")
-        }.getMessage.contains(missingColError + "'q'"))
+        checkError(
+          exception = intercept[AnalysisException] {
+            sql("insert into t partition(i='true') (s) values(5)")
+          },
+          errorClass = "_LEGACY_ERROR_TEMP_1204",
+          parameters = Map(
+            "tableName" -> "`spark_catalog`.`default`.`t`",
+            "errors" -> "Cannot find data for output column 'q'"))
       }
       withTable("t") {
         sql("create table t(i boolean, s bigint, q int) using parquet partitioned by (i)")
-        assert(intercept[AnalysisException] {
-          sql("insert into t partition(i='false') (q) select 43")
-        }.getMessage.contains(missingColError + "'s'"))
+        checkError(
+          exception = intercept[AnalysisException] {
+            sql("insert into t partition(i='false') (q) select 43")
+          },
+          errorClass = "_LEGACY_ERROR_TEMP_1204",
+          parameters = Map(
+            "tableName" -> "`spark_catalog`.`default`.`t`",
+            "errors" -> "Cannot find data for output column 's'"))
       }
       withTable("t") {
         sql("create table t(i boolean, s bigint, q int) using parquet partitioned by (i)")
-        assert(intercept[AnalysisException] {
-          sql("insert into t partition(i='false') (q) select default")
-        }.getMessage.contains(missingColError + "'s'"))
+        checkError(
+          exception = intercept[AnalysisException] {
+            sql("insert into t partition(i='false') (q) select default")
+          },
+          errorClass = "_LEGACY_ERROR_TEMP_1204",
+          parameters = Map(
+            "tableName" -> "`spark_catalog`.`default`.`t`",
+            "errors" -> "Cannot find data for output column 's'"))
       }
     }
     // When the CASE_SENSITIVE configuration is enabled, then using different cases for the required
@@ -1394,15 +1536,18 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
   }
 
   test("SPARK-38811 INSERT INTO on columns added with ALTER TABLE ADD COLUMNS: Negative tests") {
-    object Errors {
-      val COMMON_SUBSTRING = " has a DEFAULT value"
-    }
     // The default value fails to analyze.
     withTable("t") {
       sql("create table t(i boolean) using parquet")
-      assert(intercept[AnalysisException] {
-        sql("alter table t add column s bigint default badvalue")
-      }.getMessage.contains(Errors.COMMON_SUBSTRING))
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql("alter table t add column s bigint default badvalue")
+        },
+        errorClass = "INVALID_DEFAULT_VALUE.UNRESOLVED_EXPRESSION",
+        parameters = Map(
+          "statement" -> "ALTER TABLE ADD COLUMNS",
+          "colName" -> "`s`",
+          "defaultValue" -> "badvalue"))
     }
     // The default value analyzes to a table not in the catalog.
     withTable("t") {
@@ -1450,9 +1595,17 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
     withTable("t") {
       withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "false") {
         sql("create table t(i boolean) using parquet")
-        assert(intercept[AnalysisException] {
-          sql("alter table t add column s bigint default 42L")
-        }.getMessage.contains("Support for DEFAULT column values is not allowed"))
+        checkError(
+          exception = intercept[AnalysisException] {
+            sql("alter table t add column s bigint default 42L")
+          },
+          errorClass = "_LEGACY_ERROR_TEMP_0058",
+          parameters = Map.empty,
+          context = ExpectedContext(
+            fragment = "s bigint default 42L",
+            start = 25,
+            stop = 44)
+        )
       }
     }
   }
@@ -1482,16 +1635,19 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
   }
 
   test("SPARK-38838 INSERT INTO with defaults set by ALTER TABLE ALTER COLUMN: negative tests") {
-    object Errors {
-      val COMMON_SUBSTRING = " has a DEFAULT value"
-    }
     val createTable = "create table t(i boolean, s bigint) using parquet"
     withTable("t") {
       sql(createTable)
       // The default value fails to analyze.
-      assert(intercept[AnalysisException] {
-        sql("alter table t alter column s set default badvalue")
-      }.getMessage.contains(Errors.COMMON_SUBSTRING))
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql("alter table t alter column s set default badvalue")
+        },
+        errorClass = "INVALID_DEFAULT_VALUE.UNRESOLVED_EXPRESSION",
+        parameters = Map(
+          "statement" -> "ALTER TABLE ALTER COLUMN",
+          "colName" -> "`s`",
+          "defaultValue" -> "badvalue"))
       // The default value analyzes to a table not in the catalog.
       checkError(
         exception = intercept[AnalysisException] {
@@ -1543,9 +1699,12 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
     // Attempting to set a default value for a partitioning column is not allowed.
     withTable("t") {
       sql("create table t(i boolean, s bigint, q int default 42) using parquet partitioned by (i)")
-      assert(intercept[AnalysisException] {
-        sql("alter table t alter column i set default false")
-      }.getMessage.contains("Can't find column `i` given table data columns [`s`, `q`]"))
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql("alter table t alter column i set default false")
+        },
+        errorClass = "_LEGACY_ERROR_TEMP_1246",
+        parameters = Map("name" -> "i", "fieldNames" -> "[`s`, `q`]"))
     }
   }
 
@@ -1703,14 +1862,16 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
     // "default" generates a "column not found" error before any following .insertInto.
     withTable("t") {
       sql(s"create table t(a string, i int default 42) using parquet")
-      assert(intercept[AnalysisException] {
-        Seq(("xyz")).toDF.select("value", "default").write.insertInto("t")
-      }.getMessage.contains("column or function parameter with name `default` cannot be resolved"))
+      checkError(
+        exception = intercept[AnalysisException] {
+          Seq("xyz").toDF.select("value", "default").write.insertInto("t")
+        },
+        errorClass = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
+        parameters = Map("objectName" -> "`default`", "proposal" -> "`value`"))
     }
   }
 
   test("SPARK-40001 JSON DEFAULT columns = JSON_GENERATOR_WRITE_NULL_IF_WITH_DEFAULT_VALUE off") {
-    val error = "DEFAULT values are not supported for JSON tables"
     // Check that the JSON_GENERATOR_WRITE_NULL_IF_WITH_DEFAULT_VALUE config overrides the
     // JSON_GENERATOR_IGNORE_NULL_FIELDS config.
     withSQLConf(SQLConf.JSON_GENERATOR_WRITE_NULL_IF_WITH_DEFAULT_VALUE.key -> "true",
@@ -1733,15 +1894,22 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
 
   test("SPARK-39359 Restrict DEFAULT columns to allowlist of supported data source types") {
     withSQLConf(SQLConf.DEFAULT_COLUMN_ALLOWED_PROVIDERS.key -> "csv,json,orc") {
-      val unsupported = "DEFAULT values are not supported for target data source"
-      assert(intercept[AnalysisException] {
-        sql(s"create table t(a string default 'abc') using parquet")
-      }.getMessage.contains(unsupported))
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql(s"create table t(a string default 'abc') using parquet")
+        },
+        errorClass = "_LEGACY_ERROR_TEMP_1345",
+        parameters = Map("statementType" -> "CREATE TABLE", "dataSource" -> "parquet"))
       withTable("t") {
         sql(s"create table t(a string, b int) using parquet")
-        assert(intercept[AnalysisException] {
-          sql("alter table t add column s bigint default 42")
-        }.getMessage.contains(unsupported))
+        checkError(
+          exception = intercept[AnalysisException] {
+            sql("alter table t add column s bigint default 42")
+          },
+          errorClass = "_LEGACY_ERROR_TEMP_1345",
+          parameters = Map(
+            "statementType" -> "ALTER TABLE ADD COLUMNS",
+            "dataSource" -> "parquet"))
       }
     }
   }
@@ -1765,7 +1933,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
       withTable("t") {
         sql(s"create table t(i boolean) using ${config.dataSource}")
         if (config.useDataFrames) {
-          Seq((false)).toDF.write.insertInto("t")
+          Seq(false).toDF.write.insertInto("t")
         } else {
           sql("insert into t select false")
         }
@@ -2022,13 +2190,16 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
         // Now update the allowlist of table providers to prohibit ALTER TABLE ADD COLUMN commands
         // from assigning DEFAULT values.
         withSQLConf(SQLConf.DEFAULT_COLUMN_ALLOWED_PROVIDERS.key -> s"$provider*") {
-          assert(intercept[AnalysisException] {
-            // Try to add another column to the existing table again. This fails because the table
-            // provider is now in the denylist.
-            sql(s"alter table t1 add column (b string default 'abc')")
-          }.getMessage.contains(
-            QueryCompilationErrors.addNewDefaultColumnToExistingTableNotAllowed(
-              "ALTER TABLE ADD COLUMNS", provider).getMessage))
+          checkError(
+            exception = intercept[AnalysisException] {
+              // Try to add another column to the existing table again. This fails because the table
+              // provider is now in the denylist.
+              sql(s"alter table t1 add column (b string default 'abc')")
+            },
+            errorClass = "_LEGACY_ERROR_TEMP_1346",
+            parameters = Map(
+              "statementType" -> "ALTER TABLE ADD COLUMNS",
+              "dataSource" -> provider))
           withTable("t2") {
             // It is still OK to create a new table with a column DEFAULT value assigned, even if
             // the table provider is in the above denylist.
@@ -2146,13 +2317,24 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
           |CREATE TABLE insertTable(i int, part1 string, part2 string) USING PARQUET
           |PARTITIONED BY (part1, part2)
             """.stripMargin)
-      val msg = "Partition spec is invalid"
-      assert(intercept[AnalysisException] {
-        sql("INSERT INTO TABLE insertTable PARTITION(part1=1, part2='') SELECT 1")
-      }.getMessage.contains(msg))
-      assert(intercept[AnalysisException] {
-        sql("INSERT INTO TABLE insertTable PARTITION(part1='', part2) SELECT 1 ,'' AS part2")
-      }.getMessage.contains(msg))
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql("INSERT INTO TABLE insertTable PARTITION(part1=1, part2='') SELECT 1")
+        },
+        errorClass = "_LEGACY_ERROR_TEMP_1076",
+        parameters = Map(
+          "details" -> ("The spec ([part1=Some(1), part2=Some()]) " +
+            "contains an empty partition column value"))
+      )
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql("INSERT INTO TABLE insertTable PARTITION(part1='', part2) SELECT 1 ,'' AS part2")
+        },
+        errorClass = "_LEGACY_ERROR_TEMP_1076",
+        parameters = Map(
+          "details" -> ("The spec ([part1=Some(), part2=None]) " +
+            "contains an empty partition column value"))
+      )
 
       sql("INSERT INTO TABLE insertTable PARTITION(part1='1', part2='2') SELECT 1")
       sql("INSERT INTO TABLE insertTable PARTITION(part1='1', part2) SELECT 1 ,'2' AS part2")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
index 9cc26d894ba..03bbcb1a5c0 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
@@ -346,20 +346,30 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter
   }
 
   testPartitionedTable("partitionBy() can't be used together with insertInto()") { tableName =>
-    val cause = intercept[AnalysisException] {
-      Seq((1, 2, 3, 4)).toDF("a", "b", "c", "d").write.partitionBy("b", "c").insertInto(tableName)
-    }
-
-    assert(cause.getMessage.contains("insertInto() can't be used together with partitionBy()."))
+    checkError(
+      exception = intercept[AnalysisException] {
+        Seq((1, 2, 3, 4)).toDF("a", "b", "c", "d").write.partitionBy("b", "c").insertInto(tableName)
+      },
+      errorClass = "_LEGACY_ERROR_TEMP_1309",
+      parameters = Map.empty
+    )
   }
 
   testPartitionedTable(
     "SPARK-16036: better error message when insert into a table with mismatch schema") {
     tableName =>
-      val e = intercept[AnalysisException] {
-        sql(s"INSERT INTO TABLE $tableName PARTITION(b=1, c=2) SELECT 1, 2, 3")
-      }
-      assert(e.message.contains("Cannot write to") && e.message.contains("too many data columns"))
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql(s"INSERT INTO TABLE $tableName PARTITION(b=1, c=2) SELECT 1, 2, 3")
+        },
+        errorClass = "INSERT_PARTITION_COLUMN_ARITY_MISMATCH",
+        parameters = Map(
+          "staticPartCols" -> "'b', 'c'",
+          "tableColumns" -> "'a', 'd', 'b', 'c'",
+          "reason" -> "too many data columns",
+          "dataColumns" -> "'1', '2', '3'",
+          "tableName" -> s"`spark_catalog`.`default`.`$tableName`")
+      )
   }
 
   testPartitionedTable("SPARK-16037: INSERT statement should match columns by position") {
@@ -841,16 +851,18 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter
           |PARTITIONED BY (d string)
           """.stripMargin)
 
-      val e = intercept[AnalysisException] {
-        spark.sql(
-          """
-            |INSERT OVERWRITE TABLE t1 PARTITION(d='')
-            |SELECT 1
-          """.stripMargin)
-      }.getMessage
-
-      assert(!e.contains("get partition: Value for key d is null or empty"))
-      assert(e.contains("Partition spec is invalid"))
+      checkError(
+        exception = intercept[AnalysisException] {
+          spark.sql(
+            """
+              |INSERT OVERWRITE TABLE t1 PARTITION(d='')
+              |SELECT 1
+            """.stripMargin)
+        },
+        errorClass = "_LEGACY_ERROR_TEMP_1076",
+        parameters = Map(
+          "details" -> "The spec ([d=Some()]) contains an empty partition column value")
+      )
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org