You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2023/12/09 09:32:36 UTC

(spark) branch master updated: [SPARK-46333][SQL] Replace `IllegalStateException` by `SparkException.internalError` in catalyst

This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e95929ac423 [SPARK-46333][SQL] Replace `IllegalStateException` by `SparkException.internalError` in catalyst
8e95929ac423 is described below

commit 8e95929ac4238d02dca379837ccf2fbc1cd1926d
Author: Max Gekk <ma...@gmail.com>
AuthorDate: Sat Dec 9 12:32:21 2023 +0300

    [SPARK-46333][SQL] Replace `IllegalStateException` by `SparkException.internalError` in catalyst
    
    ### What changes were proposed in this pull request?
    In the PR, I propose to replace all `IllegalStateException` exception in `catalyst` by `SparkException.internalError`.
    
    ### Why are the changes needed?
    This is a part of migration onto new error framework and error classes.
    
    ### Does this PR introduce _any_ user-facing change?
    No, users shouldn't face to `IllegalStateException` in regular cases.
    
    ### How was this patch tested?
    Using existing GAs.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #44263 from MaxGekk/bind-ref-internal-error.
    
    Authored-by: Max Gekk <ma...@gmail.com>
    Signed-off-by: Max Gekk <ma...@gmail.com>
---
 .../spark/sql/catalyst/analysis/Analyzer.scala     |  3 ++-
 .../sql/catalyst/analysis/CheckAnalysis.scala      |  6 ++---
 .../sql/catalyst/analysis/v2ResolutionPlans.scala  | 13 +++++-----
 .../sql/catalyst/catalog/SessionCatalog.scala      |  3 ++-
 .../spark/sql/catalyst/catalog/interface.scala     |  3 ++-
 .../sql/catalyst/expressions/BoundAttribute.scala  |  3 ++-
 .../expressions/EquivalentExpressions.scala        |  5 ++--
 .../expressions/InterpretedUnsafeProjection.scala  |  4 ++-
 .../expressions/ProjectionOverSchema.scala         |  5 ++--
 .../sql/catalyst/expressions/arithmetic.scala      |  4 +--
 .../expressions/codegen/CodeGenerator.scala        |  4 +--
 .../catalyst/expressions/codegen/javaCode.scala    |  3 ++-
 .../expressions/collectionOperations.scala         |  6 ++---
 .../catalyst/expressions/complexTypeCreator.scala  |  5 ++--
 .../sql/catalyst/expressions/csvExpressions.scala  |  3 ++-
 .../sql/catalyst/expressions/jsonExpressions.scala |  5 ++--
 .../catalyst/expressions/namedExpressions.scala    |  3 ++-
 .../catalyst/optimizer/DecorrelateInnerQuery.scala |  4 +--
 .../catalyst/optimizer/NestedColumnAliasing.scala  |  3 ++-
 .../optimizer/NormalizeFloatingNumbers.scala       |  5 ++--
 .../spark/sql/catalyst/optimizer/Optimizer.scala   |  5 ++--
 .../optimizer/PushExtraPredicateThroughJoin.scala  |  3 ++-
 .../optimizer/ReplaceExceptWithFilter.scala        |  3 ++-
 .../spark/sql/catalyst/optimizer/objects.scala     |  7 ++---
 .../spark/sql/catalyst/optimizer/subquery.scala    |  2 +-
 .../spark/sql/catalyst/parser/AstBuilder.scala     |  6 ++---
 .../sql/catalyst/plans/physical/partitioning.scala |  6 +++--
 .../apache/spark/sql/catalyst/trees/TreeNode.scala |  3 ++-
 .../sql/catalyst/util/ArrayBasedMapBuilder.scala   |  3 ++-
 .../spark/sql/catalyst/util/DateTimeUtils.scala    |  8 +++---
 .../sql/catalyst/analysis/AnalysisSuite.scala      | 22 ++++++++++------
 .../optimizer/ReassignLambdaVariableIDSuite.scala  |  8 ++++--
 .../sql/catalyst/util/DateTimeUtilsSuite.scala     | 30 ++++++++++++----------
 .../sql/execution/WholeStageCodegenSuite.scala     | 14 ++++++----
 34 files changed, 126 insertions(+), 84 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index e5961b46e743..ec91f9b21a76 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -25,6 +25,7 @@ import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 import scala.util.{Failure, Random, Success, Try}
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst._
 import org.apache.spark.sql.catalyst.catalog._
@@ -3706,7 +3707,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
         case u @ UpCast(child, _, _) if !child.resolved => u
 
         case UpCast(_, target, _) if target != DecimalType && !target.isInstanceOf[DataType] =>
-          throw new IllegalStateException(
+          throw SparkException.internalError(
             s"UpCast only supports DecimalType as AbstractDataType yet, but got: $target")
 
         case UpCast(child, target, walkedTypePath) if target == DecimalType
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index ea1af1d3c8cd..1ce984a39b27 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -223,8 +223,8 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB
       case p if p.analyzed => // Skip already analyzed sub-plans
 
       case leaf: LeafNode if leaf.output.map(_.dataType).exists(CharVarcharUtils.hasCharVarchar) =>
-        throw new IllegalStateException(
-          "[BUG] logical plan should not have output of char/varchar type: " + leaf)
+        throw SparkException.internalError(
+          "Logical plan should not have output of char/varchar type: " + leaf)
 
       case u: UnresolvedNamespace =>
         u.schemaNotFound(u.multipartIdentifier)
@@ -757,7 +757,7 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB
               messageParameters = Map("sqlExprs" -> o.expressions.map(toSQLExpr(_)).mkString(", "))
             )
 
-          case _: UnresolvedHint => throw new IllegalStateException(
+          case _: UnresolvedHint => throw SparkException.internalError(
             "Logical hint operator should be removed during analysis.")
 
           case f @ Filter(condition, _)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala
index 1a7d2501d684..ecdf40e87a89 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
@@ -77,9 +78,9 @@ case class UnresolvedTableOrView(
     allowTempView: Boolean) extends UnresolvedLeafNode
 
 sealed trait PartitionSpec extends LeafExpression with Unevaluable {
-  override def dataType: DataType = throw new IllegalStateException(
+  override def dataType: DataType = throw SparkException.internalError(
     "PartitionSpec.dataType should not be called.")
-  override def nullable: Boolean = throw new IllegalStateException(
+  override def nullable: Boolean = throw SparkException.internalError(
     "PartitionSpec.nullable should not be called.")
 }
 
@@ -91,9 +92,9 @@ case class UnresolvedPartitionSpec(
 
 sealed trait FieldName extends LeafExpression with Unevaluable {
   def name: Seq[String]
-  override def dataType: DataType = throw new IllegalStateException(
+  override def dataType: DataType = throw SparkException.internalError(
     "FieldName.dataType should not be called.")
-  override def nullable: Boolean = throw new IllegalStateException(
+  override def nullable: Boolean = throw SparkException.internalError(
     "FieldName.nullable should not be called.")
 }
 
@@ -103,9 +104,9 @@ case class UnresolvedFieldName(name: Seq[String]) extends FieldName {
 
 sealed trait FieldPosition extends LeafExpression with Unevaluable {
   def position: ColumnPosition
-  override def dataType: DataType = throw new IllegalStateException(
+  override def dataType: DataType = throw SparkException.internalError(
     "FieldPosition.dataType should not be called.")
-  override def nullable: Boolean = throw new IllegalStateException(
+  override def nullable: Boolean = throw SparkException.internalError(
     "FieldPosition.nullable should not be called.")
 }
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index f1373b2e593a..4016e3bfb067 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -30,6 +30,7 @@ import com.google.common.cache.{Cache, CacheBuilder}
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
+import org.apache.spark.SparkException
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst._
 import org.apache.spark.sql.catalyst.analysis._
@@ -927,7 +928,7 @@ class SessionCatalog(
 
   private def fromCatalogTable(metadata: CatalogTable, isTempView: Boolean): View = {
     val viewText = metadata.viewText.getOrElse {
-      throw new IllegalStateException("Invalid view without text.")
+      throw SparkException.internalError("Invalid view without text.")
     }
     val viewConfigs = metadata.viewSQLConfigs
     val origin = Origin(
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index f716c2a0ccb6..0a1a40a88522 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -31,6 +31,7 @@ import org.apache.commons.lang3.StringUtils
 import org.json4s.JsonAST.{JArray, JString}
 import org.json4s.jackson.JsonMethods._
 
+import org.apache.spark.SparkException
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{CurrentUserContext, FunctionIdentifier, InternalRow, SQLConfHelper, TableIdentifier}
@@ -923,7 +924,7 @@ case class HiveTableRelation(
     tableMeta.stats.map(_.toPlanStats(output, conf.cboEnabled || conf.planStatsEnabled))
       .orElse(tableStats)
       .getOrElse {
-      throw new IllegalStateException("Table stats must be specified.")
+      throw SparkException.internalError("Table stats must be specified.")
     }
   }
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
index 4d303aa95b78..2ca2697a3e1f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.catalyst.expressions
 
+import org.apache.spark.SparkException
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode, FalseLiteral, JavaCode}
@@ -76,7 +77,7 @@ object BindReferences extends Logging {
         if (allowFailures) {
           a
         } else {
-          throw new IllegalStateException(
+          throw SparkException.internalError(
             s"Couldn't find $a in ${input.attrs.mkString("[", ",", "]")}")
         }
       } else {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala
index 7f43b2b78478..cd8f1bf1d688 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala
@@ -21,6 +21,7 @@ import java.util.Objects
 
 import scala.collection.mutable
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
 import org.apache.spark.sql.catalyst.trees.TreePattern.{LAMBDA_VARIABLE, PLAN_EXPRESSION}
 import org.apache.spark.sql.internal.SQLConf
@@ -73,7 +74,7 @@ class EquivalentExpressions(
             false
           } else {
             // Should not happen
-            throw new IllegalStateException(
+            throw SparkException.internalError(
               s"Cannot update expression: $expr in map: $map with use count: $useCount")
           }
         case _ =>
@@ -81,7 +82,7 @@ class EquivalentExpressions(
             map.put(wrapper, ExpressionStats(expr)(useCount))
           } else {
             // Should not happen
-            throw new IllegalStateException(
+            throw SparkException.internalError(
               s"Cannot update expression: $expr in map: $map with use count: $useCount")
           }
           false
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala
index a53903a7c16d..3dcc775d6ab2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.spark.sql.catalyst.expressions
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.codegen.{UnsafeArrayWriter, UnsafeRowWriter, UnsafeWriter}
 import org.apache.spark.sql.catalyst.types._
@@ -233,7 +234,8 @@ object InterpretedUnsafeProjection {
         case PhysicalNullType => (_, _) => {}
 
         case _ =>
-          throw new IllegalStateException(s"The data type '${dt.typeName}' is not supported in " +
+          throw SparkException.internalError(
+            s"The data type '${dt.typeName}' is not supported in " +
             "generating a writer function for a struct field, array element, map key or map value.")
       }
     }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ProjectionOverSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ProjectionOverSchema.scala
index 57dc1ee8ad92..bb67c173b946 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ProjectionOverSchema.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ProjectionOverSchema.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.catalyst.expressions
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.types._
 
 /**
@@ -57,7 +58,7 @@ case class ProjectionOverSchema(schema: StructType, output: AttributeSet) {
               projSchema.size,
               a.containsNull)
           case (_, projSchema) =>
-            throw new IllegalStateException(
+            throw SparkException.internalError(
               s"unmatched child schema for GetArrayStructFields: ${projSchema.toString}"
             )
         }
@@ -72,7 +73,7 @@ case class ProjectionOverSchema(schema: StructType, output: AttributeSet) {
           case (projection, projSchema: StructType) =>
             GetStructField(projection, projSchema.fieldIndex(field.name))
           case (_, projSchema) =>
-            throw new IllegalStateException(
+            throw SparkException.internalError(
               s"unmatched child schema for GetStructField: ${projSchema.toString}"
             )
         }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
index e3c5184c5acc..a0fb17cec812 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.expressions
 
 import scala.math.{max, min}
 
-import org.apache.spark.QueryContext
+import org.apache.spark.{QueryContext, SparkException}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, TypeCheckResult, TypeCoercion}
 import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch
@@ -248,7 +248,7 @@ abstract class BinaryArithmetic extends BinaryOperator
   protected def allowPrecisionLoss: Boolean = SQLConf.get.decimalOperationsAllowPrecisionLoss
 
   protected def resultDecimalType(p1: Int, s1: Int, p2: Int, s2: Int): DecimalType = {
-    throw new IllegalStateException(
+    throw SparkException.internalError(
       s"${getClass.getSimpleName} must override `resultDecimalType`.")
   }
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index e73b00600764..d10e4a1ced1b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -30,7 +30,7 @@ import org.codehaus.commons.compiler.{CompileException, InternalCompilerExceptio
 import org.codehaus.janino.ClassBodyEvaluator
 import org.codehaus.janino.util.ClassFile
 
-import org.apache.spark.{TaskContext, TaskKilledException}
+import org.apache.spark.{SparkException, TaskContext, TaskKilledException}
 import org.apache.spark.executor.InputMetrics
 import org.apache.spark.internal.Logging
 import org.apache.spark.metrics.source.CodegenMetrics
@@ -1199,7 +1199,7 @@ class CodegenContext extends Logging {
           "the parameter length of at least one split function went over the JVM limit: " +
           MAX_JVM_METHOD_PARAMS_LENGTH
         if (Utils.isTesting) {
-          throw new IllegalStateException(errMsg)
+          throw SparkException.internalError(errMsg)
         } else {
           logInfo(errMsg)
           (localSubExprEliminationExprsForNonSplit, Seq.empty)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/javaCode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/javaCode.scala
index bfa828ae7a6c..49ab06ea2a3d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/javaCode.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/javaCode.scala
@@ -22,6 +22,7 @@ import java.lang.{Boolean => JBool}
 import scala.collection.mutable.ArrayBuffer
 import scala.language.implicitConversions
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.trees.{LeafLike, TreeNode}
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.types.{BooleanType, DataType}
@@ -203,7 +204,7 @@ trait Block extends TreeNode[Block] with JavaCode {
 
   override def verboseString(maxFields: Int): String = toString
   override def simpleStringWithNodeId(): String = {
-    throw new IllegalStateException(s"$nodeName does not implement simpleStringWithNodeId")
+    throw SparkException.internalError(s"$nodeName does not implement simpleStringWithNodeId")
   }
 }
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
index e522b6574e87..04f56eaf8c1e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
@@ -22,7 +22,7 @@ import java.util.Comparator
 import scala.collection.mutable
 import scala.reflect.ClassTag
 
-import org.apache.spark.QueryContext
+import org.apache.spark.{QueryContext, SparkException}
 import org.apache.spark.SparkException.internalError
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, TypeCoercion, UnresolvedAttribute, UnresolvedSeed}
@@ -2088,7 +2088,7 @@ case class ArrayMin(child: Expression)
 
   @transient override lazy val dataType: DataType = child.dataType match {
     case ArrayType(dt, _) => dt
-    case _ => throw new IllegalStateException(s"$prettyName accepts only arrays.")
+    case _ => throw SparkException.internalError(s"$prettyName accepts only arrays.")
   }
 
   override def prettyName: String = "array_min"
@@ -2161,7 +2161,7 @@ case class ArrayMax(child: Expression)
 
   @transient override lazy val dataType: DataType = child.dataType match {
     case ArrayType(dt, _) => dt
-    case _ => throw new IllegalStateException(s"$prettyName accepts only arrays.")
+    case _ => throw SparkException.internalError(s"$prettyName accepts only arrays.")
   }
 
   override def prettyName: String = "array_max"
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
index b35a7b412e48..1693c6b21484 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions
 
 import scala.collection.mutable.ArrayBuffer
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.{Resolver, TypeCheckResult, TypeCoercion, UnresolvedAttribute, UnresolvedExtractValue}
 import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.{FUNC_ALIAS, FunctionBuilder}
@@ -626,10 +627,10 @@ trait StructFieldsOperation extends Expression with Unevaluable {
 
   val resolver: Resolver = SQLConf.get.resolver
 
-  override def dataType: DataType = throw new IllegalStateException(
+  override def dataType: DataType = throw SparkException.internalError(
     "StructFieldsOperation.dataType should not be called.")
 
-  override def nullable: Boolean = throw new IllegalStateException(
+  override def nullable: Boolean = throw SparkException.internalError(
     "StructFieldsOperation.nullable should not be called.")
 
   /**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala
index 27be2617a5ae..a7c9e2946d7b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala
@@ -21,6 +21,7 @@ import java.io.CharArrayWriter
 
 import com.univocity.parsers.csv.CsvParser
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
 import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch
@@ -92,7 +93,7 @@ case class CsvToStructs(
       assert(!rows.hasNext)
       result
     } else {
-      throw new IllegalStateException("Expected one row from CSV parser.")
+      throw SparkException.internalError("Expected one row from CSV parser.")
     }
   }
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index 04bc457b66a4..890ab5541985 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -25,6 +25,7 @@ import scala.util.parsing.combinator.RegexParsers
 import com.fasterxml.jackson.core._
 import com.fasterxml.jackson.core.json.JsonReadFeature
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
 import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch
@@ -361,7 +362,7 @@ class GetJsonObjectEvaluator(cachedPath: UTF8String) {
         val nextStyle = style match {
           case RawStyle => QuotedStyle
           case FlattenStyle => FlattenStyle
-          case QuotedStyle => throw new IllegalStateException()
+          case QuotedStyle => throw SparkException.internalError("Unexpected the quoted style.")
         }
 
         // temporarily buffer child matches, the emitted json will need to be
@@ -593,7 +594,7 @@ case class JsonTuple(children: Seq[Expression])
         // a special case that needs to be handled outside of this method.
         // if a requested field is null, the result must be null. the easiest
         // way to achieve this is just by ignoring null tokens entirely
-        throw new IllegalStateException("Do not attempt to copy a null field.")
+        throw SparkException.internalError("Do not attempt to copy a null field.")
 
       case _ =>
         // handle other types including objects, arrays, booleans and numbers
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
index d51223eb4b7e..94f90427cc8f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions
 
 import java.util.{Objects, UUID}
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
 import org.apache.spark.sql.catalyst.expressions.codegen._
@@ -160,7 +161,7 @@ case class Alias(child: Expression, name: String)(
   /** Just a simple passthrough for code generation. */
   override def genCode(ctx: CodegenContext): ExprCode = child.genCode(ctx)
   override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
-    throw new IllegalStateException("Alias.doGenCode should not be called.")
+    throw SparkException.internalError("Alias.doGenCode should not be called.")
   }
 
   override def dataType: DataType = child.dataType
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala
index 08d581d6ddfc..feb01d1ce3fb 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala
@@ -416,7 +416,7 @@ object DecorrelateInnerQuery extends PredicateHelper {
             s"Child of a domain inner join shouldn't contain another domain join.\n$child")
           child
         case o =>
-          throw new IllegalStateException(s"Unexpected domain join type $o")
+          throw SparkException.internalError(s"Unexpected domain join type $o")
       }
 
       // We should only rewrite a domain join when all corresponding outer plan attributes
@@ -442,7 +442,7 @@ object DecorrelateInnerQuery extends PredicateHelper {
           case _ => Join(domain, newChild, joinType, outerJoinCondition, JoinHint.NONE)
         }
       } else {
-        throw new IllegalStateException(
+        throw SparkException.internalError(
           s"Unable to rewrite domain join with conditions: $conditions\n$d.")
       }
     case s @ (_ : Union | _: SetOperation) =>
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
index 7ed68218f143..ca3c14177e6b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer
 
 import scala.collection.mutable
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction
 import org.apache.spark.sql.catalyst.plans.logical._
@@ -454,7 +455,7 @@ object GeneratorNestedColumnAliasing {
 
           case other =>
             // We should not reach here.
-            throw new IllegalStateException(s"Unreasonable plan after optimization: $other")
+            throw SparkException.internalError(s"Unreasonable plan after optimization: $other")
         }
       }
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NormalizeFloatingNumbers.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NormalizeFloatingNumbers.scala
index ff638a5abbae..f946fe76bde4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NormalizeFloatingNumbers.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NormalizeFloatingNumbers.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.catalyst.optimizer
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.expressions.{Alias, And, ArrayTransform, CaseWhen, Coalesce, CreateArray, CreateMap, CreateNamedStruct, EqualTo, ExpectsInputTypes, Expression, GetStructField, If, IsNull, KnownFloatingPointNormalized, LambdaFunction, Literal, NamedLambdaVariable, UnaryExpression}
 import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
 import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
@@ -99,7 +100,7 @@ object NormalizeFloatingNumbers extends Rule[LogicalPlan] {
     case ArrayType(et, _) => needNormalize(et)
     // Currently MapType is not comparable and analyzer should fail earlier if this case happens.
     case _: MapType =>
-      throw new IllegalStateException("grouping/join/window partition keys cannot be map type.")
+      throw SparkException.internalError("grouping/join/window partition keys cannot be map type.")
     case _ => false
   }
 
@@ -143,7 +144,7 @@ object NormalizeFloatingNumbers extends Rule[LogicalPlan] {
       val function = normalize(lv)
       KnownFloatingPointNormalized(ArrayTransform(expr, LambdaFunction(function, Seq(lv))))
 
-    case _ => throw new IllegalStateException(s"fail to normalize $expr")
+    case _ => throw SparkException.internalError(s"fail to normalize $expr")
   }
 
   val FLOAT_NORMALIZER: Any => Any = (input: Any) => {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index a461bf529eb1..960f5e532c08 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer
 
 import scala.collection.mutable
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.SQLConfHelper
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
@@ -1960,7 +1961,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
             reduceLeftOption(And).map(Filter(_, newJoin)).getOrElse(newJoin)
 
         case other =>
-          throw new IllegalStateException(s"Unexpected join type: $other")
+          throw SparkException.internalError(s"Unexpected join type: $other")
       }
 
     // push down the join filter into sub query scanning if applicable
@@ -1996,7 +1997,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
           Join(newLeft, newRight, joinType, newJoinCond, hint)
 
         case other =>
-          throw new IllegalStateException(s"Unexpected join type: $other")
+          throw SparkException.internalError(s"Unexpected join type: $other")
       }
   }
 }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushExtraPredicateThroughJoin.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushExtraPredicateThroughJoin.scala
index 134a1eba1dd2..a2bc0bf83a2c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushExtraPredicateThroughJoin.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushExtraPredicateThroughJoin.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.catalyst.optimizer
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.expressions.{And, Expression, PredicateHelper}
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan}
@@ -71,7 +72,7 @@ object PushExtraPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHel
           case LeftOuter | LeftAnti | ExistenceJoin(_) =>
             Join(left, newRight, joinType, Some(joinCondition), hint)
           case other =>
-            throw new IllegalStateException(s"Unexpected join type: $other")
+            throw SparkException.internalError(s"Unexpected join type: $other")
         }
         newJoin.setTagValue(processedJoinConditionTag, joinCondition)
         newJoin
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala
index f66128dcbc3f..3d249fde0e80 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer
 
 import scala.annotation.tailrec
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
@@ -104,7 +105,7 @@ object ReplaceExceptWithFilter extends Rule[LogicalPlan] {
   }
 
   private def nonFilterChild(plan: LogicalPlan) = plan.find(!_.isInstanceOf[Filter]).getOrElse {
-    throw new IllegalStateException("Leaf node is expected")
+    throw SparkException.internalError("Leaf node is expected")
   }
 
   private def combineFilters(plan: LogicalPlan): LogicalPlan = {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/objects.scala
index 6655a09402d0..163b6d9ff782 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/objects.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/objects.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer
 
 import scala.collection.mutable.ArrayBuffer
 
+import org.apache.spark.SparkException
 import org.apache.spark.api.java.function.FilterFunction
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.objects._
@@ -261,12 +262,12 @@ object ReassignLambdaVariableID extends Rule[LogicalPlan] {
 
     plan.transformAllExpressionsWithPruning(_.containsPattern(LAMBDA_VARIABLE), ruleId) {
       case lr: LambdaVariable if lr.id == 0 =>
-        throw new IllegalStateException("LambdaVariable should never has 0 as its ID.")
+        throw SparkException.internalError("LambdaVariable should never has 0 as its ID.")
 
       case lr: LambdaVariable if lr.id < 0 =>
         hasNegativeIds = true
         if (hasPositiveIds) {
-          throw new IllegalStateException(
+          throw SparkException.internalError(
             "LambdaVariable IDs in a query should be all positive or negative.")
 
         }
@@ -275,7 +276,7 @@ object ReassignLambdaVariableID extends Rule[LogicalPlan] {
       case lr: LambdaVariable if lr.id > 0 =>
         hasPositiveIds = true
         if (hasNegativeIds) {
-          throw new IllegalStateException(
+          throw SparkException.internalError(
             "LambdaVariable IDs in a query should be all positive or negative.")
         }
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
index 6ca2cb79aaf5..ee5b24f76ebf 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
@@ -849,7 +849,7 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] with AliasHelpe
   private def checkScalarSubqueryInAgg(a: Aggregate): Unit = {
     if (a.groupingExpressions.exists(hasCorrelatedScalarSubquery) &&
         !a.aggregateExpressions.exists(hasCorrelatedScalarSubquery)) {
-      throw new IllegalStateException(
+      throw SparkException.internalError(
         s"Fail to rewrite correlated scalar subqueries in Aggregate:\n$a")
     }
   }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 4af1801aaa18..9a4e389fcf99 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -1394,7 +1394,7 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging {
         case Some(c) if c.booleanExpression != null =>
           (baseJoinType, Option(expression(c.booleanExpression)))
         case Some(c) =>
-          throw new IllegalStateException(s"Unimplemented joinCriteria: $c")
+          throw SparkException.internalError(s"Unimplemented joinCriteria: $c")
         case None if ctx.NATURAL != null =>
           if (ctx.LATERAL != null) {
             throw QueryParsingErrors.incompatibleJoinTypesError(
@@ -3409,7 +3409,7 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging {
       if (arguments.size > 1) {
         throw QueryParsingErrors.wrongNumberArgumentsForTransformError(name, arguments.size, ctx)
       } else if (arguments.isEmpty) {
-        throw new IllegalStateException(s"Not enough arguments for transform $name")
+        throw SparkException.internalError(s"Not enough arguments for transform $name")
       } else {
         getFieldReference(ctx, arguments.head)
       }
@@ -3470,7 +3470,7 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging {
           .map(typedVisit[Literal])
           .map(lit => LiteralValue(lit.value, lit.dataType))
       reference.orElse(literal)
-          .getOrElse(throw new IllegalStateException("Invalid transform argument"))
+          .getOrElse(throw SparkException.internalError("Invalid transform argument"))
     }
   }
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
index 60e6e42bedf8..8e64cf8b29fb 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.plans.physical
 import scala.annotation.tailrec
 import scala.collection.mutable
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.util.InternalRowComparableWrapper
@@ -55,7 +56,8 @@ case object UnspecifiedDistribution extends Distribution {
   override def requiredNumPartitions: Option[Int] = None
 
   override def createPartitioning(numPartitions: Int): Partitioning = {
-    throw new IllegalStateException("UnspecifiedDistribution does not have default partitioning.")
+    throw SparkException.internalError(
+      "UnspecifiedDistribution does not have default partitioning.")
   }
 }
 
@@ -220,7 +222,7 @@ trait Partitioning {
    * @param distribution the required clustered distribution for this partitioning
    */
   def createShuffleSpec(distribution: ClusteredDistribution): ShuffleSpec =
-    throw new IllegalStateException(s"Unexpected partitioning: ${getClass.getSimpleName}")
+    throw SparkException.internalError(s"Unexpected partitioning: ${getClass.getSimpleName}")
 
   /**
    * The actual method that defines whether this [[Partitioning]] can satisfy the given
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index 917a59d826d8..dbacb833ef59 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -29,6 +29,7 @@ import org.json4s.JsonAST._
 import org.json4s.JsonDSL._
 import org.json4s.jackson.JsonMethods._
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.{AliasIdentifier, CatalystIdentifier}
 import org.apache.spark.sql.catalyst.ScalaReflection._
 import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType, FunctionResource}
@@ -745,7 +746,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
       }
     } catch {
       case e: java.lang.IllegalArgumentException =>
-        throw new IllegalStateException(
+        throw SparkException.internalError(
           s"""
              |Failed to copy node.
              |Is otherCopyArgs specified correctly for $nodeName.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala
index 147197236213..d358c92dd62c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.util
 
 import scala.collection.mutable
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.internal.SQLConf
@@ -71,7 +72,7 @@ class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends Seria
         // Overwrite the previous value, as the policy is last wins.
         values(index) = value
       } else {
-        throw new IllegalStateException("Unknown map key dedup policy: " + mapKeyDedupPolicy)
+        throw SparkException.internalError("Unknown map key dedup policy: " + mapKeyDedupPolicy)
       }
     }
   }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
index 8fabb4487620..128582f71d11 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit._
 
 import scala.util.control.NonFatal
 
-import org.apache.spark.QueryContext
+import org.apache.spark.{QueryContext, SparkException}
 import org.apache.spark.sql.catalyst.util.DateTimeConstants._
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.types.{Decimal, DoubleExactNumeric, TimestampNTZType, TimestampType}
@@ -678,11 +678,11 @@ object DateTimeUtils extends SparkDateTimeUtils {
       }
     } catch {
       case _: scala.MatchError =>
-        throw new IllegalStateException(s"Got the unexpected unit '$unit'.")
+        throw SparkException.internalError(s"Got the unexpected unit '$unit'.")
       case _: ArithmeticException | _: DateTimeException =>
         throw QueryExecutionErrors.timestampAddOverflowError(micros, quantity, unit)
       case e: Throwable =>
-        throw new IllegalStateException(s"Failure of 'timestampAdd': ${e.getMessage}")
+        throw SparkException.internalError(s"Failure of 'timestampAdd': ${e.getMessage}")
     }
   }
 
@@ -716,7 +716,7 @@ object DateTimeUtils extends SparkDateTimeUtils {
       val endLocalTs = getLocalDateTime(endTs, zoneId)
       timestampDiffMap(unitInUpperCase)(startLocalTs, endLocalTs)
     } else {
-      throw new IllegalStateException(s"Got the unexpected unit '$unit'.")
+      throw SparkException.internalError(s"Got the unexpected unit '$unit'.")
     }
   }
 }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index b86d82a7f24f..93a2efbbf6d8 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -26,6 +26,7 @@ import scala.reflect.runtime.universe.TypeTag
 import org.apache.logging.log4j.Level
 import org.scalatest.matchers.must.Matchers
 
+import org.apache.spark.SparkException
 import org.apache.spark.api.python.PythonEvalType
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{AliasIdentifier, QueryPlanningTracker, TableIdentifier}
@@ -63,14 +64,19 @@ class AnalysisSuite extends AnalysisTest with Matchers {
     val schema3 = new StructType().add("c", ArrayType(CharType(5)))
     Seq(schema1, schema2, schema3).foreach { schema =>
       val table = new InMemoryTable("t", schema, Array.empty, Map.empty[String, String].asJava)
-      intercept[IllegalStateException] {
-        DataSourceV2Relation(
-          table,
-          DataTypeUtils.toAttributes(schema),
-          None,
-          None,
-          CaseInsensitiveStringMap.empty()).analyze
-      }
+      checkError(
+        exception = intercept[SparkException] {
+          DataSourceV2Relation(
+            table,
+            DataTypeUtils.toAttributes(schema),
+            None,
+            None,
+            CaseInsensitiveStringMap.empty()).analyze
+        },
+        errorClass = "INTERNAL_ERROR",
+        parameters = Map("message" ->
+          "Logical plan should not have output of char/varchar type.*\n"),
+        matchPVals = true)
     }
   }
 
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReassignLambdaVariableIDSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReassignLambdaVariableIDSuite.scala
index 057ec956bf22..bf9f922978f6 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReassignLambdaVariableIDSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReassignLambdaVariableIDSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.catalyst.optimizer
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
 import org.apache.spark.sql.catalyst.expressions.objects.LambdaVariable
@@ -55,7 +56,10 @@ class ReassignLambdaVariableIDSuite extends PlanTest {
     val var1 = LambdaVariable("a", BooleanType, true, id = -2)
     val var2 = LambdaVariable("b", BooleanType, true, id = 4)
     val query = testRelation.where(var1 && var2)
-    val e = intercept[IllegalStateException](Optimize.execute(query))
-    assert(e.getMessage.contains("should be all positive or negative"))
+    checkError(
+      exception = intercept[SparkException](Optimize.execute(query)),
+      errorClass = "INTERNAL_ERROR",
+      parameters = Map(
+        "message" -> "LambdaVariable IDs in a query should be all positive or negative."))
   }
 }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
index 178d24352df7..c4e578ee2a51 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
@@ -26,7 +26,7 @@ import java.util.concurrent.TimeUnit
 import org.scalatest.matchers.must.Matchers
 import org.scalatest.matchers.should.Matchers._
 
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkException, SparkFunSuite}
 import org.apache.spark.sql.catalyst.plans.SQLHelper
 import org.apache.spark.sql.catalyst.util.DateTimeConstants._
 import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._
@@ -1006,10 +1006,12 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper {
         date(1, 1, 1, 0, 0, 0, 1, zid))
     }
 
-    val e = intercept[IllegalStateException] {
-      timestampAdd("SECS", 1, date(1969, 1, 1, 0, 0, 0, 1, getZoneId("UTC")), getZoneId("UTC"))
-    }
-    assert(e.getMessage === "Got the unexpected unit 'SECS'.")
+    checkError(
+      exception = intercept[SparkException] {
+        timestampAdd("SECS", 1, date(1969, 1, 1, 0, 0, 0, 1, getZoneId("UTC")), getZoneId("UTC"))
+      },
+      errorClass = "INTERNAL_ERROR",
+      parameters = Map("message" -> "Got the unexpected unit 'SECS'."))
   }
 
   test("SPARK-38284: difference between two timestamps in units") {
@@ -1056,13 +1058,15 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper {
         zid) === -9998)
     }
 
-    val e = intercept[IllegalStateException] {
-      timestampDiff(
-        "SECS",
-        date(1969, 1, 1, 0, 0, 0, 1, getZoneId("UTC")),
-        date(2022, 1, 1, 0, 0, 0, 1, getZoneId("UTC")),
-        getZoneId("UTC"))
-    }
-    assert(e.getMessage === "Got the unexpected unit 'SECS'.")
+    checkError(
+      exception = intercept[SparkException] {
+        timestampDiff(
+          "SECS",
+          date(1969, 1, 1, 0, 0, 0, 1, getZoneId("UTC")),
+          date(2022, 1, 1, 0, 0, 0, 1, getZoneId("UTC")),
+          getZoneId("UTC"))
+      },
+      errorClass = "INTERNAL_ERROR",
+      parameters = Map("message" -> "Got the unexpected unit 'SECS'."))
   }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
index 146a583d7737..faefc240b794 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.{Dataset, QueryTest, Row, SaveMode}
 import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
 import org.apache.spark.sql.catalyst.expressions.codegen.{ByteCodeStats, CodeAndComment, CodeGenerator}
@@ -877,17 +878,20 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession
         SQLConf.CODEGEN_METHOD_SPLIT_THRESHOLD.key -> "1",
         "spark.sql.CodeGenerator.validParamLength" -> "0") {
       withTable("t") {
-        val expectedErrMsg = "Failed to split subexpression code into small functions"
+        val expectedErrMsg = "Failed to split subexpression code into small functions.*"
         Seq(
           // Test case without keys
           "SELECT AVG(a + b), SUM(a + b + c) FROM VALUES((1, 1, 1)) t(a, b, c)",
           // Tet case with keys
           "SELECT k, AVG(a + b), SUM(a + b + c) FROM VALUES((1, 1, 1, 1)) t(k, a, b, c) " +
             "GROUP BY k").foreach { query =>
-          val e = intercept[IllegalStateException] {
-            sql(query).collect()
-          }
-          assert(e.getMessage.contains(expectedErrMsg))
+          checkError(
+            exception = intercept[SparkException] {
+              sql(query).collect()
+            },
+            errorClass = "INTERNAL_ERROR",
+            parameters = Map("message" -> expectedErrMsg),
+            matchPVals = true)
         }
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org