You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2023/06/30 15:37:02 UTC

[spark] branch master updated: [SPARK-44260][SQL] Assign names to the error class _LEGACY_ERROR_TEMP_[1215-1245-2329] & Use checkError() to check Exception in *CharVarchar*Suite

This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 3fb9a2c6135 [SPARK-44260][SQL] Assign names to the error class _LEGACY_ERROR_TEMP_[1215-1245-2329] & Use checkError() to check Exception in *CharVarchar*Suite
3fb9a2c6135 is described below

commit 3fb9a2c6135d49cc7b80546c0f228d7d2bc78bf6
Author: panbingkun <pb...@gmail.com>
AuthorDate: Fri Jun 30 18:36:46 2023 +0300

    [SPARK-44260][SQL] Assign names to the error class _LEGACY_ERROR_TEMP_[1215-1245-2329] & Use checkError() to check Exception in *CharVarchar*Suite
    
    ### What changes were proposed in this pull request?
    The pr aims to:
    1.Assign clear error class names for some logic in `CharVarcharCodegenUtils` that directly uses exceptions
    - EXCEED_LIMIT_LENGTH
    
    2.Assign names to the error class
    - _LEGACY_ERROR_TEMP_1215 -> UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING
    - _LEGACY_ERROR_TEMP_1245 -> NOT_SUPPORTED_CHANGE_COLUMN
    - _LEGACY_ERROR_TEMP_2329  -> merge to NOT_SUPPORTED_CHANGE_COLUMN(_LEGACY_ERROR_TEMP_1245)
    
    3.Use checkError() to check Exception in `*CharVarchar*Suite`
    
    ### Why are the changes needed?
    The changes improve the error framework.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    - Update UT
    - Pass GA
    - Manually test.
    
    Closes #41768 from panbingkun/CharVarchar_checkError.
    
    Authored-by: panbingkun <pb...@gmail.com>
    Signed-off-by: Max Gekk <ma...@gmail.com>
---
 .../src/main/resources/error/error-classes.json    |  30 +--
 .../spark/sql/jdbc/v2/DB2IntegrationSuite.scala    |  19 +-
 .../sql/jdbc/v2/MsSqlServerIntegrationSuite.scala  |  19 +-
 .../spark/sql/jdbc/v2/MySQLIntegrationSuite.scala  |  19 +-
 .../spark/sql/jdbc/v2/OracleIntegrationSuite.scala |  19 +-
 .../sql/jdbc/v2/PostgresIntegrationSuite.scala     |  19 +-
 .../sql/catalyst/util/CharVarcharCodegenUtils.java |   3 +-
 .../sql/catalyst/analysis/CheckAnalysis.scala      |  11 +-
 .../spark/sql/errors/QueryCompilationErrors.scala  |  17 +-
 .../spark/sql/errors/QueryExecutionErrors.scala    |   7 +
 .../apache/spark/sql/execution/command/ddl.scala   |   3 +-
 .../analyzer-results/change-column.sql.out         |  11 +-
 .../sql-tests/analyzer-results/charvarchar.sql.out |  11 +-
 .../sql-tests/results/change-column.sql.out        |  11 +-
 .../sql-tests/results/charvarchar.sql.out          |  11 +-
 .../apache/spark/sql/CharVarcharTestSuite.scala    | 291 +++++++++++++++++----
 .../spark/sql/connector/AlterTableTests.scala      |  25 +-
 .../execution/command/CharVarcharDDLTestBase.scala | 120 +++++++--
 .../spark/sql/HiveCharVarcharTestSuite.scala       |  12 +-
 19 files changed, 494 insertions(+), 164 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json
index 1b2a1ce305a..db6b9a97012 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -680,6 +680,11 @@
       "The event time <eventName> has the invalid type <eventType>, but expected \"TIMESTAMP\"."
     ]
   },
+  "EXCEED_LIMIT_LENGTH" : {
+    "message" : [
+      "Exceeds char/varchar type length limitation: <limit>."
+    ]
+  },
   "EXPRESSION_TYPE_IS_NOT_ORDERABLE" : {
     "message" : [
       "Column expression <expr> cannot be sorted because its type <exprType> is not orderable."
@@ -1817,6 +1822,11 @@
     },
     "sqlState" : "42000"
   },
+  "NOT_SUPPORTED_CHANGE_COLUMN" : {
+    "message" : [
+      "ALTER TABLE ALTER/CHANGE COLUMN is not supported for changing <table>'s column <originName> with type <originType> to <newName> with type <newType>."
+    ]
+  },
   "NOT_SUPPORTED_COMMAND_FOR_V2_TABLE" : {
     "message" : [
       "<cmd> is not supported for v2 tables."
@@ -2351,6 +2361,11 @@
     ],
     "sqlState" : "0A000"
   },
+  "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING" : {
+    "message" : [
+      "The char/varchar type can't be used in the table schema. If you want Spark treat them as string type as same as Spark 3.0 and earlier, please set \"spark.sql.legacy.charVarcharAsString\" to \"true\"."
+    ]
+  },
   "UNSUPPORTED_DATASOURCE_FOR_DIRECT_QUERY" : {
     "message" : [
       "Unsupported data source type for direct query on files: <dataSourceType>"
@@ -3875,11 +3890,6 @@
       "Found different window function type in <windowExpressions>."
     ]
   },
-  "_LEGACY_ERROR_TEMP_1215" : {
-    "message" : [
-      "char/varchar type can only be used in the table schema. You can set <config> to true, so that Spark treat them as string type as same as Spark 3.0 and earlier."
-    ]
-  },
   "_LEGACY_ERROR_TEMP_1218" : {
     "message" : [
       "<tableIdentifier> should be converted to HadoopFsRelation."
@@ -3955,11 +3965,6 @@
       "CREATE-TABLE-AS-SELECT cannot create table with location to a non-empty directory <tablePath>. To allow overwriting the existing non-empty directory, set '<config>' to true."
     ]
   },
-  "_LEGACY_ERROR_TEMP_1245" : {
-    "message" : [
-      "ALTER TABLE CHANGE COLUMN is not supported for changing column '<originName>' with type '<originType>' to '<newName>' with type '<newType>'."
-    ]
-  },
   "_LEGACY_ERROR_TEMP_1246" : {
     "message" : [
       "Can't find column `<name>` given table data columns <fieldNames>."
@@ -5678,11 +5683,6 @@
       "Cannot update <table> field <fieldName> to interval type."
     ]
   },
-  "_LEGACY_ERROR_TEMP_2329" : {
-    "message" : [
-      "Cannot update <table> field <fieldName>: <oldType> cannot be cast to <newType>."
-    ]
-  },
   "_LEGACY_ERROR_TEMP_2330" : {
     "message" : [
       "Cannot change nullable column to non-nullable: <fieldName>."
diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/DB2IntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/DB2IntegrationSuite.scala
index 291276c1981..661b1277e9f 100644
--- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/DB2IntegrationSuite.scala
+++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/DB2IntegrationSuite.scala
@@ -92,11 +92,20 @@ class DB2IntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTest {
     expectedSchema = new StructType().add("ID", DoubleType, true, defaultMetadata)
     assert(t.schema === expectedSchema)
     // Update column type from DOUBLE to STRING
-    val msg1 = intercept[AnalysisException] {
-      sql(s"ALTER TABLE $tbl ALTER COLUMN id TYPE VARCHAR(10)")
-    }.getMessage
-    assert(msg1.contains(
-      s"Cannot update $catalogName.alt_table field ID: double cannot be cast to varchar"))
+    val sql1 = s"ALTER TABLE $tbl ALTER COLUMN id TYPE VARCHAR(10)"
+    checkError(
+      exception = intercept[AnalysisException] {
+        sql(sql1)
+      },
+      errorClass = "NOT_SUPPORTED_CHANGE_COLUMN",
+      parameters = Map(
+        "originType" -> "\"DOUBLE\"",
+        "newType" -> "\"VARCHAR(10)\"",
+        "newName" -> "`ID`",
+        "originName" -> "`ID`",
+        "table" -> s"`$catalogName`.`alt_table`"),
+      context = ExpectedContext(fragment = sql1, start = 0, stop = 57)
+    )
   }
 
   override def testCreateTableWithProperty(tbl: String): Unit = {
diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MsSqlServerIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MsSqlServerIntegrationSuite.scala
index 9e7279ecaf1..fc93f5cba4c 100644
--- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MsSqlServerIntegrationSuite.scala
+++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MsSqlServerIntegrationSuite.scala
@@ -100,11 +100,20 @@ class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JD
     expectedSchema = new StructType().add("ID", StringType, true, defaultMetadata)
     assert(t.schema === expectedSchema)
     // Update column type from STRING to INTEGER
-    val msg1 = intercept[AnalysisException] {
-      sql(s"ALTER TABLE $tbl ALTER COLUMN id TYPE INTEGER")
-    }.getMessage
-    assert(msg1.contains(
-      s"Cannot update $catalogName.alt_table field ID: string cannot be cast to int"))
+    val sql1 = s"ALTER TABLE $tbl ALTER COLUMN id TYPE INTEGER"
+    checkError(
+      exception = intercept[AnalysisException] {
+        sql(sql1)
+      },
+      errorClass = "NOT_SUPPORTED_CHANGE_COLUMN",
+      parameters = Map(
+        "originType" -> "\"STRING\"",
+        "newType" -> "\"INT\"",
+        "newName" -> "`ID`",
+        "originName" -> "`ID`",
+        "table" -> s"`$catalogName`.`alt_table`"),
+      context = ExpectedContext(fragment = sql1, start = 0, stop = 55)
+    )
   }
 
   override def testUpdateColumnNullability(tbl: String): Unit = {
diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
index 23bb95b2b7c..5e340f135c8 100644
--- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
+++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
@@ -100,11 +100,20 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTest
     expectedSchema = new StructType().add("ID", StringType, true, defaultMetadata)
     assert(t.schema === expectedSchema)
     // Update column type from STRING to INTEGER
-    val msg1 = intercept[AnalysisException] {
-      sql(s"ALTER TABLE $tbl ALTER COLUMN id TYPE INTEGER")
-    }.getMessage
-    assert(msg1.contains(
-      s"Cannot update $catalogName.alt_table field ID: string cannot be cast to int"))
+    val sql1 = s"ALTER TABLE $tbl ALTER COLUMN id TYPE INTEGER"
+    checkError(
+      exception = intercept[AnalysisException] {
+        sql(sql1)
+      },
+      errorClass = "NOT_SUPPORTED_CHANGE_COLUMN",
+      parameters = Map(
+        "originType" -> "\"STRING\"",
+        "newType" -> "\"INT\"",
+        "newName" -> "`ID`",
+        "originName" -> "`ID`",
+        "table" -> s"`$catalogName`.`alt_table`"),
+      context = ExpectedContext(fragment = sql1, start = 0, stop = 55)
+    )
   }
 
   override def testRenameColumn(tbl: String): Unit = {
diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/OracleIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/OracleIntegrationSuite.scala
index 966083615ea..5124199328c 100644
--- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/OracleIntegrationSuite.scala
+++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/OracleIntegrationSuite.scala
@@ -111,11 +111,20 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTes
     expectedSchema = new StructType().add("ID", DecimalType(19, 0), true, defaultMetadata)
     assert(t.schema === expectedSchema)
     // Update column type from LONG to INTEGER
-    val msg1 = intercept[AnalysisException] {
-      sql(s"ALTER TABLE $tbl ALTER COLUMN id TYPE INTEGER")
-    }.getMessage
-    assert(msg1.contains(
-      s"Cannot update $catalogName.alt_table field ID: decimal(19,0) cannot be cast to int"))
+    val sql1 = s"ALTER TABLE $tbl ALTER COLUMN id TYPE INTEGER"
+    checkError(
+      exception = intercept[AnalysisException] {
+        sql(sql1)
+      },
+      errorClass = "NOT_SUPPORTED_CHANGE_COLUMN",
+      parameters = Map(
+        "originType" -> "\"DECIMAL(19,0)\"",
+        "newType" -> "\"INT\"",
+        "newName" -> "`ID`",
+        "originName" -> "`ID`",
+        "table" -> s"`$catalogName`.`alt_table`"),
+      context = ExpectedContext(fragment = sql1, start = 0, stop = 56)
+    )
   }
 
   override def caseConvert(tableName: String): String = tableName.toUpperCase(Locale.ROOT)
diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala
index 9a6d35b46da..85e85f8bf38 100644
--- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala
+++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala
@@ -71,11 +71,20 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT
     expectedSchema = new StructType().add("ID", StringType, true, defaultMetadata)
     assert(t.schema === expectedSchema)
     // Update column type from STRING to INTEGER
-    val msg = intercept[AnalysisException] {
-      sql(s"ALTER TABLE $tbl ALTER COLUMN id TYPE INTEGER")
-    }.getMessage
-    assert(msg.contains(
-      s"Cannot update $catalogName.alt_table field ID: string cannot be cast to int"))
+    val sql1 = s"ALTER TABLE $tbl ALTER COLUMN id TYPE INTEGER"
+    checkError(
+      exception = intercept[AnalysisException] {
+        sql(sql1)
+      },
+      errorClass = "NOT_SUPPORTED_CHANGE_COLUMN",
+      parameters = Map(
+        "originType" -> "\"STRING\"",
+        "newType" -> "\"INT\"",
+        "newName" -> "`ID`",
+        "originName" -> "`ID`",
+        "table" -> s"`$catalogName`.`alt_table`"),
+      context = ExpectedContext(fragment = sql1, start = 0, stop = 60)
+    )
   }
 
   override def testCreateTableWithProperty(tbl: String): Unit = {
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/CharVarcharCodegenUtils.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/CharVarcharCodegenUtils.java
index 582b697c92a..1e183b9be59 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/CharVarcharCodegenUtils.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/util/CharVarcharCodegenUtils.java
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.catalyst.util;
 
+import org.apache.spark.sql.errors.QueryExecutionErrors;
 import org.apache.spark.unsafe.types.UTF8String;
 
 public class CharVarcharCodegenUtils {
@@ -27,7 +28,7 @@ public class CharVarcharCodegenUtils {
     int numTailSpacesToTrim = numChars - limit;
     UTF8String trimmed = inputStr.trimTrailingSpaces(numTailSpacesToTrim);
     if (trimmed.numChars() > limit) {
-      throw new RuntimeException("Exceeds char/varchar type length limitation: " + limit);
+      throw QueryExecutionErrors.exceedMaxLimit(limit);
     } else {
       return trimmed;
     }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index a0296d27361..2b4753a027d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -1450,12 +1450,13 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB
 
           if (!canAlterColumnType(field.dataType, newDataType)) {
             alter.failAnalysis(
-              errorClass = "_LEGACY_ERROR_TEMP_2329",
+              errorClass = "NOT_SUPPORTED_CHANGE_COLUMN",
               messageParameters = Map(
-                "table" -> table.name,
-                "fieldName" -> fieldName,
-                "oldType" -> field.dataType.simpleString,
-                "newType" -> newDataType.simpleString))
+                "table" -> toSQLId(table.name),
+                "originName" -> toSQLId(fieldName),
+                "originType" -> toSQLType(field.dataType),
+                "newName" -> toSQLId(fieldName),
+                "newType" -> toSQLType(newDataType)))
           }
         }
         if (a.nullable.isDefined) {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index be296f4a87c..94cbf880b57 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -2197,8 +2197,8 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
 
   def charOrVarcharTypeAsStringUnsupportedError(): Throwable = {
     new AnalysisException(
-      errorClass = "_LEGACY_ERROR_TEMP_1215",
-      messageParameters = Map("config" -> SQLConf.LEGACY_CHAR_VARCHAR_AS_STRING.key))
+      errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING",
+      messageParameters = Map.empty)
   }
 
   def escapeCharacterInTheMiddleError(pattern: String, char: String): Throwable = {
@@ -2420,15 +2420,17 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
   }
 
   def alterTableChangeColumnNotSupportedForColumnTypeError(
+      tableName: String,
       originColumn: StructField,
       newColumn: StructField): Throwable = {
     new AnalysisException(
-      errorClass = "_LEGACY_ERROR_TEMP_1245",
+      errorClass = "NOT_SUPPORTED_CHANGE_COLUMN",
       messageParameters = Map(
-        "originName" -> originColumn.name,
-        "originType" -> originColumn.dataType.toString,
-        "newName" -> newColumn.name,
-        "newType"-> newColumn.dataType.toString))
+        "table" -> tableName,
+        "originName" -> toSQLId(originColumn.name),
+        "originType" -> toSQLType(originColumn.dataType),
+        "newName" -> toSQLId(newColumn.name),
+        "newType"-> toSQLType(newColumn.dataType)))
   }
 
   def cannotFindColumnError(name: String, fieldNames: Array[String]): Throwable = {
@@ -2437,7 +2439,6 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
       messageParameters = Map(
         "name" -> name,
         "fieldNames" -> fieldNames.mkString("[`", "`, `", "`]")))
-
   }
 
   def alterTableSetSerdeForSpecificPartitionNotSupportedError(): Throwable = {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index cfaf39cb7ef..c31d01162c5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -2756,6 +2756,13 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase {
       messageParameters = Map("namespace" -> toSQLId(namespace)))
   }
 
+  def exceedMaxLimit(limit: Int): SparkRuntimeException = {
+    new SparkRuntimeException(
+      errorClass = "EXCEED_LIMIT_LENGTH",
+      messageParameters = Map("limit" -> limit.toString)
+    )
+  }
+
   def timestampAddOverflowError(micros: Long, amount: Int, unit: String): ArithmeticException = {
     new SparkArithmeticException(
       errorClass = "DATETIME_OVERFLOW",
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index b4c98108efa..8acf52b1250 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -38,6 +38,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns
+import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLId
 import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, TableCatalog}
 import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
 import org.apache.spark.sql.connector.catalog.SupportsNamespaces._
@@ -380,7 +381,7 @@ case class AlterTableChangeColumnCommand(
     // Throw an AnalysisException if the column name/dataType is changed.
     if (!columnEqual(originColumn, newColumn, resolver)) {
       throw QueryCompilationErrors.alterTableChangeColumnNotSupportedForColumnTypeError(
-        originColumn, newColumn)
+        toSQLId(table.identifier.nameParts), originColumn, newColumn)
     }
 
     val newDataSchema = table.dataSchema.fields.map { field =>
diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/change-column.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/change-column.sql.out
index 0a1b745531c..94dcd99bbd6 100644
--- a/sql/core/src/test/resources/sql-tests/analyzer-results/change-column.sql.out
+++ b/sql/core/src/test/resources/sql-tests/analyzer-results/change-column.sql.out
@@ -61,12 +61,13 @@ ALTER TABLE test_change CHANGE a TYPE STRING
 -- !query analysis
 org.apache.spark.sql.AnalysisException
 {
-  "errorClass" : "_LEGACY_ERROR_TEMP_1245",
+  "errorClass" : "NOT_SUPPORTED_CHANGE_COLUMN",
   "messageParameters" : {
-    "newName" : "a",
-    "newType" : "StringType",
-    "originName" : "a",
-    "originType" : "IntegerType"
+    "newName" : "`a`",
+    "newType" : "\"STRING\"",
+    "originName" : "`a`",
+    "originType" : "\"INT\"",
+    "table" : "`spark_catalog`.`default`.`test_change`"
   }
 }
 
diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/charvarchar.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/charvarchar.sql.out
index 61bac3e4d14..6e72fd28686 100644
--- a/sql/core/src/test/resources/sql-tests/analyzer-results/charvarchar.sql.out
+++ b/sql/core/src/test/resources/sql-tests/analyzer-results/charvarchar.sql.out
@@ -125,12 +125,13 @@ alter table char_tbl1 change column c type char(6)
 -- !query analysis
 org.apache.spark.sql.AnalysisException
 {
-  "errorClass" : "_LEGACY_ERROR_TEMP_1245",
+  "errorClass" : "NOT_SUPPORTED_CHANGE_COLUMN",
   "messageParameters" : {
-    "newName" : "c",
-    "newType" : "CharType(6)",
-    "originName" : "c",
-    "originType" : "CharType(5)"
+    "newName" : "`c`",
+    "newType" : "\"CHAR(6)\"",
+    "originName" : "`c`",
+    "originType" : "\"CHAR(5)\"",
+    "table" : "`spark_catalog`.`default`.`char_tbl1`"
   }
 }
 
diff --git a/sql/core/src/test/resources/sql-tests/results/change-column.sql.out b/sql/core/src/test/resources/sql-tests/results/change-column.sql.out
index dbcf5f7fcbb..5937f52c468 100644
--- a/sql/core/src/test/resources/sql-tests/results/change-column.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/change-column.sql.out
@@ -81,12 +81,13 @@ struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
 {
-  "errorClass" : "_LEGACY_ERROR_TEMP_1245",
+  "errorClass" : "NOT_SUPPORTED_CHANGE_COLUMN",
   "messageParameters" : {
-    "newName" : "a",
-    "newType" : "StringType",
-    "originName" : "a",
-    "originType" : "IntegerType"
+    "newName" : "`a`",
+    "newType" : "\"STRING\"",
+    "originName" : "`a`",
+    "originType" : "\"INT\"",
+    "table" : "`spark_catalog`.`default`.`test_change`"
   }
 }
 
diff --git a/sql/core/src/test/resources/sql-tests/results/charvarchar.sql.out b/sql/core/src/test/resources/sql-tests/results/charvarchar.sql.out
index d248ed405c2..54cdc1e3528 100644
--- a/sql/core/src/test/resources/sql-tests/results/charvarchar.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/charvarchar.sql.out
@@ -260,12 +260,13 @@ struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
 {
-  "errorClass" : "_LEGACY_ERROR_TEMP_1245",
+  "errorClass" : "NOT_SUPPORTED_CHANGE_COLUMN",
   "messageParameters" : {
-    "newName" : "c",
-    "newType" : "CharType(6)",
-    "originName" : "c",
-    "originType" : "CharType(5)"
+    "newName" : "`c`",
+    "newType" : "\"CHAR(6)\"",
+    "originName" : "`c`",
+    "originType" : "\"CHAR(5)\"",
+    "table" : "`spark_catalog`.`default`.`char_tbl1`"
   }
 }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
index a6c310cd925..40b31e5850f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql
 
-import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.{SparkConf, SparkException, SparkRuntimeException}
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.catalyst.plans.logical.Project
@@ -147,17 +147,28 @@ trait CharVarcharTestSuite extends QueryTest with SQLTestUtils {
       withTable("t") {
         sql(s"CREATE TABLE t(i STRING, c $typ) USING $format PARTITIONED BY (c)")
         Seq("ADD", "DROP").foreach { op =>
-          val e = intercept[RuntimeException](sql(s"ALTER TABLE t $op PARTITION(c='abcdef')"))
-          assert(e.getMessage.contains("Exceeds char/varchar type length limitation: 5"))
+          checkError(
+            exception = intercept[SparkRuntimeException] {
+              sql(s"ALTER TABLE t $op PARTITION(c='abcdef')")
+            },
+            errorClass = "EXCEED_LIMIT_LENGTH",
+            parameters = Map("limit" -> "5")
+          )
         }
-        val e1 = intercept[RuntimeException] {
-          sql(s"ALTER TABLE t PARTITION (c='abcdef') RENAME TO PARTITION (c='2')")
-        }
-        assert(e1.getMessage.contains("Exceeds char/varchar type length limitation: 5"))
-        val e2 = intercept[RuntimeException] {
-          sql(s"ALTER TABLE t PARTITION (c='1') RENAME TO PARTITION (c='abcdef')")
-        }
-        assert(e2.getMessage.contains("Exceeds char/varchar type length limitation: 5"))
+        checkError(
+          exception = intercept[SparkRuntimeException] {
+            sql(s"ALTER TABLE t PARTITION (c='abcdef') RENAME TO PARTITION (c='2')")
+          },
+          errorClass = "EXCEED_LIMIT_LENGTH",
+          parameters = Map("limit" -> "5")
+        )
+        checkError(
+          exception = intercept[SparkRuntimeException] {
+            sql(s"ALTER TABLE t PARTITION (c='1') RENAME TO PARTITION (c='abcdef')")
+          },
+          errorClass = "EXCEED_LIMIT_LENGTH",
+          parameters = Map("limit" -> "5")
+        )
       }
     }
   }
@@ -304,8 +315,17 @@ trait CharVarcharTestSuite extends QueryTest with SQLTestUtils {
       sql(s"CREATE TABLE t(c $typeName(5)) USING $format")
       sql("INSERT INTO t VALUES (null)")
       checkAnswer(spark.table("t"), Row(null))
-      val e = intercept[SparkException](sql("INSERT INTO t VALUES ('123456')"))
-      assert(e.getMessage.contains(s"Exceeds char/varchar type length limitation: 5"))
+      val e = intercept[SparkException] {
+        sql("INSERT INTO t VALUES ('123456')")
+      }
+      checkError(
+        exception = e.getCause match {
+          case c: SparkRuntimeException => c
+          case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException]
+        },
+        errorClass = "EXCEED_LIMIT_LENGTH",
+        parameters = Map("limit" -> "5")
+      )
     }
   }
 
@@ -333,8 +353,13 @@ trait CharVarcharTestSuite extends QueryTest with SQLTestUtils {
       sql(s"CREATE TABLE t(c STRUCT<c: $typeName(5)>) USING $format")
       sql("INSERT INTO t SELECT struct(null)")
       checkAnswer(spark.table("t"), Row(Row(null)))
-      val e = intercept[RuntimeException](sql("INSERT INTO t SELECT struct('123456')"))
-      assert(e.getMessage.contains(s"Exceeds char/varchar type length limitation: 5"))
+      checkError(
+        exception = intercept[SparkRuntimeException] {
+          sql("INSERT INTO t SELECT struct('123456')")
+        },
+        errorClass = "EXCEED_LIMIT_LENGTH",
+        parameters = Map("limit" -> "5")
+      )
     }
   }
 
@@ -343,8 +368,17 @@ trait CharVarcharTestSuite extends QueryTest with SQLTestUtils {
       sql(s"CREATE TABLE t(c ARRAY<$typeName(5)>) USING $format")
       sql("INSERT INTO t VALUES (array(null))")
       checkAnswer(spark.table("t"), Row(Seq(null)))
-      val e = intercept[SparkException](sql("INSERT INTO t VALUES (array('a', '123456'))"))
-      assert(e.getMessage.contains(s"Exceeds char/varchar type length limitation: 5"))
+      val e = intercept[SparkException] {
+        sql("INSERT INTO t VALUES (array('a', '123456'))")
+      }
+      checkError(
+        exception = e.getCause match {
+          case c: SparkRuntimeException => c
+          case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException]
+        },
+        errorClass = "EXCEED_LIMIT_LENGTH",
+        parameters = Map("limit" -> "5")
+      )
     }
   }
 
@@ -352,7 +386,14 @@ trait CharVarcharTestSuite extends QueryTest with SQLTestUtils {
     testTableWrite { typeName =>
       sql(s"CREATE TABLE t(c MAP<$typeName(5), STRING>) USING $format")
       val e = intercept[SparkException](sql("INSERT INTO t VALUES (map('123456', 'a'))"))
-      assert(e.getMessage.contains(s"Exceeds char/varchar type length limitation: 5"))
+      checkError(
+        exception = e.getCause match {
+          case c: SparkRuntimeException => c
+          case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException]
+        },
+        errorClass = "EXCEED_LIMIT_LENGTH",
+        parameters = Map("limit" -> "5")
+      )
     }
   }
 
@@ -362,7 +403,14 @@ trait CharVarcharTestSuite extends QueryTest with SQLTestUtils {
       sql("INSERT INTO t VALUES (map('a', null))")
       checkAnswer(spark.table("t"), Row(Map("a" -> null)))
       val e = intercept[SparkException](sql("INSERT INTO t VALUES (map('a', '123456'))"))
-      assert(e.getMessage.contains(s"Exceeds char/varchar type length limitation: 5"))
+      checkError(
+        exception = e.getCause match {
+          case c: SparkRuntimeException => c
+          case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException]
+        },
+        errorClass = "EXCEED_LIMIT_LENGTH",
+        parameters = Map("limit" -> "5")
+      )
     }
   }
 
@@ -370,9 +418,23 @@ trait CharVarcharTestSuite extends QueryTest with SQLTestUtils {
     testTableWrite { typeName =>
       sql(s"CREATE TABLE t(c MAP<$typeName(5), $typeName(5)>) USING $format")
       val e1 = intercept[SparkException](sql("INSERT INTO t VALUES (map('123456', 'a'))"))
-      assert(e1.getMessage.contains(s"Exceeds char/varchar type length limitation: 5"))
+      checkError(
+        exception = e1.getCause match {
+          case c: SparkRuntimeException => c
+          case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException]
+        },
+        errorClass = "EXCEED_LIMIT_LENGTH",
+        parameters = Map("limit" -> "5")
+      )
       val e2 = intercept[SparkException](sql("INSERT INTO t VALUES (map('a', '123456'))"))
-      assert(e2.getMessage.contains(s"Exceeds char/varchar type length limitation: 5"))
+      checkError(
+        exception = e2.getCause match {
+          case c: SparkRuntimeException => c
+          case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException]
+        },
+        errorClass = "EXCEED_LIMIT_LENGTH",
+        parameters = Map("limit" -> "5")
+      )
     }
   }
 
@@ -382,7 +444,14 @@ trait CharVarcharTestSuite extends QueryTest with SQLTestUtils {
       sql("INSERT INTO t SELECT struct(array(null))")
       checkAnswer(spark.table("t"), Row(Row(Seq(null))))
       val e = intercept[SparkException](sql("INSERT INTO t SELECT struct(array('123456'))"))
-      assert(e.getMessage.contains(s"Exceeds char/varchar type length limitation: 5"))
+      checkError(
+        exception = e.getCause match {
+          case c: SparkRuntimeException => c
+          case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException]
+        },
+        errorClass = "EXCEED_LIMIT_LENGTH",
+        parameters = Map("limit" -> "5")
+      )
     }
   }
 
@@ -392,7 +461,14 @@ trait CharVarcharTestSuite extends QueryTest with SQLTestUtils {
       sql("INSERT INTO t VALUES (array(struct(null)))")
       checkAnswer(spark.table("t"), Row(Seq(Row(null))))
       val e = intercept[SparkException](sql("INSERT INTO t VALUES (array(struct('123456')))"))
-      assert(e.getMessage.contains(s"Exceeds char/varchar type length limitation: 5"))
+      checkError(
+        exception = e.getCause match {
+          case c: SparkRuntimeException => c
+          case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException]
+        },
+        errorClass = "EXCEED_LIMIT_LENGTH",
+        parameters = Map("limit" -> "5")
+      )
     }
   }
 
@@ -402,7 +478,14 @@ trait CharVarcharTestSuite extends QueryTest with SQLTestUtils {
       sql("INSERT INTO t VALUES (array(array(null)))")
       checkAnswer(spark.table("t"), Row(Seq(Seq(null))))
       val e = intercept[SparkException](sql("INSERT INTO t VALUES (array(array('123456')))"))
-      assert(e.getMessage.contains(s"Exceeds char/varchar type length limitation: 5"))
+      checkError(
+        exception = e.getCause match {
+          case c: SparkRuntimeException => c
+          case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException]
+        },
+        errorClass = "EXCEED_LIMIT_LENGTH",
+        parameters = Map("limit" -> "5")
+      )
     }
   }
 
@@ -423,9 +506,23 @@ trait CharVarcharTestSuite extends QueryTest with SQLTestUtils {
       sql("INSERT INTO t VALUES (1234, 1234)")
       checkAnswer(spark.table("t"), Row("1234 ", "1234"))
       val e1 = intercept[SparkException](sql("INSERT INTO t VALUES (123456, 1)"))
-      assert(e1.getMessage.contains("Exceeds char/varchar type length limitation: 5"))
+      checkError(
+        exception = e1.getCause match {
+          case c: SparkRuntimeException => c
+          case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException]
+        },
+        errorClass = "EXCEED_LIMIT_LENGTH",
+        parameters = Map("limit" -> "5")
+      )
       val e2 = intercept[SparkException](sql("INSERT INTO t VALUES (1, 123456)"))
-      assert(e2.getMessage.contains("Exceeds char/varchar type length limitation: 5"))
+      checkError(
+        exception = e2.getCause match {
+          case c: SparkRuntimeException => c
+          case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException]
+        },
+        errorClass = "EXCEED_LIMIT_LENGTH",
+        parameters = Map("limit" -> "5")
+      )
     }
   }
 
@@ -695,13 +792,18 @@ class BasicCharVarcharTestSuite extends QueryTest with SharedSparkSession {
     }
   }
 
-  def failWithInvalidCharUsage[T](fn: => T): Unit = {
-    val e = intercept[AnalysisException](fn)
-    assert(e.getMessage contains "char/varchar type can only be used in the table schema")
-  }
-
   test("invalidate char/varchar in functions") {
-    failWithInvalidCharUsage(sql("""SELECT from_json('{"a": "str"}', 'a CHAR(5)')"""))
+    checkError(
+      exception = intercept[AnalysisException] {
+        sql("""SELECT from_json('{"a": "str"}', 'a CHAR(5)')""")
+      },
+      errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING",
+      parameters = Map.empty,
+      context = ExpectedContext(
+        fragment = "from_json('{\"a\": \"str\"}', 'a CHAR(5)')",
+        start = 7,
+        stop = 44)
+    )
     withSQLConf((SQLConf.LEGACY_CHAR_VARCHAR_AS_STRING.key, "true")) {
       val df = sql("""SELECT from_json('{"a": "str"}', 'a CHAR(5)')""")
       checkAnswer(df, Row(Row("str")))
@@ -713,9 +815,24 @@ class BasicCharVarcharTestSuite extends QueryTest with SharedSparkSession {
   test("invalidate char/varchar in SparkSession createDataframe") {
     val df = spark.range(10).map(_.toString).toDF()
     val schema = new StructType().add("id", CharType(5))
-    failWithInvalidCharUsage(spark.createDataFrame(df.collectAsList(), schema))
-    failWithInvalidCharUsage(spark.createDataFrame(df.rdd, schema))
-    failWithInvalidCharUsage(spark.createDataFrame(df.toJavaRDD, schema))
+    checkError(
+      exception = intercept[AnalysisException] {
+        spark.createDataFrame(df.collectAsList(), schema)
+      },
+      errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING"
+    )
+    checkError(
+      exception = intercept[AnalysisException] {
+        spark.createDataFrame(df.rdd, schema)
+      },
+      errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING"
+    )
+    checkError(
+      exception = intercept[AnalysisException] {
+        spark.createDataFrame(df.toJavaRDD, schema)
+      },
+      errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING"
+    )
     withSQLConf((SQLConf.LEGACY_CHAR_VARCHAR_AS_STRING.key, "true")) {
       val df1 = spark.createDataFrame(df.collectAsList(), schema)
       checkAnswer(df1, df)
@@ -724,8 +841,17 @@ class BasicCharVarcharTestSuite extends QueryTest with SharedSparkSession {
   }
 
   test("invalidate char/varchar in spark.read.schema") {
-    failWithInvalidCharUsage(spark.read.schema(new StructType().add("id", CharType(5))))
-    failWithInvalidCharUsage(spark.read.schema("id char(5)"))
+    checkError(
+      exception = intercept[AnalysisException] {
+        spark.read.schema(new StructType().add("id", CharType(5)))
+      },
+      errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING")
+    checkError(
+      exception = intercept[AnalysisException] {
+        spark.read.schema("id char(5)")
+      },
+      errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING"
+    )
     withSQLConf((SQLConf.LEGACY_CHAR_VARCHAR_AS_STRING.key, "true")) {
       val ds = spark.range(10).map(_.toString)
       val df1 = spark.read.schema(new StructType().add("id", CharType(5))).csv(ds)
@@ -757,8 +883,18 @@ class BasicCharVarcharTestSuite extends QueryTest with SharedSparkSession {
   }
 
   test("invalidate char/varchar in udf's result type") {
-    failWithInvalidCharUsage(spark.udf.register("testchar", () => "B", VarcharType(1)))
-    failWithInvalidCharUsage(spark.udf.register("testchar2", (x: String) => x, VarcharType(1)))
+    checkError(
+      exception = intercept[AnalysisException] {
+        spark.udf.register("testchar", () => "B", VarcharType(1))
+      },
+      errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING"
+    )
+    checkError(
+      exception = intercept[AnalysisException] {
+        spark.udf.register("testchar2", (x: String) => x, VarcharType(1))
+      },
+      errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING"
+    )
     withSQLConf((SQLConf.LEGACY_CHAR_VARCHAR_AS_STRING.key, "true")) {
       spark.udf.register("testchar", () => "B", VarcharType(1))
       spark.udf.register("testchar2", (x: String) => x, VarcharType(1))
@@ -772,8 +908,18 @@ class BasicCharVarcharTestSuite extends QueryTest with SharedSparkSession {
   }
 
   test("invalidate char/varchar in spark.readStream.schema") {
-    failWithInvalidCharUsage(spark.readStream.schema(new StructType().add("id", CharType(5))))
-    failWithInvalidCharUsage(spark.readStream.schema("id char(5)"))
+    checkError(
+      exception = intercept[AnalysisException] {
+        spark.readStream.schema(new StructType().add("id", CharType(5)))
+      },
+      errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING"
+    )
+    checkError(
+      exception = intercept[AnalysisException] {
+        spark.readStream.schema("id char(5)")
+      },
+      errorClass = "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING"
+    )
     withSQLConf((SQLConf.LEGACY_CHAR_VARCHAR_AS_STRING.key, "true")) {
       withTempPath { dir =>
         spark.range(2).write.save(dir.toString)
@@ -891,8 +1037,13 @@ class FileSourceCharVarcharTestSuite extends CharVarcharTestSuite with SharedSpa
           matchPVals = true
         )
 
-        val e2 = intercept[RuntimeException](sql("ALTER TABLE t DROP PARTITION(c=100000)"))
-        assert(e2.getMessage.contains("Exceeds char/varchar type length limitation: 5"))
+        checkError(
+          exception = intercept[SparkRuntimeException] {
+            sql("ALTER TABLE t DROP PARTITION(c=100000)")
+          },
+          errorClass = "EXCEED_LIMIT_LENGTH",
+          parameters = Map("limit" -> "5")
+        )
       }
     }
   }
@@ -919,10 +1070,22 @@ class DSV2CharVarcharTestSuite extends CharVarcharTestSuite
         }
 
         val e1 = intercept[SparkException](sql(s"INSERT OVERWRITE t VALUES ('1', 100000)"))
-        assert(e1.getCause.getMessage.contains("Exceeds char/varchar type length limitation: 5"))
+        checkError(
+          exception = e1.getCause match {
+            case c: SparkRuntimeException => c
+            case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException]
+          },
+          errorClass = "EXCEED_LIMIT_LENGTH",
+          parameters = Map("limit" -> "5")
+        )
 
-        val e2 = intercept[RuntimeException](sql("ALTER TABLE t DROP PARTITION(c=100000)"))
-        assert(e2.getMessage.contains("Exceeds char/varchar type length limitation: 5"))
+        checkError(
+          exception = intercept[SparkRuntimeException] {
+            sql("ALTER TABLE t DROP PARTITION(c=100000)")
+          },
+          errorClass = "EXCEED_LIMIT_LENGTH",
+          parameters = Map("limit" -> "5")
+        )
       }
     }
   }
@@ -934,8 +1097,13 @@ class DSV2CharVarcharTestSuite extends CharVarcharTestSuite
 
         val inputDF = sql("SELECT named_struct('n_i', 1, 'n_c', '123456') AS s")
 
-        val e = intercept[RuntimeException](inputDF.writeTo("t").append())
-        assert(e.getMessage.contains("Exceeds char/varchar type length limitation: 5"))
+        checkError(
+          exception = intercept[SparkRuntimeException] {
+            inputDF.writeTo("t").append()
+          },
+          errorClass = "EXCEED_LIMIT_LENGTH",
+          parameters = Map("limit" -> "5")
+        )
       }
     }
   }
@@ -948,7 +1116,14 @@ class DSV2CharVarcharTestSuite extends CharVarcharTestSuite
         val inputDF = sql("SELECT array(named_struct('n_i', 1, 'n_c', '123456')) AS a")
 
         val e = intercept[SparkException](inputDF.writeTo("t").append())
-        assert(e.getCause.getMessage.contains("Exceeds char/varchar type length limitation: 5"))
+        checkError(
+          exception = e.getCause match {
+            case c: SparkRuntimeException => c
+            case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException]
+          },
+          errorClass = "EXCEED_LIMIT_LENGTH",
+          parameters = Map("limit" -> "5")
+        )
       }
     }
   }
@@ -961,7 +1136,14 @@ class DSV2CharVarcharTestSuite extends CharVarcharTestSuite
         val inputDF = sql("SELECT map(named_struct('n_i', 1, 'n_c', '123456'), 1) AS m")
 
         val e = intercept[SparkException](inputDF.writeTo("t").append())
-        assert(e.getCause.getMessage.contains("Exceeds char/varchar type length limitation: 5"))
+        checkError(
+          exception = e.getCause match {
+            case c: SparkRuntimeException => c
+            case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException]
+          },
+          errorClass = "EXCEED_LIMIT_LENGTH",
+          parameters = Map("limit" -> "5")
+        )
       }
     }
   }
@@ -974,7 +1156,14 @@ class DSV2CharVarcharTestSuite extends CharVarcharTestSuite
         val inputDF = sql("SELECT map(1, named_struct('n_i', 1, 'n_c', '123456')) AS m")
 
         val e = intercept[SparkException](inputDF.writeTo("t").append())
-        assert(e.getCause.getMessage.contains("Exceeds char/varchar type length limitation: 5"))
+        checkError(
+          exception = e.getCause match {
+            case c: SparkRuntimeException => c
+            case c: SparkException => c.getCause.asInstanceOf[SparkRuntimeException]
+          },
+          errorClass = "EXCEED_LIMIT_LENGTH",
+          parameters = Map("limit" -> "5")
+        )
       }
     }
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala
index 122b3ab07e6..fb8b456f618 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala
@@ -693,13 +693,24 @@ trait AlterTableTests extends SharedSparkSession with QueryErrorsBase {
     val t = fullTableName("table_name")
     withTable(t) {
       sql(s"CREATE TABLE $t (id int) USING $v2Format")
-
-      val exc = intercept[AnalysisException] {
-        sql(s"ALTER TABLE $t ALTER COLUMN id TYPE boolean")
-      }
-
-      assert(exc.getMessage.contains("id"))
-      assert(exc.getMessage.contains("int cannot be cast to boolean"))
+      val sql1 = s"ALTER TABLE $t ALTER COLUMN id TYPE boolean"
+      checkErrorMatchPVals(
+        exception = intercept[AnalysisException] {
+          sql(sql1)
+        },
+        errorClass = "NOT_SUPPORTED_CHANGE_COLUMN",
+        sqlState = None,
+        parameters = Map(
+          "originType" -> "\"INT\"",
+          "newType" -> "\"BOOLEAN\"",
+          "newName" -> "`id`",
+          "originName" -> "`id`",
+          "table" -> ".*table_name.*"),
+        context = ExpectedContext(
+          fragment = sql1,
+          start = 0,
+          stop = sql1.length - 1)
+      )
     }
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/CharVarcharDDLTestBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/CharVarcharDDLTestBase.scala
index f77b6336b81..12d5870309f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/CharVarcharDDLTestBase.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/CharVarcharDDLTestBase.scala
@@ -29,6 +29,8 @@ trait CharVarcharDDLTestBase extends QueryTest with SQLTestUtils {
 
   def format: String
 
+  def getTableName(name: String): String
+
   def checkColType(f: StructField, dt: DataType): Unit = {
     assert(f.dataType == CharVarcharUtils.replaceCharVarcharWithString(dt))
     assert(CharVarcharUtils.getRawType(f.metadata).contains(dt))
@@ -45,36 +47,72 @@ trait CharVarcharDDLTestBase extends QueryTest with SQLTestUtils {
   test("not allow to change column for char(x) to char(y), x != y") {
     withTable("t") {
       sql(s"CREATE TABLE t(i STRING, c CHAR(4)) USING $format")
-      val e = intercept[AnalysisException] {
-        sql("ALTER TABLE t CHANGE COLUMN c TYPE CHAR(5)")
-      }
-      val v1 = e.getMessage contains "'CharType(4)' to 'c' with type 'CharType(5)'"
-      val v2 = e.getMessage contains "char(4) cannot be cast to char(5)"
-      assert(v1 || v2)
+      val sql1 = "ALTER TABLE t CHANGE COLUMN c TYPE CHAR(5)"
+      val table = getTableName("t")
+      checkError(
+          exception = intercept[AnalysisException] {
+            sql(sql1)
+          },
+          errorClass = "NOT_SUPPORTED_CHANGE_COLUMN",
+          parameters = Map(
+            "originType" -> "\"CHAR(4)\"",
+            "newType" -> "\"CHAR(5)\"",
+            "newName" -> "`c`",
+            "originName" -> "`c`",
+            "table" -> table),
+          queryContext = table match {
+            case "`spark_catalog`.`default`.`t`" => Array.empty
+            case _ => Array(ExpectedContext(fragment = sql1, start = 0, stop = 41))
+          }
+      )
     }
   }
 
   test("not allow to change column from string to char type") {
     withTable("t") {
       sql(s"CREATE TABLE t(i STRING, c STRING) USING $format")
-      val e = intercept[AnalysisException] {
-        sql("ALTER TABLE t CHANGE COLUMN c TYPE CHAR(5)")
-      }
-      val v1 = e.getMessage contains "'StringType' to 'c' with type 'CharType(5)'"
-      val v2 = e.getMessage contains "string cannot be cast to char(5)"
-      assert(v1 || v2)
+      val sql1 = "ALTER TABLE t CHANGE COLUMN c TYPE CHAR(5)"
+      val table = getTableName("t")
+      checkError(
+          exception = intercept[AnalysisException] {
+            sql(sql1)
+          },
+          errorClass = "NOT_SUPPORTED_CHANGE_COLUMN",
+          parameters = Map(
+            "originType" -> "\"STRING\"",
+            "newType" -> "\"CHAR(5)\"",
+            "newName" -> "`c`",
+            "originName" -> "`c`",
+            "table" -> table),
+          queryContext = table match {
+            case "`spark_catalog`.`default`.`t`" => Array.empty
+            case _ => Array(ExpectedContext(fragment = sql1, start = 0, stop = 41))
+          }
+      )
     }
   }
 
   test("not allow to change column from int to char type") {
     withTable("t") {
       sql(s"CREATE TABLE t(i int, c CHAR(4)) USING $format")
-      val e = intercept[AnalysisException] {
-        sql("ALTER TABLE t CHANGE COLUMN i TYPE CHAR(5)")
-      }
-      val v1 = e.getMessage contains "'IntegerType' to 'i' with type 'CharType(5)'"
-      val v2 = e.getMessage contains "int cannot be cast to char(5)"
-      assert(v1 || v2)
+      val sql1 = "ALTER TABLE t CHANGE COLUMN i TYPE CHAR(5)"
+      val table = getTableName("t")
+      checkError(
+          exception = intercept[AnalysisException] {
+            sql(sql1)
+          },
+          errorClass = "NOT_SUPPORTED_CHANGE_COLUMN",
+          parameters = Map(
+            "originType" -> "\"INT\"",
+            "newType" -> "\"CHAR(5)\"",
+            "newName" -> "`i`",
+            "originName" -> "`i`",
+            "table" -> table),
+          queryContext = table match {
+            case "`spark_catalog`.`default`.`t`" => Array.empty
+            case _ => Array(ExpectedContext(fragment = sql1, start = 0, stop = 41))
+          }
+      )
     }
   }
 
@@ -89,12 +127,24 @@ trait CharVarcharDDLTestBase extends QueryTest with SQLTestUtils {
   test("not allow to change column for varchar(x) to varchar(y), x > y") {
     withTable("t") {
       sql(s"CREATE TABLE t(i STRING, c VARCHAR(4)) USING $format")
-      val e = intercept[AnalysisException] {
-        sql("ALTER TABLE t CHANGE COLUMN c TYPE VARCHAR(3)")
-      }
-      val v1 = e.getMessage contains "'VarcharType(4)' to 'c' with type 'VarcharType(3)'"
-      val v2 = e.getMessage contains "varchar(4) cannot be cast to varchar(3)"
-      assert(v1 || v2)
+      val sql1 = "ALTER TABLE t CHANGE COLUMN c TYPE VARCHAR(3)"
+      val table = getTableName("t")
+      checkError(
+          exception = intercept[AnalysisException] {
+            sql(sql1)
+          },
+          errorClass = "NOT_SUPPORTED_CHANGE_COLUMN",
+          parameters = Map(
+            "originType" -> "\"VARCHAR(4)\"",
+            "newType" -> "\"VARCHAR(3)\"",
+            "newName" -> "`c`",
+            "originName" -> "`c`",
+            "table" -> table),
+          queryContext = table match {
+            case "`spark_catalog`.`default`.`t`" => Array.empty
+            case _ => Array(ExpectedContext(fragment = sql1, start = 0, stop = 44))
+          }
+      )
     }
   }
 
@@ -177,6 +227,8 @@ class FileSourceCharVarcharDDLTestSuite extends CharVarcharDDLTestBase with Shar
     super.sparkConf.set(SQLConf.USE_V1_SOURCE_LIST, "parquet")
   }
 
+  override def getTableName(name: String): String = s"`spark_catalog`.`default`.`$name`"
+
   // TODO(SPARK-33902): MOVE TO SUPER CLASS AFTER THE TARGET TICKET RESOLVED
   test("SPARK-33901: create table like should should not change table's schema") {
     withTable("t", "tt") {
@@ -219,6 +271,8 @@ class DSV2CharVarcharDDLTestSuite extends CharVarcharDDLTestBase
       .set(SQLConf.DEFAULT_CATALOG.key, "testcat")
   }
 
+  override def getTableName(name: String): String = s"`testcat`.`$name`"
+
   test("allow to change change column from char to string type") {
     withTable("t") {
       sql(s"CREATE TABLE t(i STRING, c CHAR(4)) USING $format")
@@ -254,10 +308,20 @@ class DSV2CharVarcharDDLTestSuite extends CharVarcharDDLTestBase
   test("not allow to change column from char(x) to varchar(y) type x > y") {
     withTable("t") {
       sql(s"CREATE TABLE t(i STRING, c CHAR(4)) USING $format")
-      val e = intercept[AnalysisException] {
-        sql("ALTER TABLE t CHANGE COLUMN c TYPE VARCHAR(3)")
-      }
-      assert(e.getMessage contains "char(4) cannot be cast to varchar(3)")
+      val sql1 = "ALTER TABLE t CHANGE COLUMN c TYPE VARCHAR(3)"
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql(sql1)
+        },
+        errorClass = "NOT_SUPPORTED_CHANGE_COLUMN",
+        parameters = Map(
+          "originType" -> "\"CHAR(4)\"",
+          "newType" -> "\"VARCHAR(3)\"",
+          "newName" -> "`c`",
+          "originName" -> "`c`",
+          "table" -> getTableName("t")),
+        context = ExpectedContext(fragment = sql1, start = 0, stop = 44)
+      )
     }
   }
 
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/HiveCharVarcharTestSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/HiveCharVarcharTestSuite.scala
index 1e7820f0c19..50f959d2758 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/HiveCharVarcharTestSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/HiveCharVarcharTestSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql
 
-import org.apache.spark.SparkException
+import org.apache.spark.{SparkException, SparkRuntimeException}
 import org.apache.spark.sql.execution.command.CharVarcharDDLTestBase
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 
@@ -95,8 +95,12 @@ class HiveCharVarcharTestSuite extends CharVarcharTestSuite with TestHiveSinglet
           matchPVals = true
         )
 
-        val e2 = intercept[RuntimeException](sql("ALTER TABLE t DROP PARTITION(c=100000)"))
-        assert(e2.getMessage.contains("Exceeds char/varchar type length limitation: 5"))
+        val e2 = intercept[SparkRuntimeException](sql("ALTER TABLE t DROP PARTITION(c=100000)"))
+        checkError(
+          exception = e2,
+          errorClass = "EXCEED_LIMIT_LENGTH",
+          parameters = Map("limit" -> "5")
+        )
       }
     }
   }
@@ -107,6 +111,8 @@ class HiveCharVarcharDDLTestSuite extends CharVarcharDDLTestBase with TestHiveSi
   // The default Hive serde doesn't support nested null values.
   override def format: String = "hive OPTIONS(fileFormat='parquet')"
 
+  override def getTableName(name: String): String = s"`spark_catalog`.`default`.`$name`"
+
   private var originalPartitionMode = ""
 
   override protected def beforeAll(): Unit = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org