You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by wu...@apache.org on 2022/06/21 04:07:26 UTC
[spark] branch master updated: [SPARK-39152][CORE] Deregistering disk persisted local blocks in case of IO related errors
This is an automated email from the ASF dual-hosted git repository.
wuyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 282c7ae7b5a [SPARK-39152][CORE] Deregistering disk persisted local blocks in case of IO related errors
282c7ae7b5a is described below
commit 282c7ae7b5adbd88466681bc986a7d914080f08a
Author: attilapiros <pi...@gmail.com>
AuthorDate: Tue Jun 21 12:06:56 2022 +0800
[SPARK-39152][CORE] Deregistering disk persisted local blocks in case of IO related errors
### What changes were proposed in this pull request?
Deregistering disk persisted local blocks from the block manager in case of IO related errors.
### Why are the changes needed?
In case of a disk corruption a disk persisted block will lead to job failure as the block registration is always leads to the same file. So even when the task is rescheduled on a different executor the job will fail.
Example:
First failure (the block is locally available):
```
22/04/25 07:15:28 ERROR executor.Executor: Exception in task 17024.0 in stage 12.0 (TID 51853)
java.io.StreamCorruptedException: invalid stream header: 00000000
at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:943)
at java.io.ObjectInputStream.<init>(ObjectInputStream.java:401)
at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.<init>(JavaSerializer.scala:63)
at org.apache.spark.serializer.JavaDeserializationStream.<init>(JavaSerializer.scala:63)
at org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122)
at org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:209)
at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:617)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:897)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
```
Then the task might be rescheduled on a different executor but as the block is registered to the first block manager the error will be the same:
```
java.io.StreamCorruptedException: invalid stream header: 00000000
at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:943)
at java.io.ObjectInputStream.<init>(ObjectInputStream.java:401)
at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.<init>(JavaSerializer.scala:63)
at org.apache.spark.serializer.JavaDeserializationStream.<init>(JavaSerializer.scala:63)
at org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122)
at org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:209)
at org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:698)
at org.apache.spark.storage.BlockManager$$anonfun$getRemoteValues$1.apply(BlockManager.scala:696)
at scala.Option.map(Option.scala:146)
at org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:696)
at org.apache.spark.storage.BlockManager.get(BlockManager.scala:831)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:886)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
```
My idea deregistering the block when the IO operation occurs and let the following task to recompute it.
This PR only targets only local blocks. In a follow up PR `getRemoteValues` can be extended with the block removing.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
1) An existing unit test was extended.
2) Manually.
#### Manual testing
Start Spark:
```
$ ./bin/spark-shell --master "local-cluster[3,1,1200]" --conf spark.serializer=org.apache.spark.serializer.JavaSerializer
```
Create a persisted RDD (here via a DF):
```
scala> val df = sc.parallelize(1 to 20, 4).toDF
...
scala> df.persist(org.apache.spark.storage.StorageLevel.DISK_ONLY)
...
scala> df.show()
+-----+
|value|
+-----+
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
| 9|
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
| 20|
+-----+
```
Now as the blocks are persisted let's corrupt one of the file. For this we have to find the the directory where the blocks stored:
```
$ grep "DiskBlockManager: Created local directory" work/app-20220511112820-0000/*/stdout
work/app-20220511112820-0000/0/stdout:22/05/11 11:28:21 INFO DiskBlockManager: Created local directory at /private/var/folders/s4/qrgp74ds36l9t56tx_f6w54h0000gn/T/spark-df93a007-256e-4c16-ad23-ff7eae76c850/executor-0f4c0d32-8f12-447f-add3-5cfbd4a7c777/blockmgr-dde20b67-a824-4d92-9023-8fa902588a26
work/app-20220511112820-0000/1/stdout:22/05/11 11:28:21 INFO DiskBlockManager: Created local directory at /private/var/folders/s4/qrgp74ds36l9t56tx_f6w54h0000gn/T/spark-df93a007-256e-4c16-ad23-ff7eae76c850/executor-05de3de7-60ca-4954-8baa-965da3c35ce5/blockmgr-71c559a6-f0e8-42a1-bf53-3bddb4a69618
work/app-20220511112820-0000/2/stdout:22/05/11 11:28:21 INFO DiskBlockManager: Created local directory at /private/var/folders/s4/qrgp74ds36l9t56tx_f6w54h0000gn/T/spark-df93a007-256e-4c16-ad23-ff7eae76c850/executor-bf0ca6de-17c6-437f-82e9-33afb01dce7d/blockmgr-1f66ae2a-0a3a-4a99-8f75-b65b5d4e5e0c
```
Let's write something into one of the rdd file:
```
vim /private/var/folders/s4/qrgp74ds36l9t56tx_f6w54h0000gn/T/spark-df93a007-256e-4c16-ad23-ff7eae76c850/executor-bf0ca6de-17c6-437f-82e9-33afb01dce7d/blockmgr-1f66ae2a-0a3a-4a99-8f75-b65b5d4e5e0c/19/rdd_4_1
```
Use the DF/RDD one more time:
```
scala> df.show()
22/05/11 11:30:41 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 7) (192.168.1.65 executor 2): java.io.StreamCorruptedException: invalid stream header: 41ACED00
at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:938)
at java.io.ObjectInputStream.<init>(ObjectInputStream.java:396)
at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.<init>(JavaSerializer.scala:66)
at org.apache.spark.serializer.JavaDeserializationStream.<init>(JavaSerializer.scala:66)
at org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:137)
at org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:212)
at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:967)
at org.apache.spark.storage.BlockManager.get(BlockManager.scala:1277)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1344)
...
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
+-----+
|value|
+-----+
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
| 9|
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
| 20|
+-----+
```
Check the logs:
```
$ cat work/app-20220511112820-0000/2/stdout
...
22/05/11 11:30:41 INFO CoarseGrainedExecutorBackend: Got assigned task 7
22/05/11 11:30:41 INFO Executor: Running task 0.0 in stage 3.0 (TID 7)
...
22/05/11 11:30:41 INFO BlockManager: invalid stream header: 41ACED00. BlockManagerId(2, 192.168.1.65, 63368, None) - blockId: rdd_4_1 retryCount: 0 - blockDiskPath: /private/var/folders/s4/qrgp74ds36l9t56tx_f6w54h0000gn/T/spark-df93a007-256e-4c16-ad23-ff7eae76c850/executor-bf0ca6de-17c6-437f-82e9-33afb01dce7d/blockmgr-1f66ae2a-0a3a-4a99-8f75-b65b5d4e5e0c/19/rdd_4_1
22/05/11 11:30:41 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 7)
java.io.StreamCorruptedException: invalid stream header: 41ACED00
at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:938) ~[?:1.8.0_322]
at java.io.ObjectInputStream.<init>(ObjectInputStream.java:396) ~[?:1.8.0_322]
at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.<init>(JavaSerializer.scala:66) ~[spark-core_2.12-3.4.0-SNAPSHOT.jar:3.4.0-SNAPSHOT]
...
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_322]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_322]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_322]
22/05/11 11:30:41 INFO CoarseGrainedExecutorBackend: Got assigned task 8
22/05/11 11:30:41 INFO Executor: Running task 0.1 in stage 3.0 (TID 8)
22/05/11 11:30:41 INFO Executor: Finished task 0.1 in stage 3.0 (TID 8). 1623 bytes result sent to driver
```
Closes #36512 from attilapiros/handleCorruptedCachedRdd.
Authored-by: attilapiros <pi...@gmail.com>
Signed-off-by: yi.wu <yi...@databricks.com>
---
.../org/apache/spark/storage/BlockManager.scala | 64 +++++++++++++---------
.../apache/spark/storage/BlockManagerSuite.scala | 2 +
2 files changed, 40 insertions(+), 26 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 619b5e1edf7..f4adbc7ccb3 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -345,9 +345,7 @@ private[spark] class BlockManager(
}
} catch {
case ex: KryoException if ex.getCause.isInstanceOf[IOException] =>
- // We need to have detailed log message to catch environmental problems easily.
- // Further details: https://issues.apache.org/jira/browse/SPARK-37710
- processKryoException(ex, blockId)
+ logInfo(extendMessageWithBlockDetails(ex.getMessage, blockId))
throw ex
} finally {
IOUtils.closeQuietly(inputStream)
@@ -905,6 +903,10 @@ private[spark] class BlockManager(
throw SparkCoreErrors.readLockedBlockNotFoundError(blockId)
}
+ private def isIORelatedException(t: Throwable): Boolean =
+ t.isInstanceOf[IOException] ||
+ (t.isInstanceOf[KryoException] && t.getCause.isInstanceOf[IOException])
+
/**
* Get block from local block manager as an iterator of Java objects.
*/
@@ -933,31 +935,37 @@ private[spark] class BlockManager(
})
Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
} else if (level.useDisk && diskStore.contains(blockId)) {
+ var diskData: BlockData = null
try {
- val diskData = diskStore.getBytes(blockId)
- val iterToReturn: Iterator[Any] = {
- if (level.deserialized) {
- val diskValues = serializerManager.dataDeserializeStream(
- blockId,
- diskData.toInputStream())(info.classTag)
- maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
- } else {
- val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
- .map { _.toInputStream(dispose = false) }
- .getOrElse { diskData.toInputStream() }
- serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
- }
+ diskData = diskStore.getBytes(blockId)
+ val iterToReturn = if (level.deserialized) {
+ val diskValues = serializerManager.dataDeserializeStream(
+ blockId,
+ diskData.toInputStream())(info.classTag)
+ maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
+ } else {
+ val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
+ .map { _.toInputStream(dispose = false) }
+ .getOrElse { diskData.toInputStream() }
+ serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
}
val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
releaseLockAndDispose(blockId, diskData, taskContext)
})
Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
} catch {
- case ex: KryoException if ex.getCause.isInstanceOf[IOException] =>
- // We need to have detailed log message to catch environmental problems easily.
- // Further details: https://issues.apache.org/jira/browse/SPARK-37710
- processKryoException(ex, blockId)
- throw ex
+ case t: Throwable =>
+ if (diskData != null) {
+ diskData.dispose()
+ diskData = null
+ }
+ releaseLock(blockId, taskContext)
+ if (isIORelatedException(t)) {
+ logInfo(extendMessageWithBlockDetails(t.getMessage, blockId))
+ // Remove the block so that its unavailability is reported to the driver
+ removeBlock(blockId)
+ }
+ throw t
}
} else {
handleLocalReadFailure(blockId)
@@ -965,14 +973,18 @@ private[spark] class BlockManager(
}
}
- private def processKryoException(ex: KryoException, blockId: BlockId): Unit = {
- var message =
- "%s. %s - blockId: %s".format(ex.getMessage, blockManagerId.toString, blockId)
+ /**
+ * We need to have detailed log message to catch environmental problems easily.
+ * Further details: https://issues.apache.org/jira/browse/SPARK-37710
+ */
+ private def extendMessageWithBlockDetails(msg: String, blockId: BlockId): String = {
+ val message: String = "%s. %s - blockId: %s".format(msg, blockManagerId.toString, blockId)
val file = diskBlockManager.getFile(blockId)
if (file.exists()) {
- message = "%s - blockDiskPath: %s".format(message, file.getAbsolutePath)
+ "%s - blockDiskPath: %s".format(message, file.getAbsolutePath)
+ } else {
+ message
}
- logInfo(message)
}
/**
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 45e05b2cc2d..545bbee6671 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -2149,6 +2149,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
() => List(new Array[User](1)).iterator)
}
assert(kryoException.getMessage === "java.io.IOException: Input/output error")
+ assertUpdateBlockInfoReportedForRemovingBlock(store, "my-block-id",
+ removedFromMemory = false, removedFromDisk = true)
}
test("check KryoException when saving blocks into memory and 'Input/output error' is occurred") {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org