You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "MaxGekk (via GitHub)" <gi...@apache.org> on 2023/07/12 09:38:23 UTC
[GitHub] [spark] MaxGekk commented on a diff in pull request #41705: [SPARK-44252][SS] Define a new error class and apply for the case where loading state from DFS fails
MaxGekk commented on code in PR #41705:
URL: https://github.com/apache/spark/pull/41705#discussion_r1260886244
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala:
##########
@@ -2461,26 +2461,34 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase {
s"but it's ${endSeconds.toString} now.")
}
- def failedToReadDeltaFileError(fileToRead: Path, clazz: String, keySize: Int): Throwable = {
+ def failedToReadDeltaFileError(fileToRead: Path, clazz: String, message: String): Throwable = {
new SparkException(
- errorClass = "_LEGACY_ERROR_TEMP_2258",
+ errorClass = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_DELTA_FILE",
messageParameters = Map(
"fileToRead" -> fileToRead.toString(),
"clazz" -> clazz,
- "keySize" -> keySize.toString()),
+ "message" -> message),
cause = null)
}
def failedToReadSnapshotFileError(fileToRead: Path, clazz: String, message: String): Throwable = {
new SparkException(
- errorClass = "_LEGACY_ERROR_TEMP_2259",
+ errorClass = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_SNAPSHOT_FILE",
messageParameters = Map(
"fileToRead" -> fileToRead.toString(),
"clazz" -> clazz,
"message" -> message),
cause = null)
}
+ def failedToReadStreamingStateFileError(fileToRead: Path, f: Throwable): Throwable = {
+ new SparkException(
+ errorClass = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_STREAMING_STATE_FILE",
+ messageParameters = Map(
+ "fileToRead" -> fileToRead.toString()),
Review Comment:
nit:
```suggestion
messageParameters = Map("fileToRead" -> fileToRead.toString()),
```
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala:
##########
@@ -2461,26 +2461,34 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase {
s"but it's ${endSeconds.toString} now.")
}
- def failedToReadDeltaFileError(fileToRead: Path, clazz: String, keySize: Int): Throwable = {
+ def failedToReadDeltaFileError(fileToRead: Path, clazz: String, message: String): Throwable = {
new SparkException(
- errorClass = "_LEGACY_ERROR_TEMP_2258",
+ errorClass = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_DELTA_FILE",
messageParameters = Map(
"fileToRead" -> fileToRead.toString(),
"clazz" -> clazz,
- "keySize" -> keySize.toString()),
+ "message" -> message),
Review Comment:
Please, put error message text to error-classes.json. As I can see `failedToReadDeltaFileError` is invoke twice with `message`:
1.
```scala
throw QueryExecutionErrors.failedToReadDeltaFileError(
fileToRead, toString(), s"$fileToRead does not exist"
)
```
2.
```scala
throw QueryExecutionErrors.failedToReadDeltaFileError(
fileToRead, toString(), s"key size cannot be ${keySize}"
)
```
##########
common/utils/src/main/resources/error/error-classes.json:
##########
@@ -143,6 +143,50 @@
"Could not load Protobuf class with name <protobufClassName>. <explanation>."
]
},
+ "CANNOT_LOAD_STATE_STORE" : {
+ "message" : [
+ "An error occurred during loading state."
+ ],
+ "subClass" : {
+ "CANNOT_READ_CHECKPOINT" : {
+ "message" : [
+ "Cannot read RocksDB checkpoint metadata of version <versionLine>"
Review Comment:
Can you provide some hits to users like what we expect. From the source code:
```scala
if (versionLine != s"v$VERSION") {
throw QueryExecutionErrors.cannotReadCheckpoint(versionLine)
}
```
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -123,12 +124,33 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared
}
test("RocksDB: load version that doesn't exist") {
+ val provider = new RocksDBStateStoreProvider()
+ var ex = intercept[SparkException] {
+ provider.getStore(-1)
+ }
+ checkError(
+ ex,
+ errorClass = "CANNOT_LOAD_STATE_STORE.UNCATEGORIZED",
+ parameters = Map.empty
+ )
+ ex = intercept[SparkException] {
+ provider.getReadStore(-1)
+ }
+ checkError(
+ ex,
+ errorClass = "CANNOT_LOAD_STATE_STORE.UNCATEGORIZED",
+ parameters = Map.empty
+ )
+
val remoteDir = Utils.createTempDir().toString
new File(remoteDir).delete() // to make sure that the directory gets created
withDB(remoteDir) { db =>
- intercept[IllegalStateException] {
+ ex = intercept[SparkException] {
db.load(1)
}
+ assert(ex.isInstanceOf[SparkException])
+ assert(ex.getMessage.contains("Error reading streaming state file") &&
+ ex.getCause.getMessage.contains("does not exist"))
Review Comment:
Use `checkError`
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala:
##########
@@ -876,12 +889,17 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider]
assert(getLatestData(provider) === Set(("b", 0) -> 2))
// Trying to get newer versions should fail
- intercept[Exception] {
+ var e = intercept[SparkException] {
provider.getStore(2)
}
- intercept[Exception] {
+ assert(e.getCause.isInstanceOf[SparkException])
+ assert(e.getCause.getMessage.contains("does not exist"))
Review Comment:
Use `checkError`
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala:
##########
@@ -1178,10 +1201,13 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider]
}
assert(!StateStore.isLoaded(storeId)) // version -1 should not attempt to load the store
- intercept[IllegalStateException] {
+ val e = intercept[SparkException] {
StateStore.get(
storeId, keySchema, valueSchema, 0, 1, storeConf, hadoopConf)
}
+ assert(e.getCause.isInstanceOf[SparkException])
+ assert(e.getCause.getMessage.contains("Error reading delta file") &&
+ e.getCause.getMessage.contains("does not exist"))
Review Comment:
Use `checkError`
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala:
##########
@@ -2855,6 +2863,50 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase {
"enumString" -> enumString))
}
+ def unreleasedThreadError(loggingId: String, newAcquiredThreadInfo: String,
+ AcquiredThreadInfo: String, timeWaitedMs: Long,
+ stackTraceOutput: String): Throwable = {
Review Comment:
Fix indentations, see https://github.com/databricks/scala-style-guide#indent
```suggestion
def unreleasedThreadError(
loggingId: String,
newAcquiredThreadInfo: String,
AcquiredThreadInfo: String,
timeWaitedMs: Long,
stackTraceOutput: String): Throwable = {
```
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -720,9 +742,12 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared
// Another thread should not be able to load while current thread is using it
db.load(2)
- intercept[IllegalStateException] {
+ ex = intercept[SparkException] {
ThreadUtils.runInNewThread("concurrent-test-thread-2") { db.load(2) }
}
+ // Assert that the error message contains the stack trace
+ assert(ex.getMessage.contains("Thread holding the lock has trace:"))
+ assert(ex.getMessage.contains("runInNewThread"))
Review Comment:
Use `checkError`
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala:
##########
@@ -2855,6 +2863,50 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase {
"enumString" -> enumString))
}
+ def unreleasedThreadError(loggingId: String, newAcquiredThreadInfo: String,
+ AcquiredThreadInfo: String, timeWaitedMs: Long,
+ stackTraceOutput: String): Throwable = {
+ new SparkException (
+ errorClass = "CANNOT_LOAD_STATE_STORE.UNRELEASED_THREAD_ERROR",
+ messageParameters = Map(
+ "loggingId" -> loggingId,
+ "newAcquiredThreadInfo" -> newAcquiredThreadInfo,
+ "acquiredThreadInfo" -> AcquiredThreadInfo,
+ "timeWaitedMs" -> timeWaitedMs.toString,
+ "stackTraceOutput" -> stackTraceOutput
+ ),
+ cause = null)
+ }
+
+ def cannotReadCheckpoint(versionLine: String): Throwable = {
+ new SparkException (
+ errorClass = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_CHECKPOINT",
+ messageParameters = Map(
+ "versionLine" -> versionLine
+ ),
Review Comment:
```suggestion
messageParameters = Map("versionLine" -> versionLine),
```
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala:
##########
@@ -2855,6 +2863,50 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase {
"enumString" -> enumString))
}
+ def unreleasedThreadError(loggingId: String, newAcquiredThreadInfo: String,
+ AcquiredThreadInfo: String, timeWaitedMs: Long,
+ stackTraceOutput: String): Throwable = {
+ new SparkException (
+ errorClass = "CANNOT_LOAD_STATE_STORE.UNRELEASED_THREAD_ERROR",
+ messageParameters = Map(
+ "loggingId" -> loggingId,
+ "newAcquiredThreadInfo" -> newAcquiredThreadInfo,
+ "acquiredThreadInfo" -> AcquiredThreadInfo,
+ "timeWaitedMs" -> timeWaitedMs.toString,
+ "stackTraceOutput" -> stackTraceOutput
+ ),
+ cause = null)
+ }
+
+ def cannotReadCheckpoint(versionLine: String): Throwable = {
+ new SparkException (
+ errorClass = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_CHECKPOINT",
+ messageParameters = Map(
+ "versionLine" -> versionLine
+ ),
+ cause = null)
+ }
+
+ def unexpectedFileSize(dfsFile: Path, localFile: File, expectedSize: Long,
+ localFileSize: Long): Throwable = {
Review Comment:
Fix indentations here, please.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala:
##########
@@ -2855,6 +2863,50 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase {
"enumString" -> enumString))
}
+ def unreleasedThreadError(loggingId: String, newAcquiredThreadInfo: String,
+ AcquiredThreadInfo: String, timeWaitedMs: Long,
+ stackTraceOutput: String): Throwable = {
+ new SparkException (
+ errorClass = "CANNOT_LOAD_STATE_STORE.UNRELEASED_THREAD_ERROR",
+ messageParameters = Map(
+ "loggingId" -> loggingId,
+ "newAcquiredThreadInfo" -> newAcquiredThreadInfo,
+ "acquiredThreadInfo" -> AcquiredThreadInfo,
+ "timeWaitedMs" -> timeWaitedMs.toString,
+ "stackTraceOutput" -> stackTraceOutput
Review Comment:
Just wonder is it an user-facing error? If so, are you going to show the stack trace to SQL users?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala:
##########
@@ -218,12 +218,17 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
}
private def getLoadedMapForStore(version: Long): HDFSBackedStateStoreMap = synchronized {
- require(version >= 0, "Version cannot be less than 0")
- val newMap = HDFSBackedStateStoreMap.create(keySchema, numColsPrefixKey)
- if (version > 0) {
- newMap.putAll(loadMap(version))
+ try {
+ require(version >= 0, "Version cannot be less than 0")
Review Comment:
Can users observe the error, or it is internal error?
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala:
##########
@@ -2461,26 +2461,34 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase {
s"but it's ${endSeconds.toString} now.")
}
- def failedToReadDeltaFileError(fileToRead: Path, clazz: String, keySize: Int): Throwable = {
+ def failedToReadDeltaFileError(fileToRead: Path, clazz: String, message: String): Throwable = {
new SparkException(
- errorClass = "_LEGACY_ERROR_TEMP_2258",
+ errorClass = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_DELTA_FILE",
messageParameters = Map(
"fileToRead" -> fileToRead.toString(),
"clazz" -> clazz,
- "keySize" -> keySize.toString()),
+ "message" -> message),
cause = null)
}
def failedToReadSnapshotFileError(fileToRead: Path, clazz: String, message: String): Throwable = {
new SparkException(
- errorClass = "_LEGACY_ERROR_TEMP_2259",
+ errorClass = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_SNAPSHOT_FILE",
messageParameters = Map(
"fileToRead" -> fileToRead.toString(),
"clazz" -> clazz,
"message" -> message),
Review Comment:
Please, avoid arbitrary text in message parameters. As I can see, `message` can be:
1.
```scala
throw QueryExecutionErrors.failedToReadSnapshotFileError(
fileToRead, toString(), s"key size cannot be $keySize")
```
2.
```scala
throw QueryExecutionErrors.failedToReadSnapshotFileError(
fileToRead, toString(), s"value size cannot be $valueSize")
```
Please, put error message text to error-classes.json
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -190,15 +191,25 @@ private[sql] class RocksDBStateStoreProvider
override def stateStoreId: StateStoreId = stateStoreId_
override def getStore(version: Long): StateStore = {
- require(version >= 0, "Version cannot be less than 0")
- rocksDB.load(version)
- new RocksDBStateStore(version)
+ try {
+ require(version >= 0, "Version cannot be less than 0")
Review Comment:
Can users observe the error of version mismatch? If so, add an error class, please.
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala:
##########
@@ -876,12 +889,17 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider]
assert(getLatestData(provider) === Set(("b", 0) -> 2))
// Trying to get newer versions should fail
- intercept[Exception] {
+ var e = intercept[SparkException] {
provider.getStore(2)
}
- intercept[Exception] {
+ assert(e.getCause.isInstanceOf[SparkException])
+ assert(e.getCause.getMessage.contains("does not exist"))
+
+ e = intercept[SparkException] {
getData(provider, 2)
}
+ assert(e.getCause.isInstanceOf[SparkException])
+ assert(e.getCause.getMessage.contains("does not exist"))
Review Comment:
Use `checkError`
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala:
##########
@@ -212,22 +212,35 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
// Corrupt snapshot file and verify that it throws error
assert(getData(provider, snapshotVersion) === Set(("a", 0) -> snapshotVersion))
corruptFile(provider, snapshotVersion, isSnapshot = true)
- intercept[Exception] {
+ var e = intercept[SparkException] {
getData(provider, snapshotVersion)
}
+ checkError(
+ e,
+ errorClass = "CANNOT_LOAD_STATE_STORE.UNCATEGORIZED",
+ parameters = Map.empty
+ )
// Corrupt delta file and verify that it throws error
assert(getData(provider, snapshotVersion - 1) === Set(("a", 0) -> (snapshotVersion - 1)))
corruptFile(provider, snapshotVersion - 1, isSnapshot = false)
- intercept[Exception] {
+ e = intercept[SparkException] {
getData(provider, snapshotVersion - 1)
}
+ checkError(
+ e,
+ errorClass = "CANNOT_LOAD_STATE_STORE.UNCATEGORIZED",
+ parameters = Map.empty
+ )
// Delete delta file and verify that it throws error
deleteFilesEarlierThanVersion(provider, snapshotVersion)
- intercept[Exception] {
+ e = intercept[SparkException] {
getData(provider, snapshotVersion - 1)
}
+ assert(e.getCause.isInstanceOf[SparkException])
+ assert(e.getCause.getMessage.contains("Error reading delta file") &&
+ e.getCause.getMessage.contains("does not exist"))
Review Comment:
checkError
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org