You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by wu...@apache.org on 2022/06/21 04:07:26 UTC

[spark] branch master updated: [SPARK-39152][CORE] Deregistering disk persisted local blocks in case of IO related errors

This is an automated email from the ASF dual-hosted git repository.

wuyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 282c7ae7b5a [SPARK-39152][CORE] Deregistering disk persisted local blocks in case of IO related errors
282c7ae7b5a is described below

commit 282c7ae7b5adbd88466681bc986a7d914080f08a
Author: attilapiros <pi...@gmail.com>
AuthorDate: Tue Jun 21 12:06:56 2022 +0800

    [SPARK-39152][CORE] Deregistering disk persisted local blocks in case of IO related errors
    
    ### What changes were proposed in this pull request?
    
    Deregistering disk persisted local blocks from the block manager in case of IO related errors.
    
    ### Why are the changes needed?
    
    In case of a disk corruption a disk persisted block will lead to job failure as the block registration is always leads to the same file. So even when the task is rescheduled on a different executor the job will fail.
    
    Example:
    
    First failure (the block is locally available):
    ```
    22/04/25 07:15:28 ERROR executor.Executor: Exception in task 17024.0 in stage 12.0 (TID 51853)
    java.io.StreamCorruptedException: invalid stream header: 00000000
      at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:943)
      at java.io.ObjectInputStream.<init>(ObjectInputStream.java:401)
      at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.<init>(JavaSerializer.scala:63)
      at org.apache.spark.serializer.JavaDeserializationStream.<init>(JavaSerializer.scala:63)
      at org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122)
      at org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:209)
      at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:617)
      at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:897)
      at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
      at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
      at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    ```
    
    Then the task might be rescheduled on a different executor but as the block is registered to the first block manager the error will be the same:
    ```
    java.io.StreamCorruptedException: invalid stream header: 00000000
      at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:943)
      at java.io.ObjectInputStream.<init>(ObjectInputStream.java:401)
      at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.<init>(JavaSerializer.scala:63)
      at org.apache.spark.serializer.JavaDeserializationStream.<init>(JavaSerializer.scala:63)
      at org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122)
      at org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:209)
      at org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:698)
      at org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:696)
      at scala.Option.map(Option.scala:146)
      at org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:696)
      at org.apache.spark.storage.BlockManager.get(BlockManager.scala:831)
      at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:886)
      at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
     ```
    
    My idea deregistering the block when the IO operation occurs and let the following task to recompute it.
    
    This PR only targets only local blocks. In a follow up PR `getRemoteValues` can be extended with the block removing.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    1) An existing unit test was extended.
    2) Manually.
    
    #### Manual testing
    
    Start Spark:
    ```
    $ ./bin/spark-shell --master "local-cluster[3,1,1200]" --conf spark.serializer=org.apache.spark.serializer.JavaSerializer
    ```
    
    Create a persisted RDD (here via a DF):
    ```
    scala> val df = sc.parallelize(1 to 20, 4).toDF
    ...
    scala> df.persist(org.apache.spark.storage.StorageLevel.DISK_ONLY)
    ...
    scala> df.show()
    +-----+
    |value|
    +-----+
    |    1|
    |    2|
    |    3|
    |    4|
    |    5|
    |    6|
    |    7|
    |    8|
    |    9|
    |   10|
    |   11|
    |   12|
    |   13|
    |   14|
    |   15|
    |   16|
    |   17|
    |   18|
    |   19|
    |   20|
    +-----+
    ```
    
    Now as the blocks are persisted let's corrupt one of the file. For this we have to find the the directory where the blocks stored:
    ```
    $ grep "DiskBlockManager: Created local directory" work/app-20220511112820-0000/*/stdout
    work/app-20220511112820-0000/0/stdout:22/05/11 11:28:21 INFO DiskBlockManager: Created local directory at /private/var/folders/s4/qrgp74ds36l9t56tx_f6w54h0000gn/T/spark-df93a007-256e-4c16-ad23-ff7eae76c850/executor-0f4c0d32-8f12-447f-add3-5cfbd4a7c777/blockmgr-dde20b67-a824-4d92-9023-8fa902588a26
    work/app-20220511112820-0000/1/stdout:22/05/11 11:28:21 INFO DiskBlockManager: Created local directory at /private/var/folders/s4/qrgp74ds36l9t56tx_f6w54h0000gn/T/spark-df93a007-256e-4c16-ad23-ff7eae76c850/executor-05de3de7-60ca-4954-8baa-965da3c35ce5/blockmgr-71c559a6-f0e8-42a1-bf53-3bddb4a69618
    work/app-20220511112820-0000/2/stdout:22/05/11 11:28:21 INFO DiskBlockManager: Created local directory at /private/var/folders/s4/qrgp74ds36l9t56tx_f6w54h0000gn/T/spark-df93a007-256e-4c16-ad23-ff7eae76c850/executor-bf0ca6de-17c6-437f-82e9-33afb01dce7d/blockmgr-1f66ae2a-0a3a-4a99-8f75-b65b5d4e5e0c
    ```
    
    Let's write something into one of the rdd file:
    ```
    vim /private/var/folders/s4/qrgp74ds36l9t56tx_f6w54h0000gn/T/spark-df93a007-256e-4c16-ad23-ff7eae76c850/executor-bf0ca6de-17c6-437f-82e9-33afb01dce7d/blockmgr-1f66ae2a-0a3a-4a99-8f75-b65b5d4e5e0c/19/rdd_4_1
    ```
    
    Use the DF/RDD one more time:
    ```
    scala> df.show()
    22/05/11 11:30:41 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 7) (192.168.1.65 executor 2): java.io.StreamCorruptedException: invalid stream header: 41ACED00
            at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:938)
            at java.io.ObjectInputStream.<init>(ObjectInputStream.java:396)
            at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.<init>(JavaSerializer.scala:66)
            at org.apache.spark.serializer.JavaDeserializationStream.<init>(JavaSerializer.scala:66)
            at org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:137)
            at org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:212)
            at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:967)
            at org.apache.spark.storage.BlockManager.get(BlockManager.scala:1277)
            at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1344)
     ...
             at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
            at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:750)
    
    +-----+
    |value|
    +-----+
    |    1|
    |    2|
    |    3|
    |    4|
    |    5|
    |    6|
    |    7|
    |    8|
    |    9|
    |   10|
    |   11|
    |   12|
    |   13|
    |   14|
    |   15|
    |   16|
    |   17|
    |   18|
    |   19|
    |   20|
    +-----+
    ```
    
    Check the logs:
    ```
    $ cat work/app-20220511112820-0000/2/stdout
    ...
    22/05/11 11:30:41 INFO CoarseGrainedExecutorBackend: Got assigned task 7
    22/05/11 11:30:41 INFO Executor: Running task 0.0 in stage 3.0 (TID 7)
    ...
    22/05/11 11:30:41 INFO BlockManager: invalid stream header: 41ACED00. BlockManagerId(2, 192.168.1.65, 63368, None) - blockId: rdd_4_1 retryCount: 0 - blockDiskPath: /private/var/folders/s4/qrgp74ds36l9t56tx_f6w54h0000gn/T/spark-df93a007-256e-4c16-ad23-ff7eae76c850/executor-bf0ca6de-17c6-437f-82e9-33afb01dce7d/blockmgr-1f66ae2a-0a3a-4a99-8f75-b65b5d4e5e0c/19/rdd_4_1
    22/05/11 11:30:41 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 7)
    java.io.StreamCorruptedException: invalid stream header: 41ACED00
            at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:938) ~[?:1.8.0_322]
            at java.io.ObjectInputStream.<init>(ObjectInputStream.java:396) ~[?:1.8.0_322]
            at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.<init>(JavaSerializer.scala:66) ~[spark-core_2.12-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
    ...
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_322]
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_322]
            at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_322]
    22/05/11 11:30:41 INFO CoarseGrainedExecutorBackend: Got assigned task 8
    22/05/11 11:30:41 INFO Executor: Running task 0.1 in stage 3.0 (TID 8)
    22/05/11 11:30:41 INFO Executor: Finished task 0.1 in stage 3.0 (TID 8). 1623 bytes result sent to driver
    ```
    
    Closes #36512 from attilapiros/handleCorruptedCachedRdd.
    
    Authored-by: attilapiros <pi...@gmail.com>
    Signed-off-by: yi.wu <yi...@databricks.com>
---
 .../org/apache/spark/storage/BlockManager.scala    | 64 +++++++++++++---------
 .../apache/spark/storage/BlockManagerSuite.scala   |  2 +
 2 files changed, 40 insertions(+), 26 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 619b5e1edf7..f4adbc7ccb3 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -345,9 +345,7 @@ private[spark] class BlockManager(
         }
       } catch {
         case ex: KryoException if ex.getCause.isInstanceOf[IOException] =>
-          // We need to have detailed log message to catch environmental problems easily.
-          // Further details: https://issues.apache.org/jira/browse/SPARK-37710
-          processKryoException(ex, blockId)
+          logInfo(extendMessageWithBlockDetails(ex.getMessage, blockId))
           throw ex
       } finally {
         IOUtils.closeQuietly(inputStream)
@@ -905,6 +903,10 @@ private[spark] class BlockManager(
     throw SparkCoreErrors.readLockedBlockNotFoundError(blockId)
   }
 
+  private def isIORelatedException(t: Throwable): Boolean =
+    t.isInstanceOf[IOException] ||
+      (t.isInstanceOf[KryoException] && t.getCause.isInstanceOf[IOException])
+
   /**
    * Get block from local block manager as an iterator of Java objects.
    */
@@ -933,31 +935,37 @@ private[spark] class BlockManager(
           })
           Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
         } else if (level.useDisk && diskStore.contains(blockId)) {
+          var diskData: BlockData = null
           try {
-            val diskData = diskStore.getBytes(blockId)
-            val iterToReturn: Iterator[Any] = {
-              if (level.deserialized) {
-                val diskValues = serializerManager.dataDeserializeStream(
-                  blockId,
-                  diskData.toInputStream())(info.classTag)
-                maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
-              } else {
-                val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
-                  .map { _.toInputStream(dispose = false) }
-                  .getOrElse { diskData.toInputStream() }
-                serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
-              }
+            diskData = diskStore.getBytes(blockId)
+            val iterToReturn = if (level.deserialized) {
+              val diskValues = serializerManager.dataDeserializeStream(
+                blockId,
+                diskData.toInputStream())(info.classTag)
+              maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
+            } else {
+              val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
+                .map { _.toInputStream(dispose = false) }
+                .getOrElse { diskData.toInputStream() }
+              serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
             }
             val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
               releaseLockAndDispose(blockId, diskData, taskContext)
             })
             Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
           } catch {
-            case ex: KryoException if ex.getCause.isInstanceOf[IOException] =>
-              // We need to have detailed log message to catch environmental problems easily.
-              // Further details: https://issues.apache.org/jira/browse/SPARK-37710
-              processKryoException(ex, blockId)
-              throw ex
+            case t: Throwable =>
+              if (diskData != null) {
+                diskData.dispose()
+                diskData = null
+              }
+              releaseLock(blockId, taskContext)
+              if (isIORelatedException(t)) {
+                logInfo(extendMessageWithBlockDetails(t.getMessage, blockId))
+                // Remove the block so that its unavailability is reported to the driver
+                removeBlock(blockId)
+              }
+              throw t
           }
         } else {
           handleLocalReadFailure(blockId)
@@ -965,14 +973,18 @@ private[spark] class BlockManager(
     }
   }
 
-  private def processKryoException(ex: KryoException, blockId: BlockId): Unit = {
-    var message =
-      "%s. %s - blockId: %s".format(ex.getMessage, blockManagerId.toString, blockId)
+  /**
+   *  We need to have detailed log message to catch environmental problems easily.
+   *  Further details: https://issues.apache.org/jira/browse/SPARK-37710
+   */
+   private def extendMessageWithBlockDetails(msg: String, blockId: BlockId): String = {
+    val message: String = "%s. %s - blockId: %s".format(msg, blockManagerId.toString, blockId)
     val file = diskBlockManager.getFile(blockId)
     if (file.exists()) {
-      message = "%s - blockDiskPath: %s".format(message, file.getAbsolutePath)
+      "%s - blockDiskPath: %s".format(message, file.getAbsolutePath)
+    } else {
+      message
     }
-    logInfo(message)
   }
 
   /**
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 45e05b2cc2d..545bbee6671 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -2149,6 +2149,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
         () => List(new Array[User](1)).iterator)
     }
     assert(kryoException.getMessage === "java.io.IOException: Input/output error")
+    assertUpdateBlockInfoReportedForRemovingBlock(store, "my-block-id",
+      removedFromMemory = false, removedFromDisk = true)
   }
 
   test("check KryoException when saving blocks into memory and 'Input/output error' is occurred") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org