You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2023/12/11 22:29:43 UTC

(spark) branch master updated: [SPARK-46351][SQL] Require an error class in `AnalysisException`

This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e0808c33f1 [SPARK-46351][SQL] Require an error class in `AnalysisException`
3e0808c33f1 is described below

commit 3e0808c33f185c13808ce2d547ce9ba0057d31a6
Author: Max Gekk <ma...@gmail.com>
AuthorDate: Tue Dec 12 01:29:26 2023 +0300

    [SPARK-46351][SQL] Require an error class in `AnalysisException`
    
    ### What changes were proposed in this pull request?
    In the PR, I propose to create `AnalysisException` only with an error class by making the constructor with `message` protected. So, in this way only sub-classes can create `AnalysisException` by passing a `message`, but others shall provide an error class.
    
    ### Why are the changes needed?
    To improve user experience with Spark SQL by unifying error exceptions: the final goal is all Spark exception should contain an error class.
    
    ### Does this PR introduce _any_ user-facing change?
    No since user's code shouldn't throw `AnalysisException` but it can if it depends on error message formats.
    
    ### How was this patch tested?
    By existing test test suites like:
    ```
    $ PYSPARK_PYTHON=python3 build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite"
    ```
    and the modified test suites:
    ```
    $ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *HiveDDLSuite"
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #44277 from MaxGekk/protected-AnalysisException.
    
    Authored-by: Max Gekk <ma...@gmail.com>
    Signed-off-by: Max Gekk <ma...@gmail.com>
---
 .../src/main/resources/error/error-classes.json    | 253 +++++++++++++++++++++
 .../apache/spark/sql/avro/AvroDataToCatalyst.scala |  19 +-
 .../org/apache/spark/sql/test/SQLHelper.scala      |   4 +-
 .../connect/client/GrpcExceptionConverter.scala    |  22 +-
 .../spark/sql/kafka010/KafkaSourceProvider.scala   |   9 +-
 .../apache/spark/sql/kafka010/KafkaWriter.scala    |   5 +-
 .../org/apache/spark/sql/AnalysisException.scala   |  14 +-
 .../apache/spark/sql/catalyst/SQLConfHelper.scala  |   4 +-
 .../catalyst/analysis/ColumnResolutionHelper.scala |   8 +-
 .../ResolveRowLevelCommandAssignments.scala        |   4 +-
 .../catalyst/analysis/RewriteMergeIntoTable.scala  |  12 +-
 .../catalyst/expressions/V2ExpressionUtils.scala   |  12 +-
 .../spark/sql/catalyst/planning/patterns.scala     |   5 +-
 .../sql/catalyst/plans/logical/v2Commands.scala    |   8 +-
 .../org/apache/spark/sql/util/SchemaUtils.scala    |  25 +-
 .../catalyst/catalog/ExternalCatalogSuite.scala    |   4 +-
 .../spark/sql/RelationalGroupedDataset.scala       |   4 +-
 .../spark/sql/execution/SparkStrategies.scala      |   3 +-
 .../spark/sql/execution/aggregate/AggUtils.scala   |   5 +-
 .../execution/datasources/FileSourceStrategy.scala |  13 +-
 .../parquet/ParquetSchemaConverter.scala           |   4 +-
 .../spark/sql/execution/datasources/rules.scala    |   5 +-
 .../execution/datasources/v2/MergeRowsExec.scala   |   4 +-
 .../datasources/v2/state/utils/SchemaUtil.scala    |   6 +-
 .../RowLevelOperationRuntimeGroupFiltering.scala   |   5 +-
 .../execution/streaming/WatermarkPropagator.scala  |   6 +-
 .../execution/streaming/statefulOperators.scala    |   6 +-
 .../org/apache/spark/sql/jdbc/JdbcDialects.scala   |   5 +-
 .../sql/connector/DataSourceV2FunctionSuite.scala  |  10 +-
 .../spark/sql/connector/DataSourceV2SQLSuite.scala |  79 +++++--
 .../connector/FileDataSourceV2FallBackSuite.scala  |  28 ++-
 .../spark/sql/execution/QueryExecutionSuite.scala  |   2 +-
 .../execution/datasources/orc/OrcFilterSuite.scala |   3 +-
 .../sql/execution/datasources/orc/OrcTest.scala    |   3 +-
 .../datasources/parquet/ParquetFilterSuite.scala   |   3 +-
 .../spark/sql/hive/HiveExternalCatalog.scala       |  32 ++-
 .../org/apache/spark/sql/hive/HiveInspectors.scala |  15 +-
 .../spark/sql/hive/HiveMetastoreCatalog.scala      |  17 +-
 .../spark/sql/hive/HiveSessionStateBuilder.scala   |   7 +-
 .../org/apache/spark/sql/hive/HiveStrategies.scala |   8 +-
 .../sql/hive/execution/V1WritesHiveUtils.scala     |   4 +-
 .../spark/sql/hive/HiveMetastoreCatalogSuite.scala |  11 +-
 .../spark/sql/hive/execution/HiveDDLSuite.scala    |  72 +++---
 .../spark/sql/hive/execution/HiveQuerySuite.scala  |   8 +-
 .../sql/hive/execution/Hive_2_1_DDLSuite.scala     |   8 +-
 45 files changed, 611 insertions(+), 173 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json
index 62d10c0d34c..d52ffc011b7 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -6705,6 +6705,259 @@
       "Failed to get block <blockId>, which is not a shuffle block"
     ]
   },
+  "_LEGACY_ERROR_TEMP_3050" : {
+    "message" : [
+      "Cannot modify the value of a static config: <k>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3051" : {
+    "message" : [
+      "When resolving <u>, fail to find subplan with plan_id=<planId> in <q>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3052" : {
+    "message" : [
+      "Unexpected resolved action: <other>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3053" : {
+    "message" : [
+      "Unexpected WHEN NOT MATCHED action: <other>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3054" : {
+    "message" : [
+      "<expr> is not currently supported"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3055" : {
+    "message" : [
+      "ScalarFunction '<scalarFunc.name>' neither implement magic method nor override 'produceResult'"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3056" : {
+    "message" : [
+      "Unexpected row-level read relations (allow multiple = <allowMultipleReads>): <other>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3057" : {
+    "message" : [
+      "Cannot retrieve row-level operation from <table>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3058" : {
+    "message" : [
+      "Found duplicate column(s) <checkType>: <duplicateColumns>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3059" : {
+    "message" : [
+      "The positions provided (<pos>) cannot be resolved in",
+      "<schema>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3060" : {
+    "message" : [
+      "Couldn't find column <i> in:",
+      "<schema>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3061" : {
+    "message" : [
+      "<e>",
+      "<schema>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3062" : {
+    "message" : [
+      "Expected <columnPath> to be a nested data type, but found <o>. Was looking for the index of <attr> in a nested field"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3063" : {
+    "message" : [
+      "pivot is not supported on a streaming DataFrames/Datasets"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3064" : {
+    "message" : [
+      "<msg>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3065" : {
+    "message" : [
+      "<clazz>: <msg>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3066" : {
+    "message" : [
+      "<msg>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3067" : {
+    "message" : [
+      "Streaming aggregation doesn't support group aggregate pandas UDF"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3068" : {
+    "message" : [
+      "Global aggregation with session window in streaming query is not supported."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3069" : {
+    "message" : [
+      "<internalName> is a reserved column name that cannot be read in combination with <colName> column."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3070" : {
+    "message" : [
+      "<internalName> is a reserved column name that cannot be read in combination with <colName> column."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3071" : {
+    "message" : [
+      "<msg>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3072" : {
+    "message" : [
+      "<msg>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3073" : {
+    "message" : [
+      "Unexpected instruction: <other>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3074" : {
+    "message" : [
+      "field <fieldName> not found from given schema <schema>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3075" : {
+    "message" : [
+      "Couldn't find scan attribute for <tableAttr> in <scanAttrs>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3076" : {
+    "message" : [
+      "Redefining watermark is disallowed. You can set the config '<config>' to 'false' to restore the previous behavior. Note that multiple stateful operators will be disallowed."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3077" : {
+    "message" : [
+      "More than one event time columns are available. Please ensure there is at most one event time column per stream. event time columns: <eventTimeCols>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3078" : {
+    "message" : [
+      "Can not match ParquetTable in the query."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3079" : {
+    "message" : [
+      "Dynamic partition cannot be the parent of a static partition."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3080" : {
+    "message" : [
+      "<msg>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3081" : {
+    "message" : [
+      "Save mode <mode> not allowed for Kafka. Allowed save modes are <append> and <errorIfExists> (default)."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3082" : {
+    "message" : [
+      "Creating bucketed Hive serde table is not supported yet."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3083" : {
+    "message" : [
+      "Unable to infer the schema. The schema specification is required to create the table <tableName>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3084" : {
+    "message" : [
+      "No handler for UDF/UDAF/UDTF '<clazz>': <e>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3085" : {
+    "message" : [
+      "from_avro() doesn't support the <name> mode. Acceptable modes are <permissiveMode> and <failFastMode>."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3086" : {
+    "message" : [
+      "Cannot persist <tableName> into Hive metastore as table property keys may not start with 'spark.sql.': <invalidKeys>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3087" : {
+    "message" : [
+      "Cannot set or change the preserved property key: 'EXTERNAL'"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3088" : {
+    "message" : [
+      "The metadata is corrupted. Unable to find the partition column names from the schema. schema: <schema>. Partition columns: <partColumnNames>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3089" : {
+    "message" : [
+      "Corrupted <typeName> in catalog: <numCols> parts expected, but part <index> is missing."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3090" : {
+    "message" : [
+      "Raw list type in java is unsupported because Spark cannot infer the element type."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3091" : {
+    "message" : [
+      "Raw map type in java is unsupported because Spark cannot infer key and value types."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3092" : {
+    "message" : [
+      "Collection types with wildcards (e.g. List<?> or Map<?, ?>) are unsupported because Spark cannot infer the data type for these type parameters."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3093" : {
+    "message" : [
+      "Unsupported java type <c>"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3094" : {
+    "message" : [
+      "<dt> is not supported."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3095" : {
+    "message" : [
+      "<dt> cannot be converted to Hive TypeInfo"
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3096" : {
+    "message" : [
+      "Converted table has <resLen> columns,",
+      "but source Hive table has <relLen> columns.",
+      "Set <key> to false,",
+      "or recreate table <ident> to workaround."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3097" : {
+    "message" : [
+      "Column in converted table has different data type with source Hive table's.",
+      "Set <key> to false,",
+      "or recreate table <ident> to workaround."
+    ]
+  },
+  "_LEGACY_ERROR_TEMP_3100" : {
+    "message" : [
+      "<message>"
+    ]
+  },
   "_LEGACY_ERROR_USER_RAISED_EXCEPTION" : {
     "message" : [
       "<errorMessage>"
diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala
index 06388409284..9f31a2db55a 100644
--- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala
+++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala
@@ -72,16 +72,16 @@ private[sql] case class AvroDataToCatalyst(
   @transient private lazy val parseMode: ParseMode = {
     val mode = avroOptions.parseMode
     if (mode != PermissiveMode && mode != FailFastMode) {
-      throw new AnalysisException(unacceptableModeMessage(mode.name))
+      throw new AnalysisException(
+        errorClass = "_LEGACY_ERROR_TEMP_3085",
+        messageParameters = Map(
+          "name" -> mode.name,
+          "permissiveMode" -> PermissiveMode.name,
+          "failFastMode" -> FailFastMode.name))
     }
     mode
   }
 
-  private def unacceptableModeMessage(name: String): String = {
-    s"from_avro() doesn't support the $name mode. " +
-      s"Acceptable modes are ${PermissiveMode.name} and ${FailFastMode.name}."
-  }
-
   @transient private lazy val nullResultRow: Any = dataType match {
       case st: StructType =>
         val resultRow = new SpecificInternalRow(st.map(_.dataType))
@@ -115,7 +115,12 @@ private[sql] case class AvroDataToCatalyst(
             s"Current parse Mode: ${FailFastMode.name}. To process malformed records as null " +
             "result, try setting the option 'mode' as 'PERMISSIVE'.", e)
         case _ =>
-          throw new AnalysisException(unacceptableModeMessage(parseMode.name))
+          throw new AnalysisException(
+            errorClass = "_LEGACY_ERROR_TEMP_3085",
+            messageParameters = Map(
+              "name" -> parseMode.name,
+              "permissiveMode" -> PermissiveMode.name,
+              "failFastMode" -> FailFastMode.name))
       }
     }
   }
diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/test/SQLHelper.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/test/SQLHelper.scala
index 727e2a4f420..4a574a15f7a 100644
--- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/test/SQLHelper.scala
+++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/test/SQLHelper.scala
@@ -45,7 +45,9 @@ trait SQLHelper {
       if (spark.conf.isModifiable(k)) {
         spark.conf.set(k, v)
       } else {
-        throw new AnalysisException(s"Cannot modify the value of a static config: $k")
+        throw new AnalysisException(
+          errorClass = "_LEGACY_ERROR_TEMP_3050",
+          messageParameters = Map("k" -> k))
       }
 
     }
diff --git a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
index c06498684fa..075526e7521 100644
--- a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
+++ b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
@@ -196,13 +196,21 @@ private[client] object GrpcExceptionConverter {
         errorClass = params.errorClass.orNull,
         messageParameters = params.messageParameters,
         queryContext = params.queryContext)),
-    errorConstructor(params =>
-      new AnalysisException(
-        params.message,
-        cause = params.cause,
-        errorClass = params.errorClass,
-        messageParameters = params.messageParameters,
-        context = params.queryContext)),
+    errorConstructor(params => {
+      if (params.errorClass.isEmpty) {
+        new AnalysisException(
+          errorClass = "_LEGACY_ERROR_TEMP_3100",
+          messageParameters = Map("message" -> params.message),
+          cause = params.cause,
+          context = params.queryContext)
+      } else {
+        new AnalysisException(
+          errorClass = params.errorClass.get,
+          messageParameters = params.messageParameters,
+          cause = params.cause,
+          context = params.queryContext)
+      }
+    }),
     errorConstructor(params =>
       new NamespaceAlreadyExistsException(params.errorClass.orNull, params.messageParameters)),
     errorConstructor(params =>
diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index f63f5e541e0..73446eddd25 100644
--- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -172,9 +172,12 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
       data: DataFrame): BaseRelation = {
     mode match {
       case SaveMode.Overwrite | SaveMode.Ignore =>
-        throw new AnalysisException(s"Save mode $mode not allowed for Kafka. " +
-          s"Allowed save modes are ${SaveMode.Append} and " +
-          s"${SaveMode.ErrorIfExists} (default).")
+        throw new AnalysisException(
+          errorClass = "_LEGACY_ERROR_TEMP_3081",
+          messageParameters = Map(
+            "mode" -> mode.toString,
+            "append" -> SaveMode.Append.toString,
+            "errorIfExists" -> SaveMode.ErrorIfExists.toString))
       case _ => // good
     }
     val caseInsensitiveParameters = CaseInsensitiveMap(parameters)
diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala
index d1c4386e486..d986394eb1c 100644
--- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala
+++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala
@@ -56,7 +56,10 @@ private[kafka010] object KafkaWriter extends Logging {
       headersExpression(schema)
       partitionExpression(schema)
     } catch {
-      case e: IllegalStateException => throw new AnalysisException(e.getMessage)
+      case e: IllegalStateException =>
+        throw new AnalysisException(
+          errorClass = "_LEGACY_ERROR_TEMP_3080",
+          messageParameters = Map("msg" -> e.getMessage))
     }
   }
 
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/AnalysisException.scala b/sql/api/src/main/scala/org/apache/spark/sql/AnalysisException.scala
index a043df9b42c..7a428f6cc32 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/AnalysisException.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/AnalysisException.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.trees.{Origin, WithOrigin}
  * @since 1.3.0
  */
 @Stable
-class AnalysisException protected[sql] (
+class AnalysisException protected(
     val message: String,
     val line: Option[Int] = None,
     val startPosition: Option[Int] = None,
@@ -49,6 +49,18 @@ class AnalysisException protected[sql] (
       messageParameters = messageParameters,
       cause = cause)
 
+  def this(
+      errorClass: String,
+      messageParameters: Map[String, String],
+      context: Array[QueryContext],
+      cause: Option[Throwable]) =
+    this(
+      SparkThrowableHelper.getMessage(errorClass, messageParameters),
+      errorClass = Some(errorClass),
+      messageParameters = messageParameters,
+      context = context,
+      cause = cause)
+
   def this(
       errorClass: String,
       messageParameters: Map[String, String],
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SQLConfHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SQLConfHelper.scala
index f4605b9218f..bd0455d76a8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SQLConfHelper.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SQLConfHelper.scala
@@ -47,7 +47,9 @@ trait SQLConfHelper {
     }
     keys.lazyZip(values).foreach { (k, v) =>
       if (SQLConf.isStaticConfigKey(k)) {
-        throw new AnalysisException(s"Cannot modify the value of a static config: $k")
+        throw new AnalysisException(
+          errorClass = "_LEGACY_ERROR_TEMP_3050",
+          messageParameters = Map("k" -> k))
       }
       conf.setConfString(k, v)
     }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala
index 70b44fbfa79..a90c6156503 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala
@@ -513,8 +513,12 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase {
         //  df1 = spark.createDataFrame([Row(a = 1, b = 2, c = 3)]])
         //  df2 = spark.createDataFrame([Row(a = 1, b = 2)]])
         //  df1.select(df2.a)   <-   illegal reference df2.a
-        throw new AnalysisException(s"When resolving $u, " +
-          s"fail to find subplan with plan_id=$planId in\n$q")
+        throw new AnalysisException(
+          errorClass = "_LEGACY_ERROR_TEMP_3051",
+          messageParameters = Map(
+            "u" -> u.toString,
+            "planId" -> planId.toString,
+            "q" -> q.toString))
       }
     })
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveRowLevelCommandAssignments.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveRowLevelCommandAssignments.scala
index ee16ce262e3..3f3e707b054 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveRowLevelCommandAssignments.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveRowLevelCommandAssignments.scala
@@ -116,7 +116,9 @@ object ResolveRowLevelCommandAssignments extends Rule[LogicalPlan] {
       case i @ InsertAction(_, assignments) =>
         i.copy(assignments = AssignmentUtils.alignInsertAssignments(attrs, assignments))
       case other =>
-        throw new AnalysisException(s"Unexpected resolved action: $other")
+        throw new AnalysisException(
+          errorClass = "_LEGACY_ERROR_TEMP_3052",
+          messageParameters = Map("other" -> other.toString))
     }
   }
 }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala
index 4ba33f4743e..9e020cb55ed 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala
@@ -95,7 +95,9 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand with PredicateHelper
             case InsertAction(cond, assignments) =>
               Keep(cond.getOrElse(TrueLiteral), assignments.map(_.value))
             case other =>
-              throw new AnalysisException(s"Unexpected WHEN NOT MATCHED action: $other")
+              throw new AnalysisException(
+                errorClass = "_LEGACY_ERROR_TEMP_3053",
+                messageParameters = Map("other" -> other.toString))
           }
 
           val outputs = notMatchedInstructions.flatMap(_.outputs)
@@ -440,7 +442,9 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand with PredicateHelper
         Keep(cond.getOrElse(TrueLiteral), output)
 
       case other =>
-        throw new AnalysisException(s"Unexpected action: $other")
+        throw new AnalysisException(
+          errorClass = "_LEGACY_ERROR_TEMP_3052",
+          messageParameters = Map("other" -> other.toString))
     }
   }
 
@@ -472,7 +476,9 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand with PredicateHelper
         Keep(cond.getOrElse(TrueLiteral), output)
 
       case other =>
-        throw new AnalysisException(s"Unexpected action: $other")
+        throw new AnalysisException(
+          errorClass = "_LEGACY_ERROR_TEMP_3052",
+          messageParameters = Map("other" -> other.toString))
     }
   }
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala
index 60e457a776b..621e01eedea 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala
@@ -70,7 +70,8 @@ object V2ExpressionUtils extends SQLConfHelper with Logging {
       query: LogicalPlan,
       funCatalogOpt: Option[FunctionCatalog] = None): Expression =
     toCatalystOpt(expr, query, funCatalogOpt)
-        .getOrElse(throw new AnalysisException(s"$expr is not currently supported"))
+        .getOrElse(throw new AnalysisException(
+          errorClass = "_LEGACY_ERROR_TEMP_3054", messageParameters = Map("expr" -> expr.toString)))
 
   def toCatalystOpt(
       expr: V2Expression,
@@ -88,7 +89,9 @@ object V2ExpressionUtils extends SQLConfHelper with Logging {
       case ref: FieldReference =>
         Some(resolveRef[NamedExpression](ref, query))
       case _ =>
-        throw new AnalysisException(s"$expr is not currently supported")
+        throw new AnalysisException(
+          errorClass = "_LEGACY_ERROR_TEMP_3054",
+          messageParameters = Map("expr" -> expr.toString))
     }
   }
 
@@ -176,8 +179,9 @@ object V2ExpressionUtils extends SQLConfHelper with Logging {
           case Some(_) =>
             ApplyFunctionExpression(scalarFunc, arguments)
           case _ =>
-            throw new AnalysisException(s"ScalarFunction '${scalarFunc.name()}'" +
-              s" neither implement magic method nor override 'produceResult'")
+            throw new AnalysisException(
+              errorClass = "_LEGACY_ERROR_TEMP_3055",
+              messageParameters = Map("scalarFunc" -> scalarFunc.name()))
         }
     }
   }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
index 53dd601ac39..e48b44a603a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
@@ -478,7 +478,10 @@ object GroupBasedRowLevelOperation {
 
       case other =>
         throw new AnalysisException(
-          s"Unexpected row-level read relations (allow multiple = $allowMultipleReads): $other")
+          errorClass = "_LEGACY_ERROR_TEMP_3056",
+          messageParameters = Map(
+            "allowMultipleReads" -> allowMultipleReads.toString,
+            "other" -> other.toString))
     }
   }
 }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index 0a18532b134..03ea8c8423c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -235,7 +235,9 @@ case class ReplaceData(
       case DataSourceV2Relation(RowLevelOperationTable(_, operation), _, _, _, _) =>
         operation
       case _ =>
-        throw new AnalysisException(s"Cannot retrieve row-level operation from $table")
+        throw new AnalysisException(
+          errorClass = "_LEGACY_ERROR_TEMP_3057",
+          messageParameters = Map("table" -> table.toString))
     }
   }
 
@@ -313,7 +315,9 @@ case class WriteDelta(
       case DataSourceV2Relation(RowLevelOperationTable(_, operation), _, _, _, _) =>
         operation.asInstanceOf[SupportsDelta]
       case _ =>
-        throw new AnalysisException(s"Cannot retrieve row-level operation from $table")
+        throw new AnalysisException(
+          errorClass = "_LEGACY_ERROR_TEMP_3057",
+          messageParameters = Map("table" -> table.toString))
     }
   }
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala
index 76d0a516a13..d061fde27c8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala
@@ -192,7 +192,10 @@ private[spark] object SchemaUtils {
         case (x, ys) if ys.length > 1 => s"${x._2.mkString(".")}"
       }
       throw new AnalysisException(
-        s"Found duplicate column(s) $checkType: ${duplicateColumns.mkString(", ")}")
+        errorClass = "_LEGACY_ERROR_TEMP_3058",
+        messageParameters = Map(
+          "checkType" -> checkType,
+          "duplicateColumns" -> duplicateColumns.mkString(", ")))
     }
   }
 
@@ -225,9 +228,11 @@ private[spark] object SchemaUtils {
         case o =>
           if (column.length > 1) {
             throw new AnalysisException(
-              s"""Expected $columnPath to be a nested data type, but found $o. Was looking for the
-                 |index of ${UnresolvedAttribute(column).name} in a nested field
-              """.stripMargin)
+              errorClass = "_LEGACY_ERROR_TEMP_3062",
+              messageParameters = Map(
+                "columnPath" -> columnPath,
+                "o" -> o.toString,
+                "attr" -> UnresolvedAttribute(column).name))
           }
           Nil
       }
@@ -239,9 +244,12 @@ private[spark] object SchemaUtils {
     } catch {
       case i: IndexOutOfBoundsException =>
         throw new AnalysisException(
-          s"Couldn't find column ${i.getMessage} in:\n${schema.treeString}")
+          errorClass = "_LEGACY_ERROR_TEMP_3060",
+          messageParameters = Map("i" -> i.getMessage, "schema" -> schema.treeString))
       case e: AnalysisException =>
-        throw new AnalysisException(e.getMessage + s":\n${schema.treeString}")
+        throw new AnalysisException(
+          errorClass = "_LEGACY_ERROR_TEMP_3061",
+          messageParameters = Map("e" -> e.getMessage, "schema" -> schema.treeString))
     }
   }
 
@@ -261,7 +269,10 @@ private[spark] object SchemaUtils {
             (nameAndField._1 :+ nowField.name) -> nowField
           case _ =>
             throw new AnalysisException(
-              s"The positions provided ($pos) cannot be resolved in\n${schema.treeString}.")
+              errorClass = "_LEGACY_ERROR_TEMP_3059",
+              messageParameters = Map(
+                "pos" -> pos.toString,
+                "schema" -> schema.treeString))
       }
     }
     field._1
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index a8f73cebf31..1ee20a98cfd 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -568,7 +568,9 @@ abstract class ExternalCatalogSuite extends SparkFunSuite {
         // HiveExternalCatalog may be the first one to notice and throw an exception, which will
         // then be caught and converted to a RuntimeException with a descriptive message.
         case ex: RuntimeException if ex.getMessage.contains("MetaException") =>
-          throw new AnalysisException(ex.getMessage)
+          throw new AnalysisException(
+            errorClass = "_LEGACY_ERROR_TEMP_3066",
+            messageParameters = Map("msg" -> ex.getMessage))
       }
     }
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
index 5ad96cdba21..eef1c9436df 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
@@ -422,7 +422,9 @@ class RelationalGroupedDataset protected[sql](
    */
   def pivot(pivotColumn: Column): RelationalGroupedDataset = {
     if (df.isStreaming) {
-      throw new AnalysisException("pivot is not supported on a streaming DataFrames/Datasets")
+      throw new AnalysisException(
+        errorClass = "_LEGACY_ERROR_TEMP_3063",
+        messageParameters = Map.empty)
     }
     // This is to prevent unintended OOM errors when the number of distinct values is large
     val maxValues = df.sparkSession.sessionState.conf.dataFramePivotMaxValues
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 304ce0cd751..2d24f997d10 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -421,7 +421,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
 
         if (aggregateExpressions.exists(_.aggregateFunction.isInstanceOf[PythonUDAF])) {
           throw new AnalysisException(
-            "Streaming aggregation doesn't support group aggregate pandas UDF")
+            errorClass = "_LEGACY_ERROR_TEMP_3067",
+            messageParameters = Map.empty)
         }
 
         val sessionWindowOption = namedGroupingExpressions.find { p =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala
index 557f0e897ee..1972aeb3826 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala
@@ -427,8 +427,9 @@ object AggUtils {
     }
 
     if (groupWithoutSessionExpression.isEmpty) {
-      throw new AnalysisException("Global aggregation with session window in streaming query" +
-        " is not supported.")
+      throw new AnalysisException(
+        errorClass = "_LEGACY_ERROR_TEMP_3068",
+        messageParameters = Map.empty)
     }
 
     val groupingWithoutSessionAttributes = groupWithoutSessionExpression.map(_.toAttribute)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 1b1eddecdb9..6e5c463ed72 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -259,9 +259,12 @@ object FileSourceStrategy extends Strategy with PredicateHelper with Logging {
         metadataStruct.dataType.asInstanceOf[StructType].fields.foreach {
           case FileSourceGeneratedMetadataStructField(field, internalName) =>
             if (schemaColumns.contains(internalName)) {
-              throw new AnalysisException(internalName +
-                s"${internalName} is a reserved column name that cannot be read in combination " +
-                s"with ${FileFormat.METADATA_NAME}.${field.name} column.")
+              throw new AnalysisException(
+                errorClass = "_LEGACY_ERROR_TEMP_3069",
+                messageParameters = Map(
+                  "internalName" -> internalName,
+                  "colName" -> s"${FileFormat.METADATA_NAME}.${field.name}"
+                ))
             }
 
             // NOTE: Readers require the internal column to be nullable because it's not part of the
@@ -276,7 +279,9 @@ object FileSourceStrategy extends Strategy with PredicateHelper with Logging {
             metadataColumnsByName.put(field.name, attr)
             constantMetadataColumns += attr
 
-          case field => throw new AnalysisException(s"Unrecognized file metadata field: $field")
+          case field => throw new AnalysisException(
+            errorClass = "_LEGACY_ERROR_TEMP_3070",
+            messageParameters = Map("field" -> field.toString))
         }
       }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
index f60f7c11eef..59c99cb998c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
@@ -752,7 +752,9 @@ private[sql] object ParquetSchemaConverter {
 
   def checkConversionRequirement(f: => Boolean, message: String): Unit = {
     if (!f) {
-      throw new AnalysisException(message)
+      throw new AnalysisException(
+        errorClass = "_LEGACY_ERROR_TEMP_3071",
+        messageParameters = Map("msg" -> message))
     }
   }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 12c183de19d..c58815b6978 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -364,7 +364,10 @@ case class PreprocessTableCreation(catalog: SessionCatalog) extends Rule[Logical
     }
   }
 
-  private def failAnalysis(msg: String) = throw new AnalysisException(msg)
+  private def failAnalysis(msg: String) = {
+    throw new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_3072", messageParameters = Map("msg" -> msg))
+  }
 }
 
 /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala
index 749acbaa7ad..8a25170fcee 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala
@@ -117,7 +117,9 @@ case class MergeRowsExec(
         SplitExec(createPredicate(cond), createProjection(output), createProjection(otherOutput))
 
       case other =>
-        throw new AnalysisException(s"Unexpected instruction: $other")
+        throw new AnalysisException(
+          errorClass = "_LEGACY_ERROR_TEMP_3073",
+          messageParameters = Map("other" -> other.toString))
     }
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/utils/SchemaUtil.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/utils/SchemaUtil.scala
index 07f4a4b5bac..54c6b34db97 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/utils/SchemaUtil.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/utils/SchemaUtil.scala
@@ -23,7 +23,11 @@ object SchemaUtil {
   def getSchemaAsDataType(schema: StructType, fieldName: String): DataType = {
     schema.getFieldIndex(fieldName) match {
       case Some(idx) => schema(idx).dataType
-      case _ => throw new AnalysisException(s"field $fieldName not found from given schema $schema")
+      case _ => throw new AnalysisException(
+        errorClass = "_LEGACY_ERROR_TEMP_3074",
+        messageParameters = Map(
+          "fieldName" -> fieldName,
+          "schema" -> schema.toString()))
     }
   }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala
index 7c28f91ee1c..f0ae37fc8e4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala
@@ -134,7 +134,10 @@ class RowLevelOperationRuntimeGroupFiltering(optimizeSubqueries: Rule[LogicalPla
         .map(scanAttr => tableAttr -> scanAttr)
         .getOrElse {
           throw new AnalysisException(
-            s"Couldn't find scan attribute for $tableAttr in ${scanAttrs.mkString(",")}")
+            errorClass = "_LEGACY_ERROR_TEMP_3075",
+            messageParameters = Map(
+              "tableAttr" -> tableAttr.toString,
+              "scanAttrs" -> scanAttrs.mkString(",")))
         }
     }
     AttributeMap(attrMapping)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkPropagator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkPropagator.scala
index a4bee7e95b4..f0950063b16 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkPropagator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkPropagator.scala
@@ -189,9 +189,9 @@ class PropagateWatermarkSimulator extends WatermarkPropagator with Logging {
       case node: EventTimeWatermarkExec =>
         val inputWatermarks = getInputWatermarks(node, nodeToOutputWatermark)
         if (inputWatermarks.nonEmpty) {
-          throw new AnalysisException("Redefining watermark is disallowed. You can set the " +
-            s"config '${SQLConf.STATEFUL_OPERATOR_ALLOW_MULTIPLE.key}' to 'false' to restore " +
-            "the previous behavior. Note that multiple stateful operators will be disallowed.")
+          throw new AnalysisException(
+            errorClass = "_LEGACY_ERROR_TEMP_3076",
+            messageParameters = Map("config" -> SQLConf.STATEFUL_OPERATOR_ALLOW_MULTIPLE.key))
         }
 
         nodeToOutputWatermark.put(node.id, Some(originWatermark))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
index 57db193a4c8..80f5b3532c5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
@@ -392,9 +392,9 @@ object WatermarkSupport {
       // with allowing them.
       val eventTimeColsSet = eventTimeCols.map(_.exprId).toSet
       if (eventTimeColsSet.size > 1) {
-        throw new AnalysisException("More than one event time columns are available. Please " +
-          "ensure there is at most one event time column per stream. event time columns: " +
-          eventTimeCols.mkString("(", ",", ")"))
+        throw new AnalysisException(
+          errorClass = "_LEGACY_ERROR_TEMP_3077",
+          messageParameters = Map("eventTimeCols" -> eventTimeCols.mkString("(", ",", ")")))
       }
 
       // With above check, even there are multiple columns in eventTimeCols, all columns must be
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
index 966092b58d9..5ba4e39e8ec 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
@@ -635,7 +635,10 @@ abstract class JdbcDialect extends Serializable with Logging {
    * @return `AnalysisException` or its sub-class.
    */
   def classifyException(message: String, e: Throwable): AnalysisException = {
-    new AnalysisException(message, cause = Some(e))
+    new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_3064",
+      messageParameters = Map("msg" -> message),
+      cause = Some(e))
   }
 
   /**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2FunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2FunctionSuite.scala
index b74d7318a92..f896997b57c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2FunctionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2FunctionSuite.scala
@@ -411,11 +411,10 @@ class DataSourceV2FunctionSuite extends DatasourceV2SQLBase {
     catalog("testcat").asInstanceOf[SupportsNamespaces].createNamespace(Array("ns"), emptyProps)
     addFunction(Identifier.of(Array("ns"), "strlen"),
       new JavaStrLen(new JavaStrLenNoImpl))
-    // TODO assign a error-classes name
     checkError(
       exception = intercept[AnalysisException](sql("SELECT testcat.ns.strlen('abc')").collect()),
-      errorClass = null,
-      parameters = Map.empty,
+      errorClass = "_LEGACY_ERROR_TEMP_3055",
+      parameters = Map("scalarFunc" -> "strlen"),
       context = ExpectedContext(
         fragment = "testcat.ns.strlen('abc')",
         start = 7,
@@ -446,11 +445,10 @@ class DataSourceV2FunctionSuite extends DatasourceV2SQLBase {
   test("SPARK-35390: scalar function w/ mismatch type parameters from magic method") {
     catalog("testcat").asInstanceOf[SupportsNamespaces].createNamespace(Array("ns"), emptyProps)
     addFunction(Identifier.of(Array("ns"), "add"), new JavaLongAdd(new JavaLongAddMismatchMagic))
-    // TODO assign a error-classes name
     checkError(
       exception = intercept[AnalysisException](sql("SELECT testcat.ns.add(1L, 2L)").collect()),
-      errorClass = null,
-      parameters = Map.empty,
+      errorClass = "_LEGACY_ERROR_TEMP_3055",
+      parameters = Map("scalarFunc" -> "long_add_mismatch_magic"),
       context = ExpectedContext(
         fragment = "testcat.ns.add(1L, 2L)",
         start = 7,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index 302a8e5d41d..47e79e45b73 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -1788,23 +1788,31 @@ class DataSourceV2SQLSuiteV1Filter
   }
 
   test("tableCreation: partition column case sensitive resolution") {
-    def checkFailure(statement: String): Unit = {
+    def checkFailure(statement: String, i: String): Unit = {
       withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
         checkError(
           exception = intercept[AnalysisException] {
             sql(statement)
           },
-          errorClass = null,
-          parameters = Map.empty)
+          errorClass = "_LEGACY_ERROR_TEMP_3060",
+          parameters = Map(
+            "i" -> i,
+            "schema" ->
+              """root
+                | |-- a: integer (nullable = true)
+                | |-- b: string (nullable = true)
+                |""".stripMargin))
       }
     }
 
-    checkFailure(s"CREATE TABLE tbl (a int, b string) USING $v2Source PARTITIONED BY (A)")
-    checkFailure(s"CREATE TABLE testcat.tbl (a int, b string) USING $v2Source PARTITIONED BY (A)")
+    checkFailure(s"CREATE TABLE tbl (a int, b string) USING $v2Source PARTITIONED BY (A)", "A")
+    checkFailure(s"CREATE TABLE testcat.tbl (a int, b string) USING $v2Source PARTITIONED BY (A)",
+      "A")
     checkFailure(
-      s"CREATE OR REPLACE TABLE tbl (a int, b string) USING $v2Source PARTITIONED BY (B)")
+      s"CREATE OR REPLACE TABLE tbl (a int, b string) USING $v2Source PARTITIONED BY (B)", "B")
     checkFailure(
-      s"CREATE OR REPLACE TABLE testcat.tbl (a int, b string) USING $v2Source PARTITIONED BY (B)")
+      s"CREATE OR REPLACE TABLE testcat.tbl (a int, b string) USING $v2Source PARTITIONED BY (B)",
+      "B")
   }
 
   test("tableCreation: duplicate column names in the table definition") {
@@ -1866,23 +1874,47 @@ class DataSourceV2SQLSuiteV1Filter
     checkError(
       exception = analysisException(
         s"CREATE TABLE tbl (a int, b string) USING $v2Source CLUSTERED BY (c) INTO 4 BUCKETS"),
-      errorClass = null,
-      parameters = Map.empty)
+      errorClass = "_LEGACY_ERROR_TEMP_3060",
+      parameters = Map(
+        "i" -> "c",
+        "schema" ->
+          """root
+            | |-- a: integer (nullable = true)
+            | |-- b: string (nullable = true)
+            |""".stripMargin))
     checkError(
       exception = analysisException(s"CREATE TABLE testcat.tbl (a int, b string) " +
         s"USING $v2Source CLUSTERED BY (c) INTO 4 BUCKETS"),
-      errorClass = null,
-      parameters = Map.empty)
+      errorClass = "_LEGACY_ERROR_TEMP_3060",
+      parameters = Map(
+        "i" -> "c",
+        "schema" ->
+          """root
+            | |-- a: integer (nullable = true)
+            | |-- b: string (nullable = true)
+            |""".stripMargin))
     checkError(
       exception = analysisException(s"CREATE OR REPLACE TABLE tbl (a int, b string) " +
         s"USING $v2Source CLUSTERED BY (c) INTO 4 BUCKETS"),
-      errorClass = null,
-      parameters = Map.empty)
+      errorClass = "_LEGACY_ERROR_TEMP_3060",
+      parameters = Map(
+        "i" -> "c",
+        "schema" ->
+          """root
+            | |-- a: integer (nullable = true)
+            | |-- b: string (nullable = true)
+            |""".stripMargin))
     checkError(
       exception = analysisException(s"CREATE OR REPLACE TABLE testcat.tbl (a int, b string) " +
         s"USING $v2Source CLUSTERED BY (c) INTO 4 BUCKETS"),
-      errorClass = null,
-      parameters = Map.empty)
+      errorClass = "_LEGACY_ERROR_TEMP_3060",
+      parameters = Map(
+        "i" -> "c",
+        "schema" ->
+          """root
+            | |-- a: integer (nullable = true)
+            | |-- b: string (nullable = true)
+            |""".stripMargin))
   }
 
   test("tableCreation: bucket column name containing dot") {
@@ -1906,26 +1938,27 @@ class DataSourceV2SQLSuiteV1Filter
   test("tableCreation: column repeated in partition columns") {
     Seq((true, ("a", "a")), (false, ("aA", "Aa"))).foreach { case (caseSensitive, (c0, c1)) =>
       withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
+        val dupCol = c1.toLowerCase(Locale.ROOT)
         checkError(
           exception = analysisException(
             s"CREATE TABLE t ($c0 INT) USING $v2Source PARTITIONED BY ($c0, $c1)"),
-          errorClass = null,
-          parameters = Map.empty)
+          errorClass = "_LEGACY_ERROR_TEMP_3058",
+          parameters = Map("checkType" -> "in the partitioning", "duplicateColumns" -> dupCol))
         checkError(
           exception = analysisException(
             s"CREATE TABLE testcat.t ($c0 INT) USING $v2Source PARTITIONED BY ($c0, $c1)"),
-          errorClass = null,
-          parameters = Map.empty)
+          errorClass = "_LEGACY_ERROR_TEMP_3058",
+          parameters = Map("checkType" -> "in the partitioning", "duplicateColumns" -> dupCol))
         checkError(
           exception = analysisException(
             s"CREATE OR REPLACE TABLE t ($c0 INT) USING $v2Source PARTITIONED BY ($c0, $c1)"),
-          errorClass = null,
-          parameters = Map.empty)
+          errorClass = "_LEGACY_ERROR_TEMP_3058",
+          parameters = Map("checkType" -> "in the partitioning", "duplicateColumns" -> dupCol))
         checkError(
           exception = analysisException(s"CREATE OR REPLACE TABLE testcat.t ($c0 INT) " +
             s"USING $v2Source PARTITIONED BY ($c0, $c1)"),
-          errorClass = null,
-          parameters = Map.empty)
+          errorClass = "_LEGACY_ERROR_TEMP_3058",
+          parameters = Map("checkType" -> "in the partitioning", "duplicateColumns" -> dupCol))
       }
     }
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
index cfc8b2cc845..c6060dcdd51 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
@@ -18,8 +18,8 @@ package org.apache.spark.sql.connector
 
 import scala.collection.mutable.ArrayBuffer
 
-import org.apache.spark.SparkConf
-import org.apache.spark.sql.{AnalysisException, QueryTest}
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.sql.QueryTest
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability}
 import org.apache.spark.sql.connector.read.ScanBuilder
@@ -51,7 +51,7 @@ class DummyReadOnlyFileTable extends Table with SupportsRead {
   override def schema(): StructType = StructType(Nil)
 
   override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
-    throw new AnalysisException("Dummy file reader")
+    throw SparkException.internalError("Dummy file reader")
   }
 
   override def capabilities(): java.util.Set[TableCapability] =
@@ -75,7 +75,7 @@ class DummyWriteOnlyFileTable extends Table with SupportsWrite {
   override def schema(): StructType = StructType(Nil)
 
   override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder =
-    throw new AnalysisException("Dummy file writer")
+    throw SparkException.internalError("Dummy file writer")
 
   override def capabilities(): java.util.Set[TableCapability] =
     java.util.EnumSet.of(TableCapability.BATCH_WRITE, TableCapability.ACCEPT_ANY_SCHEMA)
@@ -99,10 +99,12 @@ class FileDataSourceV2FallBackSuite extends QueryTest with SharedSparkSession {
       checkAnswer(spark.read.parquet(path), df)
 
       // Dummy File reader should fail as expected.
-      val exception = intercept[AnalysisException] {
-        spark.read.format(dummyReadOnlyFileSourceV2).load(path).collect()
-      }
-      assert(exception.message.equals("Dummy file reader"))
+      checkError(
+        exception = intercept[SparkException] {
+          spark.read.format(dummyReadOnlyFileSourceV2).load(path).collect()
+        },
+        errorClass = "INTERNAL_ERROR",
+        parameters = Map("message" -> "Dummy file reader"))
     }
   }
 
@@ -125,10 +127,12 @@ class FileDataSourceV2FallBackSuite extends QueryTest with SharedSparkSession {
 
       withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "foo,bar") {
         // Dummy File reader should fail as DISABLED_V2_FILE_DATA_SOURCE_READERS doesn't include it.
-        val exception = intercept[AnalysisException] {
-          spark.read.format(dummyReadOnlyFileSourceV2).load(path).collect()
-        }
-        assert(exception.message.equals("Dummy file reader"))
+        checkError(
+          exception = intercept[SparkException] {
+            spark.read.format(dummyReadOnlyFileSourceV2).load(path).collect()
+          },
+          errorClass = "INTERNAL_ERROR",
+          parameters = Map("message" -> "Dummy file reader"))
       }
     }
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala
index e987adf6225..583d7fd7ee3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala
@@ -160,7 +160,7 @@ class QueryExecutionSuite extends SharedSparkSession {
 
     // Throw an AnalysisException - this should be captured.
     spark.experimental.extraStrategies = Seq[SparkStrategy](
-      (_: LogicalPlan) => throw new AnalysisException("exception"))
+      (_: LogicalPlan) => throw new AnalysisException("_LEGACY_ERROR_TEMP_3078", Map.empty))
     assert(qe.toString.contains("org.apache.spark.sql.AnalysisException"))
 
     // Throw an Error - this should not be captured.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
index ab4389eceec..03b1f937e7b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala
@@ -70,8 +70,7 @@ class OrcFilterSuite extends OrcTest with SharedSparkSession {
           s"${o.pushedFilters.mkString("pushedFilters(", ", ", ")")}")
         checker(maybeFilter.get)
 
-      case _ =>
-        throw new AnalysisException("Can not match OrcTable in the query.")
+      case _ => assert(false, "Can not match OrcTable in the query.")
     }
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala
index 8a38075932b..0c696acdeda 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala
@@ -133,8 +133,7 @@ trait OrcTest extends QueryTest with FileBasedDataSourceTest with BeforeAndAfter
             s"${o.pushedFilters.mkString("pushedFilters(", ", ", ")")}")
         }
 
-      case _ =>
-        throw new AnalysisException("Can not match OrcTable in the query.")
+      case _ => assert(false, "Can not match OrcTable in the query.")
     }
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index d7ed5c4d354..da2705f7c72 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -2345,7 +2345,8 @@ class ParquetV2FilterSuite extends ParquetFilterSuite {
           checker(stripSparkFilter(query), expected)
 
         case _ =>
-          throw new AnalysisException("Can not match ParquetTable in the query.")
+          throw new AnalysisException(
+            errorClass = "_LEGACY_ERROR_TEMP_3078", messageParameters = Map.empty)
       }
     }
   }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index c5adb8e27c4..84f75e3ef50 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -106,7 +106,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
           case o => o
         }
         throw new AnalysisException(
-          e.getClass.getCanonicalName + ": " + e.getMessage, cause = Some(e))
+          errorClass = "_LEGACY_ERROR_TEMP_3065",
+          messageParameters = Map(
+            "clazz" -> e.getClass.getCanonicalName,
+            "msg" -> Option(e.getMessage).getOrElse("")),
+          cause = Some(e))
     }
   }
 
@@ -132,14 +136,17 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
   private def verifyTableProperties(table: CatalogTable): Unit = {
     val invalidKeys = table.properties.keys.filter(_.startsWith(SPARK_SQL_PREFIX))
     if (invalidKeys.nonEmpty) {
-      throw new AnalysisException(s"Cannot persist ${table.qualifiedName} into Hive metastore " +
-        s"as table property keys may not start with '$SPARK_SQL_PREFIX': " +
-        invalidKeys.mkString("[", ", ", "]"))
+      throw new AnalysisException(
+        errorClass = "_LEGACY_ERROR_TEMP_3086",
+        messageParameters = Map(
+          "tableName" -> table.qualifiedName,
+          "invalidKeys" -> invalidKeys.mkString("[", ", ", "]")))
     }
     // External users are not allowed to set/switch the table type. In Hive metastore, the table
     // type can be switched by changing the value of a case-sensitive table property `EXTERNAL`.
     if (table.properties.contains("EXTERNAL")) {
-      throw new AnalysisException("Cannot set or change the preserved property key: 'EXTERNAL'")
+      throw new AnalysisException(
+        errorClass = "_LEGACY_ERROR_TEMP_3087", messageParameters = Map.empty)
     }
   }
 
@@ -807,9 +814,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
   private def reorderSchema(schema: StructType, partColumnNames: Seq[String]): StructType = {
     val partitionFields = partColumnNames.map { partCol =>
       schema.find(_.name == partCol).getOrElse {
-        throw new AnalysisException("The metadata is corrupted. Unable to find the " +
-          s"partition column names from the schema. schema: ${schema.catalogString}. " +
-          s"Partition columns: ${partColumnNames.mkString("[", ", ", "]")}")
+        throw new AnalysisException(
+          errorClass = "_LEGACY_ERROR_TEMP_3088",
+          messageParameters = Map(
+            "schema" -> schema.catalogString,
+            "partColumnNames" -> partColumnNames.mkString("[", ", ", "]")))
       }
     }
     StructType(schema.filterNot(partitionFields.contains) ++ partitionFields)
@@ -1424,9 +1433,12 @@ object HiveExternalCatalog {
     } yield props.getOrElse(
       s"$DATASOURCE_SCHEMA_PREFIX${colType}Col.$index",
       throw new AnalysisException(
-        s"Corrupted $typeName in catalog: $numCols parts expected, but part $index is missing."
+        errorClass = "_LEGACY_ERROR_TEMP_3089",
+        messageParameters = Map(
+          "typeName" -> typeName,
+          "numCols" -> numCols,
+          "index" -> index.toString))
       )
-    )
   }
 
   private def getPartitionColumnsFromTableProperties(metadata: CatalogTable): Seq[String] = {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
index ba87ad37130..279af3f240d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala
@@ -239,19 +239,19 @@ private[hive] trait HiveInspectors {
     // raw java list type unsupported
     case c: Class[_] if isSubClassOf(c, classOf[java.util.List[_]]) =>
       throw new AnalysisException(
-        "Raw list type in java is unsupported because Spark cannot infer the element type.")
+        errorClass = "_LEGACY_ERROR_TEMP_3090", messageParameters = Map.empty)
 
     // raw java map type unsupported
     case c: Class[_] if isSubClassOf(c, classOf[java.util.Map[_, _]]) =>
       throw new AnalysisException(
-        "Raw map type in java is unsupported because Spark cannot infer key and value types.")
+        errorClass = "_LEGACY_ERROR_TEMP_3091", messageParameters = Map.empty)
 
     case _: WildcardType =>
       throw new AnalysisException(
-        "Collection types with wildcards (e.g. List<?> or Map<?, ?>) are unsupported because " +
-          "Spark cannot infer the data type for these type parameters.")
+        errorClass = "_LEGACY_ERROR_TEMP_3092", messageParameters = Map.empty)
 
-    case c => throw new AnalysisException(s"Unsupported java type $c")
+    case c => throw new AnalysisException(
+      errorClass = "_LEGACY_ERROR_TEMP_3093", messageParameters = Map("c" -> c.toString))
   }
 
   private def isSubClassOf(t: Type, parent: Class[_]): Boolean = t match {
@@ -1125,7 +1125,8 @@ private[hive] trait HiveInspectors {
 
     private def decimalTypeInfo(decimalType: DecimalType): TypeInfo = decimalType match {
       case DecimalType.Fixed(precision, scale) => new DecimalTypeInfo(precision, scale)
-      case dt => throw new AnalysisException(s"${dt.catalogString} is not supported.")
+      case dt => throw new AnalysisException(
+        errorClass = "_LEGACY_ERROR_TEMP_3094", messageParameters = Map("dt" -> dt.catalogString))
     }
 
     def toTypeInfo: TypeInfo = dt match {
@@ -1154,7 +1155,7 @@ private[hive] trait HiveInspectors {
       case _: YearMonthIntervalType => intervalYearMonthTypeInfo
       case dt =>
         throw new AnalysisException(
-          s"${dt.catalogString} cannot be converted to Hive TypeInfo")
+          errorClass = "_LEGACY_ERROR_TEMP_3095", messageParameters = Map("dt" -> dt.catalogString))
     }
   }
 }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index c9446b37829..f1d99d359cd 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -301,17 +301,20 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
     // it, but also respect the exprId in table relation output.
     if (result.output.length != relation.output.length) {
       throw new AnalysisException(
-        s"Converted table has ${result.output.length} columns, " +
-        s"but source Hive table has ${relation.output.length} columns. " +
-        s"Set ${HiveUtils.CONVERT_METASTORE_PARQUET.key} to false, " +
-        s"or recreate table ${relation.tableMeta.identifier} to workaround.")
+        errorClass = "_LEGACY_ERROR_TEMP_3096",
+        messageParameters = Map(
+          "resLen" ->  result.output.length.toString,
+          "relLen" -> relation.output.length.toString,
+          "key" -> HiveUtils.CONVERT_METASTORE_PARQUET.key,
+          "ident" -> relation.tableMeta.identifier.toString))
     }
     if (!result.output.zip(relation.output).forall {
           case (a1, a2) => a1.dataType == a2.dataType }) {
       throw new AnalysisException(
-        s"Column in converted table has different data type with source Hive table's. " +
-          s"Set ${HiveUtils.CONVERT_METASTORE_PARQUET.key} to false, " +
-          s"or recreate table ${relation.tableMeta.identifier} to workaround.")
+        errorClass = "_LEGACY_ERROR_TEMP_3097",
+        messageParameters = Map(
+          "key" -> HiveUtils.CONVERT_METASTORE_PARQUET.key,
+          "ident" -> relation.tableMeta.identifier.toString))
     }
     val newOutput = result.output.zip(relation.output).map {
       case (a1, a2) => a1.withExprId(a2.exprId)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
index 8a33645853c..32100d060b0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala
@@ -202,8 +202,11 @@ object HiveUDFExpressionBuilder extends SparkUDFExpressionBuilder {
           case i: InvocationTargetException => i.getCause
           case o => o
         }
-        val errorMsg = s"No handler for UDF/UDAF/UDTF '${clazz.getCanonicalName}': $e"
-        val analysisException = new AnalysisException(errorMsg)
+        val analysisException = new AnalysisException(
+          errorClass = "_LEGACY_ERROR_TEMP_3084",
+          messageParameters = Map(
+            "clazz" -> clazz.getCanonicalName,
+            "e" -> e.toString))
         analysisException.setStackTrace(e.getStackTrace)
         throw analysisException
     }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 3da3d4a0eb5..e5de5941d4a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -47,7 +47,8 @@ class ResolveHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] {
       table
     } else {
       if (table.bucketSpec.isDefined) {
-        throw new AnalysisException("Creating bucketed Hive serde table is not supported yet.")
+        throw new AnalysisException(
+          errorClass = "_LEGACY_ERROR_TEMP_3082", messageParameters = Map.empty)
       }
 
       val defaultStorage = HiveSerDe.getDefaultStorage(conf)
@@ -101,8 +102,9 @@ class ResolveHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] {
       val withSchema = if (query.isEmpty) {
         val inferred = HiveUtils.inferSchema(withStorage)
         if (inferred.schema.length <= 0) {
-          throw new AnalysisException("Unable to infer the schema. " +
-            s"The schema specification is required to create the table ${inferred.identifier}.")
+          throw new AnalysisException(
+            errorClass = "_LEGACY_ERROR_TEMP_3083",
+            messageParameters = Map("tableName" -> inferred.identifier.toString))
         }
         inferred
       } else {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/V1WritesHiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/V1WritesHiveUtils.scala
index e6b1019e717..bbfc8364071 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/V1WritesHiveUtils.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/V1WritesHiveUtils.scala
@@ -86,7 +86,9 @@ trait V1WritesHiveUtils {
       // Report error if any static partition appears after a dynamic partition
       val isDynamic = partitionColumnNames.map(partitionSpec(_).isEmpty)
       if (isDynamic.init.zip(isDynamic.tail).contains((true, false))) {
-        throw new AnalysisException(ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg)
+        throw new AnalysisException(
+          errorClass = "_LEGACY_ERROR_TEMP_3079",
+          messageParameters = Map.empty)
       }
     }
 
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
index af3d4555bc5..c580fd0dfa5 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
@@ -356,9 +356,14 @@ class DataSourceWithHiveMetastoreCatalogSuite
                |CREATE TABLE non_partition_table (id bigint)
                |STORED AS PARQUET LOCATION '$baseDir'
                |""".stripMargin)
-          val e = intercept[AnalysisException](
-            spark.table("non_partition_table")).getMessage
-          assert(e.contains("Converted table has 2 columns, but source Hive table has 1 columns."))
+          checkError(
+            exception = intercept[AnalysisException](spark.table("non_partition_table")),
+            errorClass = "_LEGACY_ERROR_TEMP_3096",
+            parameters = Map(
+              "resLen" -> "2",
+              "relLen" -> "1",
+              "key" -> "spark.sql.hive.convertMetastoreParquet",
+              "ident" -> "`spark_catalog`.`default`.`non_partition_table`"))
         }
       }
     })
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index cf2098641ad..de79e96c412 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -418,15 +418,15 @@ class HiveDDLSuite
           exception = intercept[AnalysisException] {
             sql("CREATE TABLE tab1 USING hive")
           },
-          errorClass = null,
-          parameters = Map.empty
+          errorClass = "_LEGACY_ERROR_TEMP_3083",
+          parameters = Map("tableName" -> "`spark_catalog`.`default`.`tab1`")
         )
         checkError(
           exception = intercept[AnalysisException] {
             sql(s"CREATE TABLE tab2 USING hive location '${tempDir.getCanonicalPath}'")
           },
-          errorClass = null,
-          parameters = Map.empty
+          errorClass = "_LEGACY_ERROR_TEMP_3083",
+          parameters = Map("tableName" -> "`spark_catalog`.`default`.`tab2`")
         )
       }
     }
@@ -812,7 +812,7 @@ class HiveDDLSuite
           sql(s"CREATE TABLE $tabName (height INT, length INT) " +
             s"TBLPROPERTIES('EXTERNAL'='TRUE')")
         },
-        errorClass = null,
+        errorClass = "_LEGACY_ERROR_TEMP_3087",
         parameters = Map.empty
       )
     }
@@ -829,7 +829,7 @@ class HiveDDLSuite
         exception = intercept[AnalysisException] {
           sql(s"ALTER TABLE $tabName SET TBLPROPERTIES ('EXTERNAL' = 'TRUE')")
         },
-        errorClass = null,
+        errorClass = "_LEGACY_ERROR_TEMP_3087",
         parameters = Map.empty
       )
       // The table type is not changed to external
@@ -1395,11 +1395,13 @@ class HiveDDLSuite
           },
           errorClass = caseSensitive match {
             case "false" => "UNSUPPORTED_FEATURE.DROP_DATABASE"
-            case _ => null
+            case _ => "_LEGACY_ERROR_TEMP_3065"
           },
           parameters = caseSensitive match {
             case "false" => Map("database" -> "`default`")
-            case _ => Map.empty
+            case _ => Map(
+              "clazz" -> "org.apache.hadoop.hive.ql.metadata.HiveException",
+              "msg" -> "MetaException(message:Can not drop default database)")
           }
         )
       }
@@ -1892,8 +1894,10 @@ class HiveDDLSuite
           exception = intercept[AnalysisException] {
             sql(s"ALTER TABLE tbl SET TBLPROPERTIES ('${forbiddenPrefix}foo' = 'loser')")
           },
-          errorClass = null,
-          parameters = Map.empty
+          errorClass = "_LEGACY_ERROR_TEMP_3086",
+          parameters = Map(
+            "tableName" -> "spark_catalog.default.tbl",
+            "invalidKeys" -> s"[${forbiddenPrefix}foo]")
         )
         checkError(
           exception = intercept[AnalysisException] {
@@ -1909,8 +1913,10 @@ class HiveDDLSuite
           exception = intercept[AnalysisException] {
             sql(s"CREATE TABLE tbl2 (a INT) TBLPROPERTIES ('${forbiddenPrefix}foo'='anything')")
           },
-          errorClass = null,
-          parameters = Map.empty
+          errorClass = "_LEGACY_ERROR_TEMP_3086",
+          parameters = Map(
+            "tableName" -> "spark_catalog.default.tbl2",
+            "invalidKeys" -> s"[${forbiddenPrefix}foo]")
         )
       }
     }
@@ -2409,8 +2415,10 @@ class HiveDDLSuite
                exception = intercept[AnalysisException] {
                 sql("INSERT INTO TABLE t SELECT 1")
               },
-              errorClass = null,
-              parameters = Map.empty
+              errorClass = "_LEGACY_ERROR_TEMP_3065",
+              parameters = Map(
+                "clazz" -> "java.lang.IllegalArgumentException",
+                "msg" -> "java.net.URISyntaxException: Relative path in absolute URI: a:b")
             )
           }
         }
@@ -2454,16 +2462,20 @@ class HiveDDLSuite
                exception = intercept[AnalysisException] {
                 sql("INSERT INTO TABLE t1 PARTITION(b=2) SELECT 1")
               },
-              errorClass = null,
-              parameters = Map.empty
+              errorClass = "_LEGACY_ERROR_TEMP_3065",
+              parameters = Map(
+                "clazz" -> "java.lang.IllegalArgumentException",
+                "msg" -> "java.net.URISyntaxException: Relative path in absolute URI: a:b")
             )
 
             checkError(
                exception = intercept[AnalysisException] {
                 sql("INSERT INTO TABLE t1 PARTITION(b='2017-03-03 12:13%3A14') SELECT 1")
               },
-              errorClass = null,
-              parameters = Map.empty
+              errorClass = "_LEGACY_ERROR_TEMP_3065",
+              parameters = Map(
+                "clazz" -> "java.lang.IllegalArgumentException",
+                "msg" -> "java.net.URISyntaxException: Relative path in absolute URI: a:b")
             )
           }
         }
@@ -2566,8 +2578,10 @@ class HiveDDLSuite
               exception = intercept[AnalysisException] {
                 sql("ALTER TABLE tab ADD COLUMNS (C2 string)")
               },
-              errorClass = null,
-              parameters = Map.empty
+              errorClass = "_LEGACY_ERROR_TEMP_3065",
+              parameters = Map(
+                "clazz" -> "org.apache.hadoop.hive.ql.metadata.HiveException",
+                "msg" -> "Partition column name c2 conflicts with table columns.")
             )
 
             // hive catalog will still complains that c1 is duplicate column name because hive
@@ -2576,8 +2590,10 @@ class HiveDDLSuite
               exception = intercept[AnalysisException] {
                 sql("ALTER TABLE tab ADD COLUMNS (C1 string)")
               },
-              errorClass = null,
-              parameters = Map.empty
+              errorClass = "_LEGACY_ERROR_TEMP_3065",
+              parameters = Map(
+                "clazz" -> "org.apache.hadoop.hive.ql.metadata.HiveException",
+                "msg" -> "Duplicate column name c1 in the table definition.")
             )
           }
         }
@@ -2603,8 +2619,10 @@ class HiveDDLSuite
         exception = intercept[AnalysisException] {
           sql("CREATE TABLE t2 STORED AS PARQUET AS SELECT null as null_col")
         },
-        errorClass = null,
-        parameters = Map.empty
+        errorClass = "_LEGACY_ERROR_TEMP_3065",
+        parameters = Map(
+          "clazz" -> "org.apache.hadoop.hive.ql.metadata.HiveException",
+          "msg" -> "java.lang.UnsupportedOperationException: Unknown field type: void")
       )
 
       sql("CREATE TABLE t3 AS SELECT NULL AS null_col")
@@ -2627,8 +2645,10 @@ class HiveDDLSuite
         exception = intercept[AnalysisException] {
           sql("CREATE TABLE t2 (v VOID) STORED AS PARQUET")
         },
-        errorClass = null,
-        parameters = Map.empty
+        errorClass = "_LEGACY_ERROR_TEMP_3065",
+        parameters = Map(
+          "clazz" -> "org.apache.hadoop.hive.ql.metadata.HiveException",
+          "msg" -> "java.lang.UnsupportedOperationException: Unknown field type: void")
       )
 
       sql("CREATE TABLE t3 (v VOID) USING hive")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index 1412b4c8610..6e94333df4e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -833,8 +833,10 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd
           """ALTER TABLE alter1 SET SERDE 'org.apache.hadoop.hive.serde2.TestSerDe'
             |WITH serdeproperties('s1'='9')""".stripMargin)
       },
-      errorClass = null,
-      parameters = Map.empty)
+      errorClass = "_LEGACY_ERROR_TEMP_3065",
+      parameters = Map(
+        "clazz" -> "org.apache.hadoop.hive.ql.metadata.HiveException",
+        "msg" -> "at least one column must be specified for the table"))
     sql("DROP TABLE alter1")
   }
 
@@ -1270,7 +1272,7 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd
           """INSERT INTO TABLE dp_test PARTITION(dp, sp = 1)
             |SELECT key, value, key % 5 FROM src""".stripMargin)
       },
-      errorClass = null,
+      errorClass = "_LEGACY_ERROR_TEMP_3079",
       parameters = Map.empty)
   }
 
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala
index f9dbae9b1aa..98801e0b027 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala
@@ -107,8 +107,12 @@ class Hive_2_1_DDLSuite extends SparkFunSuite with TestHiveSingleton {
           "CREATE TABLE t1 (c1 string) USING parquet",
           StructType(Array(StructField("c2", IntegerType))))
       },
-      errorClass = null,
-      parameters = Map.empty
+      errorClass = "_LEGACY_ERROR_TEMP_3065",
+      parameters = Map(
+        "clazz" -> "org.apache.hadoop.hive.ql.metadata.HiveException",
+        "msg" -> ("Unable to alter table. " +
+          "The following columns have types incompatible with the existing columns " +
+          "in their respective positions :\ncol"))
     )
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org