You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/12/28 07:26:53 UTC

[GitHub] [spark] mridulm commented on a change in pull request #34980: [SPARK-37710][CORE] Add detailed error message for java.io.IOException occurring on Kryo flow

mridulm commented on a change in pull request #34980:
URL: https://github.com/apache/spark/pull/34980#discussion_r775762199



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -926,30 +933,48 @@ private[spark] class BlockManager(
           })
           Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
         } else if (level.useDisk && diskStore.contains(blockId)) {
-          val diskData = diskStore.getBytes(blockId)
-          val iterToReturn: Iterator[Any] = {
-            if (level.deserialized) {
-              val diskValues = serializerManager.dataDeserializeStream(
-                blockId,
-                diskData.toInputStream())(info.classTag)
-              maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
-            } else {
-              val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
-                .map { _.toInputStream(dispose = false) }
-                .getOrElse { diskData.toInputStream() }
-              serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+          try {
+            val diskData = diskStore.getBytes(blockId)
+            val iterToReturn: Iterator[Any] = {
+              if (level.deserialized) {
+                val diskValues = serializerManager.dataDeserializeStream(
+                  blockId,
+                  diskData.toInputStream())(info.classTag)
+                maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
+              } else {
+                val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
+                  .map { _.toInputStream(dispose = false) }
+                  .getOrElse { diskData.toInputStream() }
+                serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+              }
             }
+            val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
+              releaseLockAndDispose(blockId, diskData, taskContext)
+            })
+            Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+          } catch {
+            case ex: KryoException if ex.getCause.isInstanceOf[IOException] =>
+              // We need to have clear error message to catch environmental problems easily.
+              // Further details: https://issues.apache.org/jira/browse/SPARK-37710
+              processKryoException(ex, blockId)
+              throw ex
           }
-          val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
-            releaseLockAndDispose(blockId, diskData, taskContext)
-          })
-          Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
         } else {
           handleLocalReadFailure(blockId)
         }
     }
   }
 
+  private def processKryoException(ex: KryoException, blockId: BlockId): Unit = {
+    var errorMessage = s"${ex.getMessage}. Please check if environment status is healthy for " +
+        s"disk corruption, network failure (etc). ${blockManagerId.toString} - blockName: $blockId"

Review comment:
       `IOException` can occur for a variety of reasons. Specifically calling out a few would confuse: avoid them.

##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -926,30 +933,48 @@ private[spark] class BlockManager(
           })
           Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
         } else if (level.useDisk && diskStore.contains(blockId)) {
-          val diskData = diskStore.getBytes(blockId)
-          val iterToReturn: Iterator[Any] = {
-            if (level.deserialized) {
-              val diskValues = serializerManager.dataDeserializeStream(
-                blockId,
-                diskData.toInputStream())(info.classTag)
-              maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
-            } else {
-              val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
-                .map { _.toInputStream(dispose = false) }
-                .getOrElse { diskData.toInputStream() }
-              serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+          try {
+            val diskData = diskStore.getBytes(blockId)
+            val iterToReturn: Iterator[Any] = {
+              if (level.deserialized) {
+                val diskValues = serializerManager.dataDeserializeStream(
+                  blockId,
+                  diskData.toInputStream())(info.classTag)
+                maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
+              } else {
+                val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
+                  .map { _.toInputStream(dispose = false) }
+                  .getOrElse { diskData.toInputStream() }
+                serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+              }
             }
+            val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
+              releaseLockAndDispose(blockId, diskData, taskContext)
+            })
+            Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+          } catch {
+            case ex: KryoException if ex.getCause.isInstanceOf[IOException] =>
+              // We need to have clear error message to catch environmental problems easily.
+              // Further details: https://issues.apache.org/jira/browse/SPARK-37710
+              processKryoException(ex, blockId)
+              throw ex
           }
-          val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
-            releaseLockAndDispose(blockId, diskData, taskContext)
-          })
-          Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
         } else {
           handleLocalReadFailure(blockId)
         }
     }
   }
 
+  private def processKryoException(ex: KryoException, blockId: BlockId): Unit = {
+    var errorMessage = s"${ex.getMessage}. Please check if environment status is healthy for " +
+        s"disk corruption, network failure (etc). ${blockManagerId.toString} - blockName: $blockId"
+    if (diskBlockManager.containsBlock(blockId)) {
+      val file = diskBlockManager.getFile(blockId)
+      errorMessage = errorMessage + s" - blockDiskPath: ${file.getAbsolutePath}"
+    }
+    logError(errorMessage, ex)

Review comment:
       log and rethrow pattern is anti-pattern - I am fine with a `logDebug` or `logInfo` giving additional info (blockManagerId, blockId, file name) in case this context is lost - the stack trace is already getting logged elsewhere.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org