You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/09/07 11:51:00 UTC

[spark] branch master updated: [SPARK-40185][SQL] Remove column suggestion when the candidate list is empty

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 32567e94b8a [SPARK-40185][SQL] Remove column suggestion when the candidate list is empty
32567e94b8a is described below

commit 32567e94b8ad2550d8b0b4d73e2dfd441d426ecc
Author: Vitalii Li <vi...@databricks.com>
AuthorDate: Wed Sep 7 19:50:33 2022 +0800

    [SPARK-40185][SQL] Remove column suggestion when the candidate list is empty
    
    ### What changes were proposed in this pull request?
    
    1. Remove column, attribute or map key suggestion from `UNRESOLVED_*` error if candidate list is empty.
    2. Sort suggested columns by closeness to unresolved column
    3. Limit number of candidates to 5. Previously entire list of existing columns were shown as suggestions.
    
    ### Why are the changes needed?
    
    When the list of candidates is empty the error message looks incomplete:
    
    `[UNRESOLVED_COLUMN] A column or function parameter with name 'YrMo' cannot be resolved. Did you mean one of the following? []`
    
    This PR is to introduce `GENERIC` error subclass without suggestion and `WITH_SUGGESTION` subclass where error message includes suggested fields/columns:
    
    `[UNRESOLVED_COLUMN.GENERIC] A column or function parameter with name 'YrMo' cannot be resolved.`
    
    OR
    
    `[UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name 'YrMo' cannot be resolved. Did you mean one of the following? ['YearAndMonth', 'Year', 'Month']`
    
    In addition suggested column names are sorted by Levenstein distance and capped to 5.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Unit test
    
    Closes #37621 from vitaliili-db/SC-108622.
    
    Authored-by: Vitalii Li <vi...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 core/src/main/resources/error/error-classes.json   | 42 +++++++++-
 .../org/apache/spark/SparkThrowableSuite.scala     | 16 ++--
 .../spark/sql/catalyst/analysis/Analyzer.scala     | 11 ++-
 .../plans/logical/basicLogicalOperators.scala      |  5 +-
 .../spark/sql/errors/QueryCompilationErrors.scala  | 33 +++++---
 .../sql/catalyst/analysis/AnalysisErrorSuite.scala | 21 ++++-
 .../sql/catalyst/analysis/AnalysisSuite.scala      | 12 ++-
 .../spark/sql/catalyst/analysis/AnalysisTest.scala | 23 +++++-
 .../catalyst/analysis/ResolveSubquerySuite.scala   | 24 +++---
 .../catalyst/analysis/V2WriteAnalysisSuite.scala   | 14 +++-
 .../results/columnresolution-negative.sql.out      | 12 ++-
 .../resources/sql-tests/results/group-by.sql.out   |  6 +-
 .../sql-tests/results/join-lateral.sql.out         | 15 ++--
 .../sql-tests/results/natural-join.sql.out         |  3 +-
 .../test/resources/sql-tests/results/pivot.sql.out |  6 +-
 .../results/postgreSQL/aggregates_part1.sql.out    |  3 +-
 .../results/postgreSQL/create_view.sql.out         |  4 +-
 .../sql-tests/results/postgreSQL/join.sql.out      | 28 ++++---
 .../results/postgreSQL/select_having.sql.out       |  3 +-
 .../results/postgreSQL/select_implicit.sql.out     |  6 +-
 .../sql-tests/results/postgreSQL/union.sql.out     |  3 +-
 .../sql-tests/results/query_regex_column.sql.out   | 24 ++++--
 .../negative-cases/invalid-correlation.sql.out     |  3 +-
 .../sql-tests/results/table-aliases.sql.out        |  3 +-
 .../udf/postgreSQL/udf-aggregates_part1.sql.out    |  3 +-
 .../results/udf/postgreSQL/udf-join.sql.out        | 28 ++++---
 .../udf/postgreSQL/udf-select_having.sql.out       |  3 +-
 .../udf/postgreSQL/udf-select_implicit.sql.out     |  6 +-
 .../sql-tests/results/udf/udf-group-by.sql.out     |  3 +-
 .../sql-tests/results/udf/udf-pivot.sql.out        |  6 +-
 .../apache/spark/sql/DataFrameFunctionsSuite.scala | 89 ++++++++++++----------
 .../apache/spark/sql/DataFrameSelfJoinSuite.scala  |  3 +-
 .../apache/spark/sql/DataFrameToSchemaSuite.scala  |  4 +-
 .../spark/sql/DataFrameWindowFunctionsSuite.scala  |  9 ++-
 .../scala/org/apache/spark/sql/DatasetSuite.scala  | 44 ++++++-----
 .../org/apache/spark/sql/DatasetUnpivotSuite.scala |  9 ++-
 .../org/apache/spark/sql/SQLInsertTestSuite.scala  | 10 ++-
 .../scala/org/apache/spark/sql/SubquerySuite.scala | 11 ++-
 .../test/scala/org/apache/spark/sql/UDFSuite.scala | 11 ++-
 .../spark/sql/connector/DataSourceV2SQLSuite.scala | 19 +++--
 .../sql/errors/QueryCompilationErrorsSuite.scala   | 11 ++-
 .../apache/spark/sql/execution/SQLViewSuite.scala  |  6 +-
 .../execution/command/v2/DescribeTableSuite.scala  |  6 +-
 .../org/apache/spark/sql/sources/InsertSuite.scala | 11 +--
 .../apache/spark/sql/hive/HiveParquetSuite.scala   |  9 ++-
 45 files changed, 423 insertions(+), 198 deletions(-)

diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json
index b923d5a39e0..f39ee465768 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -448,20 +448,56 @@
   },
   "UNRESOLVED_COLUMN" : {
     "message" : [
-      "A column or function parameter with name <objectName> cannot be resolved. Did you mean one of the following? [<objectList>]"
+      "A column or function parameter with name <objectName> cannot be resolved."
     ],
+    "subClass" : {
+      "WITHOUT_SUGGESTION" : {
+        "message" : [
+          ""
+        ]
+      },
+      "WITH_SUGGESTION" : {
+        "message" : [
+          "Did you mean one of the following? [<proposal>]"
+        ]
+      }
+    },
     "sqlState" : "42000"
   },
   "UNRESOLVED_FIELD" : {
     "message" : [
-      "A field with name <fieldName> cannot be resolved with the struct-type column <columnPath>. Did you mean one of the following? [<proposal>]"
+      "A field with name <fieldName> cannot be resolved with the struct-type column <columnPath>."
     ],
+    "subClass" : {
+      "WITHOUT_SUGGESTION" : {
+        "message" : [
+          ""
+        ]
+      },
+      "WITH_SUGGESTION" : {
+        "message" : [
+          "Did you mean one of the following? [<proposal>]"
+        ]
+      }
+    },
     "sqlState" : "42000"
   },
   "UNRESOLVED_MAP_KEY" : {
     "message" : [
-      "Cannot resolve column <columnName> as a map key. If the key is a string literal, please add single quotes around it. Otherwise, did you mean one of the following column(s)? [<proposal>]"
+      "Cannot resolve column <columnName> as a map key. If the key is a string literal, please add single quotes around it."
     ],
+    "subClass" : {
+      "WITHOUT_SUGGESTION" : {
+        "message" : [
+          ""
+        ]
+      },
+      "WITH_SUGGESTION" : {
+        "message" : [
+          "Otherwise did you mean one of the following column(s)? [<proposal>]"
+        ]
+      }
+    },
     "sqlState" : "42000"
   },
   "UNSUPPORTED_DATATYPE" : {
diff --git a/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala b/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala
index 8f5b1ba645c..dda86387f15 100644
--- a/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala
@@ -126,7 +126,7 @@ class SparkThrowableSuite extends SparkFunSuite {
 
   test("Message format invariants") {
     val messageFormats = errorClassToInfoMap.values.toSeq.flatMap { i =>
-      Seq(i.messageFormat) ++ i.subClass.getOrElse(Map.empty).values.toSeq.map(_.messageFormat)
+      Seq(i.messageFormat)
     }
     checkCondition(messageFormats, s => s != null)
     checkIfUnique(messageFormats)
@@ -159,7 +159,7 @@ class SparkThrowableSuite extends SparkFunSuite {
   test("Check if message parameters match message format") {
     // Requires 2 args
     intercept[IllegalFormatException] {
-      getMessage("UNRESOLVED_COLUMN", null, Array.empty)
+      getMessage("UNRESOLVED_COLUMN", "WITHOUT_SUGGESTION", Array.empty)
     }
 
     // Does not fail with too many args (expects 0 args)
@@ -171,9 +171,15 @@ class SparkThrowableSuite extends SparkFunSuite {
   }
 
   test("Error message is formatted") {
-    assert(getMessage("UNRESOLVED_COLUMN", null, Array("`foo`", "`bar`, `baz`")) ==
-      "[UNRESOLVED_COLUMN] A column or function parameter with name `foo` cannot be resolved. " +
-        "Did you mean one of the following? [`bar`, `baz`]")
+    assert(
+      getMessage(
+        "UNRESOLVED_COLUMN",
+        "WITH_SUGGESTION",
+        Array("`foo`", "`bar`, `baz`")
+      ) ==
+      "[UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with " +
+        "name `foo` cannot be resolved. Did you mean one of the following? [`bar`, `baz`]"
+    )
   }
 
   test("Try catching legacy SparkError") {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 6636dcc63e6..6fc9d756c99 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -41,7 +41,7 @@ import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
 import org.apache.spark.sql.catalyst.trees.{AlwaysProcess, CurrentOrigin}
 import org.apache.spark.sql.catalyst.trees.CurrentOrigin.withOrigin
 import org.apache.spark.sql.catalyst.trees.TreePattern._
-import org.apache.spark.sql.catalyst.util.{toPrettySQL, CharVarcharUtils}
+import org.apache.spark.sql.catalyst.util.{toPrettySQL, CharVarcharUtils, StringUtils}
 import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
 import org.apache.spark.sql.connector.catalog._
 import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
@@ -3440,9 +3440,12 @@ class Analyzer(override val catalogManager: CatalogManager)
         i.userSpecifiedCols, "in the column list", resolver)
 
       i.userSpecifiedCols.map { col =>
-        i.table.resolve(Seq(col), resolver).getOrElse(
-          throw QueryCompilationErrors.unresolvedAttributeError(
-            "UNRESOLVED_COLUMN", col, i.table.output.map(_.name), i.origin))
+        i.table.resolve(Seq(col), resolver).getOrElse {
+          val candidates = i.table.output.map(_.name)
+          val orderedCandidates = StringUtils.orderStringsBySimilarity(col, candidates)
+          throw QueryCompilationErrors
+            .unresolvedAttributeError("UNRESOLVED_COLUMN", col, orderedCandidates, i.origin)
+        }
       }
     }
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 0a24134e4af..1baa5c20ba4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -178,7 +178,10 @@ object Project {
           createNewColumn(columnExpr, f.name, f.metadata, Metadata.empty)
         } else {
           if (columnPath.isEmpty) {
-            throw QueryCompilationErrors.unresolvedColumnError(f.name, fields.map(_._1))
+            val candidates = fields.map(_._1)
+            val orderedCandidates =
+              StringUtils.orderStringsBySimilarity(f.name, candidates)
+            throw QueryCompilationErrors.unresolvedColumnError(f.name, orderedCandidates)
           } else {
             throw QueryCompilationErrors.unresolvedFieldError(f.name, columnPath, fields.map(_._1))
           }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index d142be68b52..11e1bb2c6b9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -168,30 +168,43 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
       colName: String,
       candidates: Seq[String],
       origin: Origin): Throwable = {
-    val candidateIds = candidates.map(candidate => toSQLId(candidate))
     new AnalysisException(
       errorClass = errorClass,
-      messageParameters = Array(toSQLId(colName), candidateIds.mkString(", ")),
-      origin = origin)
+      errorSubClass = if (candidates.isEmpty) "WITHOUT_SUGGESTION" else "WITH_SUGGESTION",
+      messageParameters = Array.concat(Array(toSQLId(colName)), if (candidates.isEmpty) {
+        Array.empty
+      } else {
+        Array(candidates.take(5).map(toSQLId).mkString(", "))
+      }),
+      origin = origin
+    )
   }
 
-  def unresolvedColumnError(
-      columnName: String,
-      proposal: Seq[String]): Throwable = {
-    val proposalStr = proposal.map(toSQLId).mkString(", ")
+  def unresolvedColumnError(columnName: String, proposal: Seq[String]): Throwable = {
     new AnalysisException(
       errorClass = "UNRESOLVED_COLUMN",
-      messageParameters = Array(toSQLId(columnName), proposalStr))
+      errorSubClass = if (proposal.isEmpty) "WITHOUT_SUGGESTION" else "WITH_SUGGESTION",
+      messageParameters = Array.concat(Array(toSQLId(columnName)), if (proposal.isEmpty) {
+        Array.empty
+      } else {
+        Array(proposal.take(5).map(toSQLId).mkString(", "))
+      }))
   }
 
   def unresolvedFieldError(
       fieldName: String,
       columnPath: Seq[String],
       proposal: Seq[String]): Throwable = {
-    val proposalStr = proposal.map(toSQLId).mkString(", ")
     new AnalysisException(
       errorClass = "UNRESOLVED_FIELD",
-      messageParameters = Array(toSQLId(fieldName), toSQLId(columnPath), proposalStr))
+      errorSubClass = if (proposal.isEmpty) "WITHOUT_SUGGESTION" else "WITH_SUGGESTION",
+      messageParameters =
+        Array.concat(Array(toSQLId(fieldName), toSQLId(columnPath)), if (proposal.isEmpty) {
+          Array.empty
+        } else {
+          Array(proposal.map(toSQLId).mkString(", "))
+        })
+    )
   }
 
   def dataTypeMismatchForDeserializerError(
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
index 257922eb81c..cb89531757c 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
@@ -117,8 +117,18 @@ class AnalysisErrorSuite extends AnalysisTest {
       plan: LogicalPlan,
       errorClass: String,
       messageParameters: Array[String]): Unit = {
+    errorClassTest(name, plan, errorClass, null, messageParameters)
+  }
+
+  def errorClassTest(
+      name: String,
+      plan: LogicalPlan,
+      errorClass: String,
+      errorSubClass: String,
+      messageParameters: Array[String]): Unit = {
     test(name) {
-      assertAnalysisErrorClass(plan, errorClass, messageParameters)
+      assertAnalysisErrorClass(plan, errorClass, errorSubClass, messageParameters,
+        caseSensitive = true)
     }
   }
 
@@ -292,6 +302,7 @@ class AnalysisErrorSuite extends AnalysisTest {
     "unresolved attributes",
     testRelation.select($"abcd"),
     "UNRESOLVED_COLUMN",
+    "WITH_SUGGESTION",
     Array("`abcd`", "`a`"))
 
   errorClassTest(
@@ -300,6 +311,7 @@ class AnalysisErrorSuite extends AnalysisTest {
       .where(sum($"b") > 0)
       .orderBy($"havingCondition".asc),
     "UNRESOLVED_COLUMN",
+    "WITH_SUGGESTION",
     Array("`havingCondition`", "`max(b)`"))
 
   errorTest(
@@ -316,6 +328,7 @@ class AnalysisErrorSuite extends AnalysisTest {
     "sorting by attributes are not from grouping expressions",
     testRelation2.groupBy($"a", $"c")($"a", $"c", count($"a").as("a3")).orderBy($"b".asc),
     "UNRESOLVED_COLUMN",
+    "WITH_SUGGESTION",
     Array("`b`", "`a`, `c`, `a3`"))
 
   errorTest(
@@ -410,6 +423,7 @@ class AnalysisErrorSuite extends AnalysisTest {
     // When parse SQL string, we will wrap aggregate expressions with UnresolvedAlias.
     testRelation2.where($"bad_column" > 1).groupBy($"a")(UnresolvedAlias(max($"b"))),
     "UNRESOLVED_COLUMN",
+    "WITH_SUGGESTION",
     Array("`bad_column`", "`a`, `b`, `c`, `d`, `e`"))
 
   errorTest(
@@ -830,8 +844,9 @@ class AnalysisErrorSuite extends AnalysisTest {
   errorTest(
     "SPARK-34920: error code to error message",
     testRelation2.where($"bad_column" > 1).groupBy($"a")(UnresolvedAlias(max($"b"))),
-    "[UNRESOLVED_COLUMN] A column or function parameter with name `bad_column` cannot be " +
-      "resolved. Did you mean one of the following? [`a`, `b`, `c`, `d`, `e`]"
+    "[UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name " +
+      "`bad_column` cannot be resolved. Did you mean one of the following? " +
+      "[`a`, `b`, `c`, `d`, `e`]"
       :: Nil)
 
   test("SPARK-35080: Unsupported correlated equality predicates in subquery") {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index 1b397935a89..1fd001c27d6 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -104,7 +104,9 @@ class AnalysisSuite extends AnalysisTest with Matchers {
       Project(Seq(UnresolvedAttribute("tBl.a")),
         SubqueryAlias("TbL", UnresolvedRelation(TableIdentifier("TaBlE")))),
       "UNRESOLVED_COLUMN",
-      Array("`tBl`.`a`", "`TbL`.`a`"))
+      "WITH_SUGGESTION",
+      Array("`tBl`.`a`", "`TbL`.`a`"),
+      caseSensitive = true)
 
     checkAnalysisWithoutViewWrapper(
       Project(Seq(UnresolvedAttribute("TbL.a")),
@@ -712,7 +714,9 @@ class AnalysisSuite extends AnalysisTest with Matchers {
   test("CTE with non-existing column alias") {
     assertAnalysisErrorClass(parsePlan("WITH t(x) AS (SELECT 1) SELECT * FROM t WHERE y = 1"),
       "UNRESOLVED_COLUMN",
-      Array("`y`", "`t`.`x`"))
+      "WITH_SUGGESTION",
+      Array("`y`", "`t`.`x`"),
+      caseSensitive = true)
   }
 
   test("CTE with non-matching column alias") {
@@ -1150,7 +1154,9 @@ class AnalysisSuite extends AnalysisTest with Matchers {
         |ORDER BY c.x + c.y
         |""".stripMargin),
       "UNRESOLVED_COLUMN",
-      Array("`c`.`y`", "`x`"))
+      "WITH_SUGGESTION",
+      Array("`c`.`y`", "`x`"),
+      caseSensitive = true)
   }
 
   test("SPARK-38118: Func(wrong_type) in the HAVING clause should throw data mismatch error") {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
index 7dde85014e7..94cb68a26f7 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
@@ -175,6 +175,20 @@ trait AnalysisTest extends PlanTest {
       expectedErrorClass: String,
       expectedMessageParameters: Array[String],
       caseSensitive: Boolean = true): Unit = {
+    assertAnalysisErrorClass(
+      inputPlan,
+      expectedErrorClass,
+      null,
+      expectedMessageParameters,
+      caseSensitive)
+  }
+
+  protected def assertAnalysisErrorClass(
+      inputPlan: LogicalPlan,
+      expectedErrorClass: String,
+      expectedErrorSubClass: String,
+      expectedMessageParameters: Array[String],
+      caseSensitive: Boolean): Unit = {
     withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
       val analyzer = getAnalyzer
       val e = intercept[AnalysisException] {
@@ -182,7 +196,8 @@ trait AnalysisTest extends PlanTest {
       }
 
       if (e.getErrorClass != expectedErrorClass ||
-        !e.messageParameters.sameElements(expectedMessageParameters)) {
+        !e.messageParameters.sameElements(expectedMessageParameters) ||
+        e.getErrorSubClass != expectedErrorSubClass) {
         var failMsg = ""
         if (e.getErrorClass != expectedErrorClass) {
           failMsg +=
@@ -190,6 +205,12 @@ trait AnalysisTest extends PlanTest {
                |Actual error class: ${e.getErrorClass}
              """.stripMargin
         }
+        if (e.getErrorSubClass != expectedErrorSubClass) {
+          failMsg +=
+            s"""Error sub class should be: $expectedErrorSubClass
+               |Actual error sub class: ${e.getErrorSubClass}
+             """.stripMargin
+        }
         if (!e.messageParameters.sameElements(expectedMessageParameters)) {
           failMsg +=
             s"""Message parameters should be: ${expectedMessageParameters.mkString("\n  ")}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveSubquerySuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveSubquerySuite.scala
index 11dcae12406..b3a19041220 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveSubquerySuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveSubquerySuite.scala
@@ -134,7 +134,9 @@ class ResolveSubquerySuite extends AnalysisTest {
     assertAnalysisErrorClass(
       lateralJoin(t1, lateralJoin(t2, t0.select($"a", $"b", $"c"))),
       "UNRESOLVED_COLUMN",
-      Array("`a`", ""))
+      "WITHOUT_SUGGESTION",
+      Array("`a`"),
+      caseSensitive = true)
   }
 
   test("lateral subquery with unresolvable attributes") {
@@ -142,26 +144,30 @@ class ResolveSubquerySuite extends AnalysisTest {
     assertAnalysisErrorClass(
       lateralJoin(t1, t0.select($"a", $"c")),
       "UNRESOLVED_COLUMN",
-      Array("`c`", "")
-    )
+      "WITHOUT_SUGGESTION",
+      Array("`c`"),
+      caseSensitive = true)
     // SELECT * FROM t1, LATERAL (SELECT a, b, c, d FROM t2)
     assertAnalysisErrorClass(
       lateralJoin(t1, t2.select($"a", $"b", $"c", $"d")),
       "UNRESOLVED_COLUMN",
-      Array("`d`", "`b`, `c`")
-    )
+      "WITH_SUGGESTION",
+      Array("`d`", "`b`, `c`"),
+      caseSensitive = true)
     // SELECT * FROM t1, LATERAL (SELECT * FROM t2, LATERAL (SELECT t1.a))
     assertAnalysisErrorClass(
       lateralJoin(t1, lateralJoin(t2, t0.select($"t1.a"))),
       "UNRESOLVED_COLUMN",
-      Array("`t1`.`a`", "")
-    )
+      "WITHOUT_SUGGESTION",
+      Array("`t1`.`a`"),
+      caseSensitive = true)
     // SELECT * FROM t1, LATERAL (SELECT * FROM t2, LATERAL (SELECT a, b))
     assertAnalysisErrorClass(
       lateralJoin(t1, lateralJoin(t2, t0.select($"a", $"b"))),
       "UNRESOLVED_COLUMN",
-      Array("`a`", "")
-    )
+      "WITHOUT_SUGGESTION",
+      Array("`a`"),
+      caseSensitive = true)
   }
 
   test("lateral subquery with struct type") {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala
index 7fbedc7b312..e5fda40cf51 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala
@@ -688,7 +688,12 @@ abstract class V2WriteAnalysisSuiteBase extends AnalysisTest {
       LessThanOrEqual(UnresolvedAttribute(Seq("a")), Literal(15.0d)))
 
     assertNotResolved(parsedPlan)
-    assertAnalysisErrorClass(parsedPlan, "UNRESOLVED_COLUMN", Array("`a`", "`x`, `y`"))
+    assertAnalysisErrorClass(
+      parsedPlan,
+      "UNRESOLVED_COLUMN",
+      "WITH_SUGGESTION",
+      Array("`a`", "`x`, `y`"),
+      caseSensitive = true)
 
     val tableAcceptAnySchema = TestRelationAcceptAnySchema(StructType(Seq(
       StructField("x", DoubleType, nullable = false),
@@ -697,7 +702,12 @@ abstract class V2WriteAnalysisSuiteBase extends AnalysisTest {
     val parsedPlan2 = OverwriteByExpression.byPosition(tableAcceptAnySchema, query,
       LessThanOrEqual(UnresolvedAttribute(Seq("a")), Literal(15.0d)))
     assertNotResolved(parsedPlan2)
-    assertAnalysisErrorClass(parsedPlan2, "UNRESOLVED_COLUMN", Array("`a`", "`x`, `y`"))
+    assertAnalysisErrorClass(
+      parsedPlan2,
+      "UNRESOLVED_COLUMN",
+      "WITH_SUGGESTION",
+      Array("`a`", "`x`, `y`"),
+      caseSensitive = true)
   }
 
   test("SPARK-36498: reorder inner fields with byName mode") {
diff --git a/sql/core/src/test/resources/sql-tests/results/columnresolution-negative.sql.out b/sql/core/src/test/resources/sql-tests/results/columnresolution-negative.sql.out
index 1eaf5f03f58..3afc3f35476 100644
--- a/sql/core/src/test/resources/sql-tests/results/columnresolution-negative.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/columnresolution-negative.sql.out
@@ -160,10 +160,11 @@ struct<>
 org.apache.spark.sql.AnalysisException
 {
   "errorClass" : "UNRESOLVED_COLUMN",
+  "errorSubClass" : "WITH_SUGGESTION",
   "sqlState" : "42000",
   "messageParameters" : {
     "objectName" : "`db1`.`t1`.`i1`",
-    "objectList" : "`spark_catalog`.`mydb2`.`t1`.`i1`, `spark_catalog`.`mydb2`.`t1`.`i1`"
+    "proposal" : "`spark_catalog`.`mydb2`.`t1`.`i1`, `spark_catalog`.`mydb2`.`t1`.`i1`"
   }
 }
 
@@ -192,10 +193,11 @@ struct<>
 org.apache.spark.sql.AnalysisException
 {
   "errorClass" : "UNRESOLVED_COLUMN",
+  "errorSubClass" : "WITH_SUGGESTION",
   "sqlState" : "42000",
   "messageParameters" : {
     "objectName" : "`mydb1`.`t1`",
-    "objectList" : "`spark_catalog`.`mydb1`.`t1`.`i1`"
+    "proposal" : "`spark_catalog`.`mydb1`.`t1`.`i1`"
   }
 }
 
@@ -217,10 +219,11 @@ struct<>
 org.apache.spark.sql.AnalysisException
 {
   "errorClass" : "UNRESOLVED_COLUMN",
+  "errorSubClass" : "WITH_SUGGESTION",
   "sqlState" : "42000",
   "messageParameters" : {
     "objectName" : "`t1`",
-    "objectList" : "`spark_catalog`.`mydb1`.`t1`.`i1`"
+    "proposal" : "`spark_catalog`.`mydb1`.`t1`.`i1`"
   }
 }
 
@@ -241,10 +244,11 @@ struct<>
 org.apache.spark.sql.AnalysisException
 {
   "errorClass" : "UNRESOLVED_COLUMN",
+  "errorSubClass" : "WITH_SUGGESTION",
   "sqlState" : "42000",
   "messageParameters" : {
     "objectName" : "`mydb1`.`t1`.`i1`",
-    "objectList" : "`spark_catalog`.`mydb2`.`t1`.`i1`"
+    "proposal" : "`spark_catalog`.`mydb2`.`t1`.`i1`"
   }
 }
 
diff --git a/sql/core/src/test/resources/sql-tests/results/group-by.sql.out b/sql/core/src/test/resources/sql-tests/results/group-by.sql.out
index 2b5e8cfcc50..834d1972cfd 100644
--- a/sql/core/src/test/resources/sql-tests/results/group-by.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/group-by.sql.out
@@ -166,10 +166,11 @@ struct<>
 org.apache.spark.sql.AnalysisException
 {
   "errorClass" : "UNRESOLVED_COLUMN",
+  "errorSubClass" : "WITH_SUGGESTION",
   "sqlState" : "42000",
   "messageParameters" : {
     "objectName" : "`non_existing`",
-    "objectList" : "`testdata`.`a`, `testdata`.`b`"
+    "proposal" : "`testdata`.`a`, `testdata`.`b`"
   }
 }
 
@@ -217,10 +218,11 @@ struct<>
 org.apache.spark.sql.AnalysisException
 {
   "errorClass" : "UNRESOLVED_COLUMN",
+  "errorSubClass" : "WITH_SUGGESTION",
   "sqlState" : "42000",
   "messageParameters" : {
     "objectName" : "`k`",
-    "objectList" : "`testdata`.`a`, `testdata`.`b`"
+    "proposal" : "`testdata`.`a`, `testdata`.`b`"
   }
 }
 
diff --git a/sql/core/src/test/resources/sql-tests/results/join-lateral.sql.out b/sql/core/src/test/resources/sql-tests/results/join-lateral.sql.out
index eee62aba35f..5180df6fcde 100644
--- a/sql/core/src/test/resources/sql-tests/results/join-lateral.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/join-lateral.sql.out
@@ -280,10 +280,10 @@ struct<>
 org.apache.spark.sql.AnalysisException
 {
   "errorClass" : "UNRESOLVED_COLUMN",
+  "errorSubClass" : "WITHOUT_SUGGESTION",
   "sqlState" : "42000",
   "messageParameters" : {
-    "objectName" : "`t2`.`c1`",
-    "objectList" : ""
+    "objectName" : "`t2`.`c1`"
   }
 }
 
@@ -405,10 +405,10 @@ struct<>
 org.apache.spark.sql.AnalysisException
 {
   "errorClass" : "UNRESOLVED_COLUMN",
+  "errorSubClass" : "WITHOUT_SUGGESTION",
   "sqlState" : "42000",
   "messageParameters" : {
-    "objectName" : "`t1`.`c1`",
-    "objectList" : ""
+    "objectName" : "`t1`.`c1`"
   }
 }
 
@@ -421,10 +421,10 @@ struct<>
 org.apache.spark.sql.AnalysisException
 {
   "errorClass" : "UNRESOLVED_COLUMN",
+  "errorSubClass" : "WITHOUT_SUGGESTION",
   "sqlState" : "42000",
   "messageParameters" : {
-    "objectName" : "`c2`",
-    "objectList" : ""
+    "objectName" : "`c2`"
   }
 }
 
@@ -455,10 +455,11 @@ struct<>
 org.apache.spark.sql.AnalysisException
 {
   "errorClass" : "UNRESOLVED_COLUMN",
+  "errorSubClass" : "WITH_SUGGESTION",
   "sqlState" : "42000",
   "messageParameters" : {
     "objectName" : "`t1`.`c1`",
-    "objectList" : "`spark_catalog`.`default`.`t2`.`c1`, `spark_catalog`.`default`.`t2`.`c2`"
+    "proposal" : "`spark_catalog`.`default`.`t2`.`c1`, `spark_catalog`.`default`.`t2`.`c2`"
   }
 }
 
diff --git a/sql/core/src/test/resources/sql-tests/results/natural-join.sql.out b/sql/core/src/test/resources/sql-tests/results/natural-join.sql.out
index fa5f47938a9..debb6b626bd 100644
--- a/sql/core/src/test/resources/sql-tests/results/natural-join.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/natural-join.sql.out
@@ -231,10 +231,11 @@ struct<>
 org.apache.spark.sql.AnalysisException
 {
   "errorClass" : "UNRESOLVED_COLUMN",
+  "errorSubClass" : "WITH_SUGGESTION",
   "sqlState" : "42000",
   "messageParameters" : {
     "objectName" : "`nt2`.`k`",
-    "objectList" : "`__auto_generated_subquery_name`.`k`, `__auto_generated_subquery_name`.`v1`, `__auto_generated_subquery_name`.`v2`"
+    "proposal" : "`__auto_generated_subquery_name`.`k`, `__auto_generated_subquery_name`.`v1`, `__auto_generated_subquery_name`.`v2`"
   }
 }
 
diff --git a/sql/core/src/test/resources/sql-tests/results/pivot.sql.out b/sql/core/src/test/resources/sql-tests/results/pivot.sql.out
index 239ce7a8eda..bacc541deff 100644
--- a/sql/core/src/test/resources/sql-tests/results/pivot.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/pivot.sql.out
@@ -231,10 +231,11 @@ struct<>
 org.apache.spark.sql.AnalysisException
 {
   "errorClass" : "UNRESOLVED_COLUMN",
+  "errorSubClass" : "WITH_SUGGESTION",
   "sqlState" : "42000",
   "messageParameters" : {
     "objectName" : "`year`",
-    "objectList" : "`__auto_generated_subquery_name`.`course`, `__auto_generated_subquery_name`.`earnings`"
+    "proposal" : "`__auto_generated_subquery_name`.`course`, `__auto_generated_subquery_name`.`earnings`"
   }
 }
 
@@ -340,10 +341,11 @@ struct<>
 org.apache.spark.sql.AnalysisException
 {
   "errorClass" : "UNRESOLVED_COLUMN",
+  "errorSubClass" : "WITH_SUGGESTION",
   "sqlState" : "42000",
   "messageParameters" : {
     "objectName" : "`s`",
-    "objectList" : "`coursesales`.`year`, `coursesales`.`course`, `coursesales`.`earnings`"
+    "proposal" : "`coursesales`.`year`, `coursesales`.`course`, `coursesales`.`earnings`"
   }
 }
 
diff --git a/sql/core/src/test/resources/sql-tests/results/postgreSQL/aggregates_part1.sql.out b/sql/core/src/test/resources/sql-tests/results/postgreSQL/aggregates_part1.sql.out
index 79bfc138c74..1a6f3869984 100644
--- a/sql/core/src/test/resources/sql-tests/results/postgreSQL/aggregates_part1.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/postgreSQL/aggregates_part1.sql.out
@@ -497,9 +497,10 @@ struct<>
 org.apache.spark.sql.AnalysisException
 {
   "errorClass" : "UNRESOLVED_COLUMN",
+  "errorSubClass" : "WITH_SUGGESTION",
   "sqlState" : "42000",
   "messageParameters" : {
     "objectName" : "`o`.`unique1`",
-    "objectList" : "`i`.`unique1`, `i`.`unique2`, `i`.`hundred`, `i`.`even`, `i`.`four`, `i`.`stringu1`, `i`.`ten`, `i`.`odd`, `i`.`string4`, `i`.`stringu2`, `i`.`tenthous`, `i`.`twenty`, `i`.`two`, `i`.`thousand`, `i`.`fivethous`, `i`.`twothousand`"
+    "proposal" : "`i`.`unique1`, `i`.`unique2`, `i`.`hundred`, `i`.`even`, `i`.`four`"
   }
 }
diff --git a/sql/core/src/test/resources/sql-tests/results/postgreSQL/create_view.sql.out b/sql/core/src/test/resources/sql-tests/results/postgreSQL/create_view.sql.out
index 6556538304f..5fc399525c9 100644
--- a/sql/core/src/test/resources/sql-tests/results/postgreSQL/create_view.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/postgreSQL/create_view.sql.out
@@ -65,10 +65,10 @@ struct<>
 org.apache.spark.sql.AnalysisException
 {
   "errorClass" : "UNRESOLVED_COLUMN",
+  "errorSubClass" : "WITHOUT_SUGGESTION",
   "sqlState" : "42000",
   "messageParameters" : {
-    "objectName" : "`FROM`",
-    "objectList" : ""
+    "objectName" : "`FROM`"
   }
 }
 
diff --git a/sql/core/src/test/resources/sql-tests/results/postgreSQL/join.sql.out b/sql/core/src/test/resources/sql-tests/results/postgreSQL/join.sql.out
index 4b2013ba206..19e5d94f24b 100644
--- a/sql/core/src/test/resources/sql-tests/results/postgreSQL/join.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/postgreSQL/join.sql.out
@@ -3247,10 +3247,11 @@ struct<>
 org.apache.spark.sql.AnalysisException
 {
   "errorClass" : "UNRESOLVED_COLUMN",
+  "errorSubClass" : "WITH_SUGGESTION",
   "sqlState" : "42000",
   "messageParameters" : {
     "objectName" : "`y`.`f1`",
-    "objectList" : "`j`.`f1`, `j`.`f1`, `x`.`q1`, `x`.`q2`"
+    "proposal" : "`j`.`f1`, `j`.`f1`, `x`.`q1`, `x`.`q2`"
   }
 }
 
@@ -3273,10 +3274,11 @@ struct<>
 org.apache.spark.sql.AnalysisException
 {
   "errorClass" : "UNRESOLVED_COLUMN",
+  "errorSubClass" : "WITH_SUGGESTION",
   "sqlState" : "42000",
   "messageParameters" : {
     "objectName" : "`t1`.`uunique1`",
-    "objectList" : "`t1`.`unique1`, `t2`.`unique1`, `t1`.`unique2`, `t2`.`unique2`, `t1`.`hundred`, `t2`.`hundred`, `t1`.`stringu1`, `t1`.`even`, `t1`.`four`, `t1`.`string4`, `t2`.`stringu1`, `t1`.`stringu2`, `t1`.`ten`, `t1`.`tenthous`, `t2`.`even`, `t2`.`four`, `t1`.`odd`, `t2`.`string4`, `t2`.`stringu2`, `t2`.`ten`, `t2`.`tenthous`, `t1`.`thousand`, `t1`.`twenty`, `t1`.`two`, `t1`.`fivethous`, `t2`.`odd`, `t2`.`thousand`, `t2`.`twenty`, `t2`.`two`, `t2`.`fivethous`, `t1`.`twothousand` [...]
+    "proposal" : "`t1`.`unique1`, `t2`.`unique1`, `t1`.`unique2`, `t2`.`unique2`, `t1`.`hundred`"
   }
 }
 
@@ -3290,10 +3292,11 @@ struct<>
 org.apache.spark.sql.AnalysisException
 {
   "errorClass" : "UNRESOLVED_COLUMN",
+  "errorSubClass" : "WITH_SUGGESTION",
   "sqlState" : "42000",
   "messageParameters" : {
     "objectName" : "`t2`.`uunique1`",
-    "objectList" : "`t2`.`unique1`, `t1`.`unique1`, `t2`.`unique2`, `t1`.`unique2`, `t2`.`hundred`, `t1`.`hundred`, `t2`.`stringu1`, `t2`.`even`, `t2`.`four`, `t2`.`string4`, `t1`.`stringu1`, `t2`.`stringu2`, `t2`.`ten`, `t2`.`tenthous`, `t1`.`even`, `t1`.`four`, `t2`.`odd`, `t1`.`string4`, `t1`.`stringu2`, `t1`.`ten`, `t1`.`tenthous`, `t2`.`thousand`, `t2`.`twenty`, `t2`.`two`, `t2`.`fivethous`, `t1`.`odd`, `t1`.`thousand`, `t1`.`twenty`, `t1`.`two`, `t1`.`fivethous`, `t2`.`twothousand` [...]
+    "proposal" : "`t2`.`unique1`, `t1`.`unique1`, `t2`.`unique2`, `t1`.`unique2`, `t2`.`hundred`"
   }
 }
 
@@ -3307,10 +3310,11 @@ struct<>
 org.apache.spark.sql.AnalysisException
 {
   "errorClass" : "UNRESOLVED_COLUMN",
+  "errorSubClass" : "WITH_SUGGESTION",
   "sqlState" : "42000",
   "messageParameters" : {
     "objectName" : "`uunique1`",
-    "objectList" : "`t1`.`unique1`, `t2`.`unique1`, `t1`.`unique2`, `t2`.`unique2`, `t1`.`even`, `t2`.`even`, `t1`.`four`, `t2`.`four`, `t1`.`ten`, `t2`.`ten`, `t1`.`hundred`, `t2`.`hundred`, `t1`.`odd`, `t2`.`odd`, `t1`.`two`, `t2`.`two`, `t1`.`stringu1`, `t2`.`stringu1`, `t1`.`twenty`, `t2`.`twenty`, `t1`.`string4`, `t2`.`string4`, `t1`.`stringu2`, `t2`.`stringu2`, `t1`.`tenthous`, `t2`.`tenthous`, `t1`.`thousand`, `t2`.`thousand`, `t1`.`fivethous`, `t2`.`fivethous`, `t1`.`twothousand` [...]
+    "proposal" : "`t1`.`unique1`, `t2`.`unique1`, `t1`.`unique2`, `t2`.`unique2`, `t1`.`even`"
   }
 }
 
@@ -3514,10 +3518,10 @@ struct<>
 org.apache.spark.sql.AnalysisException
 {
   "errorClass" : "UNRESOLVED_COLUMN",
+  "errorSubClass" : "WITHOUT_SUGGESTION",
   "sqlState" : "42000",
   "messageParameters" : {
-    "objectName" : "`f1`",
-    "objectList" : ""
+    "objectName" : "`f1`"
   }
 }
 
@@ -3530,10 +3534,10 @@ struct<>
 org.apache.spark.sql.AnalysisException
 {
   "errorClass" : "UNRESOLVED_COLUMN",
+  "errorSubClass" : "WITHOUT_SUGGESTION",
   "sqlState" : "42000",
   "messageParameters" : {
-    "objectName" : "`a`.`f1`",
-    "objectList" : ""
+    "objectName" : "`a`.`f1`"
   }
 }
 
@@ -3546,10 +3550,10 @@ struct<>
 org.apache.spark.sql.AnalysisException
 {
   "errorClass" : "UNRESOLVED_COLUMN",
+  "errorSubClass" : "WITHOUT_SUGGESTION",
   "sqlState" : "42000",
   "messageParameters" : {
-    "objectName" : "`f1`",
-    "objectList" : ""
+    "objectName" : "`f1`"
   }
 }
 
@@ -3562,10 +3566,10 @@ struct<>
 org.apache.spark.sql.AnalysisException
 {
   "errorClass" : "UNRESOLVED_COLUMN",
+  "errorSubClass" : "WITHOUT_SUGGESTION",
   "sqlState" : "42000",
   "messageParameters" : {
-    "objectName" : "`a`.`f1`",
-    "objectList" : ""
+    "objectName" : "`a`.`f1`"
   }
 }
 
diff --git a/sql/core/src/test/resources/sql-tests/results/postgreSQL/select_having.sql.out b/sql/core/src/test/resources/sql-tests/results/postgreSQL/select_having.sql.out
index 4ab2e877902..0c2b1701830 100644
--- a/sql/core/src/test/resources/sql-tests/results/postgreSQL/select_having.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/postgreSQL/select_having.sql.out
@@ -151,10 +151,11 @@ struct<>
 org.apache.spark.sql.AnalysisException
 {
   "errorClass" : "UNRESOLVED_COLUMN",
+  "errorSubClass" : "WITH_SUGGESTION",
   "sqlState" : "42000",
   "messageParameters" : {
     "objectName" : "`a`",
-    "objectList" : "`one`"
+    "proposal" : "`one`"
   }
 }
 
diff --git a/sql/core/src/test/resources/sql-tests/results/postgreSQL/select_implicit.sql.out b/sql/core/src/test/resources/sql-tests/results/postgreSQL/select_implicit.sql.out
index cd5bc39d7c6..6f1e11e0afe 100755
--- a/sql/core/src/test/resources/sql-tests/results/postgreSQL/select_implicit.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/postgreSQL/select_implicit.sql.out
@@ -121,10 +121,11 @@ struct<>
 org.apache.spark.sql.AnalysisException
 {
   "errorClass" : "UNRESOLVED_COLUMN",
+  "errorSubClass" : "WITH_SUGGESTION",
   "sqlState" : "42000",
   "messageParameters" : {
     "objectName" : "`b`",
-    "objectList" : "`count(1)`"
+    "proposal" : "`count(1)`"
   }
 }
 
@@ -340,10 +341,11 @@ struct<>
 org.apache.spark.sql.AnalysisException
 {
   "errorClass" : "UNRESOLVED_COLUMN",
+  "errorSubClass" : "WITH_SUGGESTION",
   "sqlState" : "42000",
   "messageParameters" : {
     "objectName" : "`b`",
-    "objectList" : "`count(a)`"
+    "proposal" : "`count(a)`"
   }
 }
 
diff --git a/sql/core/src/test/resources/sql-tests/results/postgreSQL/union.sql.out b/sql/core/src/test/resources/sql-tests/results/postgreSQL/union.sql.out
index a1055bcb0dd..20b26d895b0 100644
--- a/sql/core/src/test/resources/sql-tests/results/postgreSQL/union.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/postgreSQL/union.sql.out
@@ -583,10 +583,11 @@ struct<>
 org.apache.spark.sql.AnalysisException
 {
   "errorClass" : "UNRESOLVED_COLUMN",
+  "errorSubClass" : "WITH_SUGGESTION",
   "sqlState" : "42000",
   "messageParameters" : {
     "objectName" : "`q2`",
-    "objectList" : "`int8_tbl`.`q1`"
+    "proposal" : "`int8_tbl`.`q1`"
   }
 }
 
diff --git a/sql/core/src/test/resources/sql-tests/results/query_regex_column.sql.out b/sql/core/src/test/resources/sql-tests/results/query_regex_column.sql.out
index 6d82f5b290f..16f6ad9d449 100644
--- a/sql/core/src/test/resources/sql-tests/results/query_regex_column.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/query_regex_column.sql.out
@@ -35,10 +35,11 @@ struct<>
 org.apache.spark.sql.AnalysisException
 {
   "errorClass" : "UNRESOLVED_COLUMN",
+  "errorSubClass" : "WITH_SUGGESTION",
   "sqlState" : "42000",
   "messageParameters" : {
     "objectName" : "`(a)?+.+`",
-    "objectList" : "`testdata2`.`A`, `testdata2`.`B`, `testdata2`.`c`, `testdata2`.`d`"
+    "proposal" : "`testdata2`.`A`, `testdata2`.`B`, `testdata2`.`c`, `testdata2`.`d`"
   }
 }
 
@@ -51,10 +52,11 @@ struct<>
 org.apache.spark.sql.AnalysisException
 {
   "errorClass" : "UNRESOLVED_COLUMN",
+  "errorSubClass" : "WITH_SUGGESTION",
   "sqlState" : "42000",
   "messageParameters" : {
     "objectName" : "`t`.`(a)?+.+`",
-    "objectList" : "`t`.`A`, `t`.`B`, `t`.`c`, `t`.`d`"
+    "proposal" : "`t`.`A`, `t`.`B`, `t`.`c`, `t`.`d`"
   }
 }
 
@@ -67,10 +69,11 @@ struct<>
 org.apache.spark.sql.AnalysisException
 {
   "errorClass" : "UNRESOLVED_COLUMN",
+  "errorSubClass" : "WITH_SUGGESTION",
   "sqlState" : "42000",
   "messageParameters" : {
     "objectName" : "`(a|b)`",
-    "objectList" : "`testdata2`.`A`, `testdata2`.`B`, `testdata2`.`c`, `testdata2`.`d`"
+    "proposal" : "`testdata2`.`A`, `testdata2`.`B`, `testdata2`.`c`, `testdata2`.`d`"
   }
 }
 
@@ -83,10 +86,11 @@ struct<>
 org.apache.spark.sql.AnalysisException
 {
   "errorClass" : "UNRESOLVED_COLUMN",
+  "errorSubClass" : "WITH_SUGGESTION",
   "sqlState" : "42000",
   "messageParameters" : {
     "objectName" : "`(a|b)?+.+`",
-    "objectList" : "`testdata2`.`A`, `testdata2`.`B`, `testdata2`.`c`, `testdata2`.`d`"
+    "proposal" : "`testdata2`.`A`, `testdata2`.`B`, `testdata2`.`c`, `testdata2`.`d`"
   }
 }
 
@@ -99,10 +103,11 @@ struct<>
 org.apache.spark.sql.AnalysisException
 {
   "errorClass" : "UNRESOLVED_COLUMN",
+  "errorSubClass" : "WITH_SUGGESTION",
   "sqlState" : "42000",
   "messageParameters" : {
     "objectName" : "`(a|b)?+.+`",
-    "objectList" : "`testdata2`.`A`, `testdata2`.`B`, `testdata2`.`c`, `testdata2`.`d`"
+    "proposal" : "`testdata2`.`A`, `testdata2`.`B`, `testdata2`.`c`, `testdata2`.`d`"
   }
 }
 
@@ -115,10 +120,11 @@ struct<>
 org.apache.spark.sql.AnalysisException
 {
   "errorClass" : "UNRESOLVED_COLUMN",
+  "errorSubClass" : "WITH_SUGGESTION",
   "sqlState" : "42000",
   "messageParameters" : {
     "objectName" : "`(a)`",
-    "objectList" : "`testdata2`.`A`, `testdata2`.`B`, `testdata2`.`c`, `testdata2`.`d`"
+    "proposal" : "`testdata2`.`A`, `testdata2`.`B`, `testdata2`.`c`, `testdata2`.`d`"
   }
 }
 
@@ -342,10 +348,11 @@ struct<>
 org.apache.spark.sql.AnalysisException
 {
   "errorClass" : "UNRESOLVED_COLUMN",
+  "errorSubClass" : "WITH_SUGGESTION",
   "sqlState" : "42000",
   "messageParameters" : {
     "objectName" : "`(a)`",
-    "objectList" : "`testdata3`.`a`, `testdata3`.`b`"
+    "proposal" : "`testdata3`.`a`, `testdata3`.`b`"
   }
 }
 
@@ -358,9 +365,10 @@ struct<>
 org.apache.spark.sql.AnalysisException
 {
   "errorClass" : "UNRESOLVED_COLUMN",
+  "errorSubClass" : "WITH_SUGGESTION",
   "sqlState" : "42000",
   "messageParameters" : {
     "objectName" : "`(a)?+.+`",
-    "objectList" : "`testdata3`.`a`, `testdata3`.`b`"
+    "proposal" : "`testdata3`.`a`, `testdata3`.`b`"
   }
 }
diff --git a/sql/core/src/test/resources/sql-tests/results/subquery/negative-cases/invalid-correlation.sql.out b/sql/core/src/test/resources/sql-tests/results/subquery/negative-cases/invalid-correlation.sql.out
index 9ae9778b106..0b7581afc25 100644
--- a/sql/core/src/test/resources/sql-tests/results/subquery/negative-cases/invalid-correlation.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/subquery/negative-cases/invalid-correlation.sql.out
@@ -136,9 +136,10 @@ struct<>
 org.apache.spark.sql.AnalysisException
 {
   "errorClass" : "UNRESOLVED_COLUMN",
+  "errorSubClass" : "WITH_SUGGESTION",
   "sqlState" : "42000",
   "messageParameters" : {
     "objectName" : "`t1a`",
-    "objectList" : "`t2`.`t2a`, `t2`.`t2b`, `t2`.`t2c`"
+    "proposal" : "`t2`.`t2a`, `t2`.`t2b`, `t2`.`t2c`"
   }
 }
diff --git a/sql/core/src/test/resources/sql-tests/results/table-aliases.sql.out b/sql/core/src/test/resources/sql-tests/results/table-aliases.sql.out
index d1e3357a9cc..41998dcab17 100644
--- a/sql/core/src/test/resources/sql-tests/results/table-aliases.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/table-aliases.sql.out
@@ -59,10 +59,11 @@ struct<>
 org.apache.spark.sql.AnalysisException
 {
   "errorClass" : "UNRESOLVED_COLUMN",
+  "errorSubClass" : "WITH_SUGGESTION",
   "sqlState" : "42000",
   "messageParameters" : {
     "objectName" : "`a`",
-    "objectList" : "`t`.`c`, `t`.`d`"
+    "proposal" : "`t`.`c`, `t`.`d`"
   }
 }
 
diff --git a/sql/core/src/test/resources/sql-tests/results/udf/postgreSQL/udf-aggregates_part1.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/postgreSQL/udf-aggregates_part1.sql.out
index f3b4c72a488..fc11234112b 100644
--- a/sql/core/src/test/resources/sql-tests/results/udf/postgreSQL/udf-aggregates_part1.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/udf/postgreSQL/udf-aggregates_part1.sql.out
@@ -488,9 +488,10 @@ struct<>
 org.apache.spark.sql.AnalysisException
 {
   "errorClass" : "UNRESOLVED_COLUMN",
+  "errorSubClass" : "WITH_SUGGESTION",
   "sqlState" : "42000",
   "messageParameters" : {
     "objectName" : "`o`.`unique1`",
-    "objectList" : "`i`.`unique1`, `i`.`unique2`, `i`.`hundred`, `i`.`even`, `i`.`four`, `i`.`stringu1`, `i`.`ten`, `i`.`odd`, `i`.`string4`, `i`.`stringu2`, `i`.`tenthous`, `i`.`twenty`, `i`.`two`, `i`.`thousand`, `i`.`fivethous`, `i`.`twothousand`"
+    "proposal" : "`i`.`unique1`, `i`.`unique2`, `i`.`hundred`, `i`.`even`, `i`.`four`"
   }
 }
diff --git a/sql/core/src/test/resources/sql-tests/results/udf/postgreSQL/udf-join.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/postgreSQL/udf-join.sql.out
index 5ba04859797..328f4de9aa1 100644
--- a/sql/core/src/test/resources/sql-tests/results/udf/postgreSQL/udf-join.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/udf/postgreSQL/udf-join.sql.out
@@ -3275,10 +3275,11 @@ struct<>
 org.apache.spark.sql.AnalysisException
 {
   "errorClass" : "UNRESOLVED_COLUMN",
+  "errorSubClass" : "WITH_SUGGESTION",
   "sqlState" : "42000",
   "messageParameters" : {
     "objectName" : "`y`.`f1`",
-    "objectList" : "`j`.`f1`, `j`.`f1`, `x`.`q1`, `x`.`q2`"
+    "proposal" : "`j`.`f1`, `j`.`f1`, `x`.`q1`, `x`.`q2`"
   }
 }
 
@@ -3301,10 +3302,11 @@ struct<>
 org.apache.spark.sql.AnalysisException
 {
   "errorClass" : "UNRESOLVED_COLUMN",
+  "errorSubClass" : "WITH_SUGGESTION",
   "sqlState" : "42000",
   "messageParameters" : {
     "objectName" : "`t1`.`uunique1`",
-    "objectList" : "`t1`.`unique1`, `t2`.`unique1`, `t1`.`unique2`, `t2`.`unique2`, `t1`.`hundred`, `t2`.`hundred`, `t1`.`stringu1`, `t1`.`even`, `t1`.`four`, `t1`.`string4`, `t2`.`stringu1`, `t1`.`stringu2`, `t1`.`ten`, `t1`.`tenthous`, `t2`.`even`, `t2`.`four`, `t1`.`odd`, `t2`.`string4`, `t2`.`stringu2`, `t2`.`ten`, `t2`.`tenthous`, `t1`.`thousand`, `t1`.`twenty`, `t1`.`two`, `t1`.`fivethous`, `t2`.`odd`, `t2`.`thousand`, `t2`.`twenty`, `t2`.`two`, `t2`.`fivethous`, `t1`.`twothousand` [...]
+    "proposal" : "`t1`.`unique1`, `t2`.`unique1`, `t1`.`unique2`, `t2`.`unique2`, `t1`.`hundred`"
   }
 }
 
@@ -3318,10 +3320,11 @@ struct<>
 org.apache.spark.sql.AnalysisException
 {
   "errorClass" : "UNRESOLVED_COLUMN",
+  "errorSubClass" : "WITH_SUGGESTION",
   "sqlState" : "42000",
   "messageParameters" : {
     "objectName" : "`t2`.`uunique1`",
-    "objectList" : "`t2`.`unique1`, `t1`.`unique1`, `t2`.`unique2`, `t1`.`unique2`, `t2`.`hundred`, `t1`.`hundred`, `t2`.`stringu1`, `t2`.`even`, `t2`.`four`, `t2`.`string4`, `t1`.`stringu1`, `t2`.`stringu2`, `t2`.`ten`, `t2`.`tenthous`, `t1`.`even`, `t1`.`four`, `t2`.`odd`, `t1`.`string4`, `t1`.`stringu2`, `t1`.`ten`, `t1`.`tenthous`, `t2`.`thousand`, `t2`.`twenty`, `t2`.`two`, `t2`.`fivethous`, `t1`.`odd`, `t1`.`thousand`, `t1`.`twenty`, `t1`.`two`, `t1`.`fivethous`, `t2`.`twothousand` [...]
+    "proposal" : "`t2`.`unique1`, `t1`.`unique1`, `t2`.`unique2`, `t1`.`unique2`, `t2`.`hundred`"
   }
 }
 
@@ -3335,10 +3338,11 @@ struct<>
 org.apache.spark.sql.AnalysisException
 {
   "errorClass" : "UNRESOLVED_COLUMN",
+  "errorSubClass" : "WITH_SUGGESTION",
   "sqlState" : "42000",
   "messageParameters" : {
     "objectName" : "`uunique1`",
-    "objectList" : "`t1`.`unique1`, `t2`.`unique1`, `t1`.`unique2`, `t2`.`unique2`, `t1`.`even`, `t2`.`even`, `t1`.`four`, `t2`.`four`, `t1`.`ten`, `t2`.`ten`, `t1`.`hundred`, `t2`.`hundred`, `t1`.`odd`, `t2`.`odd`, `t1`.`two`, `t2`.`two`, `t1`.`stringu1`, `t2`.`stringu1`, `t1`.`twenty`, `t2`.`twenty`, `t1`.`string4`, `t2`.`string4`, `t1`.`stringu2`, `t2`.`stringu2`, `t1`.`tenthous`, `t2`.`tenthous`, `t1`.`thousand`, `t2`.`thousand`, `t1`.`fivethous`, `t2`.`fivethous`, `t1`.`twothousand` [...]
+    "proposal" : "`t1`.`unique1`, `t2`.`unique1`, `t1`.`unique2`, `t2`.`unique2`, `t1`.`even`"
   }
 }
 
@@ -3542,10 +3546,10 @@ struct<>
 org.apache.spark.sql.AnalysisException
 {
   "errorClass" : "UNRESOLVED_COLUMN",
+  "errorSubClass" : "WITHOUT_SUGGESTION",
   "sqlState" : "42000",
   "messageParameters" : {
-    "objectName" : "`f1`",
-    "objectList" : ""
+    "objectName" : "`f1`"
   }
 }
 
@@ -3558,10 +3562,10 @@ struct<>
 org.apache.spark.sql.AnalysisException
 {
   "errorClass" : "UNRESOLVED_COLUMN",
+  "errorSubClass" : "WITHOUT_SUGGESTION",
   "sqlState" : "42000",
   "messageParameters" : {
-    "objectName" : "`a`.`f1`",
-    "objectList" : ""
+    "objectName" : "`a`.`f1`"
   }
 }
 
@@ -3574,10 +3578,10 @@ struct<>
 org.apache.spark.sql.AnalysisException
 {
   "errorClass" : "UNRESOLVED_COLUMN",
+  "errorSubClass" : "WITHOUT_SUGGESTION",
   "sqlState" : "42000",
   "messageParameters" : {
-    "objectName" : "`f1`",
-    "objectList" : ""
+    "objectName" : "`f1`"
   }
 }
 
@@ -3590,10 +3594,10 @@ struct<>
 org.apache.spark.sql.AnalysisException
 {
   "errorClass" : "UNRESOLVED_COLUMN",
+  "errorSubClass" : "WITHOUT_SUGGESTION",
   "sqlState" : "42000",
   "messageParameters" : {
-    "objectName" : "`a`.`f1`",
-    "objectList" : ""
+    "objectName" : "`a`.`f1`"
   }
 }
 
diff --git a/sql/core/src/test/resources/sql-tests/results/udf/postgreSQL/udf-select_having.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/postgreSQL/udf-select_having.sql.out
index 37ab12fbf7f..f46dd188ec4 100644
--- a/sql/core/src/test/resources/sql-tests/results/udf/postgreSQL/udf-select_having.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/udf/postgreSQL/udf-select_having.sql.out
@@ -151,10 +151,11 @@ struct<>
 org.apache.spark.sql.AnalysisException
 {
   "errorClass" : "UNRESOLVED_COLUMN",
+  "errorSubClass" : "WITH_SUGGESTION",
   "sqlState" : "42000",
   "messageParameters" : {
     "objectName" : "`a`",
-    "objectList" : "`one`"
+    "proposal" : "`one`"
   }
 }
 
diff --git a/sql/core/src/test/resources/sql-tests/results/udf/postgreSQL/udf-select_implicit.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/postgreSQL/udf-select_implicit.sql.out
index db2a855bf0f..fa7693046b8 100755
--- a/sql/core/src/test/resources/sql-tests/results/udf/postgreSQL/udf-select_implicit.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/udf/postgreSQL/udf-select_implicit.sql.out
@@ -124,10 +124,11 @@ struct<>
 org.apache.spark.sql.AnalysisException
 {
   "errorClass" : "UNRESOLVED_COLUMN",
+  "errorSubClass" : "WITH_SUGGESTION",
   "sqlState" : "42000",
   "messageParameters" : {
     "objectName" : "`b`",
-    "objectList" : "`udf(count(1))`"
+    "proposal" : "`udf(count(1))`"
   }
 }
 
@@ -343,10 +344,11 @@ struct<>
 org.apache.spark.sql.AnalysisException
 {
   "errorClass" : "UNRESOLVED_COLUMN",
+  "errorSubClass" : "WITH_SUGGESTION",
   "sqlState" : "42000",
   "messageParameters" : {
     "objectName" : "`b`",
-    "objectList" : "`udf(count(udf(a)))`"
+    "proposal" : "`udf(count(udf(a)))`"
   }
 }
 
diff --git a/sql/core/src/test/resources/sql-tests/results/udf/udf-group-by.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/udf-group-by.sql.out
index cf1cc0dce01..95c4da26299 100644
--- a/sql/core/src/test/resources/sql-tests/results/udf/udf-group-by.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/udf/udf-group-by.sql.out
@@ -201,10 +201,11 @@ struct<>
 org.apache.spark.sql.AnalysisException
 {
   "errorClass" : "UNRESOLVED_COLUMN",
+  "errorSubClass" : "WITH_SUGGESTION",
   "sqlState" : "42000",
   "messageParameters" : {
     "objectName" : "`k`",
-    "objectList" : "`testdata`.`a`, `testdata`.`b`"
+    "proposal" : "`testdata`.`a`, `testdata`.`b`"
   }
 }
 
diff --git a/sql/core/src/test/resources/sql-tests/results/udf/udf-pivot.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/udf-pivot.sql.out
index c097acf18b1..dec5e2b5268 100644
--- a/sql/core/src/test/resources/sql-tests/results/udf/udf-pivot.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/udf/udf-pivot.sql.out
@@ -231,10 +231,11 @@ struct<>
 org.apache.spark.sql.AnalysisException
 {
   "errorClass" : "UNRESOLVED_COLUMN",
+  "errorSubClass" : "WITH_SUGGESTION",
   "sqlState" : "42000",
   "messageParameters" : {
     "objectName" : "`year`",
-    "objectList" : "`__auto_generated_subquery_name`.`course`, `__auto_generated_subquery_name`.`earnings`"
+    "proposal" : "`__auto_generated_subquery_name`.`course`, `__auto_generated_subquery_name`.`earnings`"
   }
 }
 
@@ -340,10 +341,11 @@ struct<>
 org.apache.spark.sql.AnalysisException
 {
   "errorClass" : "UNRESOLVED_COLUMN",
+  "errorSubClass" : "WITH_SUGGESTION",
   "sqlState" : "42000",
   "messageParameters" : {
     "objectName" : "`s`",
-    "objectList" : "`coursesales`.`year`, `coursesales`.`course`, `coursesales`.`earnings`"
+    "proposal" : "`coursesales`.`year`, `coursesales`.`course`, `coursesales`.`earnings`"
   }
 }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
index ee41b1efba2..a910bdf9db1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
@@ -2514,11 +2514,12 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession {
     }
     assert(ex2.getMessage.contains("data type mismatch: argument 1 requires array type"))
 
-    val ex3 = intercept[AnalysisException] {
-      df.selectExpr("transform(a, x -> x)")
-    }
-    assert(ex3.getErrorClass == "UNRESOLVED_COLUMN")
-    assert(ex3.messageParameters.head == "`a`")
+    checkError(
+      exception =
+        intercept[AnalysisException](df.selectExpr("transform(a, x -> x)")),
+      errorClass = "UNRESOLVED_COLUMN",
+      errorSubClass = Some("WITH_SUGGESTION"),
+      parameters = Map("objectName" -> "`a`", "proposal" -> "`i`, `s`"))
   }
 
   test("map_filter") {
@@ -2586,11 +2587,12 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession {
     }
     assert(ex3a.getMessage.contains("data type mismatch: argument 1 requires map type"))
 
-    val ex4 = intercept[AnalysisException] {
-      df.selectExpr("map_filter(a, (k, v) -> k > v)")
-    }
-    assert(ex4.getErrorClass == "UNRESOLVED_COLUMN")
-    assert(ex4.messageParameters.head == "`a`")
+    checkError(
+      exception =
+        intercept[AnalysisException](df.selectExpr("map_filter(a, (k, v) -> k > v)")),
+      errorClass = "UNRESOLVED_COLUMN",
+      errorSubClass = Some("WITH_SUGGESTION"),
+      parameters = Map("objectName" -> "`a`", "proposal" -> "`i`, `s`"))
   }
 
   test("filter function - array for primitive type not containing null") {
@@ -2746,11 +2748,12 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession {
     }
     assert(ex3a.getMessage.contains("data type mismatch: argument 2 requires boolean type"))
 
-    val ex4 = intercept[AnalysisException] {
-      df.selectExpr("filter(a, x -> x)")
-    }
-    assert(ex4.getErrorClass == "UNRESOLVED_COLUMN")
-    assert(ex4.messageParameters.head == "`a`")
+    checkError(
+      exception =
+        intercept[AnalysisException](df.selectExpr("filter(a, x -> x)")),
+      errorClass = "UNRESOLVED_COLUMN",
+      errorSubClass = Some("WITH_SUGGESTION"),
+      parameters = Map("objectName" -> "`a`", "proposal" -> "`i`, `s`"))
   }
 
   test("exists function - array for primitive type not containing null") {
@@ -2879,11 +2882,12 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession {
     }
     assert(ex3a.getMessage.contains("data type mismatch: argument 2 requires boolean type"))
 
-    val ex4 = intercept[AnalysisException] {
-      df.selectExpr("exists(a, x -> x)")
-    }
-    assert(ex4.getErrorClass == "UNRESOLVED_COLUMN")
-    assert(ex4.messageParameters.head == "`a`")
+    checkError(
+      exception =
+        intercept[AnalysisException](df.selectExpr("exists(a, x -> x)")),
+      errorClass = "UNRESOLVED_COLUMN",
+      errorSubClass = Some("WITH_SUGGESTION"),
+      parameters = Map("objectName" -> "`a`", "proposal" -> "`i`, `s`"))
   }
 
   test("forall function - array for primitive type not containing null") {
@@ -3026,17 +3030,19 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession {
     }
     assert(ex3a.getMessage.contains("data type mismatch: argument 2 requires boolean type"))
 
-    val ex4 = intercept[AnalysisException] {
-      df.selectExpr("forall(a, x -> x)")
-    }
-    assert(ex4.getErrorClass == "UNRESOLVED_COLUMN")
-    assert(ex4.messageParameters.head == "`a`")
+    checkError(
+      exception =
+        intercept[AnalysisException](df.selectExpr("forall(a, x -> x)")),
+      errorClass = "UNRESOLVED_COLUMN",
+      errorSubClass = Some("WITH_SUGGESTION"),
+      parameters = Map("objectName" -> "`a`", "proposal" -> "`i`, `s`"))
 
-    val ex4a = intercept[AnalysisException] {
-      df.select(forall(col("a"), x => x))
-    }
-    assert(ex4a.getErrorClass == "UNRESOLVED_COLUMN")
-    assert(ex4a.messageParameters.head == "`a`")
+    checkError(
+      exception =
+        intercept[AnalysisException](df.select(forall(col("a"), x => x))),
+      errorClass = "UNRESOLVED_COLUMN",
+      errorSubClass = Some("WITH_SUGGESTION"),
+      parameters = Map("objectName" -> "`a`", "proposal" -> "`i`, `s`"))
   }
 
   test("aggregate function - array for primitive type not containing null") {
@@ -3210,11 +3216,12 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession {
     }
     assert(ex4a.getMessage.contains("data type mismatch: argument 3 requires int type"))
 
-    val ex5 = intercept[AnalysisException] {
-      df.selectExpr("aggregate(a, 0, (acc, x) -> x)")
-    }
-    assert(ex5.getErrorClass == "UNRESOLVED_COLUMN")
-    assert(ex5.messageParameters.head == "`a`")
+    checkError(
+      exception =
+        intercept[AnalysisException](df.selectExpr("aggregate(a, 0, (acc, x) -> x)")),
+      errorClass = "UNRESOLVED_COLUMN",
+      errorSubClass = Some("WITH_SUGGESTION"),
+      parameters = Map("objectName" -> "`a`", "proposal" -> "`i`, `s`"))
   }
 
   test("map_zip_with function - map of primitive types") {
@@ -3764,11 +3771,13 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession {
       df.select(zip_with(df("i"), df("a2"), (acc, x) => x))
     }
     assert(ex3a.getMessage.contains("data type mismatch: argument 1 requires array type"))
-    val ex4 = intercept[AnalysisException] {
-      df.selectExpr("zip_with(a1, a, (acc, x) -> x)")
-    }
-    assert(ex4.getErrorClass == "UNRESOLVED_COLUMN")
-    assert(ex4.messageParameters.head == "`a`")
+
+    checkError(
+      exception =
+        intercept[AnalysisException](df.selectExpr("zip_with(a1, a, (acc, x) -> x)")),
+      errorClass = "UNRESOLVED_COLUMN",
+      errorSubClass = Some("WITH_SUGGESTION"),
+      parameters = Map("objectName" -> "`a`", "proposal" -> "`a1`, `a2`, `i`"))
   }
 
   private def assertValuesDoNotChangeAfterCoalesceOrUnion(v: Column): Unit = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSelfJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSelfJoinSuite.scala
index 5052dc0fc7f..067daa044c1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSelfJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSelfJoinSuite.scala
@@ -483,8 +483,9 @@ class DataFrameSelfJoinSuite extends QueryTest with SharedSparkSession {
       )
       checkError(ex,
         errorClass = "UNRESOLVED_COLUMN",
+        errorSubClass = Some("WITH_SUGGESTION"),
         parameters = Map("objectName" -> "`df1`.`timeStr`",
-          "objectList" -> "`df3`.`timeStr`, `df1`.`tsStr`"))
+          "proposal" -> "`df3`.`timeStr`, `df1`.`tsStr`"))
     }
   }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameToSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameToSchemaSuite.scala
index b61b83896e5..882ba7b5395 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameToSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameToSchemaSuite.scala
@@ -59,9 +59,10 @@ class DataFrameToSchemaSuite extends QueryTest with SharedSparkSession {
     checkError(
       exception = e,
       errorClass = "UNRESOLVED_COLUMN",
+      errorSubClass = Some("WITH_SUGGESTION"),
       parameters = Map(
         "objectName" -> "`non_exist`",
-        "objectList" -> "`i`, `j`"))
+        "proposal" -> "`i`, `j`"))
   }
 
   test("negative: ambiguous column") {
@@ -161,6 +162,7 @@ class DataFrameToSchemaSuite extends QueryTest with SharedSparkSession {
     checkError(
       exception = e,
       errorClass = "UNRESOLVED_FIELD",
+      errorSubClass = Some("WITH_SUGGESTION"),
       parameters = Map(
         "fieldName" -> "`non_exist`",
         "columnPath" -> "`struct`",
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
index 2581afd1df3..e1f3cc060c8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
@@ -404,8 +404,13 @@ class DataFrameWindowFunctionsSuite extends QueryTest
     val df = Seq((1, "1")).toDF("key", "value")
     val e = intercept[AnalysisException](
       df.select($"key", count("invalid").over()))
-    assert(e.getErrorClass == "UNRESOLVED_COLUMN")
-    assert(e.messageParameters.sameElements(Array("`invalid`", "`value`, `key`")))
+    checkError(
+      exception = e,
+      errorClass = "UNRESOLVED_COLUMN",
+      errorSubClass = Some("WITH_SUGGESTION"),
+      parameters = Map(
+        "objectName" -> "`invalid`",
+        "proposal" -> "`value`, `key`"))
   }
 
   test("numerical aggregate functions on string column") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 463e79166fc..d7ea766b21b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -322,19 +322,27 @@ class DatasetSuite extends QueryTest
     val ds = Seq(("a", 1), ("b", 2), ("c", 3)).toDS()
 
     withSQLConf(SQLConf.SUPPORT_QUOTED_REGEX_COLUMN_NAME.key -> "false") {
-      var e = intercept[AnalysisException] {
-        ds.select(expr("`(_1)?+.+`").as[Int])
-      }
-      assert(e.getErrorClass == "UNRESOLVED_COLUMN")
-      assert(e.messageParameters.head == "`(_1)?+.+`")
-
-      e = intercept[AnalysisException] {
-        ds.select(expr("`(_1|_2)`").as[Int])
-      }
-      assert(e.getErrorClass == "UNRESOLVED_COLUMN")
-      assert(e.messageParameters.head == "`(_1|_2)`")
+      checkError(
+        exception = intercept[AnalysisException] {
+          ds.select(expr("`(_1)?+.+`").as[Int])
+        },
+        errorClass = "UNRESOLVED_COLUMN",
+        errorSubClass = Some("WITH_SUGGESTION"),
+        parameters = Map(
+          "objectName" -> "`(_1)?+.+`",
+          "proposal" -> "`_1`, `_2`"))
+
+      checkError(
+        exception = intercept[AnalysisException] {
+          ds.select(expr("`(_1|_2)`").as[Int])
+        },
+        errorClass = "UNRESOLVED_COLUMN",
+        errorSubClass = Some("WITH_SUGGESTION"),
+        parameters = Map(
+          "objectName" -> "`(_1|_2)`",
+          "proposal" -> "`_1`, `_2`"))
 
-      e = intercept[AnalysisException] {
+      var e = intercept[AnalysisException] {
         ds.select(ds("`(_1)?+.+`"))
       }
       assert(e.getMessage.contains("Cannot resolve column name \"`(_1)?+.+`\""))
@@ -931,11 +939,13 @@ class DatasetSuite extends QueryTest
 
   test("verify mismatching field names fail with a good error") {
     val ds = Seq(ClassData("a", 1)).toDS()
-    val e = intercept[AnalysisException] {
-      ds.as[ClassData2]
-    }
-    assert(e.getErrorClass == "UNRESOLVED_COLUMN")
-    assert(e.messageParameters.sameElements(Array("`c`", "`a`, `b`")))
+    checkError(
+      exception = intercept[AnalysisException] (ds.as[ClassData2]),
+      errorClass = "UNRESOLVED_COLUMN",
+      errorSubClass = Some("WITH_SUGGESTION"),
+      parameters = Map(
+        "objectName" -> "`c`",
+        "proposal" -> "`a`, `b`"))
   }
 
   test("runtime nullability check") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetUnpivotSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetUnpivotSuite.scala
index b81383149a5..5c14e9df09d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetUnpivotSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetUnpivotSuite.scala
@@ -362,9 +362,10 @@ class DatasetUnpivotSuite extends QueryTest
     checkError(
       exception = e1,
       errorClass = "UNRESOLVED_COLUMN",
+      errorSubClass = Some("WITH_SUGGESTION"),
       parameters = Map(
         "objectName" -> "`1`",
-        "objectList" -> "`id`, `int1`, `str1`, `str2`, `long1`"))
+        "proposal" -> "`id`, `int1`, `str1`, `str2`, `long1`"))
 
     // unpivoting where value column does not exist
     val e2 = intercept[AnalysisException] {
@@ -378,9 +379,10 @@ class DatasetUnpivotSuite extends QueryTest
     checkError(
       exception = e2,
       errorClass = "UNRESOLVED_COLUMN",
+      errorSubClass = Some("WITH_SUGGESTION"),
       parameters = Map(
         "objectName" -> "`does`",
-        "objectList" -> "`id`, `int1`, `long1`, `str1`, `str2`"))
+        "proposal" -> "`id`, `int1`, `long1`, `str1`, `str2`"))
 
     // unpivoting with empty list of value columns
     // where potential value columns are of incompatible types
@@ -499,10 +501,11 @@ class DatasetUnpivotSuite extends QueryTest
     checkError(
       exception = e,
       errorClass = "UNRESOLVED_COLUMN",
+      errorSubClass = Some("WITH_SUGGESTION"),
       // expected message is wrong: https://issues.apache.org/jira/browse/SPARK-39783
       parameters = Map(
         "objectName" -> "`an`.`id`",
-        "objectList" -> "`an`.`id`, `int1`, `long1`, `str`.`one`, `str`.`two`"))
+        "proposal" -> "`an`.`id`, `int1`, `long1`, `str`.`one`, `str`.`two`"))
   }
 
   test("unpivot with struct fields") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala
index 7fd6a5dbea0..6d1b4eaf36b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala
@@ -165,10 +165,12 @@ trait SQLInsertTestSuite extends QueryTest with SQLTestUtils {
     withTable("t1") {
       val cols = Seq("c1", "c2", "c3")
       createTable("t1", cols, Seq("int", "long", "string"))
-      val e1 = intercept[AnalysisException](sql(s"INSERT INTO t1 (c1, c2, c4) values(1, 2, 3)"))
-      assert(e1.getMessage.contains(
-        "[UNRESOLVED_COLUMN] A column or function parameter with name `c4` cannot be resolved. " +
-          "Did you mean one of the following? [`c1`, `c2`, `c3`]"))
+      checkError(
+        exception =
+          intercept[AnalysisException](sql(s"INSERT INTO t1 (c1, c2, c4) values(1, 2, 3)")),
+        errorClass = "UNRESOLVED_COLUMN",
+        errorSubClass = Some("WITH_SUGGESTION"),
+        parameters = Map("objectName" -> "`c4`", "proposal" -> "`c1`, `c2`, `c3`"))
     }
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index 1ae5ae68d07..a0ccfb10ca8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -886,9 +886,14 @@ class SubquerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
   test("SPARK-20688: correctly check analysis for scalar sub-queries") {
     withTempView("t") {
       Seq(1 -> "a").toDF("i", "j").createOrReplaceTempView("t")
-      val e = intercept[AnalysisException](sql("SELECT (SELECT count(*) FROM t WHERE a = 1)"))
-      assert(e.getErrorClass == "UNRESOLVED_COLUMN")
-      assert(e.messageParameters.sameElements(Array("`a`", "`t`.`i`, `t`.`j`")))
+      checkError(
+        exception =
+          intercept[AnalysisException](sql("SELECT (SELECT count(*) FROM t WHERE a = 1)")),
+        errorClass = "UNRESOLVED_COLUMN",
+        errorSubClass = Some("WITH_SUGGESTION"),
+        parameters = Map(
+          "objectName" -> "`a`",
+          "proposal" -> "`t`.`i`, `t`.`j`"))
     }
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
index ba8af623893..675426720b0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
@@ -724,9 +724,14 @@ class UDFSuite extends QueryTest with SharedSparkSession {
     val df = spark.range(1)
       .select(lit(50).as("a"))
       .select(struct("a").as("col"))
-    val error = intercept[AnalysisException](df.select(myUdf(Column("col"))))
-    assert(error.getErrorClass == "UNRESOLVED_COLUMN")
-    assert(error.messageParameters.sameElements(Array("`b`", "`a`")))
+    checkError(
+      exception =
+        intercept[AnalysisException](df.select(myUdf(Column("col")))),
+      errorClass = "UNRESOLVED_COLUMN",
+      errorSubClass = Some("WITH_SUGGESTION"),
+      parameters = Map(
+        "objectName" -> "`b`",
+        "proposal" -> "`a`"))
   }
 
   test("wrong order of input fields for case class") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index d14e961c03a..f621ea14325 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -121,6 +121,7 @@ class DataSourceV2SQLSuiteV1Filter extends DataSourceV2SQLSuite with AlterTableT
       assertAnalysisErrorClass(
         s"DESCRIBE $t invalid_col",
         "UNRESOLVED_COLUMN",
+        "WITH_SUGGESTION",
         Array("`invalid_col`", "`testcat`.`tbl`.`id`, `testcat`.`tbl`.`data`"))
     }
   }
@@ -992,11 +993,12 @@ class DataSourceV2SQLSuiteV1Filter extends DataSourceV2SQLSuite with AlterTableT
       sql("USE testcat.ns1.ns2")
       check("tbl")
 
-      val ex = intercept[AnalysisException] {
-        sql(s"SELECT ns1.ns2.ns3.tbl.id from $t")
-      }
-      assert(ex.getErrorClass == "UNRESOLVED_COLUMN")
-      assert(ex.messageParameters.head == "`ns1`.`ns2`.`ns3`.`tbl`.`id`")
+      assertAnalysisErrorClass(
+        s"SELECT ns1.ns2.ns3.tbl.id from $t",
+        "UNRESOLVED_COLUMN",
+        "WITH_SUGGESTION",
+        Array("`ns1`.`ns2`.`ns3`.`tbl`.`id`",
+          "`testcat`.`ns1`.`ns2`.`tbl`.`id`, `testcat`.`ns1`.`ns2`.`tbl`.`point`"))
     }
   }
 
@@ -1568,6 +1570,7 @@ class DataSourceV2SQLSuiteV1Filter extends DataSourceV2SQLSuite with AlterTableT
       assertAnalysisErrorClass(
         s"UPDATE $t SET dummy='abc'",
         "UNRESOLVED_COLUMN",
+        "WITH_SUGGESTION",
         Array(
           "`dummy`",
           "`testcat`.`ns1`.`ns2`.`tbl`.`p`, `testcat`.`ns1`.`ns2`.`tbl`.`id`, " +
@@ -1575,9 +1578,11 @@ class DataSourceV2SQLSuiteV1Filter extends DataSourceV2SQLSuite with AlterTableT
       assertAnalysisErrorClass(
         s"UPDATE $t SET name='abc' WHERE dummy=1",
         "UNRESOLVED_COLUMN",
+        "WITH_SUGGESTION",
         Array(
           "`dummy`",
-          "`testcat`.`ns1`.`ns2`.`tbl`.`p`, `testcat`.`ns1`.`ns2`.`tbl`.`id`, " +
+          "`testcat`.`ns1`.`ns2`.`tbl`.`p`, " +
+            "`testcat`.`ns1`.`ns2`.`tbl`.`id`, " +
             "`testcat`.`ns1`.`ns2`.`tbl`.`age`, `testcat`.`ns1`.`ns2`.`tbl`.`name`"))
 
       // UPDATE is not implemented yet.
@@ -2416,11 +2421,13 @@ class DataSourceV2SQLSuiteV1Filter extends DataSourceV2SQLSuite with AlterTableT
   private def assertAnalysisErrorClass(
       sqlStatement: String,
       expectedErrorClass: String,
+      expectedErrorSubClass: String,
       expectedErrorMessageParameters: Array[String]): Unit = {
     val ex = intercept[AnalysisException] {
       sql(sqlStatement)
     }
     assert(ex.getErrorClass == expectedErrorClass)
+    assert(ex.getErrorSubClass == expectedErrorSubClass)
     assert(ex.messageParameters.sameElements(expectedErrorMessageParameters))
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala
index 9766fe2120a..595d632512f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala
@@ -407,6 +407,7 @@ class QueryCompilationErrorsSuite
         sql("select m[a] from (select map('a', 'b') as m, 'aa' as aa)")
       },
       errorClass = "UNRESOLVED_MAP_KEY",
+      errorSubClass = Some("WITH_SUGGESTION"),
       parameters = Map("columnName" -> "`a`",
         "proposal" ->
           "`__auto_generated_subquery_name`.`m`, `__auto_generated_subquery_name`.`aa`"))
@@ -436,7 +437,12 @@ class QueryCompilationErrorsSuite
             |""".stripMargin)
       },
       errorClass = "UNRESOLVED_COLUMN",
-      parameters = Map("objectName" -> "`struct`.`a`", "objectList" -> "`a`, `b`"))
+      errorSubClass = Some("WITH_SUGGESTION"),
+      parameters = Map(
+        "objectName" -> "`struct`.`a`",
+        "proposal" -> "`a`, `b`"
+      )
+    )
   }
 
   test("UNRESOLVED_COLUMN - SPARK-21335: support un-aliased subquery") {
@@ -447,9 +453,10 @@ class QueryCompilationErrorsSuite
       checkError(
         exception = intercept[AnalysisException](sql("SELECT v.i from (SELECT i FROM v)")),
         errorClass = "UNRESOLVED_COLUMN",
+        errorSubClass = Some("WITH_SUGGESTION"),
         parameters = Map(
           "objectName" -> "`v`.`i`",
-          "objectList" -> "`__auto_generated_subquery_name`.`i`"))
+          "proposal" -> "`__auto_generated_subquery_name`.`i`"))
 
       checkAnswer(sql("SELECT __auto_generated_subquery_name.i from (SELECT i FROM v)"), Row(1))
     }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
index 016e24a1c4b..1b033d98589 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
@@ -886,9 +886,10 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils {
             }
             checkError(e,
               errorClass = "UNRESOLVED_COLUMN",
+              errorSubClass = Some("WITH_SUGGESTION"),
               parameters = Map(
                 "objectName" -> "`C1`",
-                "objectList" -> "`spark_catalog`.`default`.`t`.`c1`"))
+                "proposal" -> "`spark_catalog`.`default`.`t`.`c1`"))
           }
           withSQLConf(ORDER_BY_ORDINAL.key -> "false") {
             checkAnswer(sql("SELECT * FROM v2"), Seq(Row(3), Row(2), Row(1)))
@@ -908,9 +909,10 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils {
             }
             checkError(e,
               errorClass = "UNRESOLVED_COLUMN",
+              errorSubClass = Some("WITH_SUGGESTION"),
               parameters = Map(
                 "objectName" -> "`a`",
-                "objectList" -> "`spark_catalog`.`default`.`t`.`c1`"))
+                "proposal" -> "`spark_catalog`.`default`.`t`.`c1`"))
           }
           withSQLConf(ANSI_ENABLED.key -> "true") {
             val e = intercept[ArithmeticException] {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala
index a317c562276..2f2a55642e1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala
@@ -104,10 +104,11 @@ class DescribeTableSuite extends command.DescribeTableSuiteBase with CommandSuit
           sql(s"DESC $tbl key1").collect()
         },
         errorClass = "UNRESOLVED_COLUMN",
+        errorSubClass = "WITH_SUGGESTION",
         sqlState = "42000",
         parameters = Map(
           "objectName" -> "`key1`",
-          "objectList" -> "`test_catalog`.`ns`.`tbl`.`key`, `test_catalog`.`ns`.`tbl`.`col`"))
+          "proposal" -> "`test_catalog`.`ns`.`tbl`.`key`, `test_catalog`.`ns`.`tbl`.`col`"))
     }
   }
 
@@ -129,10 +130,11 @@ class DescribeTableSuite extends command.DescribeTableSuiteBase with CommandSuit
             sql(s"DESC $tbl KEY").collect()
           },
           errorClass = "UNRESOLVED_COLUMN",
+          errorSubClass = "WITH_SUGGESTION",
           sqlState = "42000",
           parameters = Map(
             "objectName" -> "`KEY`",
-            "objectList" -> "`test_catalog`.`ns`.`tbl`.`key`"))
+            "proposal" -> "`test_catalog`.`ns`.`tbl`.`key`"))
       }
     }
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index 3936f2b995c..bcd485a129c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -1294,11 +1294,12 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
     withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
       withTable("t") {
         sql("create table t(i boolean default true, s bigint default 42) using parquet")
-        assert(intercept[AnalysisException] {
-          sql("insert into t (I) select true from (select 1)")
-        }.getMessage.contains(
-          "[UNRESOLVED_COLUMN] A column or function parameter with name `I` cannot be resolved. " +
-            "Did you mean one of the following? [`i`, `s`]"))
+        checkError(
+          exception =
+            intercept[AnalysisException](sql("insert into t (I) select true from (select 1)")),
+          errorClass = "UNRESOLVED_COLUMN",
+          errorSubClass = Some("WITH_SUGGESTION"),
+          parameters = Map("objectName" -> "`I`", "proposal" -> "`i`, `s`"))
       }
     }
   }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
index e5ecc2c889c..593a0339d2e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
@@ -123,8 +123,13 @@ class HiveParquetSuite extends QueryTest with ParquetTest with TestHiveSingleton
              |)
           """.stripMargin)
       }
-      assert(ex.getErrorClass == "UNRESOLVED_COLUMN")
-      assert(ex.messageParameters.head == "`c3`")
+      checkError(
+        exception = ex,
+        errorClass = "UNRESOLVED_COLUMN",
+        errorSubClass = Some("WITH_SUGGESTION"),
+        parameters = Map("objectName" -> "`c3`",
+          "proposal" -> ("`__auto_generated_subquery_name`.`c1`, " +
+            "`__auto_generated_subquery_name`.`c2`")))
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org