You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2021/08/13 10:26:24 UTC
[spark] branch branch-3.2 updated: [SPARK-36500][CORE] Fix
temp_shuffle file leaking when a task is interrupted
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 09a1ddb [SPARK-36500][CORE] Fix temp_shuffle file leaking when a task is interrupted
09a1ddb is described below
commit 09a1ddba41519f6aeb2241cae4d95a3bd194301d
Author: Xingbo Jiang <xi...@databricks.com>
AuthorDate: Fri Aug 13 19:25:20 2021 +0900
[SPARK-36500][CORE] Fix temp_shuffle file leaking when a task is interrupted
### What changes were proposed in this pull request?
When a task thread is interrupted, the underlying output stream referred by `DiskBlockObjectWriter.mcs` may have been closed, then we get IOException when flushing the buffered data. This breaks the assumption that `revertPartialWritesAndClose()` should not throw exceptions.
To fix the issue, we can catch the IOException in `ManualCloseOutputStream.manualClose()`.
### Why are the changes needed?
Previously the IOException was not captured, thus `revertPartialWritesAndClose()` threw an exception. When this happens, `BypassMergeSortShuffleWriter.stop()` would stop deleting the temp_shuffle files tracked by `partitionWriters`, hens lead to temp_shuffle file leak issues.
### Does this PR introduce _any_ user-facing change?
No, this is an internal bug fix.
### How was this patch tested?
Tested by running a longevity stress test. After the fix, there is no more leaked temp_shuffle files.
Closes #33731 from jiangxb1987/temp_shuffle.
Authored-by: Xingbo Jiang <xi...@databricks.com>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
(cherry picked from commit ec5f3a17e33f7afe03e48f8b7690a8b18ae0c058)
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
.../org/apache/spark/storage/DiskBlockObjectWriter.scala | 13 +++++++++++--
1 file changed, 11 insertions(+), 2 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
index f5d8c02..662f63d 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
@@ -17,7 +17,7 @@
package org.apache.spark.storage
-import java.io.{BufferedOutputStream, File, FileOutputStream, OutputStream}
+import java.io.{BufferedOutputStream, File, FileOutputStream, IOException, OutputStream}
import java.nio.channels.{ClosedByInterruptException, FileChannel}
import java.util.zip.Checksum
@@ -64,7 +64,16 @@ private[spark] class DiskBlockObjectWriter(
}
def manualClose(): Unit = {
- super.close()
+ try {
+ super.close()
+ } catch {
+ // The output stream may have been closed when the task thread is interrupted, then we
+ // get IOException when flushing the buffered data. We should catch and log the exception
+ // to ensure the revertPartialWritesAndClose() function doesn't throw an exception.
+ case e: IOException =>
+ logError("Exception occurred while manually close the output stream to file "
+ + file + ", " + e.getMessage)
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org