You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2018/09/26 16:30:09 UTC

spark git commit: [SPARK-25318] Add exception handling when wrapping the input stream during the the fetch or stage retry in response to a corrupted block

Repository: spark
Updated Branches:
  refs/heads/master a2ac5a72c -> bd2ae857d


[SPARK-25318] Add exception handling when wrapping the input stream during the the fetch or stage retry in response to a corrupted block

SPARK-4105 provided a solution to block corruption issue by retrying the fetch or the stage. In that solution there is a step that wraps the input stream with compression and/or encryption. This step is prone to exceptions, but in the current code there is no exception handling for this step and this has caused confusion for the user. The  confusion was that after SPARK-4105 the user expects to see either a fetchFailed exception or a warning about a corrupted block. However an exception during wrapping can fail the job without any of those.  This change adds exception handling for the wrapping step and also adds a fetch retry if we experience a corruption during the wrapping step. The reason for adding the retry is that usually user won't experience the same failure after rerunning the job and so it seems reasonable try to fetch and wrap one more time instead of failing.

Closes #22325 from rezasafi/localcorruption.

Authored-by: Reza Safi <re...@cloudera.com>
Signed-off-by: Marcelo Vanzin <va...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bd2ae857
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bd2ae857
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bd2ae857

Branch: refs/heads/master
Commit: bd2ae857d1c5f251056de38a7a40540986756b94
Parents: a2ac5a7
Author: Reza Safi <re...@cloudera.com>
Authored: Wed Sep 26 09:29:58 2018 -0700
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Wed Sep 26 09:29:58 2018 -0700

----------------------------------------------------------------------
 .../storage/ShuffleBlockFetcherIterator.scala   | 50 ++++++++++----------
 1 file changed, 25 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bd2ae857/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index e534c74..aecc228 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -448,35 +448,35 @@ final class ShuffleBlockFetcherIterator(
               buf.release()
               throwFetchFailedException(blockId, address, e)
           }
-
-          input = streamWrapper(blockId, in)
-          // Only copy the stream if it's wrapped by compression or encryption, also the size of
-          // block is small (the decompressed block is smaller than maxBytesInFlight)
-          if (detectCorrupt && !input.eq(in) && size < maxBytesInFlight / 3) {
-            val originalInput = input
-            val out = new ChunkedByteBufferOutputStream(64 * 1024, ByteBuffer.allocate)
-            try {
+          var isStreamCopied: Boolean = false
+          try {
+            input = streamWrapper(blockId, in)
+            // Only copy the stream if it's wrapped by compression or encryption, also the size of
+            // block is small (the decompressed block is smaller than maxBytesInFlight)
+            if (detectCorrupt && !input.eq(in) && size < maxBytesInFlight / 3) {
+              isStreamCopied = true
+              val out = new ChunkedByteBufferOutputStream(64 * 1024, ByteBuffer.allocate)
               // Decompress the whole block at once to detect any corruption, which could increase
               // the memory usage tne potential increase the chance of OOM.
               // TODO: manage the memory used here, and spill it into disk in case of OOM.
-              Utils.copyStream(input, out)
-              out.close()
+              Utils.copyStream(input, out, closeStreams = true)
               input = out.toChunkedByteBuffer.toInputStream(dispose = true)
-            } catch {
-              case e: IOException =>
-                buf.release()
-                if (buf.isInstanceOf[FileSegmentManagedBuffer]
-                  || corruptedBlocks.contains(blockId)) {
-                  throwFetchFailedException(blockId, address, e)
-                } else {
-                  logWarning(s"got an corrupted block $blockId from $address, fetch again", e)
-                  corruptedBlocks += blockId
-                  fetchRequests += FetchRequest(address, Array((blockId, size)))
-                  result = null
-                }
-            } finally {
-              // TODO: release the buf here to free memory earlier
-              originalInput.close()
+            }
+          } catch {
+            case e: IOException =>
+              buf.release()
+              if (buf.isInstanceOf[FileSegmentManagedBuffer]
+                || corruptedBlocks.contains(blockId)) {
+                throwFetchFailedException(blockId, address, e)
+              } else {
+                logWarning(s"got an corrupted block $blockId from $address, fetch again", e)
+                corruptedBlocks += blockId
+                fetchRequests += FetchRequest(address, Array((blockId, size)))
+                result = null
+              }
+          } finally {
+            // TODO: release the buf here to free memory earlier
+            if (isStreamCopied) {
               in.close()
             }
           }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org