You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2024/02/13 10:50:36 UTC

(spark) branch master updated: [SPARK-47028][SQL][TESTS] Check `SparkUnsupportedOperationException` instead of `UnsupportedOperationException`

This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 49bcde612c59 [SPARK-47028][SQL][TESTS] Check `SparkUnsupportedOperationException` instead of `UnsupportedOperationException`
49bcde612c59 is described below

commit 49bcde612c598fcf3c76cbd91a3dbf11d1b7f1b2
Author: Max Gekk <ma...@gmail.com>
AuthorDate: Tue Feb 13 13:50:18 2024 +0300

    [SPARK-47028][SQL][TESTS] Check `SparkUnsupportedOperationException` instead of `UnsupportedOperationException`
    
    ### What changes were proposed in this pull request?
    In the PR, I propose to use `checkError()` in tests of `sql` to check `SparkUnsupportedOperationException`, and its fields.
    
    ### Why are the changes needed?
    By checking `SparkUnsupportedOperationException` and its fields like error class and message parameters prevents replacing `SparkUnsupportedOperationException` back to `UnsupportedOperationException`.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    By running the modified test suites.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #45082 from MaxGekk/intercept-UnsupportedOperationException-tests.
    
    Authored-by: Max Gekk <ma...@gmail.com>
    Signed-off-by: Max Gekk <ma...@gmail.com>
---
 .../test/scala/org/apache/spark/sql/RowTest.scala  |  6 +-
 .../spark/sql/catalyst/ScalaReflectionSuite.scala  | 23 +++++---
 .../encoders/EncoderErrorMessageSuite.scala        | 12 ++--
 .../catalyst/encoders/ExpressionEncoderSuite.scala | 10 ++--
 .../sql/catalyst/json/JacksonGeneratorSuite.scala  |  6 +-
 .../spark/sql/connector/catalog/CatalogSuite.scala |  4 +-
 .../sql/util/CaseInsensitiveStringMapSuite.scala   |  6 +-
 .../scala/org/apache/spark/sql/DatasetSuite.scala  | 33 ++++++-----
 .../spark/sql/ScalaReflectionRelationSuite.scala   | 29 +++++----
 .../sql/connector/DataSourceV2FunctionSuite.scala  |  4 +-
 .../binaryfile/BinaryFileFormatSuite.scala         | 16 ++---
 .../datasources/v2/V2SessionCatalogSuite.scala     | 11 ++--
 .../streaming/CompactibleFileStreamLogSuite.scala  | 11 ++--
 .../sources/RatePerMicroBatchProviderSuite.scala   | 20 ++++---
 .../sources/RateStreamProviderSuite.scala          | 19 +++---
 .../streaming/sources/TextSocketStreamSuite.scala  | 12 ++--
 .../spark/sql/streaming/GroupStateSuite.scala      | 68 +++++++++++-----------
 17 files changed, 160 insertions(+), 130 deletions(-)

diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/RowTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/RowTest.scala
index 840d80ffed13..985443773943 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/RowTest.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/RowTest.scala
@@ -24,7 +24,7 @@ import org.scalatest.funspec.AnyFunSpec
 import org.scalatest.matchers.must.Matchers
 import org.scalatest.matchers.should.Matchers._
 
-import org.apache.spark.SparkException
+import org.apache.spark.{SparkException, SparkUnsupportedOperationException}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{GenericRow, GenericRowWithSchema}
 import org.apache.spark.sql.types._
@@ -45,10 +45,10 @@ class RowTest extends AnyFunSpec with Matchers {
 
   describe("Row (without schema)") {
     it("throws an exception when accessing by fieldName") {
-      intercept[UnsupportedOperationException] {
+      intercept[SparkUnsupportedOperationException] {
         noSchemaRow.fieldIndex("col1")
       }
-      intercept[UnsupportedOperationException] {
+      intercept[SparkUnsupportedOperationException] {
         noSchemaRow.getAs("col1")
       }
     }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
index bbb62acd0250..daa8d12613f2 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
@@ -22,7 +22,7 @@ import java.sql.{Date, Timestamp}
 import scala.reflect.ClassTag
 import scala.reflect.runtime.universe.{typeTag, TypeTag}
 
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkFunSuite, SparkUnsupportedOperationException}
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.FooEnum.FooEnum
 import org.apache.spark.sql.catalyst.analysis.UnresolvedExtractValue
@@ -490,17 +490,22 @@ class ScalaReflectionSuite extends SparkFunSuite {
   }
 
   test("SPARK-29026: schemaFor for trait without companion object throws exception ") {
-    val e = intercept[UnsupportedOperationException] {
-      schemaFor[TraitProductWithoutCompanion]
-    }
-    assert(e.getMessage.contains("Unable to find constructor"))
+    checkError(
+      exception = intercept[SparkUnsupportedOperationException] {
+        schemaFor[TraitProductWithoutCompanion]
+      },
+      errorClass = "_LEGACY_ERROR_TEMP_2144",
+      parameters = Map("tpe" -> "org.apache.spark.sql.catalyst.TraitProductWithoutCompanion"))
   }
 
   test("SPARK-29026: schemaFor for trait with no-constructor companion throws exception ") {
-    val e = intercept[UnsupportedOperationException] {
-      schemaFor[TraitProductWithNoConstructorCompanion]
-    }
-    assert(e.getMessage.contains("Unable to find constructor"))
+    checkError(
+      exception = intercept[SparkUnsupportedOperationException] {
+        schemaFor[TraitProductWithNoConstructorCompanion]
+      },
+      errorClass = "_LEGACY_ERROR_TEMP_2144",
+      parameters = Map("tpe" ->
+        "org.apache.spark.sql.catalyst.TraitProductWithNoConstructorCompanion"))
   }
 
   test("SPARK-27625: annotated data types") {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/EncoderErrorMessageSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/EncoderErrorMessageSuite.scala
index 2d61f9fbc071..e852b474aa18 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/EncoderErrorMessageSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/EncoderErrorMessageSuite.scala
@@ -40,15 +40,15 @@ class EncoderErrorMessageSuite extends SparkFunSuite {
   // That is done in Java because Scala cannot create truly private classes.
 
   test("primitive types in encoders using Kryo serialization") {
-    intercept[UnsupportedOperationException] { Encoders.kryo[Int] }
-    intercept[UnsupportedOperationException] { Encoders.kryo[Long] }
-    intercept[UnsupportedOperationException] { Encoders.kryo[Char] }
+    intercept[SparkUnsupportedOperationException] { Encoders.kryo[Int] }
+    intercept[SparkUnsupportedOperationException] { Encoders.kryo[Long] }
+    intercept[SparkUnsupportedOperationException] { Encoders.kryo[Char] }
   }
 
   test("primitive types in encoders using Java serialization") {
-    intercept[UnsupportedOperationException] { Encoders.javaSerialization[Int] }
-    intercept[UnsupportedOperationException] { Encoders.javaSerialization[Long] }
-    intercept[UnsupportedOperationException] { Encoders.javaSerialization[Char] }
+    intercept[SparkUnsupportedOperationException] { Encoders.javaSerialization[Int] }
+    intercept[SparkUnsupportedOperationException] { Encoders.javaSerialization[Long] }
+    intercept[SparkUnsupportedOperationException] { Encoders.javaSerialization[Char] }
   }
 
   test("nice error message for missing encoder") {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala
index 35d8327b9308..7c3857ecfc02 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala
@@ -584,10 +584,12 @@ class ExpressionEncoderSuite extends CodegenInterpretedPlanTest with AnalysisTes
   test("throw exception for tuples with more than 22 elements") {
     val encoders = (0 to 22).map(_ => Encoders.scalaInt.asInstanceOf[ExpressionEncoder[_]])
 
-    val e = intercept[UnsupportedOperationException] {
-      ExpressionEncoder.tuple(encoders)
-    }
-    assert(e.getMessage.contains("tuple with more than 22 elements are not supported"))
+    checkError(
+      exception = intercept[SparkUnsupportedOperationException] {
+        ExpressionEncoder.tuple(encoders)
+      },
+      errorClass = "_LEGACY_ERROR_TEMP_2150",
+      parameters = Map.empty)
   }
 
   test("throw exception for unexpected serializer") {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonGeneratorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonGeneratorSuite.scala
index 4b8693cf7fd5..dc9a5816a335 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonGeneratorSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonGeneratorSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.json
 
 import java.io.CharArrayWriter
 
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkFunSuite, SparkUnsupportedOperationException}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData}
 import org.apache.spark.sql.types._
@@ -122,7 +122,7 @@ class JacksonGeneratorSuite extends SparkFunSuite {
     val input = ArrayBasedMapData(Map("a" -> 1))
     val writer = new CharArrayWriter()
     val gen = new JacksonGenerator(dataType, writer, option)
-    intercept[UnsupportedOperationException] {
+    intercept[SparkUnsupportedOperationException] {
       gen.write(input)
     }
   }
@@ -132,7 +132,7 @@ class JacksonGeneratorSuite extends SparkFunSuite {
     val input = InternalRow(1)
     val writer = new CharArrayWriter()
     val gen = new JacksonGenerator(dataType, writer, option)
-    intercept[UnsupportedOperationException] {
+    intercept[SparkUnsupportedOperationException] {
       gen.write(input)
     }
   }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala
index e79fff7479b9..42756d91b39e 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala
@@ -22,7 +22,7 @@ import java.util.Collections
 
 import scala.jdk.CollectionConverters._
 
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkFunSuite, SparkUnsupportedOperationException}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchFunctionException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
@@ -659,7 +659,7 @@ class CatalogSuite extends SparkFunSuite {
 
   test("purgeTable") {
     val catalog = newCatalog()
-    intercept[UnsupportedOperationException](catalog.purgeTable(testIdent))
+    intercept[SparkUnsupportedOperationException](catalog.purgeTable(testIdent))
   }
 
   test("renameTable") {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.scala
index dfed284bc2b9..98c2a3d1e272 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.scala
@@ -21,20 +21,20 @@ import java.util
 
 import scala.jdk.CollectionConverters._
 
-import org.apache.spark.{SparkFunSuite, SparkIllegalArgumentException}
+import org.apache.spark.{SparkFunSuite, SparkIllegalArgumentException, SparkUnsupportedOperationException}
 
 class CaseInsensitiveStringMapSuite extends SparkFunSuite {
 
   test("put and get") {
     val options = CaseInsensitiveStringMap.empty()
-    intercept[UnsupportedOperationException] {
+    intercept[SparkUnsupportedOperationException] {
       options.put("kEy", "valUE")
     }
   }
 
   test("clear") {
     val options = new CaseInsensitiveStringMap(Map("kEy" -> "valUE").asJava)
-    intercept[UnsupportedOperationException] {
+    intercept[SparkUnsupportedOperationException] {
       options.clear()
     }
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 5779fe1c62ff..fe295b0cfa26 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -1883,21 +1883,24 @@ class DatasetSuite extends QueryTest
   }
 
   test("SPARK-19896: cannot have circular references in case class") {
-    val errMsg1 = intercept[UnsupportedOperationException] {
-      Seq(CircularReferenceClassA(null)).toDS()
-    }
-    assert(errMsg1.getMessage.startsWith("cannot have circular references in class, but got the " +
-      "circular reference of class"))
-    val errMsg2 = intercept[UnsupportedOperationException] {
-      Seq(CircularReferenceClassC(null)).toDS()
-    }
-    assert(errMsg2.getMessage.startsWith("cannot have circular references in class, but got the " +
-      "circular reference of class"))
-    val errMsg3 = intercept[UnsupportedOperationException] {
-      Seq(CircularReferenceClassD(null)).toDS()
-    }
-    assert(errMsg3.getMessage.startsWith("cannot have circular references in class, but got the " +
-      "circular reference of class"))
+    checkError(
+      exception = intercept[SparkUnsupportedOperationException] {
+        Seq(CircularReferenceClassA(null)).toDS()
+      },
+      errorClass = "_LEGACY_ERROR_TEMP_2139",
+      parameters = Map("t" -> "org.apache.spark.sql.CircularReferenceClassA"))
+    checkError(
+      exception = intercept[SparkUnsupportedOperationException] {
+        Seq(CircularReferenceClassC(null)).toDS()
+      },
+      errorClass = "_LEGACY_ERROR_TEMP_2139",
+      parameters = Map("t" -> "org.apache.spark.sql.CircularReferenceClassC"))
+    checkError(
+      exception = intercept[SparkUnsupportedOperationException] {
+        Seq(CircularReferenceClassD(null)).toDS()
+      },
+      errorClass = "_LEGACY_ERROR_TEMP_2139",
+      parameters = Map("t" -> "org.apache.spark.sql.CircularReferenceClassD"))
   }
 
   test("SPARK-20125: option of map") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala
index 66c6fbeabbf5..01b9fdec9be3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql
 
 import java.sql.{Date, Timestamp}
 
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkFunSuite, SparkUnsupportedOperationException}
 import org.apache.spark.sql.test.SharedSparkSession
 
 case class ReflectData(
@@ -159,18 +159,25 @@ class ScalaReflectionRelationSuite extends SparkFunSuite with SharedSparkSession
   }
 
   test("better error message when use java reserved keyword as field name") {
-    val e = intercept[UnsupportedOperationException] {
-      Seq(InvalidInJava(1)).toDS()
-    }
-    assert(e.getMessage.contains(
-      "`abstract` is not a valid identifier of Java and cannot be used as field name"))
+    checkError(
+      exception = intercept[SparkUnsupportedOperationException] {
+        Seq(InvalidInJava(1)).toDS()
+      },
+      errorClass = "_LEGACY_ERROR_TEMP_2140",
+      parameters = Map(
+        "fieldName" -> "abstract",
+        "walkedTypePath" -> "- root class: \"org.apache.spark.sql.InvalidInJava\""))
   }
 
   test("better error message when use invalid java identifier as field name") {
-    val e1 = intercept[UnsupportedOperationException] {
-      Seq(InvalidInJava2(1)).toDS()
-    }
-    assert(e1.getMessage.contains(
-      "`0` is not a valid identifier of Java and cannot be used as field name"))
+    checkError(
+      exception = intercept[SparkUnsupportedOperationException] {
+        Seq(InvalidInJava2(1)).toDS()
+      },
+      errorClass = "_LEGACY_ERROR_TEMP_2140",
+      parameters = Map(
+        "fieldName" -> "0",
+        "walkedTypePath" ->
+          "- root class: \"org.apache.spark.sql.ScalaReflectionRelationSuite.InvalidInJava2\""))
   }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2FunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2FunctionSuite.scala
index 141581e75884..6481a3f3a891 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2FunctionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2FunctionSuite.scala
@@ -322,7 +322,7 @@ class DataSourceV2FunctionSuite extends DatasourceV2SQLBase {
   test("scalar function: bad magic method") {
     catalog("testcat").asInstanceOf[SupportsNamespaces].createNamespace(Array("ns"), emptyProps)
     addFunction(Identifier.of(Array("ns"), "strlen"), StrLen(StrLenBadMagic))
-    intercept[UnsupportedOperationException](sql("SELECT testcat.ns.strlen('abc')").collect())
+    intercept[SparkUnsupportedOperationException](sql("SELECT testcat.ns.strlen('abc')").collect())
   }
 
   test("scalar function: bad magic method with default impl") {
@@ -334,7 +334,7 @@ class DataSourceV2FunctionSuite extends DatasourceV2SQLBase {
   test("scalar function: no implementation found") {
     catalog("testcat").asInstanceOf[SupportsNamespaces].createNamespace(Array("ns"), emptyProps)
     addFunction(Identifier.of(Array("ns"), "strlen"), StrLen(StrLenNoImpl))
-    intercept[UnsupportedOperationException](sql("SELECT testcat.ns.strlen('abc')").collect())
+    intercept[SparkUnsupportedOperationException](sql("SELECT testcat.ns.strlen('abc')").collect())
   }
 
   test("scalar function: invalid parameter type or length") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
index 12f2205cb1db..c5cf94f8747c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
@@ -27,7 +27,7 @@ import com.google.common.io.{ByteStreams, Closeables}
 import org.apache.hadoop.fs.{FileStatus, FileSystem, GlobFilter, Path}
 import org.mockito.Mockito.{mock, when}
 
-import org.apache.spark.SparkException
+import org.apache.spark.{SparkException, SparkUnsupportedOperationException}
 import org.apache.spark.sql.{DataFrame, QueryTest, Row}
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.execution.datasources.PartitionedFile
@@ -162,12 +162,14 @@ class BinaryFileFormatSuite extends QueryTest with SharedSparkSession {
   test("binary file data source do not support write operation") {
     val df = spark.read.format(BINARY_FILE).load(testDir)
     withTempDir { tmpDir =>
-      val thrown = intercept[UnsupportedOperationException] {
-        df.write
-          .format(BINARY_FILE)
-          .save(s"$tmpDir/test_save")
-      }
-      assert(thrown.getMessage.contains("Write is not supported for binary file data source"))
+      checkError(
+        exception = intercept[SparkUnsupportedOperationException] {
+          df.write
+            .format(BINARY_FILE)
+            .save(s"$tmpDir/test_save")
+        },
+        errorClass = "_LEGACY_ERROR_TEMP_2075",
+        parameters = Map.empty)
     }
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala
index e5473222d429..2195768e3b08 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala
@@ -26,6 +26,7 @@ import scala.jdk.CollectionConverters._
 import org.apache.hadoop.fs.Path
 import org.scalatest.BeforeAndAfter
 
+import org.apache.spark.SparkUnsupportedOperationException
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
@@ -1164,10 +1165,12 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite {
     catalog.createNamespace(testNs, emptyProps)
 
     CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES.foreach { p =>
-      val exc = intercept[UnsupportedOperationException] {
-        catalog.alterNamespace(testNs, NamespaceChange.removeProperty(p))
-      }
-      assert(exc.getMessage.contains(s"Cannot remove reserved property: $p"))
+      checkError(
+        exception = intercept[SparkUnsupportedOperationException] {
+          catalog.alterNamespace(testNs, NamespaceChange.removeProperty(p))
+        },
+        errorClass = "_LEGACY_ERROR_TEMP_2069",
+        parameters = Map("property" -> p))
 
     }
     catalog.dropNamespace(testNs, cascade = false)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala
index c7655af98160..42eb9fa17a21 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.streaming
 import java.io._
 import java.nio.charset.StandardCharsets._
 
+import org.apache.spark.SparkUnsupportedOperationException
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.test.SharedSparkSession
 
@@ -242,10 +243,12 @@ class CompactibleFileStreamLogSuite extends SharedSparkSession {
         compactibleLog.add(1, Array("some_path_1"))
         compactibleLog.add(2, Array("some_path_2"))
 
-        val exc = intercept[UnsupportedOperationException] {
-          compactibleLog.purge(2)
-        }
-        assert(exc.getMessage.contains("Cannot purge as it might break internal state"))
+        checkError(
+          exception = intercept[SparkUnsupportedOperationException] {
+            compactibleLog.purge(2)
+          },
+          errorClass = "_LEGACY_ERROR_TEMP_2260",
+          parameters = Map.empty)
 
         // Below line would fail with IllegalStateException if we don't prevent purge:
         // - purge(2) would delete batch 0 and 1 which batch 1 is compaction batch
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProviderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProviderSuite.scala
index 31fbf9323140..128b59b26b82 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProviderSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProviderSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.streaming.sources
 
 import org.scalatest.concurrent.PatienceConfiguration.Timeout
 
+import org.apache.spark.SparkUnsupportedOperationException
 import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.functions.spark_partition_id
 import org.apache.spark.sql.streaming.{StreamTest, Trigger}
@@ -193,14 +194,15 @@ class RatePerMicroBatchProviderSuite extends StreamTest {
   }
 
   test("user-specified schema given") {
-    val exception = intercept[UnsupportedOperationException] {
-      spark.readStream
-        .format("rate-micro-batch")
-        .option("rowsPerBatch", "10")
-        .schema(spark.range(1).schema)
-        .load()
-    }
-    assert(exception.getMessage.contains(
-      "RatePerMicroBatchProvider source does not support user-specified schema"))
+    checkError(
+      exception = intercept[SparkUnsupportedOperationException] {
+        spark.readStream
+          .format("rate-micro-batch")
+          .option("rowsPerBatch", "10")
+          .schema(spark.range(1).schema)
+          .load()
+      },
+      errorClass = "_LEGACY_ERROR_TEMP_2242",
+      parameters = Map("provider" -> "RatePerMicroBatchProvider"))
   }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
index 69dc8c291c0b..0732e126a013 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit
 import scala.collection.mutable.ArrayBuffer
 import scala.jdk.CollectionConverters._
 
-import org.apache.spark.{SparkException, SparkRuntimeException}
+import org.apache.spark.{SparkException, SparkRuntimeException, SparkUnsupportedOperationException}
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.connector.read.streaming.{Offset, SparkDataStream}
@@ -345,14 +345,15 @@ class RateStreamProviderSuite extends StreamTest {
   }
 
   test("user-specified schema given") {
-    val exception = intercept[UnsupportedOperationException] {
-      spark.readStream
-        .format("rate")
-        .schema(spark.range(1).schema)
-        .load()
-    }
-    assert(exception.getMessage.contains(
-      "RateStreamProvider source does not support user-specified schema"))
+    checkError(
+      exception = intercept[SparkUnsupportedOperationException] {
+        spark.readStream
+          .format("rate")
+          .schema(spark.range(1).schema)
+          .load()
+      },
+      errorClass = "_LEGACY_ERROR_TEMP_2242",
+      parameters = Map("provider" -> "RateStreamProvider"))
   }
 
   test("continuous data") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
index bfeca5851102..87e34601dc09 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit._
 
 import scala.jdk.CollectionConverters._
 
+import org.apache.spark.SparkUnsupportedOperationException
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.connector.read.streaming.{Offset, SparkDataStream}
@@ -193,11 +194,12 @@ class TextSocketStreamSuite extends StreamTest with SharedSparkSession {
       StructField("name", StringType) ::
       StructField("area", StringType) :: Nil)
     val params = Map("host" -> "localhost", "port" -> "1234")
-    val exception = intercept[UnsupportedOperationException] {
-      spark.readStream.schema(userSpecifiedSchema).format("socket").options(params).load()
-    }
-    assert(exception.getMessage.contains(
-      "TextSocketSourceProvider source does not support user-specified schema"))
+    checkError(
+      exception = intercept[SparkUnsupportedOperationException] {
+        spark.readStream.schema(userSpecifiedSchema).format("socket").options(params).load()
+      },
+      errorClass = "_LEGACY_ERROR_TEMP_2242",
+      parameters = Map("provider" -> "TextSocketSourceProvider"))
   }
 
   test("input row metrics") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/GroupStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/GroupStateSuite.scala
index d795e7a3d94c..050c1a2d7d97 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/GroupStateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/GroupStateSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.streaming
 
 import java.sql.Date
 
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkFunSuite, SparkUnsupportedOperationException}
 import org.apache.spark.api.java.Optional
 import org.apache.spark.sql.execution.streaming.GroupStateImpl
 import org.apache.spark.sql.execution.streaming.GroupStateImpl.NO_TIMESTAMP
@@ -115,12 +115,12 @@ class GroupStateSuite extends SparkFunSuite {
       )
       for (state <- states) {
         // for streaming queries
-        testTimeoutDurationNotAllowed[UnsupportedOperationException](state)
-        testTimeoutTimestampNotAllowed[UnsupportedOperationException](state)
+        testTimeoutDurationNotAllowed[SparkUnsupportedOperationException](state)
+        testTimeoutTimestampNotAllowed[SparkUnsupportedOperationException](state)
 
         // for batch queries
-        testTimeoutDurationNotAllowed[UnsupportedOperationException](state)
-        testTimeoutTimestampNotAllowed[UnsupportedOperationException](state)
+        testTimeoutDurationNotAllowed[SparkUnsupportedOperationException](state)
+        testTimeoutTimestampNotAllowed[SparkUnsupportedOperationException](state)
       }
     }
   }
@@ -135,7 +135,7 @@ class GroupStateSuite extends SparkFunSuite {
     assert(state.getTimeoutTimestampMs.get() === 2000)
     state.setTimeoutDuration(500)
     assert(state.getTimeoutTimestampMs.get() === 1500) // can be set without initializing state
-    testTimeoutTimestampNotAllowed[UnsupportedOperationException](state)
+    testTimeoutTimestampNotAllowed[SparkUnsupportedOperationException](state)
 
     state.update(5)
     assert(state.getTimeoutTimestampMs.isPresent())
@@ -144,37 +144,37 @@ class GroupStateSuite extends SparkFunSuite {
     assert(state.getTimeoutTimestampMs.get() === 2000)
     state.setTimeoutDuration("2 second")
     assert(state.getTimeoutTimestampMs.get() === 3000)
-    testTimeoutTimestampNotAllowed[UnsupportedOperationException](state)
+    testTimeoutTimestampNotAllowed[SparkUnsupportedOperationException](state)
 
     state.remove()
     assert(state.getTimeoutTimestampMs.isPresent())
     assert(state.getTimeoutTimestampMs.get() === 3000) // does not change
     state.setTimeoutDuration(500) // can still be set
     assert(state.getTimeoutTimestampMs.get() === 1500)
-    testTimeoutTimestampNotAllowed[UnsupportedOperationException](state)
+    testTimeoutTimestampNotAllowed[SparkUnsupportedOperationException](state)
 
     // for batch queries
     state = GroupStateImpl.createForBatch(
       ProcessingTimeTimeout, watermarkPresent = false).asInstanceOf[GroupStateImpl[Int]]
     assert(!state.getTimeoutTimestampMs.isPresent())
     state.setTimeoutDuration(500)
-    testTimeoutTimestampNotAllowed[UnsupportedOperationException](state)
+    testTimeoutTimestampNotAllowed[SparkUnsupportedOperationException](state)
 
     state.update(5)
     state.setTimeoutDuration(1000)
     state.setTimeoutDuration("2 second")
-    testTimeoutTimestampNotAllowed[UnsupportedOperationException](state)
+    testTimeoutTimestampNotAllowed[SparkUnsupportedOperationException](state)
 
     state.remove()
     state.setTimeoutDuration(500)
-    testTimeoutTimestampNotAllowed[UnsupportedOperationException](state)
+    testTimeoutTimestampNotAllowed[SparkUnsupportedOperationException](state)
   }
 
   test("GroupState - setTimeout - with EventTimeTimeout") {
     var state = TestGroupState.create[Int](
       Optional.empty[Int], EventTimeTimeout, 1000, Optional.of(1000), hasTimedOut = false)
     assert(!state.getTimeoutTimestampMs.isPresent())
-    testTimeoutDurationNotAllowed[UnsupportedOperationException](state)
+    testTimeoutDurationNotAllowed[SparkUnsupportedOperationException](state)
     state.setTimeoutTimestamp(5000)
     assert(state.getTimeoutTimestampMs.get() === 5000) // can be set without initializing state
 
@@ -184,29 +184,29 @@ class GroupStateSuite extends SparkFunSuite {
     assert(state.getTimeoutTimestampMs.get() === 10000)
     state.setTimeoutTimestamp(new Date(20000))
     assert(state.getTimeoutTimestampMs.get() === 20000)
-    testTimeoutDurationNotAllowed[UnsupportedOperationException](state)
+    testTimeoutDurationNotAllowed[SparkUnsupportedOperationException](state)
 
     state.remove()
     assert(state.getTimeoutTimestampMs.get() === 20000)
     state.setTimeoutTimestamp(5000)
     assert(state.getTimeoutTimestampMs.get() === 5000) // can be set after removing state
-    testTimeoutDurationNotAllowed[UnsupportedOperationException](state)
+    testTimeoutDurationNotAllowed[SparkUnsupportedOperationException](state)
 
     // for batch queries
     state = GroupStateImpl.createForBatch(
       EventTimeTimeout, watermarkPresent = false).asInstanceOf[GroupStateImpl[Int]]
     assert(!state.getTimeoutTimestampMs.isPresent())
-    testTimeoutDurationNotAllowed[UnsupportedOperationException](state)
+    testTimeoutDurationNotAllowed[SparkUnsupportedOperationException](state)
     state.setTimeoutTimestamp(5000)
 
     state.update(5)
     state.setTimeoutTimestamp(10000)
     state.setTimeoutTimestamp(new Date(20000))
-    testTimeoutDurationNotAllowed[UnsupportedOperationException](state)
+    testTimeoutDurationNotAllowed[SparkUnsupportedOperationException](state)
 
     state.remove()
     state.setTimeoutTimestamp(5000)
-    testTimeoutDurationNotAllowed[UnsupportedOperationException](state)
+    testTimeoutDurationNotAllowed[SparkUnsupportedOperationException](state)
   }
 
   test("GroupState - illegal params to setTimeout") {
@@ -297,20 +297,19 @@ class GroupStateSuite extends SparkFunSuite {
     assert(illegalArgument.getMessage.contains("batchProcessingTimeMs must be 0 or positive"))
 
     // hasTimedOut cannot be true if there's no timeout configured
-    var unsupportedOperation = intercept[UnsupportedOperationException] {
-      TestGroupState.create[Int](
-        Optional.of(5), NoTimeout, 100L, Optional.empty[Long], hasTimedOut = true)
-    }
-    assert(
-      unsupportedOperation
-        .getMessage.contains("hasTimedOut is true however there's no timeout configured"))
-    unsupportedOperation = intercept[UnsupportedOperationException] {
-      GroupStateImpl.createForStreaming[Int](
-        Some(5), 100L, NO_TIMESTAMP, NoTimeout, true, false)
-    }
-    assert(
-      unsupportedOperation
-        .getMessage.contains("hasTimedOut is true however there's no timeout configured"))
+    checkError(
+      exception = intercept[SparkUnsupportedOperationException] {
+        TestGroupState.create[Int](
+          Optional.of(5), NoTimeout, 100L, Optional.empty[Long], hasTimedOut = true)
+      },
+      errorClass = "_LEGACY_ERROR_TEMP_3168",
+      parameters = Map.empty)
+    checkError(
+      exception = intercept[SparkUnsupportedOperationException] {
+        GroupStateImpl.createForStreaming[Int](Some(5), 100L, NO_TIMESTAMP, NoTimeout, true, false)
+      },
+      errorClass = "_LEGACY_ERROR_TEMP_3168",
+      parameters = Map.empty)
   }
 
   test("GroupState - hasTimedOut") {
@@ -348,9 +347,10 @@ class GroupStateSuite extends SparkFunSuite {
     }
 
     def assertWrongTimeoutError(test: => Unit): Unit = {
-      val e = intercept[UnsupportedOperationException] { test }
-      assert(e.getMessage.contains(
-        "Cannot get event time watermark timestamp without setting watermark"))
+      checkError(
+        exception = intercept[SparkUnsupportedOperationException] { test },
+        errorClass = "_LEGACY_ERROR_TEMP_2204",
+        parameters = Map.empty)
     }
 
     for (timeoutConf <- Seq(NoTimeout, EventTimeTimeout, ProcessingTimeTimeout)) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org