You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2023/06/19 22:44:13 UTC

[spark] branch master updated: [SPARK-43969][SQL] Refactor & Assign names to the error class _LEGACY_ERROR_TEMP_1170

This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f3db20c17df [SPARK-43969][SQL] Refactor & Assign names to the error class _LEGACY_ERROR_TEMP_1170
f3db20c17df is described below

commit f3db20c17dfdc1cb5daa42c154afa732e5e3800b
Author: panbingkun <pb...@gmail.com>
AuthorDate: Tue Jun 20 01:43:32 2023 +0300

    [SPARK-43969][SQL] Refactor & Assign names to the error class _LEGACY_ERROR_TEMP_1170
    
    ### What changes were proposed in this pull request?
    The pr aims to:
    - Refactor `PreWriteCheck` to use error framework.
    - Make `INSERT_COLUMN_ARITY_MISMATCH` more generic & avoiding to embed error's text in source code.
    - Assign name to _LEGACY_ERROR_TEMP_1170.
    - In `INSERT_PARTITION_COLUMN_ARITY_MISMATCH` error message, replace '' with `toSQLId` for table column name.
    
    ### Why are the changes needed?
    The changes improve the error framework.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    - Manually test.
    - Pass GA.
    
    Closes #41458 from panbingkun/refactor_PreWriteCheck.
    
    Lead-authored-by: panbingkun <pb...@gmail.com>
    Co-authored-by: panbingkun <84...@qq.com>
    Signed-off-by: Max Gekk <ma...@gmail.com>
---
 core/src/main/resources/error/error-classes.json   |  62 ++++++++---
 python/pyspark/sql/tests/test_readwriter.py        |   4 +-
 .../spark/sql/catalyst/analysis/Analyzer.scala     |   2 +-
 .../catalyst/analysis/ResolveInsertionBase.scala   |  13 ++-
 .../catalyst/analysis/TableOutputResolver.scala    |   4 +-
 .../spark/sql/errors/QueryCompilationErrors.scala  |  40 +++----
 .../catalyst/analysis/V2WriteAnalysisSuite.scala   |  48 +++++---
 .../spark/sql/execution/datasources/rules.scala    |  32 ++++--
 .../analyzer-results/postgreSQL/numeric.sql.out    |   7 +-
 .../sql-tests/results/postgreSQL/numeric.sql.out   |   7 +-
 .../org/apache/spark/sql/DataFrameSuite.scala      |  33 ++++--
 .../org/apache/spark/sql/SQLInsertTestSuite.scala  |  31 ++++--
 .../spark/sql/connector/InsertIntoTests.scala      |  34 ++++--
 .../apache/spark/sql/execution/SQLViewSuite.scala  |  11 +-
 .../spark/sql/execution/command/DDLSuite.scala     |  54 +++++----
 .../org/apache/spark/sql/sources/InsertSuite.scala | 122 +++++++++++++--------
 .../spark/sql/hive/thriftserver/CliSuite.scala     |   2 +-
 .../org/apache/spark/sql/hive/InsertSuite.scala    |  11 +-
 18 files changed, 324 insertions(+), 193 deletions(-)

diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json
index 54b920cc36f..d9e729effeb 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -888,10 +888,24 @@
   },
   "INSERT_COLUMN_ARITY_MISMATCH" : {
     "message" : [
-      "Cannot write to '<tableName>', <reason>:",
-      "Table columns: <tableColumns>.",
-      "Data columns: <dataColumns>."
+      "Cannot write to <tableName>, the reason is"
     ],
+    "subClass" : {
+      "NOT_ENOUGH_DATA_COLUMNS" : {
+        "message" : [
+          "not enough data columns:",
+          "Table columns: <tableColumns>.",
+          "Data columns: <dataColumns>."
+        ]
+      },
+      "TOO_MANY_DATA_COLUMNS" : {
+        "message" : [
+          "too many data columns:",
+          "Table columns: <tableColumns>.",
+          "Data columns: <dataColumns>."
+        ]
+      }
+    },
     "sqlState" : "21S01"
   },
   "INSERT_PARTITION_COLUMN_ARITY_MISMATCH" : {
@@ -1715,6 +1729,11 @@
     ],
     "sqlState" : "46110"
   },
+  "NOT_SUPPORTED_COMMAND_WITHOUT_HIVE_SUPPORT" : {
+    "message" : [
+      "<cmd> is not supported, if you want to enable it, please set \"spark.sql.catalogImplementation\" to \"hive\"."
+    ]
+  },
   "NOT_SUPPORTED_IN_JDBC_CATALOG" : {
     "message" : [
       "Not supported command in JDBC catalog:"
@@ -2464,6 +2483,33 @@
       "grouping()/grouping_id() can only be used with GroupingSets/Cube/Rollup."
     ]
   },
+  "UNSUPPORTED_INSERT" : {
+    "message" : [
+      "Can't insert into the target."
+    ],
+    "subClass" : {
+      "NOT_ALLOWED" : {
+        "message" : [
+          "The target relation <relationId> does not allow insertion."
+        ]
+      },
+      "NOT_PARTITIONED" : {
+        "message" : [
+          "The target relation <relationId> is not partitioned."
+        ]
+      },
+      "RDD_BASED" : {
+        "message" : [
+          "An RDD-based table is not allowed."
+        ]
+      },
+      "READ_FROM" : {
+        "message" : [
+          "The target relation <relationId> is also being read from."
+        ]
+      }
+    }
+  },
   "UNSUPPORTED_OVERWRITE" : {
     "message" : [
       "Can't overwrite the target that is also being read from."
@@ -3005,11 +3051,6 @@
       "Window function <wf> requires window to be ordered, please add ORDER BY clause. For example SELECT <wf>(value_expr) OVER (PARTITION BY window_partition ORDER BY window_ordering) from table."
     ]
   },
-  "_LEGACY_ERROR_TEMP_1038" : {
-    "message" : [
-      "Cannot write to table due to mismatched user specified column size(<columnSize>) and data column size(<outputSize>)."
-    ]
-  },
   "_LEGACY_ERROR_TEMP_1039" : {
     "message" : [
       "Multiple time/session window expressions would result in a cartesian product of rows, therefore they are currently not supported."
@@ -3506,11 +3547,6 @@
       "Table partitions: <partColNames>."
     ]
   },
-  "_LEGACY_ERROR_TEMP_1170" : {
-    "message" : [
-      "Hive support is required to <detail>."
-    ]
-  },
   "_LEGACY_ERROR_TEMP_1171" : {
     "message" : [
       "createTableColumnTypes option column <col> not found in schema <schema>."
diff --git a/python/pyspark/sql/tests/test_readwriter.py b/python/pyspark/sql/tests/test_readwriter.py
index 17c158a870a..6bcef51732f 100644
--- a/python/pyspark/sql/tests/test_readwriter.py
+++ b/python/pyspark/sql/tests/test_readwriter.py
@@ -226,7 +226,9 @@ class ReadwriterV2TestsMixin:
 
     def test_create_without_provider(self):
         df = self.df
-        with self.assertRaisesRegex(AnalysisException, "Hive support is required"):
+        with self.assertRaisesRegex(
+            AnalysisException, "NOT_SUPPORTED_COMMAND_WITHOUT_HIVE_SUPPORT"
+        ):
             df.writeTo("test_table").create()
 
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 1b023413aa9..488c39d5dd8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1289,7 +1289,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
 
         // Create a project if this is an INSERT INTO BY NAME query.
         val projectByName = if (i.userSpecifiedCols.nonEmpty) {
-          Some(createProjectForByNameQuery(i))
+          Some(createProjectForByNameQuery(r.table.name, i))
         } else {
           None
         }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInsertionBase.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInsertionBase.scala
index 71d36867951..8b120095bc6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInsertionBase.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInsertionBase.scala
@@ -28,12 +28,19 @@ abstract class ResolveInsertionBase extends Rule[LogicalPlan] {
   def resolver: Resolver = conf.resolver
 
   /** Add a project to use the table column names for INSERT INTO BY NAME */
-  protected def createProjectForByNameQuery(i: InsertIntoStatement): LogicalPlan = {
+  protected def createProjectForByNameQuery(
+      tblName: String,
+      i: InsertIntoStatement): LogicalPlan = {
     SchemaUtils.checkColumnNameDuplication(i.userSpecifiedCols, resolver)
 
     if (i.userSpecifiedCols.size != i.query.output.size) {
-      throw QueryCompilationErrors.writeTableWithMismatchedColumnsError(
-        i.userSpecifiedCols.size, i.query.output.size, i.query)
+      if (i.userSpecifiedCols.size > i.query.output.size) {
+        throw QueryCompilationErrors.cannotWriteNotEnoughColumnsToTableError(
+          tblName, i.userSpecifiedCols, i.query)
+      } else {
+        throw QueryCompilationErrors.cannotWriteTooManyColumnsToTableError(
+          tblName, i.userSpecifiedCols, i.query)
+      }
     }
     val projectByName = i.userSpecifiedCols.zip(i.query.output)
       .map { case (userSpecifiedCol, queryOutputCol) =>
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
index b9aca30c754..3b721cf5d0d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
@@ -47,7 +47,7 @@ object TableOutputResolver {
 
     if (actualExpectedCols.size < query.output.size) {
       throw QueryCompilationErrors.cannotWriteTooManyColumnsToTableError(
-        tableName, actualExpectedCols, query)
+        tableName, actualExpectedCols.map(_.name), query)
     }
 
     val errors = new mutable.ArrayBuffer[String]()
@@ -74,7 +74,7 @@ object TableOutputResolver {
       }
       if (actualExpectedCols.size > queryOutputCols.size) {
         throw QueryCompilationErrors.cannotWriteNotEnoughColumnsToTableError(
-          tableName, actualExpectedCols, query)
+          tableName, actualExpectedCols.map(_.name), query)
       }
 
       resolveColumnsByPosition(queryOutputCols, actualExpectedCols, conf, errors += _)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 01b90210047..1b5062e985b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -613,16 +613,6 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
       messageParameters = Map("wf" -> wf.toString))
   }
 
-  def writeTableWithMismatchedColumnsError(
-      columnSize: Int, outputSize: Int, t: TreeNode[_]): Throwable = {
-    new AnalysisException(
-      errorClass = "_LEGACY_ERROR_TEMP_1038",
-      messageParameters = Map(
-        "columnSize" -> columnSize.toString,
-        "outputSize" -> outputSize.toString),
-      origin = t.origin)
-  }
-
   def multiTimeWindowExpressionsNotSupportedError(t: TreeNode[_]): Throwable = {
     new AnalysisException(
       errorClass = "_LEGACY_ERROR_TEMP_1039",
@@ -1743,10 +1733,10 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
         "partColNames" -> partColNames.map(_.name).mkString(",")))
   }
 
-  def ddlWithoutHiveSupportEnabledError(detail: String): Throwable = {
+  def ddlWithoutHiveSupportEnabledError(cmd: String): Throwable = {
     new AnalysisException(
-      errorClass = "_LEGACY_ERROR_TEMP_1170",
-      messageParameters = Map("detail" -> detail))
+      errorClass = "NOT_SUPPORTED_COMMAND_WITHOUT_HIVE_SUPPORT",
+      messageParameters = Map("cmd" -> cmd))
   }
 
   def createTableColumnTypesOptionColumnNotFoundInSchemaError(
@@ -2056,26 +2046,26 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
 
   def cannotWriteTooManyColumnsToTableError(
       tableName: String,
-      expected: Seq[Attribute],
+      expected: Seq[String],
       query: LogicalPlan): Throwable = {
     new AnalysisException(
-      errorClass = "INSERT_COLUMN_ARITY_MISMATCH",
+      errorClass = "INSERT_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS",
       messageParameters = Map(
-        "tableName" -> tableName,
-        "reason" -> "too many data columns",
-        "tableColumns" -> expected.map(c => s"'${c.name}'").mkString(", "),
-        "dataColumns" -> query.output.map(c => s"'${c.name}'").mkString(", ")))
+        "tableName" -> toSQLId(tableName),
+        "tableColumns" -> expected.map(c => toSQLId(c)).mkString(", "),
+        "dataColumns" -> query.output.map(c => toSQLId(c.name)).mkString(", ")))
   }
 
   def cannotWriteNotEnoughColumnsToTableError(
-      tableName: String, expected: Seq[Attribute], query: LogicalPlan): Throwable = {
+      tableName: String,
+      expected: Seq[String],
+      query: LogicalPlan): Throwable = {
     new AnalysisException(
-      errorClass = "INSERT_COLUMN_ARITY_MISMATCH",
+      errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
       messageParameters = Map(
-        "tableName" -> tableName,
-        "reason" -> "not enough data columns",
-        "tableColumns" -> expected.map(c => s"'${c.name}'").mkString(", "),
-        "dataColumns" -> query.output.map(c => s"'${c.name}'").mkString(", ")))
+        "tableName" -> toSQLId(tableName),
+        "tableColumns" -> expected.map(c => toSQLId(c)).mkString(", "),
+        "dataColumns" -> query.output.map(c => toSQLId(c.name)).mkString(", ")))
   }
 
   def cannotWriteIncompatibleDataToTableError(tableName: String, errors: Seq[String]): Throwable = {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala
index 0b698ae07a2..5b51730e759 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala
@@ -443,10 +443,14 @@ abstract class V2WriteAnalysisSuiteBase extends AnalysisTest {
     val parsedPlan = byName(table, query)
 
     assertNotResolved(parsedPlan)
-    assertAnalysisError(parsedPlan, Seq(
-      "Cannot write", "'table-name'", "too many data columns",
-      "Table columns: 'x', 'y'",
-      "Data columns: 'x', 'y', 'z'"))
+    assertAnalysisErrorClass(
+      inputPlan = parsedPlan,
+      expectedErrorClass = "INSERT_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS",
+      expectedMessageParameters = Map(
+        "tableName" -> "`table-name`",
+        "tableColumns" -> "`x`, `y`",
+        "dataColumns" -> "`x`, `y`, `z`")
+    )
   }
 
   test("byName: fail extra data fields in struct") {
@@ -523,10 +527,14 @@ abstract class V2WriteAnalysisSuiteBase extends AnalysisTest {
     val parsedPlan = byPosition(requiredTable, query)
 
     assertNotResolved(parsedPlan)
-    assertAnalysisError(parsedPlan, Seq(
-      "Cannot write", "'table-name'", "not enough data columns",
-      "Table columns: 'x', 'y'",
-      "Data columns: 'y'"))
+    assertAnalysisErrorClass(
+      inputPlan = parsedPlan,
+      expectedErrorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
+      expectedMessageParameters = Map(
+        "tableName" -> "`table-name`",
+        "tableColumns" -> "`x`, `y`",
+        "dataColumns" -> "`y`")
+    )
   }
 
   test("byPosition: missing optional columns cause failure") {
@@ -537,10 +545,14 @@ abstract class V2WriteAnalysisSuiteBase extends AnalysisTest {
     val parsedPlan = byPosition(table, query)
 
     assertNotResolved(parsedPlan)
-    assertAnalysisError(parsedPlan, Seq(
-      "Cannot write", "'table-name'", "not enough data columns",
-      "Table columns: 'x', 'y'",
-      "Data columns: 'y'"))
+    assertAnalysisErrorClass(
+      inputPlan = parsedPlan,
+      expectedErrorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
+      expectedMessageParameters = Map(
+        "tableName" -> "`table-name`",
+        "tableColumns" -> "`x`, `y`",
+        "dataColumns" -> "`y`")
+    )
   }
 
   test("byPosition: insert safe cast") {
@@ -572,10 +584,14 @@ abstract class V2WriteAnalysisSuiteBase extends AnalysisTest {
     val parsedPlan = byName(table, query)
 
     assertNotResolved(parsedPlan)
-    assertAnalysisError(parsedPlan, Seq(
-      "Cannot write", "'table-name'", "too many data columns",
-      "Table columns: 'x', 'y'",
-      "Data columns: 'a', 'b', 'c'"))
+    assertAnalysisErrorClass(
+      inputPlan = parsedPlan,
+      expectedErrorClass = "INSERT_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS",
+      expectedMessageParameters = Map(
+        "tableName" -> "`table-name`",
+        "tableColumns" -> "`x`, `y`",
+        "dataColumns" -> "`a`, `b`, `c`")
+    )
   }
 
   test("bypass output column resolution") {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 0b07ae1d11c..750f39a252f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -391,7 +391,7 @@ object PreprocessTableInsertion extends ResolveInsertionBase {
     // Create a project if this INSERT has a user-specified column list.
     val hasColumnList = insert.userSpecifiedCols.nonEmpty
     val query = if (hasColumnList) {
-      createProjectForByNameQuery(insert)
+      createProjectForByNameQuery(tblName, insert)
     } else {
       insert.query
     }
@@ -401,12 +401,13 @@ object PreprocessTableInsertion extends ResolveInsertionBase {
         supportColDefaultValue = true)
     } catch {
       case e: AnalysisException if staticPartCols.nonEmpty &&
-          e.getErrorClass == "INSERT_COLUMN_ARITY_MISMATCH" =>
+        (e.getErrorClass == "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS" ||
+          e.getErrorClass == "INSERT_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS") =>
         val newException = e.copy(
           errorClass = Some("INSERT_PARTITION_COLUMN_ARITY_MISMATCH"),
           messageParameters = e.messageParameters ++ Map(
-            "tableColumns" -> insert.table.output.map(c => s"'${c.name}'").mkString(", "),
-            "staticPartCols" -> staticPartCols.toSeq.sorted.map(c => s"'$c'").mkString(", ")
+            "tableColumns" -> insert.table.output.map(c => toSQLId(c.name)).mkString(", "),
+            "staticPartCols" -> staticPartCols.toSeq.sorted.map(c => toSQLId(c)).mkString(", ")
           ))
         newException.setStackTrace(e.getStackTrace)
         throw newException
@@ -503,8 +504,6 @@ object PreReadCheck extends (LogicalPlan => Unit) {
  */
 object PreWriteCheck extends (LogicalPlan => Unit) {
 
-  def failAnalysis(msg: String): Unit = { throw new AnalysisException(msg) }
-
   def apply(plan: LogicalPlan): Unit = {
     plan.foreach {
       case InsertIntoStatement(l @ LogicalRelation(relation, _, _, _), partition,
@@ -514,7 +513,9 @@ object PreWriteCheck extends (LogicalPlan => Unit) {
           case LogicalRelation(src, _, _, _) => src
         }
         if (srcRelations.contains(relation)) {
-          failAnalysis("Cannot insert into table that is also being read from.")
+          throw new AnalysisException(
+            errorClass = "UNSUPPORTED_INSERT.READ_FROM",
+            messageParameters = Map("relationId" -> toSQLId(relation.toString)))
         } else {
           // OK
         }
@@ -524,10 +525,15 @@ object PreWriteCheck extends (LogicalPlan => Unit) {
 
           // Right now, we do not support insert into a non-file-based data source table with
           // partition specs.
-          case _: InsertableRelation if partition.nonEmpty =>
-            failAnalysis(s"Insert into a partition is not allowed because $l is not partitioned.")
-
-          case _ => failAnalysis(s"$relation does not allow insertion.")
+          case i: InsertableRelation if partition.nonEmpty =>
+            throw new AnalysisException(
+              errorClass = "UNSUPPORTED_INSERT.NOT_PARTITIONED",
+              messageParameters = Map("relationId" -> toSQLId(i.toString)))
+
+          case _ =>
+            throw new AnalysisException(
+              errorClass = "UNSUPPORTED_INSERT.NOT_ALLOWED",
+              messageParameters = Map("relationId" -> toSQLId(relation.toString)))
         }
 
       case InsertIntoStatement(t, _, _, _, _, _, _)
@@ -535,7 +541,9 @@ object PreWriteCheck extends (LogicalPlan => Unit) {
           t.isInstanceOf[Range] ||
           t.isInstanceOf[OneRowRelation] ||
           t.isInstanceOf[LocalRelation] =>
-        failAnalysis(s"Inserting into an RDD-based table is not allowed.")
+        throw new AnalysisException(
+          errorClass = "UNSUPPORTED_INSERT.RDD_BASED",
+          messageParameters = Map.empty)
 
       case _ => // OK
     }
diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/postgreSQL/numeric.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/postgreSQL/numeric.sql.out
index 95c81ff75ed..536631179d8 100644
--- a/sql/core/src/test/resources/sql-tests/analyzer-results/postgreSQL/numeric.sql.out
+++ b/sql/core/src/test/resources/sql-tests/analyzer-results/postgreSQL/numeric.sql.out
@@ -3841,12 +3841,11 @@ INSERT INTO num_result SELECT t1.id, t2.id, t1.val, t2.val, t1.val * t2.val
 -- !query analysis
 org.apache.spark.sql.AnalysisException
 {
-  "errorClass" : "INSERT_COLUMN_ARITY_MISMATCH",
+  "errorClass" : "INSERT_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS",
   "sqlState" : "21S01",
   "messageParameters" : {
-    "dataColumns" : "'id', 'id', 'val', 'val', '(val * val)'",
-    "reason" : "too many data columns",
-    "tableColumns" : "'id1', 'id2', 'result'",
+    "dataColumns" : "`id`, `id`, `val`, `val`, `(val * val)`",
+    "tableColumns" : "`id1`, `id2`, `result`",
     "tableName" : "`spark_catalog`.`default`.`num_result`"
   }
 }
diff --git a/sql/core/src/test/resources/sql-tests/results/postgreSQL/numeric.sql.out b/sql/core/src/test/resources/sql-tests/results/postgreSQL/numeric.sql.out
index 5840e1164fa..61b7a07631c 100644
--- a/sql/core/src/test/resources/sql-tests/results/postgreSQL/numeric.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/postgreSQL/numeric.sql.out
@@ -3832,12 +3832,11 @@ struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
 {
-  "errorClass" : "INSERT_COLUMN_ARITY_MISMATCH",
+  "errorClass" : "INSERT_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS",
   "sqlState" : "21S01",
   "messageParameters" : {
-    "dataColumns" : "'id', 'id', 'val', 'val', '(val * val)'",
-    "reason" : "too many data columns",
-    "tableColumns" : "'id1', 'id2', 'result'",
+    "dataColumns" : "`id`, `id`, `val`, `val`, `(val * val)`",
+    "tableColumns" : "`id1`, `id2`, `result`",
     "tableName" : "`spark_catalog`.`default`.`num_result`"
   }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index c94c6d7e50e..55ea09b5945 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -1831,25 +1831,34 @@ class DataFrameSuite extends QueryTest
 
         // error cases: insert into an RDD
         df.createOrReplaceTempView("rdd_base")
-        val e1 = intercept[AnalysisException] {
-          insertion.write.insertInto("rdd_base")
-        }
-        assert(e1.getMessage.contains("Inserting into an RDD-based table is not allowed."))
+        checkError(
+          exception = intercept[AnalysisException] {
+            insertion.write.insertInto("rdd_base")
+          },
+          errorClass = "UNSUPPORTED_INSERT.RDD_BASED",
+          parameters = Map.empty
+        )
 
         // error case: insert into a logical plan that is not a LeafNode
         val indirectDS = pdf.select("_1").filter($"_1" > 5)
         indirectDS.createOrReplaceTempView("indirect_ds")
-        val e2 = intercept[AnalysisException] {
-          insertion.write.insertInto("indirect_ds")
-        }
-        assert(e2.getMessage.contains("Inserting into an RDD-based table is not allowed."))
+        checkError(
+          exception = intercept[AnalysisException] {
+            insertion.write.insertInto("indirect_ds")
+          },
+          errorClass = "UNSUPPORTED_INSERT.RDD_BASED",
+          parameters = Map.empty
+        )
 
         // error case: insert into an OneRowRelation
         Dataset.ofRows(spark, OneRowRelation()).createOrReplaceTempView("one_row")
-        val e3 = intercept[AnalysisException] {
-          insertion.write.insertInto("one_row")
-        }
-        assert(e3.getMessage.contains("Inserting into an RDD-based table is not allowed."))
+        checkError(
+          exception = intercept[AnalysisException] {
+            insertion.write.insertInto("one_row")
+          },
+          errorClass = "UNSUPPORTED_INSERT.RDD_BASED",
+          parameters = Map.empty
+        )
       }
     }
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala
index bb3125de9c4..98e71362fda 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala
@@ -223,13 +223,14 @@ trait SQLInsertTestSuite extends QueryTest with SQLTestUtils {
         exception = intercept[AnalysisException] {
           processInsert("t1", df2, overwrite = false, byName = true)
         },
-        v1ErrorClass = "INSERT_COLUMN_ARITY_MISMATCH",
-        v2ErrorClass = "INSERT_COLUMN_ARITY_MISMATCH",
+        v1ErrorClass = "INSERT_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS",
+        v2ErrorClass = "INSERT_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS",
         v1Parameters = Map("tableName" -> "`spark_catalog`.`default`.`t1`",
-          "reason" -> "too many data columns", "tableColumns" -> "'c1', 'c2', 'c3'",
-          "dataColumns" -> "'c3', 'c2', 'c1', 'c0'"),
-        v2Parameters = Map("tableName" -> "testcat.t1", "reason" -> "too many data columns",
-          "tableColumns" -> "'c1', 'c2', 'c3'", "dataColumns" -> "'c3', 'c2', 'c1', 'c0'")
+          "tableColumns" -> "`c1`, `c2`, `c3`",
+          "dataColumns" -> "`c3`, `c2`, `c1`, `c0`"),
+        v2Parameters = Map("tableName" -> "`testcat`.`t1`",
+          "tableColumns" -> "`c1`, `c2`, `c3`",
+          "dataColumns" -> "`c3`, `c2`, `c1`, `c0`")
       )
     }
   }
@@ -302,18 +303,24 @@ trait SQLInsertTestSuite extends QueryTest with SQLTestUtils {
             sql(s"INSERT INTO t1 (c1, c2) values(1, 2, 3)")
           },
           sqlState = None,
-          errorClass = "_LEGACY_ERROR_TEMP_1038",
-          parameters = Map("columnSize" -> "2", "outputSize" -> "3"),
-          context = ExpectedContext("values(1, 2, 3)", 24, 38)
+          errorClass = "INSERT_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS",
+          parameters = Map(
+            "tableName" -> ".*`t1`",
+            "tableColumns" -> "`c1`, `c2`",
+            "dataColumns" -> "`col1`, `col2`, `col3`"),
+          matchPVals = true
         )
         checkError(
           exception = intercept[AnalysisException] {
             sql(s"INSERT INTO t1 (c1, c2, c3) values(1, 2)")
           },
           sqlState = None,
-          errorClass = "_LEGACY_ERROR_TEMP_1038",
-          parameters = Map("columnSize" -> "3", "outputSize" -> "2"),
-          context = ExpectedContext("values(1, 2)", 28, 39)
+          errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
+          parameters = Map(
+            "tableName" -> ".*`t1`",
+            "tableColumns" -> "`c1`, `c2`, `c3`",
+            "dataColumns" -> "`col1`, `col2`"),
+          matchPVals = true
         )
       }
     }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala
index 2ff9981c8d9..63bb0148972 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala
@@ -21,6 +21,7 @@ import org.scalatest.BeforeAndAfter
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.catalyst.util.TypeUtils._
 import org.apache.spark.sql.catalyst.util.quoteIdentifier
 import org.apache.spark.sql.internal.SQLConf.{PARTITION_OVERWRITE_MODE, PartitionOverwriteMode}
 import org.apache.spark.sql.test.SharedSparkSession
@@ -123,13 +124,19 @@ abstract class InsertIntoTests(
     val t1 = s"${catalogAndNamespace}tbl"
     sql(s"CREATE TABLE $t1 (id bigint, data string, missing string) USING $v2Format")
     val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data")
-    val exc = intercept[AnalysisException] {
-      doInsert(t1, df)
-    }
 
     verifyTable(t1, Seq.empty[(Long, String, String)].toDF("id", "data", "missing"))
-    val tableName = if (catalogAndNamespace.isEmpty) s"default.$t1" else t1
-    assert(exc.getMessage.contains(s"Cannot write to '$tableName', not enough data columns"))
+    val tableName = if (catalogAndNamespace.isEmpty) toSQLId(s"default.$t1") else toSQLId(t1)
+    checkError(
+      exception = intercept[AnalysisException] {
+        doInsert(t1, df)
+      },
+      errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
+      parameters = Map(
+        "tableName" -> tableName,
+        "tableColumns" -> "`id`, `data`, `missing`",
+        "dataColumns" -> "`id`, `data`")
+    )
   }
 
   test("insertInto: fails when an extra column is present") {
@@ -137,13 +144,18 @@ abstract class InsertIntoTests(
     withTable(t1) {
       sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format")
       val df = Seq((1L, "a", "mango")).toDF("id", "data", "fruit")
-      val exc = intercept[AnalysisException] {
-        doInsert(t1, df)
-      }
-
       verifyTable(t1, Seq.empty[(Long, String)].toDF("id", "data"))
-      val tableName = if (catalogAndNamespace.isEmpty) s"default.$t1" else t1
-      assert(exc.getMessage.contains(s"Cannot write to '$tableName', too many data columns"))
+      val tableName = if (catalogAndNamespace.isEmpty) toSQLId(s"default.$t1") else toSQLId(t1)
+      checkError(
+        exception = intercept[AnalysisException] {
+          doInsert(t1, df)
+        },
+        errorClass = "INSERT_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS",
+        parameters = Map(
+          "tableName" -> tableName,
+          "tableColumns" -> "`id`, `data`",
+          "dataColumns" -> "`id`, `data`, `fruit`")
+      )
     }
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
index af083dc8447..35c6ecb1dcd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
@@ -203,10 +203,13 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils {
     withTempView(viewName) {
       spark.range(10).createTempView(viewName)
 
-      val e = intercept[AnalysisException] {
-        sql(s"INSERT INTO TABLE $viewName SELECT 1")
-      }.getMessage
-      assert(e.contains("Inserting into an RDD-based table is not allowed"))
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql(s"INSERT INTO TABLE $viewName SELECT 1")
+        },
+        errorClass = "UNSUPPORTED_INSERT.RDD_BASED",
+        parameters = Map.empty
+      )
 
       val dataFilePath =
         Thread.currentThread().getContextClassLoader.getResource("data/files/employee.dat")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 05ba4e72fe1..21e6980db8f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -82,10 +82,13 @@ class InMemoryCatalogedDDLSuite extends DDLSuite with SharedSparkSession {
     assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "in-memory")
     val tabName = "tbl"
     withTable(tabName) {
-      val e = intercept[AnalysisException] {
-        sql(s"CREATE TABLE $tabName (i INT, j STRING) STORED AS parquet")
-      }.getMessage
-      assert(e.contains("Hive support is required to CREATE Hive TABLE"))
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql(s"CREATE TABLE $tabName (i INT, j STRING) STORED AS parquet")
+        },
+        errorClass = "NOT_SUPPORTED_COMMAND_WITHOUT_HIVE_SUPPORT",
+        parameters = Map("cmd" -> "CREATE Hive TABLE (AS SELECT)")
+      )
     }
   }
 
@@ -94,15 +97,18 @@ class InMemoryCatalogedDDLSuite extends DDLSuite with SharedSparkSession {
     withTempDir { tempDir =>
       val tabName = "tbl"
       withTable(tabName) {
-        val e = intercept[AnalysisException] {
-          sql(
-            s"""
-               |CREATE EXTERNAL TABLE $tabName (i INT, j STRING)
-               |ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
-               |LOCATION '${tempDir.toURI}'
-             """.stripMargin)
-        }.getMessage
-        assert(e.contains("Hive support is required to CREATE Hive TABLE"))
+        checkError(
+          exception = intercept[AnalysisException] {
+            sql(
+              s"""
+                 |CREATE EXTERNAL TABLE $tabName (i INT, j STRING)
+                 |ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
+                 |LOCATION '${tempDir.toURI}'
+               """.stripMargin)
+          },
+          errorClass = "NOT_SUPPORTED_COMMAND_WITHOUT_HIVE_SUPPORT",
+          parameters = Map("cmd" -> "CREATE Hive TABLE (AS SELECT)")
+        )
       }
     }
   }
@@ -110,16 +116,22 @@ class InMemoryCatalogedDDLSuite extends DDLSuite with SharedSparkSession {
   test("Create Hive Table As Select") {
     import testImplicits._
     withTable("t", "t1") {
-      var e = intercept[AnalysisException] {
-        sql("CREATE TABLE t STORED AS parquet SELECT 1 as a, 1 as b")
-      }.getMessage
-      assert(e.contains("Hive support is required to CREATE Hive TABLE (AS SELECT)"))
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql("CREATE TABLE t STORED AS parquet SELECT 1 as a, 1 as b")
+        },
+        errorClass = "NOT_SUPPORTED_COMMAND_WITHOUT_HIVE_SUPPORT",
+        parameters = Map("cmd" -> "CREATE Hive TABLE (AS SELECT)")
+      )
 
       spark.range(1).select($"id" as Symbol("a"), $"id" as Symbol("b")).write.saveAsTable("t1")
-      e = intercept[AnalysisException] {
-        sql("CREATE TABLE t STORED AS parquet SELECT a, b from t1")
-      }.getMessage
-      assert(e.contains("Hive support is required to CREATE Hive TABLE (AS SELECT)"))
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql("CREATE TABLE t STORED AS parquet SELECT a, b from t1")
+        },
+        errorClass = "NOT_SUPPORTED_COMMAND_WITHOUT_HIVE_SUPPORT",
+        parameters = Map("cmd" -> "CREATE Hive TABLE (AS SELECT)")
+      )
     }
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index f3b18863175..71c3301a0eb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -112,10 +112,47 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
         """.stripMargin)
       sparkContext.parallelize(1 to 10).toDF("a").createOrReplaceTempView("t2")
 
-      val message = intercept[AnalysisException] {
-        sql("INSERT INTO TABLE t1 SELECT a FROM t2")
-      }.getMessage
-      assert(message.contains("does not allow insertion"))
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql("INSERT INTO TABLE t1 SELECT a FROM t2")
+        },
+        errorClass = "UNSUPPORTED_INSERT.NOT_ALLOWED",
+        parameters = Map("relationId" -> "`SimpleScan(1,10)`")
+      )
+    }
+  }
+
+  test("UNSUPPORTED_INSERT.RDD_BASED: Inserting into an RDD-based table is not allowed") {
+    import testImplicits._
+    withTempView("t1") {
+      sparkContext.parallelize(1 to 10).toDF("a").createOrReplaceTempView("t1")
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql("INSERT INTO TABLE t1 SELECT a FROM t1")
+        },
+        errorClass = "UNSUPPORTED_INSERT.RDD_BASED",
+        parameters = Map.empty
+      )
+    }
+  }
+
+  test("UNSUPPORTED_INSERT.READ_FROM: Cannot insert into table that is also being read from") {
+    withTempView("t1") {
+      sql(
+        """
+          |CREATE TEMPORARY VIEW t1
+          |USING org.apache.spark.sql.sources.SimpleScanSource
+          |OPTIONS (
+          |  From '1',
+          |  To '10')
+        """.stripMargin)
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql("INSERT INTO TABLE t1 SELECT * FROM t1")
+        },
+        errorClass = "UNSUPPORTED_INSERT.READ_FROM",
+        parameters = Map("relationId" -> "`SimpleScan(1,10)`")
+      )
     }
   }
 
@@ -376,16 +413,12 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
       (1 to 10).map(Row(_)).toSeq
     )
 
-    val message = intercept[AnalysisException] {
-      sql(
-        s"""
-        |INSERT OVERWRITE TABLE oneToTen SELECT CAST(a AS INT) FROM jt
-        """.stripMargin)
-    }.getMessage
-    assert(
-      message.contains("does not allow insertion."),
-      "It is not allowed to insert into a table that is not an InsertableRelation."
-    )
+    checkError(
+      exception = intercept[AnalysisException] {
+        sql("INSERT OVERWRITE TABLE oneToTen SELECT CAST(a AS INT) FROM jt")
+      },
+      errorClass = "UNSUPPORTED_INSERT.NOT_ALLOWED",
+      parameters = Map("relationId" -> "`SimpleScan(1,10)`"))
 
     spark.catalog.dropTempView("oneToTen")
   }
@@ -484,16 +517,18 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
   test("Insert overwrite directory using Hive serde without turning on Hive support") {
     withTempDir { dir =>
       val path = dir.toURI.getPath
-      val e = intercept[AnalysisException] {
-        sql(
-          s"""
-             |INSERT OVERWRITE LOCAL DIRECTORY '$path'
-             |STORED AS orc
-             |SELECT 1, 2
-           """.stripMargin)
-      }.getMessage
-      assert(e.contains(
-        "Hive support is required to INSERT OVERWRITE DIRECTORY with the Hive format"))
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql(
+            s"""
+               |INSERT OVERWRITE LOCAL DIRECTORY '$path'
+               |STORED AS orc
+               |SELECT 1, 2
+             """.stripMargin)
+        },
+        errorClass = "NOT_SUPPORTED_COMMAND_WITHOUT_HIVE_SUPPORT",
+        parameters = Map("cmd" -> "INSERT OVERWRITE DIRECTORY with the Hive format")
+      )
     }
   }
 
@@ -639,12 +674,11 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
           exception = intercept[AnalysisException] {
             sql("insert into t select 1, 2.0D, 3")
           },
-          errorClass = "INSERT_COLUMN_ARITY_MISMATCH",
+          errorClass = "INSERT_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS",
           parameters = Map(
             "tableName" -> "`spark_catalog`.`default`.`t`",
-            "reason" -> "too many data columns",
-            "tableColumns" -> "'i', 'd'",
-            "dataColumns" -> "'1', '2.0', '3'"))
+            "tableColumns" -> "`i`, `d`",
+            "dataColumns" -> "`1`, `2`.`0`, `3`"))
 
         // Insert into table successfully.
         sql("insert into t select 1, 2.0D")
@@ -937,12 +971,11 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
         exception = intercept[AnalysisException] {
           sql("INSERT OVERWRITE TABLE jsonTable SELECT a FROM jt")
         },
-        errorClass = "INSERT_COLUMN_ARITY_MISMATCH",
+        errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
         parameters = Map(
-          "tableName" -> "unknown",
-          "reason" -> "not enough data columns",
-          "tableColumns" -> "'a', 'b'",
-          "dataColumns" -> "'a'"))
+          "tableName" -> "`unknown`",
+          "tableColumns" -> "`a`, `b`",
+          "dataColumns" -> "`a`"))
     }
   }
 
@@ -1208,12 +1241,11 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
           sql("insert into t select t1.id, t2.id, t1.val, t2.val, t1.val * t2.val " +
             "from num_data t1, num_data t2")
         },
-        errorClass = "INSERT_COLUMN_ARITY_MISMATCH",
+        errorClass = "INSERT_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS",
         parameters = Map(
           "tableName" -> "`spark_catalog`.`default`.`t`",
-          "reason" -> "too many data columns",
-          "tableColumns" -> "'id1', 'int2', 'result'",
-          "dataColumns" -> "'id', 'id', 'val', 'val', '(val * val)'"))
+          "tableColumns" -> "`id1`, `int2`, `result`",
+          "dataColumns" -> "`id`, `id`, `val`, `val`, `(val * val)`"))
     }
     // The default value is disabled per configuration.
     withTable("t") {
@@ -1252,12 +1284,11 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
           exception = intercept[AnalysisException] {
             sql("insert into t values(true)")
           },
-          errorClass = "INSERT_COLUMN_ARITY_MISMATCH",
+          errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
           parameters = Map(
             "tableName" -> "`spark_catalog`.`default`.`t`",
-            "reason" -> "not enough data columns",
-            "tableColumns" -> "'i', 's'",
-            "dataColumns" -> "'col1'"))
+            "tableColumns" -> "`i`, `s`",
+            "dataColumns" -> "`col1`"))
       }
     }
   }
@@ -1327,10 +1358,11 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
         exception = intercept[AnalysisException] {
           sql("insert into t (i, q) select true from (select 1)")
         },
-        errorClass = "_LEGACY_ERROR_TEMP_1038",
-        parameters = Map("columnSize" -> "2", "outputSize" -> "1"),
-        ExpectedContext(
-          fragment = "select true from (select 1)", start = 21, stop = 47))
+        errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
+        parameters = Map(
+          "tableName" -> "`spark_catalog`.`default`.`t`",
+          "tableColumns" -> "`i`, `q`",
+          "dataColumns" -> "`true`"))
     }
     // When the USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES configuration is disabled, and no
     // explicit DEFAULT value is available when the INSERT INTO statement provides fewer
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
index 90f371c7ec7..d3a9a9f0841 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
@@ -692,7 +692,7 @@ class CliSuite extends SparkFunSuite {
         // names.
         runCliWithin(1.minute, extraArgs = extraConf)(
           "create table src(key int) using hive;" ->
-            "Hive support is required to CREATE Hive TABLE",
+            "NOT_SUPPORTED_COMMAND_WITHOUT_HIVE_SUPPORT",
           "create table src(key int) using parquet;" -> "")
         cd.countDown()
       }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
index 620ce88df19..adf0db957e4 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
@@ -39,7 +39,7 @@ case class TestData(key: Int, value: String)
 case class ThreeColumnTable(key: Int, value: String, key1: String)
 
 class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter
-    with SQLTestUtils  with PrivateMethodTester  {
+    with SQLTestUtils with PrivateMethodTester {
   import spark.implicits._
 
   override lazy val testData = spark.sparkContext.parallelize(
@@ -364,11 +364,10 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter
         },
         errorClass = "INSERT_PARTITION_COLUMN_ARITY_MISMATCH",
         parameters = Map(
-          "staticPartCols" -> "'b', 'c'",
-          "tableColumns" -> "'a', 'd', 'b', 'c'",
-          "reason" -> "too many data columns",
-          "dataColumns" -> "'1', '2', '3'",
-          "tableName" -> s"`spark_catalog`.`default`.`$tableName`")
+          "staticPartCols" -> "`b`, `c`",
+          "tableColumns" -> "`a`, `d`, `b`, `c`",
+          "dataColumns" -> "`1`, `2`, `3`",
+          "tableName" -> s"`spark_catalog`.`default`.`${tableName}`")
       )
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org