You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2024/02/13 10:50:36 UTC
(spark) branch master updated: [SPARK-47028][SQL][TESTS] Check `SparkUnsupportedOperationException` instead of `UnsupportedOperationException`
This is an automated email from the ASF dual-hosted git repository.
maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 49bcde612c59 [SPARK-47028][SQL][TESTS] Check `SparkUnsupportedOperationException` instead of `UnsupportedOperationException`
49bcde612c59 is described below
commit 49bcde612c598fcf3c76cbd91a3dbf11d1b7f1b2
Author: Max Gekk <ma...@gmail.com>
AuthorDate: Tue Feb 13 13:50:18 2024 +0300
[SPARK-47028][SQL][TESTS] Check `SparkUnsupportedOperationException` instead of `UnsupportedOperationException`
### What changes were proposed in this pull request?
In the PR, I propose to use `checkError()` in tests of `sql` to check `SparkUnsupportedOperationException`, and its fields.
### Why are the changes needed?
By checking `SparkUnsupportedOperationException` and its fields like error class and message parameters prevents replacing `SparkUnsupportedOperationException` back to `UnsupportedOperationException`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
By running the modified test suites.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #45082 from MaxGekk/intercept-UnsupportedOperationException-tests.
Authored-by: Max Gekk <ma...@gmail.com>
Signed-off-by: Max Gekk <ma...@gmail.com>
---
.../test/scala/org/apache/spark/sql/RowTest.scala | 6 +-
.../spark/sql/catalyst/ScalaReflectionSuite.scala | 23 +++++---
.../encoders/EncoderErrorMessageSuite.scala | 12 ++--
.../catalyst/encoders/ExpressionEncoderSuite.scala | 10 ++--
.../sql/catalyst/json/JacksonGeneratorSuite.scala | 6 +-
.../spark/sql/connector/catalog/CatalogSuite.scala | 4 +-
.../sql/util/CaseInsensitiveStringMapSuite.scala | 6 +-
.../scala/org/apache/spark/sql/DatasetSuite.scala | 33 ++++++-----
.../spark/sql/ScalaReflectionRelationSuite.scala | 29 +++++----
.../sql/connector/DataSourceV2FunctionSuite.scala | 4 +-
.../binaryfile/BinaryFileFormatSuite.scala | 16 ++---
.../datasources/v2/V2SessionCatalogSuite.scala | 11 ++--
.../streaming/CompactibleFileStreamLogSuite.scala | 11 ++--
.../sources/RatePerMicroBatchProviderSuite.scala | 20 ++++---
.../sources/RateStreamProviderSuite.scala | 19 +++---
.../streaming/sources/TextSocketStreamSuite.scala | 12 ++--
.../spark/sql/streaming/GroupStateSuite.scala | 68 +++++++++++-----------
17 files changed, 160 insertions(+), 130 deletions(-)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/RowTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/RowTest.scala
index 840d80ffed13..985443773943 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/RowTest.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/RowTest.scala
@@ -24,7 +24,7 @@ import org.scalatest.funspec.AnyFunSpec
import org.scalatest.matchers.must.Matchers
import org.scalatest.matchers.should.Matchers._
-import org.apache.spark.SparkException
+import org.apache.spark.{SparkException, SparkUnsupportedOperationException}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{GenericRow, GenericRowWithSchema}
import org.apache.spark.sql.types._
@@ -45,10 +45,10 @@ class RowTest extends AnyFunSpec with Matchers {
describe("Row (without schema)") {
it("throws an exception when accessing by fieldName") {
- intercept[UnsupportedOperationException] {
+ intercept[SparkUnsupportedOperationException] {
noSchemaRow.fieldIndex("col1")
}
- intercept[UnsupportedOperationException] {
+ intercept[SparkUnsupportedOperationException] {
noSchemaRow.getAs("col1")
}
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
index bbb62acd0250..daa8d12613f2 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
@@ -22,7 +22,7 @@ import java.sql.{Date, Timestamp}
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.{typeTag, TypeTag}
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkFunSuite, SparkUnsupportedOperationException}
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.FooEnum.FooEnum
import org.apache.spark.sql.catalyst.analysis.UnresolvedExtractValue
@@ -490,17 +490,22 @@ class ScalaReflectionSuite extends SparkFunSuite {
}
test("SPARK-29026: schemaFor for trait without companion object throws exception ") {
- val e = intercept[UnsupportedOperationException] {
- schemaFor[TraitProductWithoutCompanion]
- }
- assert(e.getMessage.contains("Unable to find constructor"))
+ checkError(
+ exception = intercept[SparkUnsupportedOperationException] {
+ schemaFor[TraitProductWithoutCompanion]
+ },
+ errorClass = "_LEGACY_ERROR_TEMP_2144",
+ parameters = Map("tpe" -> "org.apache.spark.sql.catalyst.TraitProductWithoutCompanion"))
}
test("SPARK-29026: schemaFor for trait with no-constructor companion throws exception ") {
- val e = intercept[UnsupportedOperationException] {
- schemaFor[TraitProductWithNoConstructorCompanion]
- }
- assert(e.getMessage.contains("Unable to find constructor"))
+ checkError(
+ exception = intercept[SparkUnsupportedOperationException] {
+ schemaFor[TraitProductWithNoConstructorCompanion]
+ },
+ errorClass = "_LEGACY_ERROR_TEMP_2144",
+ parameters = Map("tpe" ->
+ "org.apache.spark.sql.catalyst.TraitProductWithNoConstructorCompanion"))
}
test("SPARK-27625: annotated data types") {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/EncoderErrorMessageSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/EncoderErrorMessageSuite.scala
index 2d61f9fbc071..e852b474aa18 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/EncoderErrorMessageSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/EncoderErrorMessageSuite.scala
@@ -40,15 +40,15 @@ class EncoderErrorMessageSuite extends SparkFunSuite {
// That is done in Java because Scala cannot create truly private classes.
test("primitive types in encoders using Kryo serialization") {
- intercept[UnsupportedOperationException] { Encoders.kryo[Int] }
- intercept[UnsupportedOperationException] { Encoders.kryo[Long] }
- intercept[UnsupportedOperationException] { Encoders.kryo[Char] }
+ intercept[SparkUnsupportedOperationException] { Encoders.kryo[Int] }
+ intercept[SparkUnsupportedOperationException] { Encoders.kryo[Long] }
+ intercept[SparkUnsupportedOperationException] { Encoders.kryo[Char] }
}
test("primitive types in encoders using Java serialization") {
- intercept[UnsupportedOperationException] { Encoders.javaSerialization[Int] }
- intercept[UnsupportedOperationException] { Encoders.javaSerialization[Long] }
- intercept[UnsupportedOperationException] { Encoders.javaSerialization[Char] }
+ intercept[SparkUnsupportedOperationException] { Encoders.javaSerialization[Int] }
+ intercept[SparkUnsupportedOperationException] { Encoders.javaSerialization[Long] }
+ intercept[SparkUnsupportedOperationException] { Encoders.javaSerialization[Char] }
}
test("nice error message for missing encoder") {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala
index 35d8327b9308..7c3857ecfc02 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala
@@ -584,10 +584,12 @@ class ExpressionEncoderSuite extends CodegenInterpretedPlanTest with AnalysisTes
test("throw exception for tuples with more than 22 elements") {
val encoders = (0 to 22).map(_ => Encoders.scalaInt.asInstanceOf[ExpressionEncoder[_]])
- val e = intercept[UnsupportedOperationException] {
- ExpressionEncoder.tuple(encoders)
- }
- assert(e.getMessage.contains("tuple with more than 22 elements are not supported"))
+ checkError(
+ exception = intercept[SparkUnsupportedOperationException] {
+ ExpressionEncoder.tuple(encoders)
+ },
+ errorClass = "_LEGACY_ERROR_TEMP_2150",
+ parameters = Map.empty)
}
test("throw exception for unexpected serializer") {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonGeneratorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonGeneratorSuite.scala
index 4b8693cf7fd5..dc9a5816a335 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonGeneratorSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonGeneratorSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.json
import java.io.CharArrayWriter
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkFunSuite, SparkUnsupportedOperationException}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData}
import org.apache.spark.sql.types._
@@ -122,7 +122,7 @@ class JacksonGeneratorSuite extends SparkFunSuite {
val input = ArrayBasedMapData(Map("a" -> 1))
val writer = new CharArrayWriter()
val gen = new JacksonGenerator(dataType, writer, option)
- intercept[UnsupportedOperationException] {
+ intercept[SparkUnsupportedOperationException] {
gen.write(input)
}
}
@@ -132,7 +132,7 @@ class JacksonGeneratorSuite extends SparkFunSuite {
val input = InternalRow(1)
val writer = new CharArrayWriter()
val gen = new JacksonGenerator(dataType, writer, option)
- intercept[UnsupportedOperationException] {
+ intercept[SparkUnsupportedOperationException] {
gen.write(input)
}
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala
index e79fff7479b9..42756d91b39e 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala
@@ -22,7 +22,7 @@ import java.util.Collections
import scala.jdk.CollectionConverters._
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkFunSuite, SparkUnsupportedOperationException}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchFunctionException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
@@ -659,7 +659,7 @@ class CatalogSuite extends SparkFunSuite {
test("purgeTable") {
val catalog = newCatalog()
- intercept[UnsupportedOperationException](catalog.purgeTable(testIdent))
+ intercept[SparkUnsupportedOperationException](catalog.purgeTable(testIdent))
}
test("renameTable") {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.scala
index dfed284bc2b9..98c2a3d1e272 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.scala
@@ -21,20 +21,20 @@ import java.util
import scala.jdk.CollectionConverters._
-import org.apache.spark.{SparkFunSuite, SparkIllegalArgumentException}
+import org.apache.spark.{SparkFunSuite, SparkIllegalArgumentException, SparkUnsupportedOperationException}
class CaseInsensitiveStringMapSuite extends SparkFunSuite {
test("put and get") {
val options = CaseInsensitiveStringMap.empty()
- intercept[UnsupportedOperationException] {
+ intercept[SparkUnsupportedOperationException] {
options.put("kEy", "valUE")
}
}
test("clear") {
val options = new CaseInsensitiveStringMap(Map("kEy" -> "valUE").asJava)
- intercept[UnsupportedOperationException] {
+ intercept[SparkUnsupportedOperationException] {
options.clear()
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 5779fe1c62ff..fe295b0cfa26 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -1883,21 +1883,24 @@ class DatasetSuite extends QueryTest
}
test("SPARK-19896: cannot have circular references in case class") {
- val errMsg1 = intercept[UnsupportedOperationException] {
- Seq(CircularReferenceClassA(null)).toDS()
- }
- assert(errMsg1.getMessage.startsWith("cannot have circular references in class, but got the " +
- "circular reference of class"))
- val errMsg2 = intercept[UnsupportedOperationException] {
- Seq(CircularReferenceClassC(null)).toDS()
- }
- assert(errMsg2.getMessage.startsWith("cannot have circular references in class, but got the " +
- "circular reference of class"))
- val errMsg3 = intercept[UnsupportedOperationException] {
- Seq(CircularReferenceClassD(null)).toDS()
- }
- assert(errMsg3.getMessage.startsWith("cannot have circular references in class, but got the " +
- "circular reference of class"))
+ checkError(
+ exception = intercept[SparkUnsupportedOperationException] {
+ Seq(CircularReferenceClassA(null)).toDS()
+ },
+ errorClass = "_LEGACY_ERROR_TEMP_2139",
+ parameters = Map("t" -> "org.apache.spark.sql.CircularReferenceClassA"))
+ checkError(
+ exception = intercept[SparkUnsupportedOperationException] {
+ Seq(CircularReferenceClassC(null)).toDS()
+ },
+ errorClass = "_LEGACY_ERROR_TEMP_2139",
+ parameters = Map("t" -> "org.apache.spark.sql.CircularReferenceClassC"))
+ checkError(
+ exception = intercept[SparkUnsupportedOperationException] {
+ Seq(CircularReferenceClassD(null)).toDS()
+ },
+ errorClass = "_LEGACY_ERROR_TEMP_2139",
+ parameters = Map("t" -> "org.apache.spark.sql.CircularReferenceClassD"))
}
test("SPARK-20125: option of map") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala
index 66c6fbeabbf5..01b9fdec9be3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ScalaReflectionRelationSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql
import java.sql.{Date, Timestamp}
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkFunSuite, SparkUnsupportedOperationException}
import org.apache.spark.sql.test.SharedSparkSession
case class ReflectData(
@@ -159,18 +159,25 @@ class ScalaReflectionRelationSuite extends SparkFunSuite with SharedSparkSession
}
test("better error message when use java reserved keyword as field name") {
- val e = intercept[UnsupportedOperationException] {
- Seq(InvalidInJava(1)).toDS()
- }
- assert(e.getMessage.contains(
- "`abstract` is not a valid identifier of Java and cannot be used as field name"))
+ checkError(
+ exception = intercept[SparkUnsupportedOperationException] {
+ Seq(InvalidInJava(1)).toDS()
+ },
+ errorClass = "_LEGACY_ERROR_TEMP_2140",
+ parameters = Map(
+ "fieldName" -> "abstract",
+ "walkedTypePath" -> "- root class: \"org.apache.spark.sql.InvalidInJava\""))
}
test("better error message when use invalid java identifier as field name") {
- val e1 = intercept[UnsupportedOperationException] {
- Seq(InvalidInJava2(1)).toDS()
- }
- assert(e1.getMessage.contains(
- "`0` is not a valid identifier of Java and cannot be used as field name"))
+ checkError(
+ exception = intercept[SparkUnsupportedOperationException] {
+ Seq(InvalidInJava2(1)).toDS()
+ },
+ errorClass = "_LEGACY_ERROR_TEMP_2140",
+ parameters = Map(
+ "fieldName" -> "0",
+ "walkedTypePath" ->
+ "- root class: \"org.apache.spark.sql.ScalaReflectionRelationSuite.InvalidInJava2\""))
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2FunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2FunctionSuite.scala
index 141581e75884..6481a3f3a891 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2FunctionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2FunctionSuite.scala
@@ -322,7 +322,7 @@ class DataSourceV2FunctionSuite extends DatasourceV2SQLBase {
test("scalar function: bad magic method") {
catalog("testcat").asInstanceOf[SupportsNamespaces].createNamespace(Array("ns"), emptyProps)
addFunction(Identifier.of(Array("ns"), "strlen"), StrLen(StrLenBadMagic))
- intercept[UnsupportedOperationException](sql("SELECT testcat.ns.strlen('abc')").collect())
+ intercept[SparkUnsupportedOperationException](sql("SELECT testcat.ns.strlen('abc')").collect())
}
test("scalar function: bad magic method with default impl") {
@@ -334,7 +334,7 @@ class DataSourceV2FunctionSuite extends DatasourceV2SQLBase {
test("scalar function: no implementation found") {
catalog("testcat").asInstanceOf[SupportsNamespaces].createNamespace(Array("ns"), emptyProps)
addFunction(Identifier.of(Array("ns"), "strlen"), StrLen(StrLenNoImpl))
- intercept[UnsupportedOperationException](sql("SELECT testcat.ns.strlen('abc')").collect())
+ intercept[SparkUnsupportedOperationException](sql("SELECT testcat.ns.strlen('abc')").collect())
}
test("scalar function: invalid parameter type or length") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
index 12f2205cb1db..c5cf94f8747c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
@@ -27,7 +27,7 @@ import com.google.common.io.{ByteStreams, Closeables}
import org.apache.hadoop.fs.{FileStatus, FileSystem, GlobFilter, Path}
import org.mockito.Mockito.{mock, when}
-import org.apache.spark.SparkException
+import org.apache.spark.{SparkException, SparkUnsupportedOperationException}
import org.apache.spark.sql.{DataFrame, QueryTest, Row}
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.execution.datasources.PartitionedFile
@@ -162,12 +162,14 @@ class BinaryFileFormatSuite extends QueryTest with SharedSparkSession {
test("binary file data source do not support write operation") {
val df = spark.read.format(BINARY_FILE).load(testDir)
withTempDir { tmpDir =>
- val thrown = intercept[UnsupportedOperationException] {
- df.write
- .format(BINARY_FILE)
- .save(s"$tmpDir/test_save")
- }
- assert(thrown.getMessage.contains("Write is not supported for binary file data source"))
+ checkError(
+ exception = intercept[SparkUnsupportedOperationException] {
+ df.write
+ .format(BINARY_FILE)
+ .save(s"$tmpDir/test_save")
+ },
+ errorClass = "_LEGACY_ERROR_TEMP_2075",
+ parameters = Map.empty)
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala
index e5473222d429..2195768e3b08 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala
@@ -26,6 +26,7 @@ import scala.jdk.CollectionConverters._
import org.apache.hadoop.fs.Path
import org.scalatest.BeforeAndAfter
+import org.apache.spark.SparkUnsupportedOperationException
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
@@ -1164,10 +1165,12 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite {
catalog.createNamespace(testNs, emptyProps)
CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES.foreach { p =>
- val exc = intercept[UnsupportedOperationException] {
- catalog.alterNamespace(testNs, NamespaceChange.removeProperty(p))
- }
- assert(exc.getMessage.contains(s"Cannot remove reserved property: $p"))
+ checkError(
+ exception = intercept[SparkUnsupportedOperationException] {
+ catalog.alterNamespace(testNs, NamespaceChange.removeProperty(p))
+ },
+ errorClass = "_LEGACY_ERROR_TEMP_2069",
+ parameters = Map("property" -> p))
}
catalog.dropNamespace(testNs, cascade = false)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala
index c7655af98160..42eb9fa17a21 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.streaming
import java.io._
import java.nio.charset.StandardCharsets._
+import org.apache.spark.SparkUnsupportedOperationException
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.test.SharedSparkSession
@@ -242,10 +243,12 @@ class CompactibleFileStreamLogSuite extends SharedSparkSession {
compactibleLog.add(1, Array("some_path_1"))
compactibleLog.add(2, Array("some_path_2"))
- val exc = intercept[UnsupportedOperationException] {
- compactibleLog.purge(2)
- }
- assert(exc.getMessage.contains("Cannot purge as it might break internal state"))
+ checkError(
+ exception = intercept[SparkUnsupportedOperationException] {
+ compactibleLog.purge(2)
+ },
+ errorClass = "_LEGACY_ERROR_TEMP_2260",
+ parameters = Map.empty)
// Below line would fail with IllegalStateException if we don't prevent purge:
// - purge(2) would delete batch 0 and 1 which batch 1 is compaction batch
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProviderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProviderSuite.scala
index 31fbf9323140..128b59b26b82 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProviderSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProviderSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.streaming.sources
import org.scalatest.concurrent.PatienceConfiguration.Timeout
+import org.apache.spark.SparkUnsupportedOperationException
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.functions.spark_partition_id
import org.apache.spark.sql.streaming.{StreamTest, Trigger}
@@ -193,14 +194,15 @@ class RatePerMicroBatchProviderSuite extends StreamTest {
}
test("user-specified schema given") {
- val exception = intercept[UnsupportedOperationException] {
- spark.readStream
- .format("rate-micro-batch")
- .option("rowsPerBatch", "10")
- .schema(spark.range(1).schema)
- .load()
- }
- assert(exception.getMessage.contains(
- "RatePerMicroBatchProvider source does not support user-specified schema"))
+ checkError(
+ exception = intercept[SparkUnsupportedOperationException] {
+ spark.readStream
+ .format("rate-micro-batch")
+ .option("rowsPerBatch", "10")
+ .schema(spark.range(1).schema)
+ .load()
+ },
+ errorClass = "_LEGACY_ERROR_TEMP_2242",
+ parameters = Map("provider" -> "RatePerMicroBatchProvider"))
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
index 69dc8c291c0b..0732e126a013 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit
import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters._
-import org.apache.spark.{SparkException, SparkRuntimeException}
+import org.apache.spark.{SparkException, SparkRuntimeException, SparkUnsupportedOperationException}
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.connector.read.streaming.{Offset, SparkDataStream}
@@ -345,14 +345,15 @@ class RateStreamProviderSuite extends StreamTest {
}
test("user-specified schema given") {
- val exception = intercept[UnsupportedOperationException] {
- spark.readStream
- .format("rate")
- .schema(spark.range(1).schema)
- .load()
- }
- assert(exception.getMessage.contains(
- "RateStreamProvider source does not support user-specified schema"))
+ checkError(
+ exception = intercept[SparkUnsupportedOperationException] {
+ spark.readStream
+ .format("rate")
+ .schema(spark.range(1).schema)
+ .load()
+ },
+ errorClass = "_LEGACY_ERROR_TEMP_2242",
+ parameters = Map("provider" -> "RateStreamProvider"))
}
test("continuous data") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
index bfeca5851102..87e34601dc09 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit._
import scala.jdk.CollectionConverters._
+import org.apache.spark.SparkUnsupportedOperationException
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.connector.read.streaming.{Offset, SparkDataStream}
@@ -193,11 +194,12 @@ class TextSocketStreamSuite extends StreamTest with SharedSparkSession {
StructField("name", StringType) ::
StructField("area", StringType) :: Nil)
val params = Map("host" -> "localhost", "port" -> "1234")
- val exception = intercept[UnsupportedOperationException] {
- spark.readStream.schema(userSpecifiedSchema).format("socket").options(params).load()
- }
- assert(exception.getMessage.contains(
- "TextSocketSourceProvider source does not support user-specified schema"))
+ checkError(
+ exception = intercept[SparkUnsupportedOperationException] {
+ spark.readStream.schema(userSpecifiedSchema).format("socket").options(params).load()
+ },
+ errorClass = "_LEGACY_ERROR_TEMP_2242",
+ parameters = Map("provider" -> "TextSocketSourceProvider"))
}
test("input row metrics") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/GroupStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/GroupStateSuite.scala
index d795e7a3d94c..050c1a2d7d97 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/GroupStateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/GroupStateSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.streaming
import java.sql.Date
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkFunSuite, SparkUnsupportedOperationException}
import org.apache.spark.api.java.Optional
import org.apache.spark.sql.execution.streaming.GroupStateImpl
import org.apache.spark.sql.execution.streaming.GroupStateImpl.NO_TIMESTAMP
@@ -115,12 +115,12 @@ class GroupStateSuite extends SparkFunSuite {
)
for (state <- states) {
// for streaming queries
- testTimeoutDurationNotAllowed[UnsupportedOperationException](state)
- testTimeoutTimestampNotAllowed[UnsupportedOperationException](state)
+ testTimeoutDurationNotAllowed[SparkUnsupportedOperationException](state)
+ testTimeoutTimestampNotAllowed[SparkUnsupportedOperationException](state)
// for batch queries
- testTimeoutDurationNotAllowed[UnsupportedOperationException](state)
- testTimeoutTimestampNotAllowed[UnsupportedOperationException](state)
+ testTimeoutDurationNotAllowed[SparkUnsupportedOperationException](state)
+ testTimeoutTimestampNotAllowed[SparkUnsupportedOperationException](state)
}
}
}
@@ -135,7 +135,7 @@ class GroupStateSuite extends SparkFunSuite {
assert(state.getTimeoutTimestampMs.get() === 2000)
state.setTimeoutDuration(500)
assert(state.getTimeoutTimestampMs.get() === 1500) // can be set without initializing state
- testTimeoutTimestampNotAllowed[UnsupportedOperationException](state)
+ testTimeoutTimestampNotAllowed[SparkUnsupportedOperationException](state)
state.update(5)
assert(state.getTimeoutTimestampMs.isPresent())
@@ -144,37 +144,37 @@ class GroupStateSuite extends SparkFunSuite {
assert(state.getTimeoutTimestampMs.get() === 2000)
state.setTimeoutDuration("2 second")
assert(state.getTimeoutTimestampMs.get() === 3000)
- testTimeoutTimestampNotAllowed[UnsupportedOperationException](state)
+ testTimeoutTimestampNotAllowed[SparkUnsupportedOperationException](state)
state.remove()
assert(state.getTimeoutTimestampMs.isPresent())
assert(state.getTimeoutTimestampMs.get() === 3000) // does not change
state.setTimeoutDuration(500) // can still be set
assert(state.getTimeoutTimestampMs.get() === 1500)
- testTimeoutTimestampNotAllowed[UnsupportedOperationException](state)
+ testTimeoutTimestampNotAllowed[SparkUnsupportedOperationException](state)
// for batch queries
state = GroupStateImpl.createForBatch(
ProcessingTimeTimeout, watermarkPresent = false).asInstanceOf[GroupStateImpl[Int]]
assert(!state.getTimeoutTimestampMs.isPresent())
state.setTimeoutDuration(500)
- testTimeoutTimestampNotAllowed[UnsupportedOperationException](state)
+ testTimeoutTimestampNotAllowed[SparkUnsupportedOperationException](state)
state.update(5)
state.setTimeoutDuration(1000)
state.setTimeoutDuration("2 second")
- testTimeoutTimestampNotAllowed[UnsupportedOperationException](state)
+ testTimeoutTimestampNotAllowed[SparkUnsupportedOperationException](state)
state.remove()
state.setTimeoutDuration(500)
- testTimeoutTimestampNotAllowed[UnsupportedOperationException](state)
+ testTimeoutTimestampNotAllowed[SparkUnsupportedOperationException](state)
}
test("GroupState - setTimeout - with EventTimeTimeout") {
var state = TestGroupState.create[Int](
Optional.empty[Int], EventTimeTimeout, 1000, Optional.of(1000), hasTimedOut = false)
assert(!state.getTimeoutTimestampMs.isPresent())
- testTimeoutDurationNotAllowed[UnsupportedOperationException](state)
+ testTimeoutDurationNotAllowed[SparkUnsupportedOperationException](state)
state.setTimeoutTimestamp(5000)
assert(state.getTimeoutTimestampMs.get() === 5000) // can be set without initializing state
@@ -184,29 +184,29 @@ class GroupStateSuite extends SparkFunSuite {
assert(state.getTimeoutTimestampMs.get() === 10000)
state.setTimeoutTimestamp(new Date(20000))
assert(state.getTimeoutTimestampMs.get() === 20000)
- testTimeoutDurationNotAllowed[UnsupportedOperationException](state)
+ testTimeoutDurationNotAllowed[SparkUnsupportedOperationException](state)
state.remove()
assert(state.getTimeoutTimestampMs.get() === 20000)
state.setTimeoutTimestamp(5000)
assert(state.getTimeoutTimestampMs.get() === 5000) // can be set after removing state
- testTimeoutDurationNotAllowed[UnsupportedOperationException](state)
+ testTimeoutDurationNotAllowed[SparkUnsupportedOperationException](state)
// for batch queries
state = GroupStateImpl.createForBatch(
EventTimeTimeout, watermarkPresent = false).asInstanceOf[GroupStateImpl[Int]]
assert(!state.getTimeoutTimestampMs.isPresent())
- testTimeoutDurationNotAllowed[UnsupportedOperationException](state)
+ testTimeoutDurationNotAllowed[SparkUnsupportedOperationException](state)
state.setTimeoutTimestamp(5000)
state.update(5)
state.setTimeoutTimestamp(10000)
state.setTimeoutTimestamp(new Date(20000))
- testTimeoutDurationNotAllowed[UnsupportedOperationException](state)
+ testTimeoutDurationNotAllowed[SparkUnsupportedOperationException](state)
state.remove()
state.setTimeoutTimestamp(5000)
- testTimeoutDurationNotAllowed[UnsupportedOperationException](state)
+ testTimeoutDurationNotAllowed[SparkUnsupportedOperationException](state)
}
test("GroupState - illegal params to setTimeout") {
@@ -297,20 +297,19 @@ class GroupStateSuite extends SparkFunSuite {
assert(illegalArgument.getMessage.contains("batchProcessingTimeMs must be 0 or positive"))
// hasTimedOut cannot be true if there's no timeout configured
- var unsupportedOperation = intercept[UnsupportedOperationException] {
- TestGroupState.create[Int](
- Optional.of(5), NoTimeout, 100L, Optional.empty[Long], hasTimedOut = true)
- }
- assert(
- unsupportedOperation
- .getMessage.contains("hasTimedOut is true however there's no timeout configured"))
- unsupportedOperation = intercept[UnsupportedOperationException] {
- GroupStateImpl.createForStreaming[Int](
- Some(5), 100L, NO_TIMESTAMP, NoTimeout, true, false)
- }
- assert(
- unsupportedOperation
- .getMessage.contains("hasTimedOut is true however there's no timeout configured"))
+ checkError(
+ exception = intercept[SparkUnsupportedOperationException] {
+ TestGroupState.create[Int](
+ Optional.of(5), NoTimeout, 100L, Optional.empty[Long], hasTimedOut = true)
+ },
+ errorClass = "_LEGACY_ERROR_TEMP_3168",
+ parameters = Map.empty)
+ checkError(
+ exception = intercept[SparkUnsupportedOperationException] {
+ GroupStateImpl.createForStreaming[Int](Some(5), 100L, NO_TIMESTAMP, NoTimeout, true, false)
+ },
+ errorClass = "_LEGACY_ERROR_TEMP_3168",
+ parameters = Map.empty)
}
test("GroupState - hasTimedOut") {
@@ -348,9 +347,10 @@ class GroupStateSuite extends SparkFunSuite {
}
def assertWrongTimeoutError(test: => Unit): Unit = {
- val e = intercept[UnsupportedOperationException] { test }
- assert(e.getMessage.contains(
- "Cannot get event time watermark timestamp without setting watermark"))
+ checkError(
+ exception = intercept[SparkUnsupportedOperationException] { test },
+ errorClass = "_LEGACY_ERROR_TEMP_2204",
+ parameters = Map.empty)
}
for (timeoutConf <- Seq(NoTimeout, EventTimeTimeout, ProcessingTimeTimeout)) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org