You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "MaxGekk (via GitHub)" <gi...@apache.org> on 2023/07/12 09:38:23 UTC

[GitHub] [spark] MaxGekk commented on a diff in pull request #41705: [SPARK-44252][SS] Define a new error class and apply for the case where loading state from DFS fails

MaxGekk commented on code in PR #41705:
URL: https://github.com/apache/spark/pull/41705#discussion_r1260886244


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala:
##########
@@ -2461,26 +2461,34 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase {
         s"but it's ${endSeconds.toString} now.")
   }
 
-  def failedToReadDeltaFileError(fileToRead: Path, clazz: String, keySize: Int): Throwable = {
+  def failedToReadDeltaFileError(fileToRead: Path, clazz: String, message: String): Throwable = {
     new SparkException(
-      errorClass = "_LEGACY_ERROR_TEMP_2258",
+      errorClass = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_DELTA_FILE",
       messageParameters = Map(
         "fileToRead" -> fileToRead.toString(),
         "clazz" -> clazz,
-        "keySize" -> keySize.toString()),
+        "message" -> message),
       cause = null)
   }
 
   def failedToReadSnapshotFileError(fileToRead: Path, clazz: String, message: String): Throwable = {
     new SparkException(
-      errorClass = "_LEGACY_ERROR_TEMP_2259",
+      errorClass = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_SNAPSHOT_FILE",
       messageParameters = Map(
         "fileToRead" -> fileToRead.toString(),
         "clazz" -> clazz,
         "message" -> message),
       cause = null)
   }
 
+  def failedToReadStreamingStateFileError(fileToRead: Path, f: Throwable): Throwable = {
+    new SparkException(
+      errorClass = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_STREAMING_STATE_FILE",
+      messageParameters = Map(
+        "fileToRead" -> fileToRead.toString()),

Review Comment:
   nit:
   ```suggestion
         messageParameters = Map("fileToRead" -> fileToRead.toString()),
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala:
##########
@@ -2461,26 +2461,34 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase {
         s"but it's ${endSeconds.toString} now.")
   }
 
-  def failedToReadDeltaFileError(fileToRead: Path, clazz: String, keySize: Int): Throwable = {
+  def failedToReadDeltaFileError(fileToRead: Path, clazz: String, message: String): Throwable = {
     new SparkException(
-      errorClass = "_LEGACY_ERROR_TEMP_2258",
+      errorClass = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_DELTA_FILE",
       messageParameters = Map(
         "fileToRead" -> fileToRead.toString(),
         "clazz" -> clazz,
-        "keySize" -> keySize.toString()),
+        "message" -> message),

Review Comment:
   Please, put error message text to error-classes.json. As I can see `failedToReadDeltaFileError` is invoke twice with `message`:
   1.
   ```scala
           throw QueryExecutionErrors.failedToReadDeltaFileError(
             fileToRead, toString(), s"$fileToRead does not exist"
           )
   ```
   
   2.
   ```scala
             throw QueryExecutionErrors.failedToReadDeltaFileError(
               fileToRead, toString(), s"key size cannot be ${keySize}"
             )
   ```



##########
common/utils/src/main/resources/error/error-classes.json:
##########
@@ -143,6 +143,50 @@
       "Could not load Protobuf class with name <protobufClassName>. <explanation>."
     ]
   },
+  "CANNOT_LOAD_STATE_STORE" : {
+    "message" : [
+      "An error occurred during loading state."
+    ],
+    "subClass" : {
+      "CANNOT_READ_CHECKPOINT" : {
+        "message" : [
+          "Cannot read RocksDB checkpoint metadata of version <versionLine>"

Review Comment:
   Can you provide some hits to users like what we expect. From the source code:
   ```scala
         if (versionLine != s"v$VERSION") {
           throw QueryExecutionErrors.cannotReadCheckpoint(versionLine)
         }
   ```



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -123,12 +124,33 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared
   }
 
   test("RocksDB: load version that doesn't exist") {
+    val provider = new RocksDBStateStoreProvider()
+    var ex = intercept[SparkException] {
+      provider.getStore(-1)
+    }
+    checkError(
+      ex,
+      errorClass = "CANNOT_LOAD_STATE_STORE.UNCATEGORIZED",
+      parameters = Map.empty
+    )
+    ex = intercept[SparkException] {
+      provider.getReadStore(-1)
+    }
+    checkError(
+      ex,
+      errorClass = "CANNOT_LOAD_STATE_STORE.UNCATEGORIZED",
+      parameters = Map.empty
+    )
+
     val remoteDir = Utils.createTempDir().toString
     new File(remoteDir).delete()  // to make sure that the directory gets created
     withDB(remoteDir) { db =>
-      intercept[IllegalStateException] {
+      ex = intercept[SparkException] {
         db.load(1)
       }
+      assert(ex.isInstanceOf[SparkException])
+      assert(ex.getMessage.contains("Error reading streaming state file") &&
+        ex.getCause.getMessage.contains("does not exist"))

Review Comment:
   Use `checkError`



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala:
##########
@@ -876,12 +889,17 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider]
       assert(getLatestData(provider) === Set(("b", 0) -> 2))
 
       // Trying to get newer versions should fail
-      intercept[Exception] {
+      var e = intercept[SparkException] {
         provider.getStore(2)
       }
-      intercept[Exception] {
+      assert(e.getCause.isInstanceOf[SparkException])
+      assert(e.getCause.getMessage.contains("does not exist"))

Review Comment:
   Use `checkError`



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala:
##########
@@ -1178,10 +1201,13 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider]
           }
           assert(!StateStore.isLoaded(storeId)) // version -1 should not attempt to load the store
 
-          intercept[IllegalStateException] {
+          val e = intercept[SparkException] {
             StateStore.get(
               storeId, keySchema, valueSchema, 0, 1, storeConf, hadoopConf)
           }
+          assert(e.getCause.isInstanceOf[SparkException])
+          assert(e.getCause.getMessage.contains("Error reading delta file") &&
+            e.getCause.getMessage.contains("does not exist"))

Review Comment:
   Use `checkError`



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala:
##########
@@ -2855,6 +2863,50 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase {
         "enumString" -> enumString))
   }
 
+  def unreleasedThreadError(loggingId: String, newAcquiredThreadInfo: String,
+                            AcquiredThreadInfo: String, timeWaitedMs: Long,
+                            stackTraceOutput: String): Throwable = {

Review Comment:
   Fix indentations, see https://github.com/databricks/scala-style-guide#indent
   ```suggestion
     def unreleasedThreadError(
         loggingId: String,
         newAcquiredThreadInfo: String,
         AcquiredThreadInfo: String,
         timeWaitedMs: Long,
         stackTraceOutput: String): Throwable = {
   ```



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -720,9 +742,12 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared
 
         // Another thread should not be able to load while current thread is using it
         db.load(2)
-        intercept[IllegalStateException] {
+        ex = intercept[SparkException] {
           ThreadUtils.runInNewThread("concurrent-test-thread-2") { db.load(2) }
         }
+        // Assert that the error message contains the stack trace
+        assert(ex.getMessage.contains("Thread holding the lock has trace:"))
+        assert(ex.getMessage.contains("runInNewThread"))

Review Comment:
   Use `checkError`



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala:
##########
@@ -2855,6 +2863,50 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase {
         "enumString" -> enumString))
   }
 
+  def unreleasedThreadError(loggingId: String, newAcquiredThreadInfo: String,
+                            AcquiredThreadInfo: String, timeWaitedMs: Long,
+                            stackTraceOutput: String): Throwable = {
+    new SparkException (
+      errorClass = "CANNOT_LOAD_STATE_STORE.UNRELEASED_THREAD_ERROR",
+      messageParameters = Map(
+        "loggingId" -> loggingId,
+        "newAcquiredThreadInfo" -> newAcquiredThreadInfo,
+        "acquiredThreadInfo" -> AcquiredThreadInfo,
+        "timeWaitedMs" -> timeWaitedMs.toString,
+        "stackTraceOutput" -> stackTraceOutput
+      ),
+      cause = null)
+  }
+
+  def cannotReadCheckpoint(versionLine: String): Throwable = {
+    new SparkException (
+      errorClass = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_CHECKPOINT",
+      messageParameters = Map(
+        "versionLine" -> versionLine
+      ),

Review Comment:
   ```suggestion
         messageParameters = Map("versionLine" -> versionLine),
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala:
##########
@@ -2855,6 +2863,50 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase {
         "enumString" -> enumString))
   }
 
+  def unreleasedThreadError(loggingId: String, newAcquiredThreadInfo: String,
+                            AcquiredThreadInfo: String, timeWaitedMs: Long,
+                            stackTraceOutput: String): Throwable = {
+    new SparkException (
+      errorClass = "CANNOT_LOAD_STATE_STORE.UNRELEASED_THREAD_ERROR",
+      messageParameters = Map(
+        "loggingId" -> loggingId,
+        "newAcquiredThreadInfo" -> newAcquiredThreadInfo,
+        "acquiredThreadInfo" -> AcquiredThreadInfo,
+        "timeWaitedMs" -> timeWaitedMs.toString,
+        "stackTraceOutput" -> stackTraceOutput
+      ),
+      cause = null)
+  }
+
+  def cannotReadCheckpoint(versionLine: String): Throwable = {
+    new SparkException (
+      errorClass = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_CHECKPOINT",
+      messageParameters = Map(
+        "versionLine" -> versionLine
+      ),
+      cause = null)
+  }
+
+  def unexpectedFileSize(dfsFile: Path, localFile: File, expectedSize: Long,
+                         localFileSize: Long): Throwable = {

Review Comment:
   Fix indentations here, please.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala:
##########
@@ -2855,6 +2863,50 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase {
         "enumString" -> enumString))
   }
 
+  def unreleasedThreadError(loggingId: String, newAcquiredThreadInfo: String,
+                            AcquiredThreadInfo: String, timeWaitedMs: Long,
+                            stackTraceOutput: String): Throwable = {
+    new SparkException (
+      errorClass = "CANNOT_LOAD_STATE_STORE.UNRELEASED_THREAD_ERROR",
+      messageParameters = Map(
+        "loggingId" -> loggingId,
+        "newAcquiredThreadInfo" -> newAcquiredThreadInfo,
+        "acquiredThreadInfo" -> AcquiredThreadInfo,
+        "timeWaitedMs" -> timeWaitedMs.toString,
+        "stackTraceOutput" -> stackTraceOutput

Review Comment:
   Just wonder is it an user-facing error? If so, are you going to show the stack trace to SQL users?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala:
##########
@@ -218,12 +218,17 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
   }
 
   private def getLoadedMapForStore(version: Long): HDFSBackedStateStoreMap = synchronized {
-    require(version >= 0, "Version cannot be less than 0")
-    val newMap = HDFSBackedStateStoreMap.create(keySchema, numColsPrefixKey)
-    if (version > 0) {
-      newMap.putAll(loadMap(version))
+    try {
+      require(version >= 0, "Version cannot be less than 0")

Review Comment:
   Can users observe the error, or it is internal error?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala:
##########
@@ -2461,26 +2461,34 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase {
         s"but it's ${endSeconds.toString} now.")
   }
 
-  def failedToReadDeltaFileError(fileToRead: Path, clazz: String, keySize: Int): Throwable = {
+  def failedToReadDeltaFileError(fileToRead: Path, clazz: String, message: String): Throwable = {
     new SparkException(
-      errorClass = "_LEGACY_ERROR_TEMP_2258",
+      errorClass = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_DELTA_FILE",
       messageParameters = Map(
         "fileToRead" -> fileToRead.toString(),
         "clazz" -> clazz,
-        "keySize" -> keySize.toString()),
+        "message" -> message),
       cause = null)
   }
 
   def failedToReadSnapshotFileError(fileToRead: Path, clazz: String, message: String): Throwable = {
     new SparkException(
-      errorClass = "_LEGACY_ERROR_TEMP_2259",
+      errorClass = "CANNOT_LOAD_STATE_STORE.CANNOT_READ_SNAPSHOT_FILE",
       messageParameters = Map(
         "fileToRead" -> fileToRead.toString(),
         "clazz" -> clazz,
         "message" -> message),

Review Comment:
   Please, avoid arbitrary text in message parameters. As I can see, `message` can be:
   1.
   ```scala
             throw QueryExecutionErrors.failedToReadSnapshotFileError(
               fileToRead, toString(), s"key size cannot be $keySize")
   ```
   
   2.
   ```scala
               throw QueryExecutionErrors.failedToReadSnapshotFileError(
                 fileToRead, toString(), s"value size cannot be $valueSize")
   ```
   
   Please, put error message text to error-classes.json
   



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -190,15 +191,25 @@ private[sql] class RocksDBStateStoreProvider
   override def stateStoreId: StateStoreId = stateStoreId_
 
   override def getStore(version: Long): StateStore = {
-    require(version >= 0, "Version cannot be less than 0")
-    rocksDB.load(version)
-    new RocksDBStateStore(version)
+    try {
+      require(version >= 0, "Version cannot be less than 0")

Review Comment:
   Can users observe the error of version mismatch? If so, add an error class, please.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala:
##########
@@ -876,12 +889,17 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider]
       assert(getLatestData(provider) === Set(("b", 0) -> 2))
 
       // Trying to get newer versions should fail
-      intercept[Exception] {
+      var e = intercept[SparkException] {
         provider.getStore(2)
       }
-      intercept[Exception] {
+      assert(e.getCause.isInstanceOf[SparkException])
+      assert(e.getCause.getMessage.contains("does not exist"))
+
+      e = intercept[SparkException] {
         getData(provider, 2)
       }
+      assert(e.getCause.isInstanceOf[SparkException])
+      assert(e.getCause.getMessage.contains("does not exist"))

Review Comment:
   Use `checkError`



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala:
##########
@@ -212,22 +212,35 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
       // Corrupt snapshot file and verify that it throws error
       assert(getData(provider, snapshotVersion) === Set(("a", 0) -> snapshotVersion))
       corruptFile(provider, snapshotVersion, isSnapshot = true)
-      intercept[Exception] {
+      var e = intercept[SparkException] {
         getData(provider, snapshotVersion)
       }
+      checkError(
+        e,
+        errorClass = "CANNOT_LOAD_STATE_STORE.UNCATEGORIZED",
+        parameters = Map.empty
+      )
 
       // Corrupt delta file and verify that it throws error
       assert(getData(provider, snapshotVersion - 1) === Set(("a", 0) -> (snapshotVersion - 1)))
       corruptFile(provider, snapshotVersion - 1, isSnapshot = false)
-      intercept[Exception] {
+      e = intercept[SparkException] {
         getData(provider, snapshotVersion - 1)
       }
+      checkError(
+        e,
+        errorClass = "CANNOT_LOAD_STATE_STORE.UNCATEGORIZED",
+        parameters = Map.empty
+      )
 
       // Delete delta file and verify that it throws error
       deleteFilesEarlierThanVersion(provider, snapshotVersion)
-      intercept[Exception] {
+      e = intercept[SparkException] {
         getData(provider, snapshotVersion - 1)
       }
+      assert(e.getCause.isInstanceOf[SparkException])
+      assert(e.getCause.getMessage.contains("Error reading delta file") &&
+        e.getCause.getMessage.contains("does not exist"))

Review Comment:
   checkError



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org