You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ge...@apache.org on 2021/07/12 17:29:33 UTC

[spark] branch master updated: [SPARK-33603][SQL] Grouping exception messages in execution/command

This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d03f716  [SPARK-33603][SQL] Grouping exception messages in execution/command
d03f716 is described below

commit d03f71657ed745247d026ca1e5de2a2d7c9a6a30
Author: dgd-contributor <dg...@viettel.com.vn>
AuthorDate: Tue Jul 13 01:28:43 2021 +0800

    [SPARK-33603][SQL] Grouping exception messages in execution/command
    
    ### What changes were proposed in this pull request?
    This PR group exception messages in sql/core/src/main/scala/org/apache/spark/sql/execution/command
    
    ### Why are the changes needed?
    It will largely help with standardization of error messages and its maintenance.
    
    ### Does this PR introduce any user-facing change?
    No. Error messages remain unchanged.
    
    ### How was this patch tested?
    No new tests - pass all original tests to make sure it doesn't break any existing behavior.
    
    Closes #32951 from dgd-contributor/SPARK-33603_grouping_execution/command.
    
    Authored-by: dgd-contributor <dg...@viettel.com.vn>
    Signed-off-by: Gengliang Wang <ge...@apache.org>
---
 .../spark/sql/errors/QueryCompilationErrors.scala  | 368 ++++++++++++++++++++-
 .../spark/sql/errors/QueryExecutionErrors.scala    |  18 +
 .../execution/command/AnalyzeColumnCommand.scala   |  16 +-
 .../command/AnalyzePartitionCommand.scala          |  17 +-
 .../spark/sql/execution/command/CommandUtils.scala |   9 +-
 .../sql/execution/command/DataWritingCommand.scala |   9 +-
 .../command/InsertIntoDataSourceDirCommand.scala   |   6 +-
 .../execution/command/createDataSourceTables.scala |   6 +-
 .../apache/spark/sql/execution/command/ddl.scala   |  57 ++--
 .../spark/sql/execution/command/functions.scala    |  22 +-
 .../spark/sql/execution/command/tables.scala       | 117 +++----
 .../apache/spark/sql/execution/command/views.scala |  44 ++-
 12 files changed, 505 insertions(+), 184 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 4f82e25..d1dcbbc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -17,12 +17,15 @@
 
 package org.apache.spark.sql.errors
 
+import scala.collection.mutable
+
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, QualifiedTableName, TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NamespaceAlreadyExistsException, NoSuchNamespaceException, NoSuchTableException, ResolvedNamespace, ResolvedTable, ResolvedView, TableAlreadyExistsException}
+import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NamespaceAlreadyExistsException, NoSuchFunctionException, NoSuchNamespaceException, NoSuchPartitionException, NoSuchTableException, ResolvedNamespace, ResolvedTable, ResolvedView, TableAlreadyExistsException}
 import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, InvalidUDFClassException}
+import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, AttributeSet, CreateMap, Expression, GroupingID, NamedExpression, SpecifiedWindowFrame, WindowFrame, WindowFunction, WindowSpecDefinition}
 import org.apache.spark.sql.catalyst.plans.JoinType
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, Join, LogicalPlan, SerdeInfo, Window}
@@ -1696,6 +1699,369 @@ private[spark] object QueryCompilationErrors {
       s"Found duplicate column(s) $colType: ${duplicateCol.sorted.mkString(", ")}")
   }
 
+  def noSuchTableError(db: String, table: String): Throwable = {
+    new NoSuchTableException(db = db, table = table)
+  }
+
+  def tempViewNotCachedForAnalyzingColumnsError(tableIdent: TableIdentifier): Throwable = {
+    new AnalysisException(s"Temporary view $tableIdent is not cached for analyzing columns.")
+  }
+
+  def columnTypeNotSupportStatisticsCollectionError(
+      name: String,
+      tableIdent: TableIdentifier,
+      dataType: DataType): Throwable = {
+    new AnalysisException(s"Column $name in table $tableIdent is of type $dataType, " +
+      "and Spark does not support statistics collection on this column type.")
+  }
+
+  def analyzeTableNotSupportedOnViewsError(): Throwable = {
+    new AnalysisException("ANALYZE TABLE is not supported on views.")
+  }
+
+  def unexpectedPartitionColumnPrefixError(
+      table: String,
+      database: String,
+      schemaColumns: String,
+      specColumns: String): Throwable = {
+    new AnalysisException(
+      s"""
+         |The list of partition columns with values
+         |in partition specification for table '${table}'
+         |in database '${database}' is not a prefix of the list of
+         |partition columns defined in the table schema.
+         |Expected a prefix of [${schemaColumns}], but got [${specColumns}].
+       """.stripMargin.replaceAll("\n", " "))
+  }
+
+  def noSuchPartitionError(
+      db: String,
+      table: String,
+      partition: TablePartitionSpec): Throwable = {
+    new NoSuchPartitionException(db, table, partition)
+  }
+
+  def analyzingColumnStatisticsNotSupportedForColumnTypeError(
+      name: String,
+      dataType: DataType): Throwable = {
+    new AnalysisException("Analyzing column statistics is not supported for column " +
+      s"$name of data type: $dataType.")
+  }
+
+  def tableAlreadyExistsError(table: String, guide: String = ""): Throwable = {
+    new AnalysisException(s"Table $table already exists." + guide)
+  }
+
+  def createTableAsSelectWithNonEmptyDirectoryError(tablePath: String): Throwable = {
+    new AnalysisException(
+      s"CREATE-TABLE-AS-SELECT cannot create table with location to a non-empty directory " +
+        s"${tablePath} . To allow overwriting the existing non-empty directory, " +
+        s"set '${SQLConf.ALLOW_NON_EMPTY_LOCATION_IN_CTAS.key}' to true.")
+  }
+
+  def tableOrViewNotFoundError(table: String): Throwable = {
+    new AnalysisException(s"Table or view not found: $table")
+  }
+
+  def unsetNonExistentPropertyError(property: String, table: TableIdentifier): Throwable = {
+    new AnalysisException(s"Attempted to unset non-existent property '$property' in table '$table'")
+  }
+
+  def alterTableChangeColumnNotSupportedForColumnTypeError(
+      originColumn: StructField,
+      newColumn: StructField): Throwable = {
+    new AnalysisException("ALTER TABLE CHANGE COLUMN is not supported for changing column " +
+      s"'${originColumn.name}' with type '${originColumn.dataType}' to " +
+      s"'${newColumn.name}' with type '${newColumn.dataType}'")
+  }
+
+  def cannotFindColumnError(name: String, fieldNames: Array[String]): Throwable = {
+    new AnalysisException(s"Can't find column `$name` given table data columns " +
+      s"${fieldNames.mkString("[`", "`, `", "`]")}")
+  }
+
+  def alterTableSetSerdeForSpecificPartitionNotSupportedError(): Throwable = {
+    new AnalysisException("Operation not allowed: ALTER TABLE SET " +
+      "[SERDE | SERDEPROPERTIES] for a specific partition is not supported " +
+      "for tables created with the datasource API")
+  }
+
+  def alterTableSetSerdeNotSupportedError(): Throwable = {
+    new AnalysisException("Operation not allowed: ALTER TABLE SET SERDE is " +
+      "not supported for tables created with the datasource API")
+  }
+
+  def cmdOnlyWorksOnPartitionedTablesError(cmd: String, tableIdentWithDB: String): Throwable = {
+    new AnalysisException(
+      s"Operation not allowed: $cmd only works on partitioned tables: $tableIdentWithDB")
+  }
+
+  def cmdOnlyWorksOnTableWithLocationError(cmd: String, tableIdentWithDB: String): Throwable = {
+    new AnalysisException(s"Operation not allowed: $cmd only works on table with " +
+      s"location provided: $tableIdentWithDB")
+  }
+
+  def actionNotAllowedOnTableWithFilesourcePartitionManagementDisabledError(
+      action: String,
+      tableName: String): Throwable = {
+    new AnalysisException(
+      s"$action is not allowed on $tableName since filesource partition management is " +
+        "disabled (spark.sql.hive.manageFilesourcePartitions = false).")
+  }
+
+  def actionNotAllowedOnTableSincePartitionMetadataNotStoredError(
+     action: String,
+     tableName: String): Throwable = {
+    new AnalysisException(
+      s"$action is not allowed on $tableName since its partition metadata is not stored in " +
+        "the Hive metastore. To import this information into the metastore, run " +
+        s"`msck repair table $tableName`")
+  }
+
+  def cannotAlterViewWithAlterTableError(): Throwable = {
+    new AnalysisException(
+      "Cannot alter a view with ALTER TABLE. Please use ALTER VIEW instead")
+  }
+
+  def cannotAlterTableWithAlterViewError(): Throwable = {
+    new AnalysisException(
+      "Cannot alter a table with ALTER VIEW. Please use ALTER TABLE instead")
+  }
+
+  def cannotOverwritePathBeingReadFromError(): Throwable = {
+    new AnalysisException("Cannot overwrite a path that is also being read from.")
+  }
+
+  def createFuncWithBothIfNotExistsAndReplaceError(): Throwable = {
+    new AnalysisException("CREATE FUNCTION with both IF NOT EXISTS and REPLACE is not allowed.")
+  }
+
+  def defineTempFuncWithIfNotExistsError(): Throwable = {
+    new AnalysisException("It is not allowed to define a TEMPORARY function with IF NOT EXISTS.")
+  }
+
+  def specifyingDBInCreateTempFuncError(databaseName: String): Throwable = {
+    new AnalysisException(
+      s"Specifying a database in CREATE TEMPORARY FUNCTION is not allowed: '$databaseName'")
+  }
+
+  def specifyingDBInDropTempFuncError(databaseName: String): Throwable = {
+    new AnalysisException(
+      s"Specifying a database in DROP TEMPORARY FUNCTION is not allowed: '$databaseName'")
+  }
+
+  def cannotDropNativeFuncError(functionName: String): Throwable = {
+    new AnalysisException(s"Cannot drop native function '$functionName'")
+  }
+
+  def cannotRefreshBuiltInFuncError(functionName: String): Throwable = {
+    new AnalysisException(s"Cannot refresh built-in function $functionName")
+  }
+
+  def cannotRefreshTempFuncError(functionName: String): Throwable = {
+    new AnalysisException(s"Cannot refresh temporary function $functionName")
+  }
+
+  def noSuchFunctionError(identifier: FunctionIdentifier): Throwable = {
+    new NoSuchFunctionException(identifier.database.get, identifier.funcName)
+  }
+
+  def alterAddColNotSupportViewError(table: TableIdentifier): Throwable = {
+    new AnalysisException(
+      s"""
+         |ALTER ADD COLUMNS does not support views.
+         |You must drop and re-create the views for adding the new columns. Views: $table
+       """.stripMargin)
+  }
+
+  def alterAddColNotSupportDatasourceTableError(
+      tableType: Any,
+      table: TableIdentifier): Throwable = {
+    new AnalysisException(
+      s"""
+         |ALTER ADD COLUMNS does not support datasource table with type $tableType.
+         |You must drop and re-create the table for adding the new columns. Tables: $table
+       """.stripMargin)
+  }
+
+  def loadDataNotSupportedForDatasourceTablesError(tableIdentWithDB: String): Throwable = {
+    new AnalysisException(s"LOAD DATA is not supported for datasource tables: $tableIdentWithDB")
+  }
+
+  def loadDataWithoutPartitionSpecProvidedError(tableIdentWithDB: String): Throwable = {
+    new AnalysisException(s"LOAD DATA target table $tableIdentWithDB is partitioned, " +
+      s"but no partition spec is provided")
+  }
+
+  def loadDataPartitionSizeNotMatchNumPartitionColumnsError(
+      tableIdentWithDB: String,
+      partitionSize: Int,
+      targetTableSize: Int): Throwable = {
+    new AnalysisException(
+      s"""
+         |LOAD DATA target table $tableIdentWithDB is partitioned,
+         |but number of columns in provided partition spec ($partitionSize)
+         |do not match number of partitioned columns in table ($targetTableSize)
+       """.stripMargin.replaceAll("\n", " "))
+  }
+
+  def loadDataTargetTableNotPartitionedButPartitionSpecWasProvidedError(
+      tableIdentWithDB: String): Throwable = {
+    new AnalysisException(s"LOAD DATA target table $tableIdentWithDB is not " +
+      s"partitioned, but a partition spec was provided.")
+  }
+
+  def loadDataInputPathNotExistError(path: String): Throwable = {
+    new AnalysisException(s"LOAD DATA input path does not exist: $path")
+  }
+
+  def truncateTableOnExternalTablesError(tableIdentWithDB: String): Throwable = {
+    new AnalysisException(
+      s"Operation not allowed: TRUNCATE TABLE on external tables: $tableIdentWithDB")
+  }
+
+  def truncateTablePartitionNotSupportedForNotPartitionedTablesError(
+      tableIdentWithDB: String): Throwable = {
+    new AnalysisException(s"Operation not allowed: TRUNCATE TABLE ... PARTITION is not supported" +
+      s" for tables that are not partitioned: $tableIdentWithDB")
+  }
+
+  def failToTruncateTableWhenRemovingDataError(
+      tableIdentWithDB: String,
+      path: Path,
+      e: Throwable): Throwable = {
+    new AnalysisException(s"Failed to truncate table $tableIdentWithDB when " +
+        s"removing data of the path: $path because of ${e.toString}")
+  }
+
+  def descPartitionNotAllowedOnTempView(table: String): Throwable = {
+    new AnalysisException(s"DESC PARTITION is not allowed on a temporary view: $table")
+  }
+
+  def descPartitionNotAllowedOnView(table: String): Throwable = {
+    new AnalysisException(s"DESC PARTITION is not allowed on a view: $table")
+  }
+
+  def showPartitionNotAllowedOnTableNotPartitionedError(tableIdentWithDB: String): Throwable = {
+    new AnalysisException(
+      s"SHOW PARTITIONS is not allowed on a table that is not partitioned: $tableIdentWithDB")
+  }
+
+  def showCreateTableNotSupportedOnTempView(table: String): Throwable = {
+    new AnalysisException(s"SHOW CREATE TABLE is not supported on a temporary view: $table")
+  }
+
+  def showCreateTableFailToExecuteUnsupportedFeatureError(table: CatalogTable): Throwable = {
+    new AnalysisException("Failed to execute SHOW CREATE TABLE against table " +
+      s"${table.identifier}, which is created by Hive and uses the " +
+      s"following unsupported feature(s)\n" +
+      table.unsupportedFeatures.map(" - " + _).mkString("\n") + ". " +
+      s"Please use `SHOW CREATE TABLE ${table.identifier} AS SERDE` to show Hive DDL instead.")
+  }
+
+  def showCreateTableNotSupportTransactionalHiveTableError(table: CatalogTable): Throwable = {
+    new AnalysisException("SHOW CREATE TABLE doesn't support transactional Hive table. " +
+      s"Please use `SHOW CREATE TABLE ${table.identifier} AS SERDE` " +
+      "to show Hive DDL instead.")
+  }
+
+  def showCreateTableFailToExecuteUnsupportedConfError(
+      table: TableIdentifier,
+      builder: mutable.StringBuilder): Throwable = {
+    new AnalysisException("Failed to execute SHOW CREATE TABLE against table " +
+        s"${table.identifier}, which is created by Hive and uses the " +
+        "following unsupported serde configuration\n" +
+        builder.toString()
+    )
+  }
+
+  def descPartitionNotAllowedOnViewError(table: String): Throwable = {
+    new AnalysisException(s"DESC PARTITION is not allowed on a view: $table")
+  }
+
+  def showCreateTableAsSerdeNotAllowedOnSparkDataSourceTableError(
+      table: TableIdentifier): Throwable = {
+    new AnalysisException(
+      s"$table is a Spark data source table. Use `SHOW CREATE TABLE` without `AS SERDE` instead.")
+  }
+
+  def showCreateTableOrViewFailToExecuteUnsupportedFeatureError(
+      table: CatalogTable,
+      features: Seq[String]): Throwable = {
+    new AnalysisException(
+      s"Failed to execute SHOW CREATE TABLE against table/view ${table.identifier}, " +
+        "which is created by Hive and uses the following unsupported feature(s)\n" +
+        features.map(" - " + _).mkString("\n"))
+  }
+
+  def createViewWithBothIfNotExistsAndReplaceError(): Throwable = {
+    new AnalysisException("CREATE VIEW with both IF NOT EXISTS and REPLACE is not allowed.")
+  }
+
+  def defineTempViewWithIfNotExistsError(): Throwable = {
+    new AnalysisException("It is not allowed to define a TEMPORARY view with IF NOT EXISTS.")
+  }
+
+  def notAllowedToAddDBPrefixForTempViewError(database: String): Throwable = {
+    new AnalysisException(
+      s"It is not allowed to add database prefix `$database` for the TEMPORARY view name.")
+  }
+
+  def logicalPlanForViewNotAnalyzedError(): Throwable = {
+    new AnalysisException("The logical plan that represents the view is not analyzed.")
+  }
+
+  def createViewNumColumnsMismatchUserSpecifiedColumnLengthError(
+      analyzedPlanLength: Int,
+      userSpecifiedColumnsLength: Int): Throwable = {
+    new AnalysisException(s"The number of columns produced by the SELECT clause " +
+      s"(num: `$analyzedPlanLength`) does not match the number of column names " +
+      s"specified by CREATE VIEW (num: `$userSpecifiedColumnsLength`).")
+  }
+
+  def tableIsNotViewError(name: TableIdentifier): Throwable = {
+    new AnalysisException(s"$name is not a view")
+  }
+
+  def viewAlreadyExistsError(name: TableIdentifier): Throwable = {
+    new AnalysisException(
+      s"View $name already exists. If you want to update the view definition, " +
+        "please use ALTER VIEW AS or CREATE OR REPLACE VIEW AS")
+  }
+
+  def createPersistedViewFromDatasetAPINotAllowedError(): Throwable = {
+    new AnalysisException("It is not allowed to create a persisted view from the Dataset API")
+  }
+
+  def recursiveViewDetectedError(
+      viewIdent: TableIdentifier,
+      newPath: Seq[TableIdentifier]): Throwable = {
+    new AnalysisException(s"Recursive view $viewIdent detected " +
+      s"(cycle: ${newPath.mkString(" -> ")})")
+  }
+
+  def notAllowedToCreatePermanentViewWithoutAssigningAliasForExpressionError(
+      name: TableIdentifier,
+      attrName: String): Throwable = {
+    new AnalysisException(s"Not allowed to create a permanent view $name without " +
+      s"explicitly assigning an alias for expression $attrName")
+  }
+
+  def notAllowedToCreatePermanentViewByReferencingTempViewError(
+      name: TableIdentifier,
+      nameParts: String): Throwable = {
+    new AnalysisException(s"Not allowed to create a permanent view $name by " +
+      s"referencing a temporary view $nameParts. " +
+      "Please create a temp view instead by CREATE TEMP VIEW")
+  }
+
+  def notAllowedToCreatePermanentViewByReferencingTempFuncError(
+      name: TableIdentifier,
+      funcName: String): Throwable = {
+    new AnalysisException(s"Not allowed to create a permanent view $name by " +
+      s"referencing a temporary function `$funcName`")
+  }
+
   def queryFromRawFilesIncludeCorruptRecordColumnError(): Throwable = {
     new AnalysisException(
       """
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index 2435414..e761eb6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -28,6 +28,7 @@ import java.util.concurrent.TimeoutException
 
 import com.fasterxml.jackson.core.JsonToken
 import org.apache.hadoop.fs.{FileAlreadyExistsException, FileStatus, Path}
+import org.apache.hadoop.fs.permission.FsPermission
 import org.codehaus.commons.compiler.{CompileException, InternalCompilerException}
 
 import org.apache.spark.{Partition, SparkArithmeticException, SparkException, SparkUpgradeException}
@@ -1543,6 +1544,23 @@ object QueryExecutionErrors {
     new NullPointerException(s"Value at index $index is null")
   }
 
+  def onlySupportDataSourcesProvidingFileFormatError(providingClass: String): Throwable = {
+    new SparkException(s"Only Data Sources providing FileFormat are supported: $providingClass")
+  }
+
+  def failToSetOriginalPermissionBackError(
+      permission: FsPermission,
+      path: Path,
+      e: Throwable): Throwable = {
+    new SecurityException(s"Failed to set original permission $permission back to " +
+      s"the created path: $path. Exception: ${e.getMessage}")
+  }
+
+  def failToSetOriginalACLBackError(aclEntries: String, path: Path, e: Throwable): Throwable = {
+    new SecurityException(s"Failed to set original ACL $aclEntries back to " +
+      s"the created path: $path. Exception: ${e.getMessage}")
+  }
+
   def multiFailuresInStageMaterializationError(error: Throwable): Throwable = {
     new SparkException("Multiple failures in stage materialization.", error)
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
index e3c2e90..5cb3478 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
@@ -19,10 +19,10 @@ package org.apache.spark.sql.execution.command
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTableType}
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.types._
 
 
@@ -44,7 +44,7 @@ case class AnalyzeColumnCommand(
     tableIdent.database match {
       case Some(db) if db == sparkSession.sharedState.globalTempViewManager.database =>
         val plan = sessionState.catalog.getGlobalTempView(tableIdent.identifier).getOrElse {
-          throw new NoSuchTableException(db = db, table = tableIdent.identifier)
+          throw QueryCompilationErrors.noSuchTableError(db, tableIdent.identifier)
         }
         analyzeColumnInTempView(plan, sparkSession)
       case Some(_) =>
@@ -72,8 +72,7 @@ case class AnalyzeColumnCommand(
 
   private def analyzeColumnInTempView(plan: LogicalPlan, sparkSession: SparkSession): Unit = {
     if (!analyzeColumnInCachedData(plan, sparkSession)) {
-      throw new AnalysisException(
-        s"Temporary view $tableIdent is not cached for analyzing columns.")
+      throw QueryCompilationErrors.tempViewNotCachedForAnalyzingColumnsError(tableIdent)
     }
   }
 
@@ -87,15 +86,14 @@ case class AnalyzeColumnCommand(
     } else {
       columnNames.get.map { col =>
         val exprOption = relation.output.find(attr => conf.resolver(attr.name, col))
-        exprOption.getOrElse(throw new AnalysisException(s"Column $col does not exist."))
+        exprOption.getOrElse(throw QueryCompilationErrors.columnDoesNotExistError(col))
       }
     }
     // Make sure the column types are supported for stats gathering.
     columnsToAnalyze.foreach { attr =>
       if (!supportsType(attr.dataType)) {
-        throw new AnalysisException(
-          s"Column ${attr.name} in table $tableIdent is of type ${attr.dataType}, " +
-            "and Spark does not support statistics collection on this column type.")
+        throw QueryCompilationErrors.columnTypeNotSupportStatisticsCollectionError(
+          attr.name, tableIdent, attr.dataType)
       }
     }
     columnsToAnalyze
@@ -108,7 +106,7 @@ case class AnalyzeColumnCommand(
       // Analyzes a catalog view if the view is cached
       val plan = sparkSession.table(tableIdent.quotedString).logicalPlan
       if (!analyzeColumnInCachedData(plan, sparkSession)) {
-        throw new AnalysisException("ANALYZE TABLE is not supported on views.")
+        throw QueryCompilationErrors.analyzeTableNotSupportedOnViewsError()
       }
     } else {
       val sizeInBytes = CommandUtils.calculateTotalSize(sparkSession, tableMeta)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
index 5b3cb74..38d92ba 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
@@ -17,12 +17,13 @@
 
 package org.apache.spark.sql.execution.command
 
-import org.apache.spark.sql.{AnalysisException, Column, Row, SparkSession}
+import org.apache.spark.sql.{Column, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, UnresolvedAttribute}
+import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute}
 import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, ExternalCatalogUtils}
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{And, EqualTo, Literal}
+import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.util.PartitioningUtils
 
 /**
@@ -58,11 +59,8 @@ case class AnalyzePartitionCommand(
       val tableId = table.identifier
       val schemaColumns = table.partitionColumnNames.mkString(",")
       val specColumns = normalizedPartitionSpec.keys.mkString(",")
-      throw new AnalysisException("The list of partition columns with values " +
-        s"in partition specification for table '${tableId.table}' " +
-        s"in database '${tableId.database.get}' is not a prefix of the list of " +
-        "partition columns defined in the table schema. " +
-        s"Expected a prefix of [${schemaColumns}], but got [${specColumns}].")
+      throw QueryCompilationErrors.unexpectedPartitionColumnPrefixError(
+        tableId.table, tableId.database.get, schemaColumns, specColumns)
     }
 
     val filteredSpec = normalizedPartitionSpec.filter(_._2.isDefined).mapValues(_.get)
@@ -79,7 +77,7 @@ case class AnalyzePartitionCommand(
     val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
     val tableMeta = sessionState.catalog.getTableMetadata(tableIdentWithDB)
     if (tableMeta.tableType == CatalogTableType.VIEW) {
-      throw new AnalysisException("ANALYZE TABLE is not supported on views.")
+      throw QueryCompilationErrors.analyzeTableNotSupportedOnViewsError()
     }
 
     val partitionValueSpec = getPartitionSpec(tableMeta)
@@ -88,7 +86,8 @@ case class AnalyzePartitionCommand(
 
     if (partitions.isEmpty) {
       if (partitionValueSpec.isDefined) {
-        throw new NoSuchPartitionException(db, tableIdent.table, partitionValueSpec.get)
+        throw QueryCompilationErrors.noSuchPartitionError(
+          db, tableIdent.table, partitionValueSpec.get)
       } else {
         // the user requested to analyze all partitions for a table which has no partitions
         // return normally, since there is nothing to do
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
index da5d00c..312f175 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
@@ -25,13 +25,14 @@ import scala.util.control.NonFatal
 import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.{SparkSession}
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTableType}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
+import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.execution.datasources.{DataSourceUtils, InMemoryFileIndex}
 import org.apache.spark.sql.internal.{SessionState, SQLConf}
@@ -217,7 +218,7 @@ object CommandUtils extends Logging {
           table.count()
         }
       } else {
-        throw new AnalysisException("ANALYZE TABLE is not supported on views.")
+        throw QueryCompilationErrors.analyzeTableNotSupportedOnViewsError()
       }
     } else {
       // Compute stats for the whole table
@@ -381,8 +382,8 @@ object CommandUtils extends Logging {
           Coalesce(Seq(Cast(Max(Length(col)), LongType), defaultSize)),
           nullArray)
       case _ =>
-        throw new AnalysisException("Analyzing column statistics is not supported for column " +
-          s"${col.name} of data type: ${col.dataType}.")
+        throw QueryCompilationErrors.analyzingColumnStatisticsNotSupportedForColumnTypeError(
+          col.name, col.dataType)
     }
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
index ca51b88..338ce8ca 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
@@ -22,9 +22,10 @@ import java.net.URI
 import org.apache.hadoop.conf.Configuration
 
 import org.apache.spark.SparkContext
-import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession}
+import org.apache.spark.sql.{Row, SaveMode, SparkSession}
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryCommand}
+import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
 import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker
 import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
@@ -114,10 +115,8 @@ object DataWritingCommand {
       if (fs.exists(filePath) &&
           fs.getFileStatus(filePath).isDirectory &&
           fs.listStatus(filePath).length != 0) {
-        throw new AnalysisException(
-          s"CREATE-TABLE-AS-SELECT cannot create table with location to a non-empty directory " +
-            s"${tablePath} . To allow overwriting the existing non-empty directory, " +
-            s"set '${SQLConf.ALLOW_NON_EMPTY_LOCATION_IN_CTAS.key}' to true.")
+        throw QueryCompilationErrors.createTableAsSelectWithNonEmptyDirectoryError(
+          tablePath.toString)
       }
     }
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala
index be680a7..35c8bec 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala
@@ -17,10 +17,10 @@
 
 package org.apache.spark.sql.execution.command
 
-import org.apache.spark.SparkException
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.datasources._
 
 /**
@@ -61,8 +61,8 @@ case class InsertIntoDataSourceDirCommand(
 
     val isFileFormat = classOf[FileFormat].isAssignableFrom(dataSource.providingClass)
     if (!isFileFormat) {
-      throw new SparkException(
-        "Only Data Sources providing FileFormat are supported: " + dataSource.providingClass)
+      throw QueryExecutionErrors.onlySupportDataSourcesProvidingFileFormatError(
+        dataSource.providingClass.toString)
     }
 
     val saveMode = if (overwrite) SaveMode.Overwrite else SaveMode.ErrorIfExists
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 3caf850..fcad25d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.util.CharVarcharUtils
+import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.execution.{CommandExecutionMode, SparkPlan}
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.sources.BaseRelation
@@ -53,7 +54,7 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
       if (ignoreIfExists) {
         return Seq.empty[Row]
       } else {
-        throw new AnalysisException(s"Table ${table.identifier.unquotedString} already exists.")
+        throw QueryCompilationErrors.tableAlreadyExistsError(table.identifier.unquotedString)
       }
     }
 
@@ -156,7 +157,8 @@ case class CreateDataSourceTableAsSelectCommand(
         s"Expect the table $tableName has been dropped when the save mode is Overwrite")
 
       if (mode == SaveMode.ErrorIfExists) {
-        throw new AnalysisException(s"Table $tableName already exists. You need to drop it first.")
+        throw QueryCompilationErrors.tableAlreadyExistsError(
+          tableName, " You need to drop it first.")
       }
       if (mode == SaveMode.Ignore) {
         // Since the table already exists and the save mode is Ignore, we will just return.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 06c6847..605d98e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -30,7 +30,7 @@ import org.apache.hadoop.fs._
 import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
 
 import org.apache.spark.internal.config.RDD_PARALLEL_LISTING_THRESHOLD
-import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
+import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.Resolver
 import org.apache.spark.sql.catalyst.catalog._
@@ -39,6 +39,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.connector.catalog.{CatalogV2Util, TableCatalog}
 import org.apache.spark.sql.connector.catalog.SupportsNamespaces._
+import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
 import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
 import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter
@@ -222,11 +223,9 @@ case class DropTableCommand(
       // issue an exception.
       catalog.getTableMetadata(tableName).tableType match {
         case CatalogTableType.VIEW if !isView =>
-          throw new AnalysisException(
-            "Cannot drop a view with DROP TABLE. Please use DROP VIEW instead")
+          throw QueryCompilationErrors.cannotDropViewWithDropTableError()
         case o if o != CatalogTableType.VIEW && isView =>
-          throw new AnalysisException(
-            s"Cannot drop a table with DROP VIEW. Please use DROP TABLE instead")
+          throw QueryCompilationErrors.cannotDropViewWithDropTableError()
         case _ =>
       }
     }
@@ -245,7 +244,7 @@ case class DropTableCommand(
     } else if (ifExists) {
       // no-op
     } else {
-      throw new AnalysisException(s"Table or view not found: ${tableName.identifier}")
+      throw QueryCompilationErrors.tableOrViewNotFoundError(tableName.identifier)
     }
     Seq.empty[Row]
   }
@@ -303,8 +302,7 @@ case class AlterTableUnsetPropertiesCommand(
     if (!ifExists) {
       propKeys.foreach { k =>
         if (!table.properties.contains(k) && k != TableCatalog.PROP_COMMENT) {
-          throw new AnalysisException(
-            s"Attempted to unset non-existent property '$k' in table '${table.identifier}'")
+          throw QueryCompilationErrors.unsetNonExistentPropertyError(k, table.identifier)
         }
       }
     }
@@ -346,10 +344,8 @@ case class AlterTableChangeColumnCommand(
     val originColumn = findColumnByName(table.dataSchema, columnName, resolver)
     // Throw an AnalysisException if the column name/dataType is changed.
     if (!columnEqual(originColumn, newColumn, resolver)) {
-      throw new AnalysisException(
-        "ALTER TABLE CHANGE COLUMN is not supported for changing column " +
-          s"'${originColumn.name}' with type '${originColumn.dataType}' to " +
-          s"'${newColumn.name}' with type '${newColumn.dataType}'")
+      throw QueryCompilationErrors.alterTableChangeColumnNotSupportedForColumnTypeError(
+        originColumn, newColumn)
     }
 
     val newDataSchema = table.dataSchema.fields.map { field =>
@@ -371,9 +367,7 @@ case class AlterTableChangeColumnCommand(
       schema: StructType, name: String, resolver: Resolver): StructField = {
     schema.fields.collectFirst {
       case field if resolver(field.name, name) => field
-    }.getOrElse(throw new AnalysisException(
-      s"Can't find column `$name` given table data columns " +
-        s"${schema.fieldNames.mkString("[`", "`, `", "`]")}"))
+    }.getOrElse(throw QueryCompilationErrors.cannotFindColumnError(name, schema.fieldNames))
   }
 
   // Add the comment to a column, if comment is empty, return the original column.
@@ -413,13 +407,10 @@ case class AlterTableSerDePropertiesCommand(
     val table = catalog.getTableRawMetadata(tableName)
     // For datasource tables, disallow setting serde or specifying partition
     if (partSpec.isDefined && DDLUtils.isDatasourceTable(table)) {
-      throw new AnalysisException("Operation not allowed: ALTER TABLE SET " +
-        "[SERDE | SERDEPROPERTIES] for a specific partition is not supported " +
-        "for tables created with the datasource API")
+      throw QueryCompilationErrors.alterTableSetSerdeForSpecificPartitionNotSupportedError()
     }
     if (serdeClassName.isDefined && DDLUtils.isDatasourceTable(table)) {
-      throw new AnalysisException("Operation not allowed: ALTER TABLE SET SERDE is " +
-        "not supported for tables created with the datasource API")
+      throw QueryCompilationErrors.alterTableSetSerdeNotSupportedError()
     }
     if (partSpec.isEmpty) {
       val newTable = table.withNewStorage(
@@ -629,13 +620,11 @@ case class RepairTableCommand(
     val table = catalog.getTableRawMetadata(tableName)
     val tableIdentWithDB = table.identifier.quotedString
     if (table.partitionColumnNames.isEmpty) {
-      throw new AnalysisException(
-        s"Operation not allowed: $cmd only works on partitioned tables: $tableIdentWithDB")
+      throw QueryCompilationErrors.cmdOnlyWorksOnPartitionedTablesError(cmd, tableIdentWithDB)
     }
 
     if (table.storage.locationUri.isEmpty) {
-      throw new AnalysisException(s"Operation not allowed: $cmd only works on table with " +
-        s"location provided: $tableIdentWithDB")
+      throw QueryCompilationErrors.cmdOnlyWorksOnTableWithLocationError(cmd, tableIdentWithDB)
     }
 
     val root = new Path(table.location)
@@ -901,15 +890,12 @@ object DDLUtils {
       spark: SparkSession, table: CatalogTable, action: String): Unit = {
     val tableName = table.identifier.table
     if (!spark.sqlContext.conf.manageFilesourcePartitions && isDatasourceTable(table)) {
-      throw new AnalysisException(
-        s"$action is not allowed on $tableName since filesource partition management is " +
-          "disabled (spark.sql.hive.manageFilesourcePartitions = false).")
+      throw QueryCompilationErrors
+        .actionNotAllowedOnTableWithFilesourcePartitionManagementDisabledError(action, tableName)
     }
     if (!table.tracksPartitionsInCatalog && isDatasourceTable(table)) {
-      throw new AnalysisException(
-        s"$action is not allowed on $tableName since its partition metadata is not stored in " +
-          "the Hive metastore. To import this information into the metastore, run " +
-          s"`msck repair table $tableName`")
+      throw QueryCompilationErrors.actionNotAllowedOnTableSincePartitionMetadataNotStoredError(
+        action, tableName)
     }
   }
 
@@ -929,11 +915,9 @@ object DDLUtils {
     if (!catalog.isTempView(tableMetadata.identifier)) {
       tableMetadata.tableType match {
         case CatalogTableType.VIEW if !isView =>
-          throw new AnalysisException(
-            "Cannot alter a view with ALTER TABLE. Please use ALTER VIEW instead")
+          throw QueryCompilationErrors.cannotAlterViewWithAlterTableError()
         case o if o != CatalogTableType.VIEW && isView =>
-          throw new AnalysisException(
-            s"Cannot alter a table with ALTER VIEW. Please use ALTER TABLE instead")
+          throw QueryCompilationErrors.cannotAlterTableWithAlterViewError()
         case _ =>
       }
     }
@@ -972,8 +956,7 @@ object DDLUtils {
     }.flatten
 
     if (inputPaths.contains(outputPath)) {
-      throw new AnalysisException(
-        "Cannot overwrite a path that is also being read from.")
+      throw QueryCompilationErrors.cannotOverwritePathBeingReadFromError()
     }
   }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
index 0eda90a..ae9a77d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.command
 
 import java.util.Locale
 
-import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
+import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.catalyst.FunctionIdentifier
 import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchFunctionException}
 import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, FunctionResource}
@@ -58,20 +58,17 @@ case class CreateFunctionCommand(
   extends LeafRunnableCommand {
 
   if (ignoreIfExists && replace) {
-    throw new AnalysisException("CREATE FUNCTION with both IF NOT EXISTS and REPLACE" +
-      " is not allowed.")
+    throw QueryCompilationErrors.createFuncWithBothIfNotExistsAndReplaceError()
   }
 
   // Disallow to define a temporary function with `IF NOT EXISTS`
   if (ignoreIfExists && isTemp) {
-    throw new AnalysisException(
-      "It is not allowed to define a TEMPORARY function with IF NOT EXISTS.")
+    throw QueryCompilationErrors.defineTempFuncWithIfNotExistsError()
   }
 
   // Temporary function names should not contain database prefix like "database.function"
   if (databaseName.isDefined && isTemp) {
-    throw new AnalysisException(s"Specifying a database in CREATE TEMPORARY FUNCTION " +
-      s"is not allowed: '${databaseName.get}'")
+    throw QueryCompilationErrors.specifyingDBInCreateTempFuncError(databaseName.get)
   }
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
@@ -183,11 +180,10 @@ case class DropFunctionCommand(
     val catalog = sparkSession.sessionState.catalog
     if (isTemp) {
       if (databaseName.isDefined) {
-        throw new AnalysisException(s"Specifying a database in DROP TEMPORARY FUNCTION " +
-          s"is not allowed: '${databaseName.get}'")
+        throw QueryCompilationErrors.specifyingDBInDropTempFuncError(databaseName.get)
       }
       if (FunctionRegistry.builtin.functionExists(FunctionIdentifier(functionName))) {
-        throw new AnalysisException(s"Cannot drop native function '$functionName'")
+        throw QueryCompilationErrors.cannotDropNativeFuncError(functionName)
       }
       catalog.dropTempFunction(functionName, ifExists)
     } else {
@@ -260,10 +256,10 @@ case class RefreshFunctionCommand(
   override def run(sparkSession: SparkSession): Seq[Row] = {
     val catalog = sparkSession.sessionState.catalog
     if (FunctionRegistry.builtin.functionExists(FunctionIdentifier(functionName, databaseName))) {
-      throw new AnalysisException(s"Cannot refresh built-in function $functionName")
+      throw QueryCompilationErrors.cannotRefreshBuiltInFuncError(functionName)
     }
     if (catalog.isTemporaryFunction(FunctionIdentifier(functionName, databaseName))) {
-      throw new AnalysisException(s"Cannot refresh temporary function $functionName")
+      throw QueryCompilationErrors.cannotRefreshTempFuncError(functionName)
     }
 
     val identifier = FunctionIdentifier(
@@ -276,7 +272,7 @@ case class RefreshFunctionCommand(
     } else {
       // clear cached function and throw exception
       catalog.unregisterFunction(identifier)
-      throw new NoSuchFunctionException(identifier.database.get, identifier.funcName)
+      throw QueryCompilationErrors.noSuchFunctionError(identifier)
     }
 
     Seq.empty[Row]
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 7d4d227..0599621 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -26,9 +26,9 @@ import scala.util.control.NonFatal
 import org.apache.hadoop.fs.{FileContext, FsConstants, Path}
 import org.apache.hadoop.fs.permission.{AclEntry, AclEntryScope, AclEntryType, FsAction, FsPermission}
 
-import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
+import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, UnresolvedAttribute}
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.catalog.CatalogTableType._
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
@@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.DescribeCommandSchema
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.util.{escapeSingleQuotedString, quoteIdentifier, CaseInsensitiveMap, CharVarcharUtils}
-import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
 import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
 import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
@@ -255,11 +255,7 @@ case class AlterTableAddColumnsCommand(
     val catalogTable = catalog.getTempViewOrPermanentTableMetadata(table)
 
     if (catalogTable.tableType == CatalogTableType.VIEW) {
-      throw new AnalysisException(
-        s"""
-          |ALTER ADD COLUMNS does not support views.
-          |You must drop and re-create the views for adding the new columns. Views: $table
-         """.stripMargin)
+      throw QueryCompilationErrors.alterAddColNotSupportViewError(table)
     }
 
     if (DDLUtils.isDatasourceTable(catalogTable)) {
@@ -274,11 +270,7 @@ case class AlterTableAddColumnsCommand(
              _: OrcDataSourceV2 | _: ParquetDataSourceV2 =>
         case s if s.getClass.getCanonicalName.endsWith("OrcFileFormat") =>
         case s =>
-          throw new AnalysisException(
-            s"""
-              |ALTER ADD COLUMNS does not support datasource table with type $s.
-              |You must drop and re-create the table for adding the new columns. Tables: $table
-             """.stripMargin)
+          throw QueryCompilationErrors.alterAddColNotSupportDatasourceTableError(s, table)
       }
     }
     catalogTable
@@ -305,34 +297,30 @@ case class LoadDataCommand(
   override def run(sparkSession: SparkSession): Seq[Row] = {
     val catalog = sparkSession.sessionState.catalog
     val targetTable = catalog.getTableMetadata(table)
-    val tableIdentwithDB = targetTable.identifier.quotedString
+    val tableIdentWithDB = targetTable.identifier.quotedString
     val normalizedSpec = partition.map { spec =>
       PartitioningUtils.normalizePartitionSpec(
         spec,
         targetTable.partitionSchema,
-        tableIdentwithDB,
+        tableIdentWithDB,
         sparkSession.sessionState.conf.resolver)
     }
 
     if (DDLUtils.isDatasourceTable(targetTable)) {
-      throw new AnalysisException(
-        s"LOAD DATA is not supported for datasource tables: $tableIdentwithDB")
+      throw QueryCompilationErrors.loadDataNotSupportedForDatasourceTablesError(tableIdentWithDB)
     }
     if (targetTable.partitionColumnNames.nonEmpty) {
       if (partition.isEmpty) {
-        throw new AnalysisException(s"LOAD DATA target table $tableIdentwithDB is partitioned, " +
-          s"but no partition spec is provided")
+        throw QueryCompilationErrors.loadDataWithoutPartitionSpecProvidedError(tableIdentWithDB)
       }
       if (targetTable.partitionColumnNames.size != partition.get.size) {
-        throw new AnalysisException(s"LOAD DATA target table $tableIdentwithDB is partitioned, " +
-          s"but number of columns in provided partition spec (${partition.get.size}) " +
-          s"do not match number of partitioned columns in table " +
-          s"(${targetTable.partitionColumnNames.size})")
+        throw QueryCompilationErrors.loadDataPartitionSizeNotMatchNumPartitionColumnsError(
+          tableIdentWithDB, partition.get.size, targetTable.partitionColumnNames.size)
       }
     } else {
       if (partition.nonEmpty) {
-        throw new AnalysisException(s"LOAD DATA target table $tableIdentwithDB is not " +
-          s"partitioned, but a partition spec was provided.")
+        throw QueryCompilationErrors
+          .loadDataTargetTableNotPartitionedButPartitionSpecWasProvidedError(tableIdentWithDB)
       }
     }
     val loadPath = {
@@ -367,12 +355,12 @@ case class LoadDataCommand(
     try {
       val fileStatus = fs.globStatus(loadPath)
       if (fileStatus == null || fileStatus.isEmpty) {
-        throw new AnalysisException(s"LOAD DATA input path does not exist: $path")
+        throw QueryCompilationErrors.loadDataInputPathNotExistError(path)
       }
     } catch {
       case e: IllegalArgumentException =>
         log.warn(s"Exception while validating the load path $path ", e)
-        throw new AnalysisException(s"LOAD DATA input path does not exist: $path")
+        throw QueryCompilationErrors.loadDataInputPathNotExistError(path)
     }
     if (partition.nonEmpty) {
       catalog.loadPartition(
@@ -391,7 +379,7 @@ case class LoadDataCommand(
     }
 
     // Refresh the data and metadata cache to ensure the data visible to the users
-    sparkSession.catalog.refreshTable(tableIdentwithDB)
+    sparkSession.catalog.refreshTable(tableIdentWithDB)
 
     CommandUtils.updateTableStats(sparkSession, targetTable)
     Seq.empty[Row]
@@ -449,13 +437,11 @@ case class TruncateTableCommand(
     val tableIdentWithDB = table.identifier.quotedString
 
     if (table.tableType == CatalogTableType.EXTERNAL) {
-      throw new AnalysisException(
-        s"Operation not allowed: TRUNCATE TABLE on external tables: $tableIdentWithDB")
+      throw QueryCompilationErrors.truncateTableOnExternalTablesError(tableIdentWithDB)
     }
     if (table.partitionColumnNames.isEmpty && partitionSpec.isDefined) {
-      throw new AnalysisException(
-        s"Operation not allowed: TRUNCATE TABLE ... PARTITION is not supported " +
-        s"for tables that are not partitioned: $tableIdentWithDB")
+      throw QueryCompilationErrors.truncateTablePartitionNotSupportedForNotPartitionedTablesError(
+        tableIdentWithDB)
     }
     if (partitionSpec.isDefined) {
       DDLUtils.verifyPartitionProviderIsHive(spark, table, "TRUNCATE TABLE ... PARTITION")
@@ -479,7 +465,8 @@ case class TruncateTableCommand(
         // Fail if the partition spec is fully specified (not partial) and the partition does not
         // exist.
         for (spec <- partitionSpec if partLocations.isEmpty && spec.size == partCols.length) {
-          throw new NoSuchPartitionException(table.database, table.identifier.table, spec)
+          throw QueryCompilationErrors.noSuchPartitionError(table.database,
+            table.identifier.table, spec)
         }
 
         partLocations
@@ -522,9 +509,8 @@ case class TruncateTableCommand(
                 fs.setPermission(path, permission)
               } catch {
                 case NonFatal(e) =>
-                  throw new SecurityException(
-                    s"Failed to set original permission $permission back to " +
-                      s"the created path: $path. Exception: ${e.getMessage}")
+                  throw QueryExecutionErrors.failToSetOriginalPermissionBackError(
+                    permission, path, e)
               }
             }
             optAcls.foreach { acls =>
@@ -547,17 +533,15 @@ case class TruncateTableCommand(
                 fs.setAcl(path, aclEntries)
               } catch {
                 case NonFatal(e) =>
-                  throw new SecurityException(
-                    s"Failed to set original ACL $aclEntries back to " +
-                      s"the created path: $path. Exception: ${e.getMessage}")
+                  throw QueryExecutionErrors.failToSetOriginalACLBackError(aclEntries.toString,
+                    path, e)
               }
             }
           }
         } catch {
           case NonFatal(e) =>
-            throw new AnalysisException(
-              s"Failed to truncate table $tableIdentWithDB when removing data of the path: $path " +
-                s"because of ${e.toString}")
+            throw QueryCompilationErrors.failToTruncateTableWhenRemovingDataError(tableIdentWithDB,
+              path, e)
         }
       }
     }
@@ -617,8 +601,7 @@ case class DescribeTableCommand(
 
     if (catalog.isTempView(table)) {
       if (partitionSpec.nonEmpty) {
-        throw new AnalysisException(
-          s"DESC PARTITION is not allowed on a temporary view: ${table.identifier}")
+        throw QueryCompilationErrors.descPartitionNotAllowedOnTempView(table.identifier)
       }
       val schema = catalog.getTempViewOrPermanentTableMetadata(table).schema
       describeSchema(schema, result, header = false)
@@ -672,8 +655,7 @@ case class DescribeTableCommand(
       metadata: CatalogTable,
       result: ArrayBuffer[Row]): Unit = {
     if (metadata.tableType == CatalogTableType.VIEW) {
-      throw new AnalysisException(
-        s"DESC PARTITION is not allowed on a view: ${table.identifier}")
+      throw QueryCompilationErrors.descPartitionNotAllowedOnView(table.identifier)
     }
     DDLUtils.verifyPartitionProviderIsHive(spark, metadata, "DESC PARTITION")
     val partition = catalog.getPartition(table, partitionSpec)
@@ -970,8 +952,8 @@ case class ShowPartitionsCommand(
      */
 
     if (table.partitionColumnNames.isEmpty) {
-      throw new AnalysisException(
-        s"SHOW PARTITIONS is not allowed on a table that is not partitioned: $tableIdentWithDB")
+      throw QueryCompilationErrors.showPartitionNotAllowedOnTableNotPartitionedError(
+        tableIdentWithDB)
     }
 
     DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "SHOW PARTITIONS")
@@ -1086,8 +1068,7 @@ case class ShowCreateTableCommand(
   override def run(sparkSession: SparkSession): Seq[Row] = {
     val catalog = sparkSession.sessionState.catalog
     if (catalog.isTempView(table)) {
-      throw new AnalysisException(
-        s"SHOW CREATE TABLE is not supported on a temporary view: ${table.identifier}")
+      throw QueryCompilationErrors.showCreateTableNotSupportedOnTempView(table.identifier)
     } else {
       val tableMetadata = catalog.getTableRawMetadata(table)
 
@@ -1098,21 +1079,13 @@ case class ShowCreateTableCommand(
       } else {
         // For a Hive serde table, we try to convert it to Spark DDL.
         if (tableMetadata.unsupportedFeatures.nonEmpty) {
-          throw new AnalysisException(
-            "Failed to execute SHOW CREATE TABLE against table " +
-              s"${tableMetadata.identifier}, which is created by Hive and uses the " +
-              "following unsupported feature(s)\n" +
-              tableMetadata.unsupportedFeatures.map(" - " + _).mkString("\n") + ". " +
-              s"Please use `SHOW CREATE TABLE ${tableMetadata.identifier} AS SERDE` " +
-              "to show Hive DDL instead."
-          )
+          throw QueryCompilationErrors.showCreateTableFailToExecuteUnsupportedFeatureError(
+            tableMetadata)
         }
 
         if ("true".equalsIgnoreCase(tableMetadata.properties.getOrElse("transactional", "false"))) {
-          throw new AnalysisException(
-            "SHOW CREATE TABLE doesn't support transactional Hive table. " +
-              s"Please use `SHOW CREATE TABLE ${tableMetadata.identifier} AS SERDE` " +
-              "to show Hive DDL instead.")
+          throw QueryCompilationErrors.showCreateTableNotSupportTransactionalHiveTableError(
+            tableMetadata)
         }
 
         if (tableMetadata.tableType == VIEW) {
@@ -1160,12 +1133,7 @@ case class ShowCreateTableCommand(
       hiveSerde.outputFormat.foreach { format =>
         builder ++= s" OUTPUTFORMAT: $format"
       }
-      throw new AnalysisException(
-        "Failed to execute SHOW CREATE TABLE against table " +
-          s"${tableMetadata.identifier}, which is created by Hive and uses the " +
-          "following unsupported serde configuration\n" +
-          builder.toString()
-      )
+      throw QueryCompilationErrors.showCreateTableFailToExecuteUnsupportedConfError(table, builder)
     } else {
       // TODO: should we keep Hive serde properties?
       val newStorage = tableMetadata.storage.copy(properties = Map.empty)
@@ -1242,8 +1210,8 @@ case class ShowCreateTableAsSerdeCommand(
     val tableMetadata = catalog.getTableRawMetadata(table)
 
     val stmt = if (DDLUtils.isDatasourceTable(tableMetadata)) {
-      throw new AnalysisException(
-        s"$table is a Spark data source table. Use `SHOW CREATE TABLE` without `AS SERDE` instead.")
+      throw QueryCompilationErrors.showCreateTableAsSerdeNotAllowedOnSparkDataSourceTableError(
+        table)
     } else {
       showCreateHiveTable(tableMetadata)
     }
@@ -1253,11 +1221,8 @@ case class ShowCreateTableAsSerdeCommand(
 
   private def showCreateHiveTable(metadata: CatalogTable): String = {
     def reportUnsupportedError(features: Seq[String]): Unit = {
-      throw new AnalysisException(
-        s"Failed to execute SHOW CREATE TABLE against table/view ${metadata.identifier}, " +
-          "which is created by Hive and uses the following unsupported feature(s)\n" +
-          features.map(" - " + _).mkString("\n")
-      )
+      throw QueryCompilationErrors.showCreateTableOrViewFailToExecuteUnsupportedFeatureError(
+        metadata, features)
     }
 
     if (metadata.unsupportedFeatures.nonEmpty) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index 5e92ce2..2eb5d76 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -23,7 +23,7 @@ import org.json4s.JsonAST.{JArray, JString}
 import org.json4s.jackson.JsonMethods._
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
+import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, SQLConfHelper, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.{GlobalTempView, LocalTempView, PersistedView, ViewType}
 import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, SessionCatalog, TemporaryViewRelation}
@@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, SubqueryExpr
 import org.apache.spark.sql.catalyst.plans.logical.{AnalysisOnlyCommand, LogicalPlan, Project, View}
 import org.apache.spark.sql.catalyst.util.CharVarcharUtils
 import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper
+import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
 import org.apache.spark.sql.types.{MetadataBuilder, StructType}
 import org.apache.spark.sql.util.SchemaUtils
@@ -86,35 +87,32 @@ case class CreateViewCommand(
   }
 
   if (allowExisting && replace) {
-    throw new AnalysisException("CREATE VIEW with both IF NOT EXISTS and REPLACE is not allowed.")
+    throw QueryCompilationErrors.createViewWithBothIfNotExistsAndReplaceError()
   }
 
   private def isTemporary = viewType == LocalTempView || viewType == GlobalTempView
 
   // Disallows 'CREATE TEMPORARY VIEW IF NOT EXISTS' to be consistent with 'CREATE TEMPORARY TABLE'
   if (allowExisting && isTemporary) {
-    throw new AnalysisException(
-      "It is not allowed to define a TEMPORARY view with IF NOT EXISTS.")
+    throw QueryCompilationErrors.defineTempViewWithIfNotExistsError()
   }
 
   // Temporary view names should NOT contain database prefix like "database.table"
   if (isTemporary && name.database.isDefined) {
     val database = name.database.get
-    throw new AnalysisException(
-      s"It is not allowed to add database prefix `$database` for the TEMPORARY view name.")
+    throw QueryCompilationErrors.notAllowedToAddDBPrefixForTempViewError(database)
   }
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
     if (!isAnalyzed) {
-      throw new AnalysisException("The logical plan that represents the view is not analyzed.")
+      throw QueryCompilationErrors.logicalPlanForViewNotAnalyzedError()
     }
     val analyzedPlan = plan
 
     if (userSpecifiedColumns.nonEmpty &&
         userSpecifiedColumns.length != analyzedPlan.output.length) {
-      throw new AnalysisException(s"The number of columns produced by the SELECT clause " +
-        s"(num: `${analyzedPlan.output.length}`) does not match the number of column names " +
-        s"specified by CREATE VIEW (num: `${userSpecifiedColumns.length}`).")
+      throw QueryCompilationErrors.createViewNumColumnsMismatchUserSpecifiedColumnLengthError(
+        analyzedPlan.output.length, userSpecifiedColumns.length)
     }
 
     val catalog = sparkSession.sessionState.catalog
@@ -154,7 +152,7 @@ case class CreateViewCommand(
         // Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing when the target view
         // already exists.
       } else if (tableMetadata.tableType != CatalogTableType.VIEW) {
-        throw new AnalysisException(s"$name is not a view")
+        throw QueryCompilationErrors.tableIsNotViewError(name)
       } else if (replace) {
         // Detect cyclic view reference on CREATE OR REPLACE VIEW.
         val viewIdent = tableMetadata.identifier
@@ -171,9 +169,7 @@ case class CreateViewCommand(
       } else {
         // Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already
         // exists.
-        throw new AnalysisException(
-          s"View $name already exists. If you want to update the view definition, " +
-            "please use ALTER VIEW AS or CREATE OR REPLACE VIEW AS")
+        throw QueryCompilationErrors.viewAlreadyExistsError(name)
       }
     } else {
       // Create the view if it doesn't exist.
@@ -207,8 +203,7 @@ case class CreateViewCommand(
    */
   private def prepareTable(session: SparkSession, analyzedPlan: LogicalPlan): CatalogTable = {
     if (originalText.isEmpty) {
-      throw new AnalysisException(
-        "It is not allowed to create a persisted view from the Dataset API")
+      throw QueryCompilationErrors.createPersistedViewFromDatasetAPINotAllowedError()
     }
     val aliasedSchema = CharVarcharUtils.getRawSchema(
       aliasPlan(session, analyzedPlan).schema)
@@ -517,8 +512,7 @@ object ViewHelper extends SQLConfHelper with Logging {
         // If the table identifier equals to the `viewIdent`, current view node is the same with
         // the altered view. We detect a view reference cycle, should throw an AnalysisException.
         if (ident == viewIdent) {
-          throw new AnalysisException(s"Recursive view $viewIdent detected " +
-            s"(cycle: ${newPath.mkString(" -> ")})")
+          throw QueryCompilationErrors.recursiveViewDetectedError(viewIdent, newPath)
         } else {
           v.children.foreach { child =>
             checkCyclicViewReference(child, newPath, viewIdent)
@@ -543,8 +537,9 @@ object ViewHelper extends SQLConfHelper with Logging {
     if (!isTemporary && !conf.allowAutoGeneratedAliasForView) {
       child.output.foreach { attr =>
         if (attr.metadata.contains("__autoGeneratedAlias")) {
-          throw new AnalysisException(s"Not allowed to create a permanent view $name without " +
-            s"explicitly assigning an alias for expression ${attr.name}")
+          throw QueryCompilationErrors
+            .notAllowedToCreatePermanentViewWithoutAssigningAliasForExpressionError(name,
+              attr.name)
         }
       }
     }
@@ -562,13 +557,12 @@ object ViewHelper extends SQLConfHelper with Logging {
     if (!isTemporary) {
       val (tempViews, tempFunctions) = collectTemporaryObjects(catalog, child)
       tempViews.foreach { nameParts =>
-        throw new AnalysisException(s"Not allowed to create a permanent view $name by " +
-          s"referencing a temporary view ${nameParts.quoted}. " +
-          "Please create a temp view instead by CREATE TEMP VIEW")
+        throw QueryCompilationErrors.notAllowedToCreatePermanentViewByReferencingTempViewError(
+          name, nameParts.quoted)
       }
       tempFunctions.foreach { funcName =>
-        throw new AnalysisException(s"Not allowed to create a permanent view $name by " +
-          s"referencing a temporary function `${funcName}`")
+        throw QueryCompilationErrors.notAllowedToCreatePermanentViewByReferencingTempFuncError(
+          name, funcName)
       }
     }
   }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org