You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2022/09/30 06:23:37 UTC

[spark] branch master updated: [SPARK-40540][SQL] Migrate compilation errors onto error classes: _LEGACY_ERROR_TEMP_1200-1299

This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 43a6b932759 [SPARK-40540][SQL] Migrate compilation errors onto error classes: _LEGACY_ERROR_TEMP_1200-1299
43a6b932759 is described below

commit 43a6b932759865c45ccf36f3e9cf6898c1b762da
Author: Max Gekk <ma...@gmail.com>
AuthorDate: Fri Sep 30 09:23:08 2022 +0300

    [SPARK-40540][SQL] Migrate compilation errors onto error classes: _LEGACY_ERROR_TEMP_1200-1299
    
    ### What changes were proposed in this pull request?
    In the PR, I propose to migrate 100 compilation errors onto temporary error classes with the prefix `_LEGACY_ERROR_TEMP_12xx`. The error message will not include the error classes, so, in this way we will preserve the existing behaviour.
    
    ### Why are the changes needed?
    The migration on temporary error classes allows to gather statistics about errors and detect most popular error classes. After that we could prioritise the work on migration.
    
    The new error class name prefix `_LEGACY_ERROR_TEMP_` proposed here kind of marks the error as developer-facing, not user-facing. Developers can still get the error class programmatically via the `SparkThrowable` interface, so that they can build error infra with it. End users won't see the error class in the message. This allows us to do the error migration very quickly, and we can refine the error classes and mark them as user-facing later (naming them properly, adding tests, etc.).
    
    ### Does this PR introduce _any_ user-facing change?
    No. The error messages should be almost the same by default.
    
    ### How was this patch tested?
    By running the modified test suites:
    ```
    $ build/sbt "core/testOnly *SparkThrowableSuite"
    $ build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite"
    $ build/sbt "test:testOnly *SQLQuerySuite"
    ```
    
    Closes #38027 from MaxGekk/legacy-error-temp-compliation-1200.
    
    Authored-by: Max Gekk <ma...@gmail.com>
    Signed-off-by: Max Gekk <ma...@gmail.com>
---
 core/src/main/resources/error/error-classes.json   | 530 +++++++++++++++++-
 .../catalyst/analysis/NoSuchItemException.scala    |  10 +-
 .../spark/sql/errors/QueryCompilationErrors.scala  | 598 +++++++++++++--------
 .../catalyst/encoders/EncoderResolutionSuite.scala |   8 +-
 .../sql-tests/results/ansi/interval.sql.out        |  60 ++-
 .../sql-tests/results/ansi/literals.sql.out        |  24 +-
 .../sql-tests/results/change-column.sql.out        |  10 +-
 .../sql-tests/results/charvarchar.sql.out          |  19 +-
 .../resources/sql-tests/results/cte-nested.sql.out |  64 ++-
 .../resources/sql-tests/results/describe.sql.out   |  18 +-
 .../sql-tests/results/inline-table.sql.out         |  15 +-
 .../resources/sql-tests/results/interval.sql.out   |  60 ++-
 .../resources/sql-tests/results/literals.sql.out   |  24 +-
 .../results/postgreSQL/create_view.sql.out         | 104 +++-
 .../sql-tests/results/postgreSQL/numeric.sql.out   |  11 +-
 .../sql-tests/results/postgreSQL/strings.sql.out   |  64 ++-
 .../results/postgreSQL/window_part3.sql.out        |  15 +-
 .../sql-tests/results/show-tables.sql.out          |  26 +-
 .../sql-tests/results/udf/udf-inline-table.sql.out |  15 +-
 .../scala/org/apache/spark/sql/SQLQuerySuite.scala |  13 +-
 .../spark/sql/execution/command/DDLSuite.scala     |  30 +-
 .../command/v1/AlterTableSetLocationSuite.scala    |  14 +-
 .../command/v1/AlterTableSetSerdeSuite.scala       |  76 +--
 23 files changed, 1453 insertions(+), 355 deletions(-)

diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json
index 48e90cc617d..3fa7f7e9e4b 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -2077,7 +2077,7 @@
   },
   "_LEGACY_ERROR_TEMP_1169" : {
     "message" : [
-      "Requested partitioning does not match the table $tableName:",
+      "Requested partitioning does not match the table <tableName>:",
       "Requested partitions: <normalizedPartSpec>",
       "Table partitions: <partColNames>"
     ]
@@ -2130,7 +2130,7 @@
   },
   "_LEGACY_ERROR_TEMP_1179" : {
     "message" : [
-      "Table-valued function $name with alternatives: <usage>",
+      "Table-valued function <name> with alternatives: <usage>",
       "cannot be applied to (<arguments>): <details>."
     ]
   },
@@ -2234,5 +2234,531 @@
     "message" : [
       "Invalid bound function '<bound>: there are <argsLen> arguments but <inputTypesLen> parameters returned from 'inputTypes()'."
     ]
+  },
+  "_LEGACY_ERROR_TEMP_1200" : {
+    "message" : [
+      "<name> is not supported for v2 tables."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1201" : {
+    "message" : [
+      "Cannot resolve column name \"<colName>\" among (<fieldNames>)."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1202" : {
+    "message" : [
+      "Cannot write to '<tableName>', too many data columns:",
+      "Table columns: <tableColumns>",
+      "Data columns: <dataColumns>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1203" : {
+    "message" : [
+      "Cannot write to '<tableName>', not enough data columns:",
+      "Table columns: <tableColumns>",
+      "Data columns: <dataColumns>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1204" : {
+    "message" : [
+      "Cannot write incompatible data to table '<tableName>':",
+      "- <errors>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1205" : {
+    "message" : [
+      "Expected only partition pruning predicates: <nonPartitionPruningPredicates>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1206" : {
+    "message" : [
+      "<colType> column <colName> is not defined in table <tableName>, defined table columns are: <tableCols>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1207" : {
+    "message" : [
+      "The duration and time inputs to window must be an integer, long or string literal."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1208" : {
+    "message" : [
+      "No such struct field <fieldName> in <fields>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1209" : {
+    "message" : [
+      "Ambiguous reference to fields <fields>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1210" : {
+    "message" : [
+      "The second argument in <funcName> should be a boolean literal."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1211" : {
+    "message" : [
+      "Detected implicit cartesian product for <joinType> join between logical plans",
+      "<leftPlan>",
+      "and",
+      "rightPlan",
+      "Join condition is missing or trivial.",
+      "Either: use the CROSS JOIN syntax to allow cartesian products between these relations, or: enable implicit cartesian products by setting the configuration variable spark.sql.crossJoin.enabled=true."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1212" : {
+    "message" : [
+      "Found conflicting attributes <conflictingAttrs> in the condition joining outer plan:",
+      "<outerPlan>",
+      "and subplan:",
+      "<subplan>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1213" : {
+    "message" : [
+      "Window expression is empty in <expr>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1214" : {
+    "message" : [
+      "Found different window function type in <windowExpressions>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1215" : {
+    "message" : [
+      "char/varchar type can only be used in the table schema. You can set <config> to true, so that Spark treat them as string type as same as Spark 3.0 and earlier."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1216" : {
+    "message" : [
+      "The pattern '<pattern>' is invalid, <message>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1217" : {
+    "message" : [
+      "<tableIdentifier> already exists."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1218" : {
+    "message" : [
+      "<tableIdentifier> should be converted to HadoopFsRelation."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1219" : {
+    "message" : [
+      "Hive metastore does not support altering database location"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1220" : {
+    "message" : [
+      "Hive <tableType> is not supported."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1221" : {
+    "message" : [
+      "Hive 0.12 doesn't support creating permanent functions. Please use Hive 0.13 or higher."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1222" : {
+    "message" : [
+      "Unknown resource type: <resourceType>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1223" : {
+    "message" : [
+      "Invalid field id '<field>' in day-time interval. Supported interval fields: <supportedIds>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1224" : {
+    "message" : [
+      "'interval <startFieldName> to <endFieldName>' is invalid."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1225" : {
+    "message" : [
+      "Invalid field id '<field>' in year-month interval. Supported interval fields: <supportedIds>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1226" : {
+    "message" : [
+      "The SQL config '<configName>' was removed in the version <version>. <comment>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1227" : {
+    "message" : [
+      "<msg><e1>",
+      "Failed fallback parsing: <e2>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1228" : {
+    "message" : [
+      "Decimal scale (<scale>) cannot be greater than precision (<precision>)."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1229" : {
+    "message" : [
+      "<decimalType> can only support precision up to <precision>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1230" : {
+    "message" : [
+      "Negative scale is not allowed: <scale>. You can use <config>=true to enable legacy mode to allow it."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1231" : {
+    "message" : [
+      "<key> is not a valid partition column in table <tblName>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1232" : {
+    "message" : [
+      "Partition spec is invalid. The spec (<specKeys>) must match the partition spec (<partitionColumnNames>) defined in table '<tableName>'."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1233" : {
+    "message" : [
+      "Found duplicate column(s) <colType>: <duplicateCol>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1234" : {
+    "message" : [
+      "Temporary view <tableIdent> is not cached for analyzing columns."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1235" : {
+    "message" : [
+      "Column <name> in table <tableIdent> is of type <dataType>, and Spark does not support statistics collection on this column type."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1236" : {
+    "message" : [
+      "ANALYZE TABLE is not supported on views."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1237" : {
+    "message" : [
+      "The list of partition columns with values in partition specification for table '<table>' in database '<database>' is not a prefix of the list of partition columns defined in the table schema. Expected a prefix of [<schemaColumns>], but got [<specColumns>]."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1238" : {
+    "message" : [
+      "<msg>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1239" : {
+    "message" : [
+      "Analyzing column statistics is not supported for column <name> of data type: <dataType>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1240" : {
+    "message" : [
+      "Table <table> already exists.<guide>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1241" : {
+    "message" : [
+      "CREATE-TABLE-AS-SELECT cannot create table with location to a non-empty directory <tablePath>. To allow overwriting the existing non-empty directory, set '<config>' to true."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1242" : {
+    "message" : [
+      "Undefined function: <rawName>. This function is neither a built-in/temporary function, nor a persistent function that is qualified as <fullName>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1243" : {
+    "message" : [
+      "Undefined function: <rawName>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1244" : {
+    "message" : [
+      "Attempted to unset non-existent property '<property>' in table '<table>'."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1245" : {
+    "message" : [
+      "ALTER TABLE CHANGE COLUMN is not supported for changing column '<originName>' with type '<originType>' to '<newName>' with type '<newType>'."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1246" : {
+    "message" : [
+      "Can't find column `<name>` given table data columns <fieldNames>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1247" : {
+    "message" : [
+      "Operation not allowed: ALTER TABLE SET [SERDE | SERDEPROPERTIES] for a specific partition is not supported for tables created with the datasource API."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1248" : {
+    "message" : [
+      "Operation not allowed: ALTER TABLE SET SERDE is not supported for tables created with the datasource API."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1249" : {
+    "message" : [
+      "Operation not allowed: <cmd> only works on partitioned tables: <tableIdentWithDB>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1250" : {
+    "message" : [
+      "<action> is not allowed on <tableName> since filesource partition management is disabled (spark.sql.hive.manageFilesourcePartitions = false)."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1251" : {
+    "message" : [
+      "<action> is not allowed on <tableName> since its partition metadata is not stored in the Hive metastore. To import this information into the metastore, run `msck repair table <tableName>`."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1252" : {
+    "message" : [
+      "Cannot alter a view with ALTER TABLE. Please use ALTER VIEW instead."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1253" : {
+    "message" : [
+      "Cannot alter a table with ALTER VIEW. Please use ALTER TABLE instead."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1254" : {
+    "message" : [
+      "Cannot overwrite a path that is also being read from."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1255" : {
+    "message" : [
+      "Cannot drop built-in function '<functionName>'."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1256" : {
+    "message" : [
+      "Cannot refresh built-in function <functionName>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1257" : {
+    "message" : [
+      "Cannot refresh temporary function <functionName>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1258" : {
+    "message" : [
+      "<msg>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1259" : {
+    "message" : [
+      "ALTER ADD COLUMNS does not support views. You must drop and re-create the views for adding the new columns. Views: <table>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1260" : {
+    "message" : [
+      "ALTER ADD COLUMNS does not support datasource table with type <tableType>. You must drop and re-create the table for adding the new columns. Tables: <table>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1261" : {
+    "message" : [
+      "LOAD DATA is not supported for datasource tables: <tableIdentWithDB>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1262" : {
+    "message" : [
+      "LOAD DATA target table <tableIdentWithDB> is partitioned, but no partition spec is provided."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1263" : {
+    "message" : [
+      "LOAD DATA target table <tableIdentWithDB> is partitioned, but number of columns in provided partition spec (<partitionSize>) do not match number of partitioned columns in table (<targetTableSize>)."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1264" : {
+    "message" : [
+      "LOAD DATA target table <tableIdentWithDB> is not partitioned, but a partition spec was provided."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1265" : {
+    "message" : [
+      "LOAD DATA input path does not exist: <path>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1266" : {
+    "message" : [
+      "Operation not allowed: TRUNCATE TABLE on external tables: <tableIdentWithDB>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1267" : {
+    "message" : [
+      "Operation not allowed: TRUNCATE TABLE ... PARTITION is not supported for tables that are not partitioned: <tableIdentWithDB>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1268" : {
+    "message" : [
+      "Failed to truncate table <tableIdentWithDB> when removing data of the path: <path>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1269" : {
+    "message" : [
+      "SHOW PARTITIONS is not allowed on a table that is not partitioned: <tableIdentWithDB>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1270" : {
+    "message" : [
+      "SHOW CREATE TABLE is not supported on a temporary view: <table>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1271" : {
+    "message" : [
+      "Failed to execute SHOW CREATE TABLE against table <table>, which is created by Hive and uses the following unsupported feature(s)",
+      "<unsupportedFeatures>",
+      "Please use `SHOW CREATE TABLE <table> AS SERDE` to show Hive DDL instead."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1272" : {
+    "message" : [
+      "SHOW CREATE TABLE doesn't support transactional Hive table. Please use `SHOW CREATE TABLE <table> AS SERDE` to show Hive DDL instead."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1273" : {
+    "message" : [
+      "Failed to execute SHOW CREATE TABLE against table <table>, which is created by Hive and uses the following unsupported serde configuration",
+      "<configs>",
+      "Please use `SHOW CREATE TABLE <table> AS SERDE` to show Hive DDL instead."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1274" : {
+    "message" : [
+      "<table> is a Spark data source table. Use `SHOW CREATE TABLE` without `AS SERDE` instead."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1275" : {
+    "message" : [
+      "Failed to execute SHOW CREATE TABLE against table/view <table>, which is created by Hive and uses the following unsupported feature(s)",
+      "<features>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1276" : {
+    "message" : [
+      "The logical plan that represents the view is not analyzed."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1277" : {
+    "message" : [
+      "The number of columns produced by the SELECT clause (num: `<analyzedPlanLength>`) does not match the number of column names specified by CREATE VIEW (num: `<userSpecifiedColumnsLength>`)."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1278" : {
+    "message" : [
+      "<name> is not a view."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1279" : {
+    "message" : [
+      "View <name> already exists. If you want to update the view definition, please use ALTER VIEW AS or CREATE OR REPLACE VIEW AS."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1280" : {
+    "message" : [
+      "It is not allowed to create a persisted view from the Dataset API."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1281" : {
+    "message" : [
+      "Recursive view <viewIdent> detected (cycle: <newPath>)."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1282" : {
+    "message" : [
+      "Not allowed to create a permanent view <name> without explicitly assigning an alias for expression <attrName>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1283" : {
+    "message" : [
+      "Not allowed to create a permanent view <name> by referencing a temporary view <nameParts>. Please create a temp view instead by CREATE TEMP VIEW."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1284" : {
+    "message" : [
+      "Not allowed to create a permanent view <name> by referencing a temporary function `<funcName>`."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1285" : {
+    "message" : [
+      "Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the",
+      "referenced columns only include the internal corrupt record column",
+      "(named _corrupt_record by default). For example:",
+      "spark.read.schema(schema).csv(file).filter($\"_corrupt_record\".isNotNull).count()",
+      "and spark.read.schema(schema).csv(file).select(\"_corrupt_record\").show().",
+      "Instead, you can cache or save the parsed results and then send the same query.",
+      "For example, val df = spark.read.schema(schema).csv(file).cache() and then",
+      "df.filter($\"_corrupt_record\".isNotNull).count()."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1286" : {
+    "message" : [
+      "User-defined partition column <columnName> not found in the JDBC relation: <schema>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1287" : {
+    "message" : [
+      "Partition column type should be <numericType>, <dateType>, or <timestampType>, but <dataType> found."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1288" : {
+    "message" : [
+      "Table or view '<name>' already exists. SaveMode: ErrorIfExists."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1289" : {
+    "message" : [
+      "Column name \"<name>\" contains invalid character(s). Please use alias to rename it."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1290" : {
+    "message" : [
+      "Text data source supports only a single column, and you have <schemaSize> columns."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1291" : {
+    "message" : [
+      "Can't find required partition column <readField> in partition schema <partitionSchema>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1292" : {
+    "message" : [
+      "Temporary view '<tableIdent>' should not have specified a database."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1293" : {
+    "message" : [
+      "Hive data source can only be used with tables, you can't use it with CREATE TEMP VIEW USING."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1294" : {
+    "message" : [
+      "The timestamp provided for the '<strategy>' option is invalid. The expected format is 'YYYY-MM-DDTHH:mm:ss', but the provided timestamp: <timeString>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1295" : {
+    "message" : [
+      "Set a host to read from with option(\"host\", ...)."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1296" : {
+    "message" : [
+      "Set a port to read from with option(\"port\", ...)."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1297" : {
+    "message" : [
+      "IncludeTimestamp must be set to either \"true\" or \"false\"."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1298" : {
+    "message" : [
+      "checkpointLocation must be specified either through option(\"checkpointLocation\", ...) or SparkSession.conf.set(\"<config>\", ...)."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_1299" : {
+    "message" : [
+      "This query does not support recovering from checkpoint location. Delete <checkpointPath> to start over."
+    ]
   }
 }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala
index bf990afad6d..6b4353acc96 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala
@@ -66,7 +66,10 @@ case class NoSuchTableException(
 
 case class NoSuchPartitionException(
     override val message: String)
-  extends AnalysisException(message) {
+  extends AnalysisException(
+    message,
+    errorClass = Some("_LEGACY_ERROR_TEMP_1238"),
+    messageParameters = Map("msg" -> message)) {
 
   def this(db: String, table: String, spec: TablePartitionSpec) = {
     this(s"Partition not found in table '$table' database '$db':\n" + spec.mkString("\n"))
@@ -83,7 +86,10 @@ case class NoSuchPermanentFunctionException(db: String, func: String)
   extends AnalysisException(s"Function '$func' not found in database '$db'")
 
 case class NoSuchFunctionException(override val message: String)
-  extends AnalysisException(message) {
+  extends AnalysisException(
+    message,
+    errorClass = Some("_LEGACY_ERROR_TEMP_1258"),
+    messageParameters = Map("msg" -> message)) {
 
   def this(db: String, func: String) = {
     this(s"Undefined function: '$func'. " +
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index f1da38f1734..f62620fcd7a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -1979,43 +1979,54 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
   }
 
   def ambiguousRelationAliasNameInNestedCTEError(name: String): Throwable = {
-    new AnalysisException(s"Name $name is ambiguous in nested CTE. " +
-      s"Please set ${LEGACY_CTE_PRECEDENCE_POLICY.key} to CORRECTED so that name " +
-      "defined in inner CTE takes precedence. If set it to LEGACY, outer CTE " +
-      "definitions will take precedence. See more details in SPARK-28228.")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1200",
+      messageParameters = Map(
+        "name" -> name,
+        "config" -> LEGACY_CTE_PRECEDENCE_POLICY.key))
   }
 
   def commandUnsupportedInV2TableError(name: String): Throwable = {
-    new AnalysisException(s"$name is not supported for v2 tables.")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1200",
+      messageParameters = Map("name" -> name))
   }
 
   def cannotResolveColumnNameAmongAttributesError(
       colName: String, fieldNames: String): Throwable = {
-    new AnalysisException(s"""Cannot resolve column name "$colName" among ($fieldNames)""")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1201",
+      messageParameters = Map(
+        "colName" -> colName,
+        "fieldNames" -> fieldNames))
   }
 
   def cannotWriteTooManyColumnsToTableError(
       tableName: String, expected: Seq[Attribute], query: LogicalPlan): Throwable = {
     new AnalysisException(
-      s"""
-         |Cannot write to '$tableName', too many data columns:
-         |Table columns: ${expected.map(c => s"'${c.name}'").mkString(", ")}
-         |Data columns: ${query.output.map(c => s"'${c.name}'").mkString(", ")}
-       """.stripMargin)
+      errorClass = "_LEGACY_ERROR_TEMP_1202",
+      messageParameters = Map(
+        "tableName" -> tableName,
+        "tableColumns" -> expected.map(c => s"'${c.name}'").mkString(", "),
+        "dataColumns" -> query.output.map(c => s"'${c.name}'").mkString(", ")))
   }
 
   def cannotWriteNotEnoughColumnsToTableError(
       tableName: String, expected: Seq[Attribute], query: LogicalPlan): Throwable = {
     new AnalysisException(
-      s"""Cannot write to '$tableName', not enough data columns:
-         |Table columns: ${expected.map(c => s"'${c.name}'").mkString(", ")}
-         |Data columns: ${query.output.map(c => s"'${c.name}'").mkString(", ")}"""
-        .stripMargin)
+      errorClass = "_LEGACY_ERROR_TEMP_1203",
+      messageParameters = Map(
+        "tableName" -> tableName,
+        "tableColumns" -> expected.map(c => s"'${c.name}'").mkString(", "),
+        "dataColumns" -> query.output.map(c => s"'${c.name}'").mkString(", ")))
   }
 
   def cannotWriteIncompatibleDataToTableError(tableName: String, errors: Seq[String]): Throwable = {
     new AnalysisException(
-      s"Cannot write incompatible data to table '$tableName':\n- ${errors.mkString("\n- ")}")
+      errorClass = "_LEGACY_ERROR_TEMP_1204",
+      messageParameters = Map(
+        "tableName" -> tableName,
+        "errors" -> errors.mkString("\n- ")))
   }
 
   def secondArgumentOfFunctionIsNotIntegerError(
@@ -2030,46 +2041,57 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
   def nonPartitionPruningPredicatesNotExpectedError(
       nonPartitionPruningPredicates: Seq[Expression]): Throwable = {
     new AnalysisException(
-      s"Expected only partition pruning predicates: $nonPartitionPruningPredicates")
+      errorClass = "_LEGACY_ERROR_TEMP_1205",
+      messageParameters = Map(
+        "nonPartitionPruningPredicates" -> nonPartitionPruningPredicates.toString()))
   }
 
   def columnNotDefinedInTableError(
       colType: String, colName: String, tableName: String, tableCols: Seq[String]): Throwable = {
-    new AnalysisException(s"$colType column $colName is not defined in table $tableName, " +
-      s"defined table columns are: ${tableCols.mkString(", ")}")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1206",
+      messageParameters = Map(
+        "colType" -> colType,
+        "colName" -> colName,
+        "tableName" -> tableName,
+        "tableCols" -> tableCols.mkString(", ")))
   }
 
   def invalidLiteralForWindowDurationError(): Throwable = {
-    new AnalysisException("The duration and time inputs to window must be " +
-      "an integer, long or string literal.")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1207",
+      messageParameters = Map.empty)
   }
 
   def noSuchStructFieldInGivenFieldsError(
       fieldName: String, fields: Array[StructField]): Throwable = {
     new AnalysisException(
-      s"No such struct field $fieldName in ${fields.map(_.name).mkString(", ")}")
+      errorClass = "_LEGACY_ERROR_TEMP_1208",
+      messageParameters = Map(
+        "fieldName" -> fieldName,
+        "fields" -> fields.map(_.name).mkString(", ")))
   }
 
   def ambiguousReferenceToFieldsError(fields: String): Throwable = {
-    new AnalysisException(s"Ambiguous reference to fields $fields")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1209",
+      messageParameters = Map("fields" -> fields))
   }
 
   def secondArgumentInFunctionIsNotBooleanLiteralError(funcName: String): Throwable = {
-    new AnalysisException(s"The second argument in $funcName should be a boolean literal.")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1210",
+      messageParameters = Map("funcName" -> funcName))
   }
 
   def joinConditionMissingOrTrivialError(
       join: Join, left: LogicalPlan, right: LogicalPlan): Throwable = {
     new AnalysisException(
-      s"""Detected implicit cartesian product for ${join.joinType.sql} join between logical plans
-         |${left.treeString(false).trim}
-         |and
-         |${right.treeString(false).trim}
-         |Join condition is missing or trivial.
-         |Either: use the CROSS JOIN syntax to allow cartesian products between these
-         |relations, or: enable implicit cartesian products by setting the configuration
-         |variable spark.sql.crossJoin.enabled=true"""
-        .stripMargin)
+      errorClass = "_LEGACY_ERROR_TEMP_1211",
+      messageParameters = Map(
+        "joinType" -> join.joinType.sql,
+        "leftPlan" -> left.treeString(false).trim,
+        "rightPlan" -> right.treeString(false).trim))
   }
 
   def usePythonUDFInJoinConditionUnsupportedError(joinType: JoinType): Throwable = {
@@ -2081,73 +2103,101 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
 
   def conflictingAttributesInJoinConditionError(
       conflictingAttrs: AttributeSet, outerPlan: LogicalPlan, subplan: LogicalPlan): Throwable = {
-    new AnalysisException("Found conflicting attributes " +
-      s"${conflictingAttrs.mkString(",")} in the condition joining outer plan:\n  " +
-      s"$outerPlan\nand subplan:\n  $subplan")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1212",
+      messageParameters = Map(
+        "conflictingAttrs" -> conflictingAttrs.mkString(","),
+        "outerPlan" -> outerPlan.toString,
+        "subplan" -> subplan.toString))
   }
 
   def emptyWindowExpressionError(expr: Window): Throwable = {
-    new AnalysisException(s"Window expression is empty in $expr")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1213",
+      messageParameters = Map("expr" -> expr.toString))
   }
 
   def foundDifferentWindowFunctionTypeError(windowExpressions: Seq[NamedExpression]): Throwable = {
     new AnalysisException(
-      s"Found different window function type in $windowExpressions")
+      errorClass = "_LEGACY_ERROR_TEMP_1214",
+      messageParameters = Map("windowExpressions" -> windowExpressions.toString()))
   }
 
   def charOrVarcharTypeAsStringUnsupportedError(): Throwable = {
-    new AnalysisException("char/varchar type can only be used in the table schema. " +
-      s"You can set ${SQLConf.LEGACY_CHAR_VARCHAR_AS_STRING.key} to true, so that Spark" +
-      s" treat them as string type as same as Spark 3.0 and earlier")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1215",
+      messageParameters = Map("config" -> SQLConf.LEGACY_CHAR_VARCHAR_AS_STRING.key))
   }
 
   def invalidPatternError(pattern: String, message: String): Throwable = {
     new AnalysisException(
-      s"the pattern '$pattern' is invalid, $message")
+      errorClass = "_LEGACY_ERROR_TEMP_1216",
+      messageParameters = Map("pattern" -> pattern, "message" -> message))
   }
 
   def tableIdentifierExistsError(tableIdentifier: TableIdentifier): Throwable = {
-    new AnalysisException(s"$tableIdentifier already exists.")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1217",
+      messageParameters = Map("tableIdentifier" -> tableIdentifier.toString))
   }
 
   def tableIdentifierNotConvertedToHadoopFsRelationError(
       tableIdentifier: TableIdentifier): Throwable = {
-    new AnalysisException(s"$tableIdentifier should be converted to HadoopFsRelation.")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1218",
+      messageParameters = Map("tableIdentifier" -> tableIdentifier.toString))
   }
 
   def alterDatabaseLocationUnsupportedError(): Throwable = {
-    new AnalysisException("Hive metastore does not support altering database location")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1219",
+      messageParameters = Map.empty)
   }
 
   def hiveTableTypeUnsupportedError(tableType: String): Throwable = {
-    new AnalysisException(s"Hive $tableType is not supported.")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1220",
+      messageParameters = Map("tableType" -> tableType))
   }
 
   def hiveCreatePermanentFunctionsUnsupportedError(): Throwable = {
-    new AnalysisException("Hive 0.12 doesn't support creating permanent functions. " +
-      "Please use Hive 0.13 or higher.")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1221",
+      messageParameters = Map.empty)
   }
 
   def unknownHiveResourceTypeError(resourceType: String): Throwable = {
-    new AnalysisException(s"Unknown resource type: $resourceType")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1222",
+      messageParameters = Map("resourceType" -> resourceType))
   }
 
   def invalidDayTimeField(field: Byte): Throwable = {
     val supportedIds = DayTimeIntervalType.dayTimeFields
       .map(i => s"$i (${DayTimeIntervalType.fieldToString(i)})")
-    new AnalysisException(s"Invalid field id '$field' in day-time interval. " +
-      s"Supported interval fields: ${supportedIds.mkString(", ")}.")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1223",
+      messageParameters = Map(
+        "field" -> field.toString,
+        "supportedIds" -> supportedIds.mkString(", ")))
   }
 
   def invalidDayTimeIntervalType(startFieldName: String, endFieldName: String): Throwable = {
-    new AnalysisException(s"'interval $startFieldName to $endFieldName' is invalid.")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1224",
+      messageParameters = Map(
+        "startFieldName" -> startFieldName,
+        "endFieldName" -> endFieldName))
   }
 
   def invalidYearMonthField(field: Byte): Throwable = {
     val supportedIds = YearMonthIntervalType.yearMonthFields
       .map(i => s"$i (${YearMonthIntervalType.fieldToString(i)})")
-    new AnalysisException(s"Invalid field id '$field' in year-month interval. " +
-      s"Supported interval fields: ${supportedIds.mkString(", ")}.")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1225",
+      messageParameters = Map(
+        "field" -> field.toString,
+        "supportedIds" -> supportedIds.mkString(", ")))
   }
 
   def configRemovedInVersionError(
@@ -2155,31 +2205,50 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
       version: String,
       comment: String): Throwable = {
     new AnalysisException(
-      s"The SQL config '$configName' was removed in the version $version. $comment")
+      errorClass = "_LEGACY_ERROR_TEMP_1226",
+      messageParameters = Map(
+        "configName" -> configName,
+        "version" -> version,
+        "comment" -> comment))
   }
 
   def failedFallbackParsingError(msg: String, e1: Throwable, e2: Throwable): Throwable = {
-    new AnalysisException(s"$msg${e1.getMessage}\nFailed fallback parsing: ${e2.getMessage}",
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1227",
+      messageParameters = Map("msg" -> msg, "e1" -> e1.getMessage, "e2" -> e2.getMessage),
       cause = Some(e1.getCause))
   }
 
   def decimalCannotGreaterThanPrecisionError(scale: Int, precision: Int): Throwable = {
-    new AnalysisException(s"Decimal scale ($scale) cannot be greater than precision ($precision).")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1228",
+      messageParameters = Map(
+        "scale" -> scale.toString,
+        "precision" -> precision.toString))
   }
 
   def decimalOnlySupportPrecisionUptoError(decimalType: String, precision: Int): Throwable = {
-    new AnalysisException(s"$decimalType can only support precision up to $precision")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1229",
+      messageParameters = Map(
+        "decimalType" -> decimalType,
+        "precision" -> precision.toString))
   }
 
   def negativeScaleNotAllowedError(scale: Int): Throwable = {
     new AnalysisException(
-      s"""|Negative scale is not allowed: $scale.
-         |You can use ${LEGACY_ALLOW_NEGATIVE_SCALE_OF_DECIMAL_ENABLED.key}=true
-         |to enable legacy mode to allow it.""".stripMargin.replaceAll("\n", " "))
+      errorClass = "_LEGACY_ERROR_TEMP_1230",
+      messageParameters = Map(
+        "scale" -> scale.toString,
+        "config" -> LEGACY_ALLOW_NEGATIVE_SCALE_OF_DECIMAL_ENABLED.key))
   }
 
   def invalidPartitionColumnKeyInTableError(key: String, tblName: String): Throwable = {
-    new AnalysisException(s"$key is not a valid partition column in table $tblName.")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1231",
+      messageParameters = Map(
+        "key" -> key,
+        "tblName" -> tblName))
   }
 
   def invalidPartitionSpecError(
@@ -2187,14 +2256,19 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
       partitionColumnNames: Seq[String],
       tableName: String): Throwable = {
     new AnalysisException(
-      s"""|Partition spec is invalid. The spec ($specKeys) must match
-        |the partition spec (${partitionColumnNames.mkString(", ")}) defined in
-        |table '$tableName'""".stripMargin.replaceAll("\n", " "))
+      errorClass = "_LEGACY_ERROR_TEMP_1232",
+      messageParameters = Map(
+        "specKeys" -> specKeys,
+        "partitionColumnNames" -> partitionColumnNames.mkString(", "),
+        "tableName" -> tableName))
   }
 
   def foundDuplicateColumnError(colType: String, duplicateCol: Seq[String]): Throwable = {
     new AnalysisException(
-      s"Found duplicate column(s) $colType: ${duplicateCol.sorted.mkString(", ")}")
+      errorClass = "_LEGACY_ERROR_TEMP_1233",
+      messageParameters = Map(
+        "colType" -> colType,
+        "duplicateCol" -> duplicateCol.sorted.mkString(", ")))
   }
 
   def noSuchTableError(db: String, table: String): Throwable = {
@@ -2202,19 +2276,27 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
   }
 
   def tempViewNotCachedForAnalyzingColumnsError(tableIdent: TableIdentifier): Throwable = {
-    new AnalysisException(s"Temporary view $tableIdent is not cached for analyzing columns.")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1234",
+      messageParameters = Map("tableIdent" -> tableIdent.toString))
   }
 
   def columnTypeNotSupportStatisticsCollectionError(
       name: String,
       tableIdent: TableIdentifier,
       dataType: DataType): Throwable = {
-    new AnalysisException(s"Column $name in table $tableIdent is of type $dataType, " +
-      "and Spark does not support statistics collection on this column type.")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1235",
+      messageParameters = Map(
+        "name" -> name,
+        "tableIdent" -> tableIdent.toString,
+        "dataType" -> dataType.toString))
   }
 
   def analyzeTableNotSupportedOnViewsError(): Throwable = {
-    new AnalysisException("ANALYZE TABLE is not supported on views.")
+        new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1236",
+      messageParameters = Map.empty)
   }
 
   def unexpectedPartitionColumnPrefixError(
@@ -2223,13 +2305,12 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
       schemaColumns: String,
       specColumns: String): Throwable = {
     new AnalysisException(
-      s"""
-         |The list of partition columns with values
-         |in partition specification for table '${table}'
-         |in database '${database}' is not a prefix of the list of
-         |partition columns defined in the table schema.
-         |Expected a prefix of [${schemaColumns}], but got [${specColumns}].
-       """.stripMargin.replaceAll("\n", " "))
+      errorClass = "_LEGACY_ERROR_TEMP_1237",
+      messageParameters = Map(
+        "table" -> table,
+        "database" -> database,
+        "schemaColumns" -> schemaColumns,
+        "specColumns" -> specColumns))
   }
 
   def noSuchPartitionError(
@@ -2242,23 +2323,27 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
   def analyzingColumnStatisticsNotSupportedForColumnTypeError(
       name: String,
       dataType: DataType): Throwable = {
-    new AnalysisException("Analyzing column statistics is not supported for column " +
-      s"$name of data type: $dataType.")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1239",
+      messageParameters = Map(
+        "name" -> name,
+        "dataType" -> dataType.toString))
   }
 
   def tableAlreadyExistsError(table: String, guide: String = ""): Throwable = {
-    new AnalysisException(s"Table $table already exists." + guide)
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1240",
+      messageParameters = Map(
+        "table" -> table,
+        "guide" -> guide))
   }
 
   def createTableAsSelectWithNonEmptyDirectoryError(tablePath: String): Throwable = {
     new AnalysisException(
-      s"CREATE-TABLE-AS-SELECT cannot create table with location to a non-empty directory " +
-        s"${tablePath} . To allow overwriting the existing non-empty directory, " +
-        s"set '${SQLConf.ALLOW_NON_EMPTY_LOCATION_IN_CTAS.key}' to true.")
-  }
-
-  def tableOrViewNotFoundError(table: String): Throwable = {
-    new AnalysisException(s"Table or view not found: $table")
+      errorClass = "_LEGACY_ERROR_TEMP_1241",
+      messageParameters = Map(
+        "tablePath" -> tablePath,
+        "config" -> SQLConf.ALLOW_NON_EMPTY_LOCATION_IN_CTAS.key))
   }
 
   def noSuchFunctionError(
@@ -2266,50 +2351,76 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
       t: TreeNode[_],
       fullName: Option[Seq[String]] = None): Throwable = {
     if (rawName.length == 1 && fullName.isDefined) {
-      new AnalysisException(s"Undefined function: ${rawName.head}. " +
-        "This function is neither a built-in/temporary function, nor a persistent " +
-        s"function that is qualified as ${fullName.get.quoted}.",
-        t.origin.line, t.origin.startPosition)
+      new AnalysisException(
+        errorClass = "_LEGACY_ERROR_TEMP_1242",
+        messageParameters = Map(
+          "rawName" -> rawName.head,
+          "fullName" -> fullName.get.quoted
+        ),
+        origin = t.origin)
     } else {
-      new AnalysisException(s"Undefined function: ${rawName.quoted}",
-        t.origin.line, t.origin.startPosition)
+      new AnalysisException(
+        errorClass = "_LEGACY_ERROR_TEMP_1243",
+        messageParameters = Map("rawName" -> rawName.quoted),
+        origin = t.origin)
     }
   }
 
   def unsetNonExistentPropertyError(property: String, table: TableIdentifier): Throwable = {
-    new AnalysisException(s"Attempted to unset non-existent property '$property' in table '$table'")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1244",
+      messageParameters = Map(
+        "property" -> property,
+        "table" -> table.toString))
   }
 
   def alterTableChangeColumnNotSupportedForColumnTypeError(
       originColumn: StructField,
       newColumn: StructField): Throwable = {
-    new AnalysisException("ALTER TABLE CHANGE COLUMN is not supported for changing column " +
-      s"'${originColumn.name}' with type '${originColumn.dataType}' to " +
-      s"'${newColumn.name}' with type '${newColumn.dataType}'")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1245",
+      messageParameters = Map(
+        "originName" -> originColumn.name,
+        "originType" -> originColumn.dataType.toString,
+        "newName" -> newColumn.name,
+        "newType"-> newColumn.dataType.toString))
   }
 
   def cannotFindColumnError(name: String, fieldNames: Array[String]): Throwable = {
-    new AnalysisException(s"Can't find column `$name` given table data columns " +
-      s"${fieldNames.mkString("[`", "`, `", "`]")}")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1246",
+      messageParameters = Map(
+        "name" -> name,
+        "fieldNames" -> fieldNames.mkString("[`", "`, `", "`]")))
+
   }
 
   def alterTableSetSerdeForSpecificPartitionNotSupportedError(): Throwable = {
-    new AnalysisException("Operation not allowed: ALTER TABLE SET " +
-      "[SERDE | SERDEPROPERTIES] for a specific partition is not supported " +
-      "for tables created with the datasource API")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1247",
+      messageParameters = Map.empty)
   }
 
   def alterTableSetSerdeNotSupportedError(): Throwable = {
-    new AnalysisException("Operation not allowed: ALTER TABLE SET SERDE is " +
-      "not supported for tables created with the datasource API")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1248",
+      messageParameters = Map.empty)
   }
 
   def cmdOnlyWorksOnPartitionedTablesError(cmd: String, tableIdentWithDB: String): Throwable = {
     new AnalysisException(
-      s"Operation not allowed: $cmd only works on partitioned tables: $tableIdentWithDB")
+      errorClass = "_LEGACY_ERROR_TEMP_1249",
+      messageParameters = Map(
+        "cmd" -> cmd,
+        "tableIdentWithDB" -> tableIdentWithDB))
   }
 
   def cmdOnlyWorksOnTableWithLocationError(cmd: String, tableIdentWithDB: String): Throwable = {
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1249",
+      messageParameters = Map(
+        "cmd" -> cmd,
+        "tableIdentWithDB" -> tableIdentWithDB))
     new AnalysisException(s"Operation not allowed: $cmd only works on table with " +
       s"location provided: $tableIdentWithDB")
   }
@@ -2318,43 +2429,56 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
       action: String,
       tableName: String): Throwable = {
     new AnalysisException(
-      s"$action is not allowed on $tableName since filesource partition management is " +
-        "disabled (spark.sql.hive.manageFilesourcePartitions = false).")
+      errorClass = "_LEGACY_ERROR_TEMP_1250",
+      messageParameters = Map(
+        "action" -> action,
+        "tableName" -> tableName))
   }
 
   def actionNotAllowedOnTableSincePartitionMetadataNotStoredError(
      action: String,
      tableName: String): Throwable = {
     new AnalysisException(
-      s"$action is not allowed on $tableName since its partition metadata is not stored in " +
-        "the Hive metastore. To import this information into the metastore, run " +
-        s"`msck repair table $tableName`")
+      errorClass = "_LEGACY_ERROR_TEMP_1251",
+      messageParameters = Map(
+        "action" -> action,
+        "tableName" -> tableName))
   }
 
   def cannotAlterViewWithAlterTableError(): Throwable = {
     new AnalysisException(
-      "Cannot alter a view with ALTER TABLE. Please use ALTER VIEW instead")
+      errorClass = "_LEGACY_ERROR_TEMP_1252",
+      messageParameters = Map.empty)
   }
 
   def cannotAlterTableWithAlterViewError(): Throwable = {
     new AnalysisException(
-      "Cannot alter a table with ALTER VIEW. Please use ALTER TABLE instead")
+      errorClass = "_LEGACY_ERROR_TEMP_1253",
+      messageParameters = Map.empty)
   }
 
   def cannotOverwritePathBeingReadFromError(): Throwable = {
-    new AnalysisException("Cannot overwrite a path that is also being read from.")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1254",
+      messageParameters = Map.empty)
   }
 
   def cannotDropBuiltinFuncError(functionName: String): Throwable = {
-    new AnalysisException(s"Cannot drop built-in function '$functionName'")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1255",
+      messageParameters = Map("functionName" -> functionName))
   }
 
   def cannotRefreshBuiltInFuncError(functionName: String): Throwable = {
-    new AnalysisException(s"Cannot refresh built-in function $functionName")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1256",
+      messageParameters = Map("functionName" -> functionName))
   }
 
   def cannotRefreshTempFuncError(functionName: String): Throwable = {
-    new AnalysisException(s"Cannot refresh temporary function $functionName")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1257",
+      messageParameters = Map("functionName" -> functionName))
   }
 
   def noSuchFunctionError(identifier: FunctionIdentifier): Throwable = {
@@ -2363,29 +2487,30 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
 
   def alterAddColNotSupportViewError(table: TableIdentifier): Throwable = {
     new AnalysisException(
-      s"""
-         |ALTER ADD COLUMNS does not support views.
-         |You must drop and re-create the views for adding the new columns. Views: $table
-       """.stripMargin)
+      errorClass = "_LEGACY_ERROR_TEMP_1259",
+      messageParameters = Map("table" -> table.toString))
   }
 
   def alterAddColNotSupportDatasourceTableError(
       tableType: Any,
       table: TableIdentifier): Throwable = {
     new AnalysisException(
-      s"""
-         |ALTER ADD COLUMNS does not support datasource table with type $tableType.
-         |You must drop and re-create the table for adding the new columns. Tables: $table
-       """.stripMargin)
+      errorClass = "_LEGACY_ERROR_TEMP_1260",
+      messageParameters = Map(
+        "tableType" -> tableType.toString,
+        "table" -> table.toString))
   }
 
   def loadDataNotSupportedForDatasourceTablesError(tableIdentWithDB: String): Throwable = {
-    new AnalysisException(s"LOAD DATA is not supported for datasource tables: $tableIdentWithDB")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1261",
+      messageParameters = Map("tableIdentWithDB" -> tableIdentWithDB))
   }
 
   def loadDataWithoutPartitionSpecProvidedError(tableIdentWithDB: String): Throwable = {
-    new AnalysisException(s"LOAD DATA target table $tableIdentWithDB is partitioned, " +
-      s"but no partition spec is provided")
+     new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1262",
+      messageParameters = Map("tableIdentWithDB" -> tableIdentWithDB))
   }
 
   def loadDataPartitionSizeNotMatchNumPartitionColumnsError(
@@ -2393,40 +2518,49 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
       partitionSize: Int,
       targetTableSize: Int): Throwable = {
     new AnalysisException(
-      s"""
-         |LOAD DATA target table $tableIdentWithDB is partitioned,
-         |but number of columns in provided partition spec ($partitionSize)
-         |do not match number of partitioned columns in table ($targetTableSize)
-       """.stripMargin.replaceAll("\n", " "))
+      errorClass = "_LEGACY_ERROR_TEMP_1263",
+      messageParameters = Map(
+        "partitionSize" -> partitionSize.toString,
+        "targetTableSize" -> targetTableSize.toString,
+        "tableIdentWithDB" -> tableIdentWithDB))
   }
 
   def loadDataTargetTableNotPartitionedButPartitionSpecWasProvidedError(
       tableIdentWithDB: String): Throwable = {
-    new AnalysisException(s"LOAD DATA target table $tableIdentWithDB is not " +
-      s"partitioned, but a partition spec was provided.")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1264",
+      messageParameters = Map("tableIdentWithDB" -> tableIdentWithDB))
   }
 
   def loadDataInputPathNotExistError(path: String): Throwable = {
-    new AnalysisException(s"LOAD DATA input path does not exist: $path")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1265",
+      messageParameters = Map("path" -> path))
   }
 
   def truncateTableOnExternalTablesError(tableIdentWithDB: String): Throwable = {
     new AnalysisException(
-      s"Operation not allowed: TRUNCATE TABLE on external tables: $tableIdentWithDB")
+      errorClass = "_LEGACY_ERROR_TEMP_1266",
+      messageParameters = Map("tableIdentWithDB" -> tableIdentWithDB))
   }
 
   def truncateTablePartitionNotSupportedForNotPartitionedTablesError(
       tableIdentWithDB: String): Throwable = {
-    new AnalysisException(s"Operation not allowed: TRUNCATE TABLE ... PARTITION is not supported" +
-      s" for tables that are not partitioned: $tableIdentWithDB")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1267",
+      messageParameters = Map("tableIdentWithDB" -> tableIdentWithDB))
   }
 
   def failToTruncateTableWhenRemovingDataError(
       tableIdentWithDB: String,
       path: Path,
       e: Throwable): Throwable = {
-    new AnalysisException(s"Failed to truncate table $tableIdentWithDB when " +
-        s"removing data of the path: $path because of ${e.toString}", cause = Some(e))
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1268",
+      messageParameters = Map(
+        "tableIdentWithDB" -> tableIdentWithDB,
+        "path" -> path.toString),
+      cause = Some(e))
   }
 
   def descPartitionNotAllowedOnTempView(table: String): Throwable = {
@@ -2449,202 +2583,232 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
 
   def showPartitionNotAllowedOnTableNotPartitionedError(tableIdentWithDB: String): Throwable = {
     new AnalysisException(
-      s"SHOW PARTITIONS is not allowed on a table that is not partitioned: $tableIdentWithDB")
+      errorClass = "_LEGACY_ERROR_TEMP_1269",
+      messageParameters = Map("tableIdentWithDB" -> tableIdentWithDB))
   }
 
   def showCreateTableNotSupportedOnTempView(table: String): Throwable = {
-    new AnalysisException(s"SHOW CREATE TABLE is not supported on a temporary view: $table")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1270",
+      messageParameters = Map("table" -> table))
   }
 
   def showCreateTableFailToExecuteUnsupportedFeatureError(table: CatalogTable): Throwable = {
-    new AnalysisException("Failed to execute SHOW CREATE TABLE against table " +
-      s"${table.identifier}, which is created by Hive and uses the " +
-      s"following unsupported feature(s)\n" +
-      table.unsupportedFeatures.map(" - " + _).mkString("\n") + ". " +
-      s"Please use `SHOW CREATE TABLE ${table.identifier} AS SERDE` to show Hive DDL instead.")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1271",
+      messageParameters = Map(
+        "unsupportedFeatures" -> table.unsupportedFeatures.map(" - " + _).mkString("\n"),
+        "table" -> table.identifier.toString))
   }
 
   def showCreateTableNotSupportTransactionalHiveTableError(table: CatalogTable): Throwable = {
-    new AnalysisException("SHOW CREATE TABLE doesn't support transactional Hive table. " +
-      s"Please use `SHOW CREATE TABLE ${table.identifier} AS SERDE` " +
-      "to show Hive DDL instead.")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1272",
+      messageParameters = Map("table" -> table.identifier.toString))
   }
 
   def showCreateTableFailToExecuteUnsupportedConfError(
       table: TableIdentifier,
       builder: mutable.StringBuilder): Throwable = {
-    new AnalysisException("Failed to execute SHOW CREATE TABLE against table " +
-        s"${table.identifier}, which is created by Hive and uses the " +
-        "following unsupported serde configuration\n" +
-        builder.toString() + "\n" +
-        s"Please use `SHOW CREATE TABLE ${table.identifier} AS SERDE` to show Hive DDL instead."
-    )
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1273",
+      messageParameters = Map(
+        "table" -> table.identifier,
+        "configs" -> builder.toString()))
   }
 
   def showCreateTableAsSerdeNotAllowedOnSparkDataSourceTableError(
       table: TableIdentifier): Throwable = {
     new AnalysisException(
-      s"$table is a Spark data source table. Use `SHOW CREATE TABLE` without `AS SERDE` instead.")
+      errorClass = "_LEGACY_ERROR_TEMP_1274",
+      messageParameters = Map("table" -> table.toString))
   }
 
   def showCreateTableOrViewFailToExecuteUnsupportedFeatureError(
       table: CatalogTable,
       features: Seq[String]): Throwable = {
     new AnalysisException(
-      s"Failed to execute SHOW CREATE TABLE against table/view ${table.identifier}, " +
-        "which is created by Hive and uses the following unsupported feature(s)\n" +
-        features.map(" - " + _).mkString("\n"))
+      errorClass = "_LEGACY_ERROR_TEMP_1275",
+      messageParameters = Map(
+        "table" -> table.identifier.toString,
+        "features" -> features.map(" - " + _).mkString("\n")))
   }
 
   def logicalPlanForViewNotAnalyzedError(): Throwable = {
-    new AnalysisException("The logical plan that represents the view is not analyzed.")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1276",
+      messageParameters = Map.empty)
   }
 
   def createViewNumColumnsMismatchUserSpecifiedColumnLengthError(
       analyzedPlanLength: Int,
       userSpecifiedColumnsLength: Int): Throwable = {
-    new AnalysisException(s"The number of columns produced by the SELECT clause " +
-      s"(num: `$analyzedPlanLength`) does not match the number of column names " +
-      s"specified by CREATE VIEW (num: `$userSpecifiedColumnsLength`).")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1277",
+      messageParameters = Map(
+        "analyzedPlanLength" -> analyzedPlanLength.toString,
+        "userSpecifiedColumnsLength" -> userSpecifiedColumnsLength.toString))
   }
 
   def tableIsNotViewError(name: TableIdentifier): Throwable = {
-    new AnalysisException(s"$name is not a view")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1278",
+      messageParameters = Map("name" -> name.toString))
   }
 
   def viewAlreadyExistsError(name: TableIdentifier): Throwable = {
     new AnalysisException(
-      s"View $name already exists. If you want to update the view definition, " +
-        "please use ALTER VIEW AS or CREATE OR REPLACE VIEW AS")
+      errorClass = "_LEGACY_ERROR_TEMP_1279",
+      messageParameters = Map("name" -> name.toString))
   }
 
   def createPersistedViewFromDatasetAPINotAllowedError(): Throwable = {
-    new AnalysisException("It is not allowed to create a persisted view from the Dataset API")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1280",
+      messageParameters = Map.empty)
   }
 
   def recursiveViewDetectedError(
       viewIdent: TableIdentifier,
       newPath: Seq[TableIdentifier]): Throwable = {
-    new AnalysisException(s"Recursive view $viewIdent detected " +
-      s"(cycle: ${newPath.mkString(" -> ")})")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1281",
+      messageParameters = Map(
+        "viewIdent" -> viewIdent.toString,
+        "newPath" -> newPath.mkString(" -> ")))
   }
 
   def notAllowedToCreatePermanentViewWithoutAssigningAliasForExpressionError(
       name: TableIdentifier,
       attrName: String): Throwable = {
-    new AnalysisException(s"Not allowed to create a permanent view $name without " +
-      s"explicitly assigning an alias for expression $attrName")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1282",
+      messageParameters = Map(
+        "name" -> name.toString,
+        "attrName" -> attrName))
   }
 
   def notAllowedToCreatePermanentViewByReferencingTempViewError(
       name: TableIdentifier,
       nameParts: String): Throwable = {
-    new AnalysisException(s"Not allowed to create a permanent view $name by " +
-      s"referencing a temporary view $nameParts. " +
-      "Please create a temp view instead by CREATE TEMP VIEW")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1283",
+      messageParameters = Map(
+        "name" -> name.toString,
+        "nameParts" -> nameParts))
   }
 
   def notAllowedToCreatePermanentViewByReferencingTempFuncError(
       name: TableIdentifier,
       funcName: String): Throwable = {
-    new AnalysisException(s"Not allowed to create a permanent view $name by " +
-      s"referencing a temporary function `$funcName`")
+     new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1284",
+      messageParameters = Map(
+        "name" -> name.toString,
+        "funcName" -> funcName))
   }
 
   def queryFromRawFilesIncludeCorruptRecordColumnError(): Throwable = {
     new AnalysisException(
-      """
-        |Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the
-        |referenced columns only include the internal corrupt record column
-        |(named _corrupt_record by default). For example:
-        |spark.read.schema(schema).csv(file).filter($"_corrupt_record".isNotNull).count()
-        |and spark.read.schema(schema).csv(file).select("_corrupt_record").show().
-        |Instead, you can cache or save the parsed results and then send the same query.
-        |For example, val df = spark.read.schema(schema).csv(file).cache() and then
-        |df.filter($"_corrupt_record".isNotNull).count().
-      """.stripMargin)
+      errorClass = "_LEGACY_ERROR_TEMP_1285",
+      messageParameters = Map.empty)
   }
 
   def userDefinedPartitionNotFoundInJDBCRelationError(
       columnName: String, schema: String): Throwable = {
-    new AnalysisException(s"User-defined partition column $columnName not " +
-      s"found in the JDBC relation: $schema")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1286",
+      messageParameters = Map(
+        "columnName" -> columnName,
+        "schema" -> schema))
   }
 
   def invalidPartitionColumnTypeError(column: StructField): Throwable = {
     new AnalysisException(
-      s"""
-         |Partition column type should be ${NumericType.simpleString},
-         |${DateType.catalogString}, or ${TimestampType.catalogString}, but
-         |${column.dataType.catalogString} found.
-       """.stripMargin.replaceAll("\n", " "))
+      errorClass = "_LEGACY_ERROR_TEMP_1287",
+      messageParameters = Map(
+        "numericType" -> NumericType.simpleString,
+        "dateType" -> DateType.catalogString,
+        "timestampType" -> TimestampType.catalogString,
+        "dataType" -> column.dataType.catalogString))
   }
 
   def tableOrViewAlreadyExistsError(name: String): Throwable = {
     new AnalysisException(
-      s"Table or view '$name' already exists. SaveMode: ErrorIfExists.")
+      errorClass = "_LEGACY_ERROR_TEMP_1288",
+      messageParameters = Map("name" -> name))
   }
 
   def columnNameContainsInvalidCharactersError(name: String): Throwable = {
     new AnalysisException(
-      s"""
-         |Column name "$name" contains invalid character(s).
-         |Please use alias to rename it.
-       """.stripMargin.replaceAll("\n", " "))
+      errorClass = "_LEGACY_ERROR_TEMP_1289",
+      messageParameters = Map("name" -> name))
   }
 
   def textDataSourceWithMultiColumnsError(schema: StructType): Throwable = {
     new AnalysisException(
-      s"Text data source supports only a single column, and you have ${schema.size} columns.")
+      errorClass = "_LEGACY_ERROR_TEMP_1290",
+      messageParameters = Map("schemaSize" -> schema.size.toString))
   }
 
   def cannotFindPartitionColumnInPartitionSchemaError(
       readField: StructField, partitionSchema: StructType): Throwable = {
-    new AnalysisException(s"Can't find required partition column ${readField.name} " +
-      s"in partition schema $partitionSchema")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1291",
+      messageParameters = Map(
+        "readField" -> readField.name,
+        "partitionSchema" -> partitionSchema.toString()))
   }
 
   def cannotSpecifyDatabaseForTempViewError(tableIdent: TableIdentifier): Throwable = {
     new AnalysisException(
-      s"Temporary view '$tableIdent' should not have specified a database")
+      errorClass = "_LEGACY_ERROR_TEMP_1292",
+      messageParameters = Map("tableIdent" -> tableIdent.toString))
   }
 
   def cannotCreateTempViewUsingHiveDataSourceError(): Throwable = {
-    new AnalysisException("Hive data source can only be used with tables, " +
-      "you can't use it with CREATE TEMP VIEW USING")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1293",
+      messageParameters = Map.empty)
   }
 
   def invalidTimestampProvidedForStrategyError(
       strategy: String, timeString: String): Throwable = {
     new AnalysisException(
-      s"The timestamp provided for the '$strategy' option is invalid. The expected format " +
-        s"is 'YYYY-MM-DDTHH:mm:ss', but the provided timestamp: $timeString")
+      errorClass = "_LEGACY_ERROR_TEMP_1294",
+      messageParameters = Map(
+        "strategy" -> strategy,
+        "timeString" -> timeString))
   }
 
   def hostOptionNotSetError(): Throwable = {
-    new AnalysisException("Set a host to read from with option(\"host\", ...).")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1295",
+      messageParameters = Map.empty)
   }
 
   def portOptionNotSetError(): Throwable = {
-    new AnalysisException("Set a port to read from with option(\"port\", ...).")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1296",
+      messageParameters = Map.empty)
   }
 
   def invalidIncludeTimestampValueError(): Throwable = {
-    new AnalysisException("includeTimestamp must be set to either \"true\" or \"false\"")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1297",
+      messageParameters = Map.empty)
   }
 
   def checkpointLocationNotSpecifiedError(): Throwable = {
     new AnalysisException(
-      s"""
-         |checkpointLocation must be specified either
-         |through option("checkpointLocation", ...) or
-         |SparkSession.conf.set("${SQLConf.CHECKPOINT_LOCATION.key}", ...)
-       """.stripMargin.replaceAll("\n", " "))
+      errorClass = "_LEGACY_ERROR_TEMP_1298",
+      messageParameters = Map("config" -> SQLConf.CHECKPOINT_LOCATION.key))
   }
 
   def recoverQueryFromCheckpointUnsupportedError(checkpointPath: Path): Throwable = {
-    new AnalysisException("This query does not support recovering from checkpoint location. " +
-      s"Delete $checkpointPath to start over.")
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_1299",
+      messageParameters = Map("checkpointPath" -> checkpointPath.toString))
   }
 
   def cannotFindColumnInRelationOutputError(
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/EncoderResolutionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/EncoderResolutionSuite.scala
index c8d2a002f2a..2e22e36b926 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/EncoderResolutionSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/EncoderResolutionSuite.scala
@@ -132,7 +132,7 @@ class EncoderResolutionSuite extends PlanTest {
     val encoder = ExpressionEncoder[ArrayClass]
     val attrs = Seq($"arr".array(new StructType().add("c", "int")))
     assert(intercept[AnalysisException](encoder.resolveAndBind(attrs)).message ==
-      "No such struct field a in c")
+      "No such struct field a in c.")
   }
 
   test("the real type is not compatible with encoder schema: nested array element type") {
@@ -150,8 +150,10 @@ class EncoderResolutionSuite extends PlanTest {
     withClue("nested array element type is not compatible") {
       val attrs = Seq($"nestedArr".array(new StructType()
         .add("arr", ArrayType(new StructType().add("c", "int")))))
-      assert(intercept[AnalysisException](encoder.resolveAndBind(attrs)).message ==
-        "No such struct field a in c")
+      checkError(
+        exception = intercept[AnalysisException](encoder.resolveAndBind(attrs)),
+        errorClass = "_LEGACY_ERROR_TEMP_1208",
+        parameters = Map("fieldName" -> "a", "fields" -> "c"))
     }
   }
 
diff --git a/sql/core/src/test/resources/sql-tests/results/ansi/interval.sql.out b/sql/core/src/test/resources/sql-tests/results/ansi/interval.sql.out
index a39066acbeb..de87bbb4b99 100644
--- a/sql/core/src/test/resources/sql-tests/results/ansi/interval.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/ansi/interval.sql.out
@@ -1692,7 +1692,20 @@ select interval (-30) day
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Undefined function: interval. This function is neither a built-in/temporary function, nor a persistent function that is qualified as spark_catalog.default.interval.; line 1 pos 7
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1242",
+  "messageParameters" : {
+    "fullName" : "spark_catalog.default.interval",
+    "rawName" : "interval"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 21,
+    "fragment" : "interval (-30)"
+  } ]
+}
 
 
 -- !query
@@ -1701,7 +1714,20 @@ select interval (a + 1) day
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Undefined function: interval. This function is neither a built-in/temporary function, nor a persistent function that is qualified as spark_catalog.default.interval.; line 1 pos 7
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1242",
+  "messageParameters" : {
+    "fullName" : "spark_catalog.default.interval",
+    "rawName" : "interval"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 23,
+    "fragment" : "interval (a + 1)"
+  } ]
+}
 
 
 -- !query
@@ -1726,7 +1752,20 @@ select interval (-30) days
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Undefined function: interval. This function is neither a built-in/temporary function, nor a persistent function that is qualified as spark_catalog.default.interval.; line 1 pos 7
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1242",
+  "messageParameters" : {
+    "fullName" : "spark_catalog.default.interval",
+    "rawName" : "interval"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 21,
+    "fragment" : "interval (-30)"
+  } ]
+}
 
 
 -- !query
@@ -1735,7 +1774,20 @@ select interval (a + 1) days
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Undefined function: interval. This function is neither a built-in/temporary function, nor a persistent function that is qualified as spark_catalog.default.interval.; line 1 pos 7
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1242",
+  "messageParameters" : {
+    "fullName" : "spark_catalog.default.interval",
+    "rawName" : "interval"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 23,
+    "fragment" : "interval (a + 1)"
+  } ]
+}
 
 
 -- !query
diff --git a/sql/core/src/test/resources/sql-tests/results/ansi/literals.sql.out b/sql/core/src/test/resources/sql-tests/results/ansi/literals.sql.out
index 72b7601f6f5..391500dc4e4 100644
--- a/sql/core/src/test/resources/sql-tests/results/ansi/literals.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/ansi/literals.sql.out
@@ -173,10 +173,13 @@ select 1234567890123456789012345678901234567890
 struct<>
 -- !query output
 org.apache.spark.sql.catalyst.parser.ParseException
-
-decimal can only support precision up to 38
-== SQL ==
-select 1234567890123456789012345678901234567890
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1229",
+  "messageParameters" : {
+    "decimalType" : "decimal",
+    "precision" : "38"
+  }
+}
 
 
 -- !query
@@ -185,10 +188,13 @@ select 1234567890123456789012345678901234567890.0
 struct<>
 -- !query output
 org.apache.spark.sql.catalyst.parser.ParseException
-
-decimal can only support precision up to 38
-== SQL ==
-select 1234567890123456789012345678901234567890.0
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1229",
+  "messageParameters" : {
+    "decimalType" : "decimal",
+    "precision" : "38"
+  }
+}
 
 
 -- !query
@@ -467,7 +473,7 @@ org.apache.spark.sql.catalyst.parser.ParseException
 {
   "errorClass" : "_LEGACY_ERROR_TEMP_0061",
   "messageParameters" : {
-    "msg" : "decimal can only support precision up to 38"
+    "msg" : "decimal can only support precision up to 38."
   },
   "queryContext" : [ {
     "objectType" : "",
diff --git a/sql/core/src/test/resources/sql-tests/results/change-column.sql.out b/sql/core/src/test/resources/sql-tests/results/change-column.sql.out
index 9d2ef445e16..ce07f60a748 100644
--- a/sql/core/src/test/resources/sql-tests/results/change-column.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/change-column.sql.out
@@ -81,7 +81,15 @@ ALTER TABLE test_change CHANGE a TYPE STRING
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-ALTER TABLE CHANGE COLUMN is not supported for changing column 'a' with type 'IntegerType' to 'a' with type 'StringType'
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1245",
+  "messageParameters" : {
+    "newName" : "a",
+    "newType" : "StringType",
+    "originName" : "a",
+    "originType" : "IntegerType"
+  }
+}
 
 
 -- !query
diff --git a/sql/core/src/test/resources/sql-tests/results/charvarchar.sql.out b/sql/core/src/test/resources/sql-tests/results/charvarchar.sql.out
index babce91d999..ae88227121b 100644
--- a/sql/core/src/test/resources/sql-tests/results/charvarchar.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/charvarchar.sql.out
@@ -259,7 +259,15 @@ alter table char_tbl1 change column c type char(6)
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-ALTER TABLE CHANGE COLUMN is not supported for changing column 'c' with type 'CharType(5)' to 'c' with type 'CharType(6)'
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1245",
+  "messageParameters" : {
+    "newName" : "c",
+    "newType" : "CharType(6)",
+    "originName" : "c",
+    "originType" : "CharType(5)"
+  }
+}
 
 
 -- !query
@@ -575,7 +583,14 @@ alter table char_part partition (v2='ke') rename to partition (v2='nt')
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Partition spec is invalid. The spec (v2) must match the partition spec (v2, c2) defined in table '`spark_catalog`.`default`.`char_part`'
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1232",
+  "messageParameters" : {
+    "partitionColumnNames" : "v2, c2",
+    "specKeys" : "v2",
+    "tableName" : "`spark_catalog`.`default`.`char_part`"
+  }
+}
 
 
 -- !query
diff --git a/sql/core/src/test/resources/sql-tests/results/cte-nested.sql.out b/sql/core/src/test/resources/sql-tests/results/cte-nested.sql.out
index e6382d74309..75eb2809006 100644
--- a/sql/core/src/test/resources/sql-tests/results/cte-nested.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/cte-nested.sql.out
@@ -45,7 +45,13 @@ SELECT * FROM t2
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Name t is ambiguous in nested CTE. Please set spark.sql.legacy.ctePrecedencePolicy to CORRECTED so that name defined in inner CTE takes precedence. If set it to LEGACY, outer CTE definitions will take precedence. See more details in SPARK-28228.
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1200",
+  "messageParameters" : {
+    "config" : "spark.sql.legacy.ctePrecedencePolicy",
+    "name" : "t"
+  }
+}
 
 
 -- !query
@@ -82,7 +88,13 @@ SELECT * FROM t2
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Name t is ambiguous in nested CTE. Please set spark.sql.legacy.ctePrecedencePolicy to CORRECTED so that name defined in inner CTE takes precedence. If set it to LEGACY, outer CTE definitions will take precedence. See more details in SPARK-28228.
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1200",
+  "messageParameters" : {
+    "config" : "spark.sql.legacy.ctePrecedencePolicy",
+    "name" : "t"
+  }
+}
 
 
 -- !query
@@ -136,7 +148,13 @@ SELECT (
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Name t is ambiguous in nested CTE. Please set spark.sql.legacy.ctePrecedencePolicy to CORRECTED so that name defined in inner CTE takes precedence. If set it to LEGACY, outer CTE definitions will take precedence. See more details in SPARK-28228.
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1200",
+  "messageParameters" : {
+    "config" : "spark.sql.legacy.ctePrecedencePolicy",
+    "name" : "t"
+  }
+}
 
 
 -- !query
@@ -151,7 +169,13 @@ SELECT (
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Name t is ambiguous in nested CTE. Please set spark.sql.legacy.ctePrecedencePolicy to CORRECTED so that name defined in inner CTE takes precedence. If set it to LEGACY, outer CTE definitions will take precedence. See more details in SPARK-28228.
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1200",
+  "messageParameters" : {
+    "config" : "spark.sql.legacy.ctePrecedencePolicy",
+    "name" : "t"
+  }
+}
 
 
 -- !query
@@ -167,7 +191,13 @@ SELECT (
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Name t is ambiguous in nested CTE. Please set spark.sql.legacy.ctePrecedencePolicy to CORRECTED so that name defined in inner CTE takes precedence. If set it to LEGACY, outer CTE definitions will take precedence. See more details in SPARK-28228.
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1200",
+  "messageParameters" : {
+    "config" : "spark.sql.legacy.ctePrecedencePolicy",
+    "name" : "t"
+  }
+}
 
 
 -- !query
@@ -181,7 +211,13 @@ WHERE c IN (
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Name t is ambiguous in nested CTE. Please set spark.sql.legacy.ctePrecedencePolicy to CORRECTED so that name defined in inner CTE takes precedence. If set it to LEGACY, outer CTE definitions will take precedence. See more details in SPARK-28228.
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1200",
+  "messageParameters" : {
+    "config" : "spark.sql.legacy.ctePrecedencePolicy",
+    "name" : "t"
+  }
+}
 
 
 -- !query
@@ -210,7 +246,13 @@ SELECT * FROM t
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Name aBc is ambiguous in nested CTE. Please set spark.sql.legacy.ctePrecedencePolicy to CORRECTED so that name defined in inner CTE takes precedence. If set it to LEGACY, outer CTE definitions will take precedence. See more details in SPARK-28228.
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1200",
+  "messageParameters" : {
+    "config" : "spark.sql.legacy.ctePrecedencePolicy",
+    "name" : "aBc"
+  }
+}
 
 
 -- !query
@@ -223,7 +265,13 @@ SELECT (
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Name aBc is ambiguous in nested CTE. Please set spark.sql.legacy.ctePrecedencePolicy to CORRECTED so that name defined in inner CTE takes precedence. If set it to LEGACY, outer CTE definitions will take precedence. See more details in SPARK-28228.
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1200",
+  "messageParameters" : {
+    "config" : "spark.sql.legacy.ctePrecedencePolicy",
+    "name" : "aBc"
+  }
+}
 
 
 -- !query
diff --git a/sql/core/src/test/resources/sql-tests/results/describe.sql.out b/sql/core/src/test/resources/sql-tests/results/describe.sql.out
index 5bb86a398c8..5c689cacf4d 100644
--- a/sql/core/src/test/resources/sql-tests/results/describe.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/describe.sql.out
@@ -362,9 +362,12 @@ DESC t PARTITION (c='Us', d=2)
 struct<>
 -- !query output
 org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException
-Partition not found in table 't' database 'default':
-c -> Us
-d -> 2
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1238",
+  "messageParameters" : {
+    "msg" : "Partition not found in table 't' database 'default':\nc -> Us\nd -> 2"
+  }
+}
 
 
 -- !query
@@ -373,7 +376,14 @@ DESC t PARTITION (c='Us')
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Partition spec is invalid. The spec (c) must match the partition spec (c, d) defined in table '`spark_catalog`.`default`.`t`'
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1232",
+  "messageParameters" : {
+    "partitionColumnNames" : "c, d",
+    "specKeys" : "c",
+    "tableName" : "`spark_catalog`.`default`.`t`"
+  }
+}
 
 
 -- !query
diff --git a/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out b/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out
index f2eee23a52c..6ed91289f9c 100644
--- a/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out
@@ -146,7 +146,20 @@ select * from values ("one", random_not_exist_func(1)), ("two", 2) as data(a, b)
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Undefined function: random_not_exist_func. This function is neither a built-in/temporary function, nor a persistent function that is qualified as spark_catalog.default.random_not_exist_func.; line 1 pos 29
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1242",
+  "messageParameters" : {
+    "fullName" : "spark_catalog.default.random_not_exist_func",
+    "rawName" : "random_not_exist_func"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 30,
+    "stopIndex" : 53,
+    "fragment" : "random_not_exist_func(1)"
+  } ]
+}
 
 
 -- !query
diff --git a/sql/core/src/test/resources/sql-tests/results/interval.sql.out b/sql/core/src/test/resources/sql-tests/results/interval.sql.out
index 9b91f9a5b8c..dc2fd3a0619 100644
--- a/sql/core/src/test/resources/sql-tests/results/interval.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/interval.sql.out
@@ -1573,7 +1573,20 @@ select interval (-30) day
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Undefined function: interval. This function is neither a built-in/temporary function, nor a persistent function that is qualified as spark_catalog.default.interval.; line 1 pos 7
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1242",
+  "messageParameters" : {
+    "fullName" : "spark_catalog.default.interval",
+    "rawName" : "interval"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 21,
+    "fragment" : "interval (-30)"
+  } ]
+}
 
 
 -- !query
@@ -1582,7 +1595,20 @@ select interval (a + 1) day
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Undefined function: interval. This function is neither a built-in/temporary function, nor a persistent function that is qualified as spark_catalog.default.interval.; line 1 pos 7
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1242",
+  "messageParameters" : {
+    "fullName" : "spark_catalog.default.interval",
+    "rawName" : "interval"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 23,
+    "fragment" : "interval (a + 1)"
+  } ]
+}
 
 
 -- !query
@@ -1607,7 +1633,20 @@ select interval (-30) days
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Undefined function: interval. This function is neither a built-in/temporary function, nor a persistent function that is qualified as spark_catalog.default.interval.; line 1 pos 7
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1242",
+  "messageParameters" : {
+    "fullName" : "spark_catalog.default.interval",
+    "rawName" : "interval"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 21,
+    "fragment" : "interval (-30)"
+  } ]
+}
 
 
 -- !query
@@ -1616,7 +1655,20 @@ select interval (a + 1) days
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Undefined function: interval. This function is neither a built-in/temporary function, nor a persistent function that is qualified as spark_catalog.default.interval.; line 1 pos 7
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1242",
+  "messageParameters" : {
+    "fullName" : "spark_catalog.default.interval",
+    "rawName" : "interval"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 23,
+    "fragment" : "interval (a + 1)"
+  } ]
+}
 
 
 -- !query
diff --git a/sql/core/src/test/resources/sql-tests/results/literals.sql.out b/sql/core/src/test/resources/sql-tests/results/literals.sql.out
index 72b7601f6f5..391500dc4e4 100644
--- a/sql/core/src/test/resources/sql-tests/results/literals.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/literals.sql.out
@@ -173,10 +173,13 @@ select 1234567890123456789012345678901234567890
 struct<>
 -- !query output
 org.apache.spark.sql.catalyst.parser.ParseException
-
-decimal can only support precision up to 38
-== SQL ==
-select 1234567890123456789012345678901234567890
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1229",
+  "messageParameters" : {
+    "decimalType" : "decimal",
+    "precision" : "38"
+  }
+}
 
 
 -- !query
@@ -185,10 +188,13 @@ select 1234567890123456789012345678901234567890.0
 struct<>
 -- !query output
 org.apache.spark.sql.catalyst.parser.ParseException
-
-decimal can only support precision up to 38
-== SQL ==
-select 1234567890123456789012345678901234567890.0
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1229",
+  "messageParameters" : {
+    "decimalType" : "decimal",
+    "precision" : "38"
+  }
+}
 
 
 -- !query
@@ -467,7 +473,7 @@ org.apache.spark.sql.catalyst.parser.ParseException
 {
   "errorClass" : "_LEGACY_ERROR_TEMP_0061",
   "messageParameters" : {
-    "msg" : "decimal can only support precision up to 38"
+    "msg" : "decimal can only support precision up to 38."
   },
   "queryContext" : [ {
     "objectType" : "",
diff --git a/sql/core/src/test/resources/sql-tests/results/postgreSQL/create_view.sql.out b/sql/core/src/test/resources/sql-tests/results/postgreSQL/create_view.sql.out
index fcdd42551d1..1889f78574a 100644
--- a/sql/core/src/test/resources/sql-tests/results/postgreSQL/create_view.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/postgreSQL/create_view.sql.out
@@ -277,7 +277,13 @@ CREATE VIEW v1_temp AS SELECT * FROM temp_table
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Not allowed to create a permanent view `spark_catalog`.`temp_view_test`.`v1_temp` by referencing a temporary view temp_table. Please create a temp view instead by CREATE TEMP VIEW
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1283",
+  "messageParameters" : {
+    "name" : "`spark_catalog`.`temp_view_test`.`v1_temp`",
+    "nameParts" : "temp_table"
+  }
+}
 
 
 -- !query
@@ -333,7 +339,13 @@ CREATE VIEW temp_view_test.v3_temp AS SELECT * FROM temp_table
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Not allowed to create a permanent view `spark_catalog`.`temp_view_test`.`v3_temp` by referencing a temporary view temp_table. Please create a temp view instead by CREATE TEMP VIEW
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1283",
+  "messageParameters" : {
+    "name" : "`spark_catalog`.`temp_view_test`.`v3_temp`",
+    "nameParts" : "temp_table"
+  }
+}
 
 
 -- !query
@@ -382,7 +394,13 @@ CREATE VIEW v4_temp AS
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Not allowed to create a permanent view `spark_catalog`.`temp_view_test`.`v4_temp` by referencing a temporary view temp_table. Please create a temp view instead by CREATE TEMP VIEW
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1283",
+  "messageParameters" : {
+    "name" : "`spark_catalog`.`temp_view_test`.`v4_temp`",
+    "nameParts" : "temp_table"
+  }
+}
 
 
 -- !query
@@ -394,7 +412,13 @@ CREATE VIEW v5_temp AS
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Not allowed to create a permanent view `spark_catalog`.`temp_view_test`.`v5_temp` by referencing a temporary view temp_table. Please create a temp view instead by CREATE TEMP VIEW
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1283",
+  "messageParameters" : {
+    "name" : "`spark_catalog`.`temp_view_test`.`v5_temp`",
+    "nameParts" : "temp_table"
+  }
+}
 
 
 -- !query
@@ -553,7 +577,13 @@ CREATE VIEW v6_temp AS SELECT * FROM base_table WHERE id IN (SELECT id FROM temp
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Not allowed to create a permanent view `spark_catalog`.`temp_view_test`.`v6_temp` by referencing a temporary view temp_table. Please create a temp view instead by CREATE TEMP VIEW
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1283",
+  "messageParameters" : {
+    "name" : "`spark_catalog`.`temp_view_test`.`v6_temp`",
+    "nameParts" : "temp_table"
+  }
+}
 
 
 -- !query
@@ -562,7 +592,13 @@ CREATE VIEW v7_temp AS SELECT t1.id, t2.a FROM base_table t1, (SELECT * FROM tem
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Not allowed to create a permanent view `spark_catalog`.`temp_view_test`.`v7_temp` by referencing a temporary view temp_table. Please create a temp view instead by CREATE TEMP VIEW
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1283",
+  "messageParameters" : {
+    "name" : "`spark_catalog`.`temp_view_test`.`v7_temp`",
+    "nameParts" : "temp_table"
+  }
+}
 
 
 -- !query
@@ -571,7 +607,13 @@ CREATE VIEW v8_temp AS SELECT * FROM base_table WHERE EXISTS (SELECT 1 FROM temp
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Not allowed to create a permanent view `spark_catalog`.`temp_view_test`.`v8_temp` by referencing a temporary view temp_table. Please create a temp view instead by CREATE TEMP VIEW
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1283",
+  "messageParameters" : {
+    "name" : "`spark_catalog`.`temp_view_test`.`v8_temp`",
+    "nameParts" : "temp_table"
+  }
+}
 
 
 -- !query
@@ -580,7 +622,13 @@ CREATE VIEW v9_temp AS SELECT * FROM base_table WHERE NOT EXISTS (SELECT 1 FROM
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Not allowed to create a permanent view `spark_catalog`.`temp_view_test`.`v9_temp` by referencing a temporary view temp_table. Please create a temp view instead by CREATE TEMP VIEW
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1283",
+  "messageParameters" : {
+    "name" : "`spark_catalog`.`temp_view_test`.`v9_temp`",
+    "nameParts" : "temp_table"
+  }
+}
 
 
 -- !query
@@ -689,7 +737,13 @@ CREATE VIEW temporal1 AS SELECT * FROM t1 CROSS JOIN tt
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Not allowed to create a permanent view `spark_catalog`.`testviewschm2`.`temporal1` by referencing a temporary view tt. Please create a temp view instead by CREATE TEMP VIEW
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1283",
+  "messageParameters" : {
+    "name" : "`spark_catalog`.`testviewschm2`.`temporal1`",
+    "nameParts" : "tt"
+  }
+}
 
 
 -- !query
@@ -730,7 +784,13 @@ CREATE VIEW temporal2 AS SELECT * FROM t1 INNER JOIN tt ON t1.num = tt.num2
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Not allowed to create a permanent view `spark_catalog`.`testviewschm2`.`temporal2` by referencing a temporary view tt. Please create a temp view instead by CREATE TEMP VIEW
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1283",
+  "messageParameters" : {
+    "name" : "`spark_catalog`.`testviewschm2`.`temporal2`",
+    "nameParts" : "tt"
+  }
+}
 
 
 -- !query
@@ -771,7 +831,13 @@ CREATE VIEW temporal3 AS SELECT * FROM t1 LEFT JOIN tt ON t1.num = tt.num2
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Not allowed to create a permanent view `spark_catalog`.`testviewschm2`.`temporal3` by referencing a temporary view tt. Please create a temp view instead by CREATE TEMP VIEW
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1283",
+  "messageParameters" : {
+    "name" : "`spark_catalog`.`testviewschm2`.`temporal3`",
+    "nameParts" : "tt"
+  }
+}
 
 
 -- !query
@@ -812,7 +878,13 @@ CREATE VIEW temporal4 AS SELECT * FROM t1 LEFT JOIN tt ON t1.num = tt.num2 AND t
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Not allowed to create a permanent view `spark_catalog`.`testviewschm2`.`temporal4` by referencing a temporary view tt. Please create a temp view instead by CREATE TEMP VIEW
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1283",
+  "messageParameters" : {
+    "name" : "`spark_catalog`.`testviewschm2`.`temporal4`",
+    "nameParts" : "tt"
+  }
+}
 
 
 -- !query
@@ -821,7 +893,13 @@ CREATE VIEW temporal5 AS SELECT * FROM t1 WHERE num IN (SELECT num FROM t1 WHERE
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Not allowed to create a permanent view `spark_catalog`.`testviewschm2`.`temporal5` by referencing a temporary view tt. Please create a temp view instead by CREATE TEMP VIEW
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1283",
+  "messageParameters" : {
+    "name" : "`spark_catalog`.`testviewschm2`.`temporal5`",
+    "nameParts" : "tt"
+  }
+}
 
 
 -- !query
diff --git a/sql/core/src/test/resources/sql-tests/results/postgreSQL/numeric.sql.out b/sql/core/src/test/resources/sql-tests/results/postgreSQL/numeric.sql.out
index 18226b0fd03..53a57ee270b 100644
--- a/sql/core/src/test/resources/sql-tests/results/postgreSQL/numeric.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/postgreSQL/numeric.sql.out
@@ -3581,10 +3581,13 @@ INSERT INTO num_exp_power_10_ln VALUES (7,1716699575118597095.423308199106402476
 struct<>
 -- !query output
 org.apache.spark.sql.catalyst.parser.ParseException
-
-decimal can only support precision up to 38
-== SQL ==
-INSERT INTO num_exp_power_10_ln VALUES (7,1716699575118597095.42330819910640247627)
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1229",
+  "messageParameters" : {
+    "decimalType" : "decimal",
+    "precision" : "38"
+  }
+}
 
 
 -- !query
diff --git a/sql/core/src/test/resources/sql-tests/results/postgreSQL/strings.sql.out b/sql/core/src/test/resources/sql-tests/results/postgreSQL/strings.sql.out
index 81f964b4bea..b3c1e94314d 100644
--- a/sql/core/src/test/resources/sql-tests/results/postgreSQL/strings.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/postgreSQL/strings.sql.out
@@ -443,7 +443,13 @@ SELECT 'maca' LIKE 'm%aca' ESCAPE '%' AS `true`
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-the pattern 'm%aca' is invalid, the escape character is not allowed to precede 'a'
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1216",
+  "messageParameters" : {
+    "message" : "the escape character is not allowed to precede 'a'",
+    "pattern" : "m%aca"
+  }
+}
 
 
 -- !query
@@ -452,7 +458,13 @@ SELECT 'maca' NOT LIKE 'm%aca' ESCAPE '%' AS `false`
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-the pattern 'm%aca' is invalid, the escape character is not allowed to precede 'a'
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1216",
+  "messageParameters" : {
+    "message" : "the escape character is not allowed to precede 'a'",
+    "pattern" : "m%aca"
+  }
+}
 
 
 -- !query
@@ -461,7 +473,13 @@ SELECT 'ma%a' LIKE 'm%a%%a' ESCAPE '%' AS `true`
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-the pattern 'm%a%%a' is invalid, the escape character is not allowed to precede 'a'
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1216",
+  "messageParameters" : {
+    "message" : "the escape character is not allowed to precede 'a'",
+    "pattern" : "m%a%%a"
+  }
+}
 
 
 -- !query
@@ -470,7 +488,13 @@ SELECT 'ma%a' NOT LIKE 'm%a%%a' ESCAPE '%' AS `false`
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-the pattern 'm%a%%a' is invalid, the escape character is not allowed to precede 'a'
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1216",
+  "messageParameters" : {
+    "message" : "the escape character is not allowed to precede 'a'",
+    "pattern" : "m%a%%a"
+  }
+}
 
 
 -- !query
@@ -479,7 +503,13 @@ SELECT 'bear' LIKE 'b_ear' ESCAPE '_' AS `true`
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-the pattern 'b_ear' is invalid, the escape character is not allowed to precede 'e'
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1216",
+  "messageParameters" : {
+    "message" : "the escape character is not allowed to precede 'e'",
+    "pattern" : "b_ear"
+  }
+}
 
 
 -- !query
@@ -488,7 +518,13 @@ SELECT 'bear' NOT LIKE 'b_ear' ESCAPE '_' AS `false`
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-the pattern 'b_ear' is invalid, the escape character is not allowed to precede 'e'
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1216",
+  "messageParameters" : {
+    "message" : "the escape character is not allowed to precede 'e'",
+    "pattern" : "b_ear"
+  }
+}
 
 
 -- !query
@@ -497,7 +533,13 @@ SELECT 'be_r' LIKE 'b_e__r' ESCAPE '_' AS `true`
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-the pattern 'b_e__r' is invalid, the escape character is not allowed to precede 'e'
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1216",
+  "messageParameters" : {
+    "message" : "the escape character is not allowed to precede 'e'",
+    "pattern" : "b_e__r"
+  }
+}
 
 
 -- !query
@@ -506,7 +548,13 @@ SELECT 'be_r' NOT LIKE 'b_e__r' ESCAPE '_' AS `false`
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-the pattern 'b_e__r' is invalid, the escape character is not allowed to precede 'e'
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1216",
+  "messageParameters" : {
+    "message" : "the escape character is not allowed to precede 'e'",
+    "pattern" : "b_e__r"
+  }
+}
 
 
 -- !query
diff --git a/sql/core/src/test/resources/sql-tests/results/postgreSQL/window_part3.sql.out b/sql/core/src/test/resources/sql-tests/results/postgreSQL/window_part3.sql.out
index d4d8165ddea..d9deb9aa31d 100644
--- a/sql/core/src/test/resources/sql-tests/results/postgreSQL/window_part3.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/postgreSQL/window_part3.sql.out
@@ -407,7 +407,20 @@ SELECT range(1, 100) OVER () FROM empsalary
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Undefined function: range. This function is neither a built-in/temporary function, nor a persistent function that is qualified as spark_catalog.default.range.; line 1 pos 7
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1242",
+  "messageParameters" : {
+    "fullName" : "spark_catalog.default.range",
+    "rawName" : "range"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 8,
+    "stopIndex" : 28,
+    "fragment" : "range(1, 100) OVER ()"
+  } ]
+}
 
 
 -- !query
diff --git a/sql/core/src/test/resources/sql-tests/results/show-tables.sql.out b/sql/core/src/test/resources/sql-tests/results/show-tables.sql.out
index f095f98fd84..7170a8d6081 100644
--- a/sql/core/src/test/resources/sql-tests/results/show-tables.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/show-tables.sql.out
@@ -223,7 +223,14 @@ SHOW TABLE EXTENDED LIKE 'show_t1' PARTITION(c='Us')
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Partition spec is invalid. The spec (c) must match the partition spec (c, d) defined in table '`spark_catalog`.`showdb`.`show_t1`'
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1232",
+  "messageParameters" : {
+    "partitionColumnNames" : "c, d",
+    "specKeys" : "c",
+    "tableName" : "`spark_catalog`.`showdb`.`show_t1`"
+  }
+}
 
 
 -- !query
@@ -232,7 +239,13 @@ SHOW TABLE EXTENDED LIKE 'show_t1' PARTITION(a='Us', d=1)
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-a is not a valid partition column in table `spark_catalog`.`showdb`.`show_t1`.
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1231",
+  "messageParameters" : {
+    "key" : "a",
+    "tblName" : "`spark_catalog`.`showdb`.`show_t1`"
+  }
+}
 
 
 -- !query
@@ -241,9 +254,12 @@ SHOW TABLE EXTENDED LIKE 'show_t1' PARTITION(c='Ch', d=1)
 struct<>
 -- !query output
 org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException
-Partition not found in table 'show_t1' database 'showdb':
-c -> Ch
-d -> 1
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1238",
+  "messageParameters" : {
+    "msg" : "Partition not found in table 'show_t1' database 'showdb':\nc -> Ch\nd -> 1"
+  }
+}
 
 
 -- !query
diff --git a/sql/core/src/test/resources/sql-tests/results/udf/udf-inline-table.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/udf-inline-table.sql.out
index b6c017118a6..688f6103ff3 100644
--- a/sql/core/src/test/resources/sql-tests/results/udf/udf-inline-table.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/udf/udf-inline-table.sql.out
@@ -130,7 +130,20 @@ select udf(a), udf(b) from values ("one", random_not_exist_func(1)), ("two", 2)
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Undefined function: random_not_exist_func. This function is neither a built-in/temporary function, nor a persistent function that is qualified as spark_catalog.default.random_not_exist_func.; line 1 pos 42
+{
+  "errorClass" : "_LEGACY_ERROR_TEMP_1242",
+  "messageParameters" : {
+    "fullName" : "spark_catalog.default.random_not_exist_func",
+    "rawName" : "random_not_exist_func"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 43,
+    "stopIndex" : 66,
+    "fragment" : "random_not_exist_func(1)"
+  } ]
+}
 
 
 -- !query
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 9fa9cd16371..b13548721b9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -3701,11 +3701,14 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
     withTempView("df") {
       Seq("m@ca").toDF("s").createOrReplaceTempView("df")
 
-      val e = intercept[AnalysisException] {
-        sql("SELECT s LIKE 'm%@ca' ESCAPE '%' FROM df").collect()
-      }
-      assert(e.message.contains("the pattern 'm%@ca' is invalid, " +
-        "the escape character is not allowed to precede '@'"))
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql("SELECT s LIKE 'm%@ca' ESCAPE '%' FROM df").collect()
+        },
+        errorClass = "_LEGACY_ERROR_TEMP_1216",
+        parameters = Map(
+          "pattern" -> "m%@ca",
+          "message" -> "the escape character is not allowed to precede '@'"))
 
       checkAnswer(sql("SELECT s LIKE 'm@@ca' ESCAPE '@' FROM df"), Row(true))
     }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 14af2b82411..91de56334e8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -525,19 +525,29 @@ abstract class DDLSuite extends QueryTest with DDLSuiteBase {
   }
 
   test("create table - partition column names not in table definition") {
-    val e = intercept[AnalysisException] {
-      sql("CREATE TABLE tbl(a int, b string) USING json PARTITIONED BY (c)")
-    }
-    assert(e.message == "partition column c is not defined in table " +
-      s"$SESSION_CATALOG_NAME.default.tbl, defined table columns are: a, b")
+    checkError(
+      exception = intercept[AnalysisException] {
+        sql("CREATE TABLE tbl(a int, b string) USING json PARTITIONED BY (c)")
+      },
+      errorClass = "_LEGACY_ERROR_TEMP_1206",
+      parameters = Map(
+        "colType" -> "partition",
+        "colName" -> "c",
+        "tableName" -> s"$SESSION_CATALOG_NAME.default.tbl",
+        "tableCols" -> "a, b"))
   }
 
   test("create table - bucket column names not in table definition") {
-    val e = intercept[AnalysisException] {
-      sql("CREATE TABLE tbl(a int, b string) USING json CLUSTERED BY (c) INTO 4 BUCKETS")
-    }
-    assert(e.message == "bucket column c is not defined in table " +
-      s"$SESSION_CATALOG_NAME.default.tbl, defined table columns are: a, b")
+    checkError(
+      exception = intercept[AnalysisException] {
+        sql("CREATE TABLE tbl(a int, b string) USING json CLUSTERED BY (c) INTO 4 BUCKETS")
+      },
+      errorClass = "_LEGACY_ERROR_TEMP_1206",
+      parameters = Map(
+        "colType" -> "bucket",
+        "colName" -> "c",
+        "tableName" -> s"$SESSION_CATALOG_NAME.default.tbl",
+        "tableCols" -> "a, b"))
   }
 
   test("create table - column repeated in partition columns") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableSetLocationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableSetLocationSuite.scala
index a8af349a89b..d6eea8ae8ed 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableSetLocationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableSetLocationSuite.scala
@@ -119,11 +119,15 @@ trait AlterTableSetLocationSuiteBase extends command.AlterTableSetLocationSuiteB
       sql(buildCreateTableSQL(t))
 
       sql(s"INSERT INTO $t PARTITION (a = '1', b = '2') SELECT 1, 'abc'")
-      val e = intercept[AnalysisException] {
-        sql(s"ALTER TABLE $t PARTITION (b='2') SET LOCATION '/mister/spark'")
-      }
-      assert(e.getMessage == "Partition spec is invalid. The spec (b) must match the partition " +
-        "spec (a, b) defined in table '`spark_catalog`.`ns`.`tbl`'")
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql(s"ALTER TABLE $t PARTITION (b='2') SET LOCATION '/mister/spark'")
+        },
+        errorClass = "_LEGACY_ERROR_TEMP_1232",
+        parameters = Map(
+          "specKeys" -> "b",
+          "partitionColumnNames" -> "a, b",
+          "tableName" -> "`spark_catalog`.`ns`.`tbl`"))
     }
   }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableSetSerdeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableSetSerdeSuite.scala
index b34d7b03d47..708c7f7a618 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableSetSerdeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableSetSerdeSuite.scala
@@ -82,17 +82,19 @@ class AlterTableSetSerdeSuite extends AlterTableSetSerdeSuiteBase with CommandSu
       checkSerdeProps(tableIdent, Map.empty[String, String])
 
       // set table serde and/or properties (should fail on datasource tables)
-      val e1 = intercept[AnalysisException] {
-        sql(s"ALTER TABLE $t SET SERDE 'whatever'")
-      }
-      assert(e1.getMessage == "Operation not allowed: " +
-        "ALTER TABLE SET SERDE is not supported for tables created with the datasource API")
-      val e2 = intercept[AnalysisException] {
-        sql(s"ALTER TABLE $t SET SERDE 'org.apache.madoop' " +
-          "WITH SERDEPROPERTIES ('k' = 'v', 'kay' = 'vee')")
-      }
-      assert(e2.getMessage == "Operation not allowed: " +
-        "ALTER TABLE SET SERDE is not supported for tables created with the datasource API")
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql(s"ALTER TABLE $t SET SERDE 'whatever'")
+        },
+        errorClass = "_LEGACY_ERROR_TEMP_1248",
+        parameters = Map.empty)
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql(s"ALTER TABLE $t SET SERDE 'org.apache.madoop' " +
+            "WITH SERDEPROPERTIES ('k' = 'v', 'kay' = 'vee')")
+        },
+        errorClass = "_LEGACY_ERROR_TEMP_1248",
+        parameters = Map.empty)
 
       // set serde properties only
       sql(s"ALTER TABLE $t SET SERDEPROPERTIES ('k' = 'vvv', 'kay' = 'vee')")
@@ -126,37 +128,37 @@ class AlterTableSetSerdeSuite extends AlterTableSetSerdeSuiteBase with CommandSu
       checkPartitionSerdeProps(tableIdent, spec, Map.empty[String, String])
 
       // set table serde and/or properties (should fail on datasource tables)
-      val e1 = intercept[AnalysisException] {
-        sql(s"ALTER TABLE $t PARTITION (a=1, b=2) SET SERDE 'whatever'")
-      }
-      assert(e1.getMessage == "Operation not allowed: " +
-        "ALTER TABLE SET [SERDE | SERDEPROPERTIES] for a specific partition " +
-        "is not supported for tables created with the datasource API")
-      val e2 = intercept[AnalysisException] {
-        sql(s"ALTER TABLE $t PARTITION (a=1, b=2) SET SERDE 'org.apache.madoop' " +
-          "WITH SERDEPROPERTIES ('k' = 'v', 'kay' = 'vee')")
-      }
-      assert(e2.getMessage == "Operation not allowed: " +
-        "ALTER TABLE SET [SERDE | SERDEPROPERTIES] for a specific partition " +
-        "is not supported for tables created with the datasource API")
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql(s"ALTER TABLE $t PARTITION (a=1, b=2) SET SERDE 'whatever'")
+        },
+        errorClass = "_LEGACY_ERROR_TEMP_1247",
+        parameters = Map.empty)
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql(s"ALTER TABLE $t PARTITION (a=1, b=2) SET SERDE 'org.apache.madoop' " +
+            "WITH SERDEPROPERTIES ('k' = 'v', 'kay' = 'vee')")
+        },
+        errorClass = "_LEGACY_ERROR_TEMP_1247",
+        parameters = Map.empty)
 
       // set serde properties only
-      val e3 = intercept[AnalysisException] {
-        sql(s"ALTER TABLE $t PARTITION (a=1, b=2) " +
-          "SET SERDEPROPERTIES ('k' = 'vvv', 'kay' = 'vee')")
-      }
-      assert(e3.getMessage == "Operation not allowed: " +
-        "ALTER TABLE SET [SERDE | SERDEPROPERTIES] for a specific partition " +
-        "is not supported for tables created with the datasource API")
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql(s"ALTER TABLE $t PARTITION (a=1, b=2) " +
+            "SET SERDEPROPERTIES ('k' = 'vvv', 'kay' = 'vee')")
+        },
+        errorClass = "_LEGACY_ERROR_TEMP_1247",
+        parameters = Map.empty)
 
       // set things without explicitly specifying database
       sessionCatalog.setCurrentDatabase("ns")
-      val e4 = intercept[AnalysisException] {
-        sql(s"ALTER TABLE tbl PARTITION (a=1, b=2) SET SERDEPROPERTIES ('kay' = 'veee')")
-      }
-      assert(e4.getMessage == "Operation not allowed: " +
-        "ALTER TABLE SET [SERDE | SERDEPROPERTIES] for a specific partition " +
-        "is not supported for tables created with the datasource API")
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql(s"ALTER TABLE tbl PARTITION (a=1, b=2) SET SERDEPROPERTIES ('kay' = 'veee')")
+        },
+        errorClass = "_LEGACY_ERROR_TEMP_1247",
+        parameters = Map.empty)
 
       // table to alter does not exist
       val e5 = intercept[AnalysisException] {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org