You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/12/21 22:38:50 UTC

[GitHub] [spark] erenavsarogullari opened a new pull request #34980: SPARK-37710 - Add clear error message for java.io.IOException: Input/output error

erenavsarogullari opened a new pull request #34980:
URL: https://github.com/apache/spark/pull/34980


   ### What changes were proposed in this pull request?
   `java.io.IOException: Input/output error` usually points environmental issues such as disk read/write failures due to disk corruption, network access failures etc. This PR aims to be added clear message to catch this kind of environmental cases occurring on `BlockManager` and logs with `BlockManager hostname`, `blockId` and `blockPath`.
   
   ### Why are the changes needed?
   This kind of problems usually environmental problems and clear error message can help its analysis and save RCA time.
   
   Following stack-trace occurred on disk corruption:
   ```
   com.esotericsoftware.kryo.KryoException: java.io.IOException: Input/output error
   Serialization trace:
   buffers (org.apache.spark.sql.execution.columnar.DefaultCachedBatch)
       at com.esotericsoftware.kryo.io.Input.fill(Input.java:166)
       at com.esotericsoftware.kryo.io.Input.require(Input.java:196)
       at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:346)
       at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:326)
       at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:55)
       at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:38)
       at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:789)
       at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:381)
       at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:302)
       at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:789)
       at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:132)
       at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
       at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:816)
       at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:296)
       at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:168)
       at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
       at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
       at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299)
       at org.apache.spark.storage.BlockManager.maybeCacheDiskValuesInMemory(BlockManager.scala:1569)
       at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:877)
       at org.apache.spark.storage.BlockManager.get(BlockManager.scala:1163)
   ...
   Caused by: java.io.IOException: Input/output error
       at java.io.FileInputStream.readBytes(Native Method)
       at java.io.FileInputStream.read(FileInputStream.java:255)
       at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
       at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
       at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
       at net.jpountz.lz4.LZ4BlockInputStream.tryReadFully(LZ4BlockInputStream.java:269)
       at net.jpountz.lz4.LZ4BlockInputStream.readFully(LZ4BlockInputStream.java:280)
       at net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:243)
       at net.jpountz.lz4.LZ4BlockInputStream.read(LZ4BlockInputStream.java:157)
       at com.esotericsoftware.kryo.io.Input.fill(Input.java:164)
       ... 87 more 
   ```
   
   Proposed Error Message:
   ```
   java.io.IOException: Input/output error usually occurs due to environmental problems 
   (e.g: disk corruption, network failure etc) so please check env status if healthy. 
   BlockManagerId(driver, localhost, 54937, None) - blockName: test_my-block-id - blockDiskPath: /private/var/folders/kj/mccyycwn6mjdwnglw9g3k6pm0000gq/T/blockmgr-e86d8f67-a993-407f-ad3b-3cfb667b4ad4/11/test_my-block-id
   ```
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   ### How was this patch tested?
   Added 2 new Unit Tests by reproducing the issue and logging new error message for getting data blocks from disk.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] erenavsarogullari commented on a change in pull request #34980: [SPARK-37710][CORE] Add detailed error message for java.io.IOException occurring on Kryo flow

Posted by GitBox <gi...@apache.org>.
erenavsarogullari commented on a change in pull request #34980:
URL: https://github.com/apache/spark/pull/34980#discussion_r776123609



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -926,30 +933,48 @@ private[spark] class BlockManager(
           })
           Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
         } else if (level.useDisk && diskStore.contains(blockId)) {
-          val diskData = diskStore.getBytes(blockId)
-          val iterToReturn: Iterator[Any] = {
-            if (level.deserialized) {
-              val diskValues = serializerManager.dataDeserializeStream(
-                blockId,
-                diskData.toInputStream())(info.classTag)
-              maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
-            } else {
-              val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
-                .map { _.toInputStream(dispose = false) }
-                .getOrElse { diskData.toInputStream() }
-              serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+          try {
+            val diskData = diskStore.getBytes(blockId)
+            val iterToReturn: Iterator[Any] = {
+              if (level.deserialized) {
+                val diskValues = serializerManager.dataDeserializeStream(
+                  blockId,
+                  diskData.toInputStream())(info.classTag)
+                maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
+              } else {
+                val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
+                  .map { _.toInputStream(dispose = false) }
+                  .getOrElse { diskData.toInputStream() }
+                serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+              }
             }
+            val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
+              releaseLockAndDispose(blockId, diskData, taskContext)
+            })
+            Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+          } catch {
+            case ex: KryoException if ex.getCause.isInstanceOf[IOException] =>
+              // We need to have clear error message to catch environmental problems easily.
+              // Further details: https://issues.apache.org/jira/browse/SPARK-37710
+              processKryoException(ex, blockId)
+              throw ex
           }
-          val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
-            releaseLockAndDispose(blockId, diskData, taskContext)
-          })
-          Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
         } else {
           handleLocalReadFailure(blockId)
         }
     }
   }
 
+  private def processKryoException(ex: KryoException, blockId: BlockId): Unit = {
+    var errorMessage = s"${ex.getMessage}. Please check if environment status is healthy for " +
+        s"disk corruption, network failure (etc). ${blockManagerId.toString} - blockName: $blockId"
+    if (diskBlockManager.containsBlock(blockId)) {
+      val file = diskBlockManager.getFile(blockId)
+      errorMessage = errorMessage + s" - blockDiskPath: ${file.getAbsolutePath}"
+    }
+    logError(errorMessage, ex)

Review comment:
       Patch' s main goal is to provide hint by logging `BlockManager`, `blockId` and `blockPath` for potential disk problems occurring on `Kryo deserialization` flow. I think we just need to log problematic `BlockManager` and `block` info so why using `error` log-level. (`ex` can be removed from `logError` due to logging at `Executor$TaskRunner`.) 
   On the other hand, re-throwing exception due to avoid breaking current execution flow (e.g: `BlockManager.getLocalValues` is called by `TorrentBroadcast.readBroadcastBlock` and it also catches `IOException/NanFatal`, `log` and `re-throw`). Current execution flow is as follows:
   ```
   FileInputStream => Compression API calls => Kryo Deserialization Flow => BlockManager => RDD => MapPartitionsRDD => ResultTask => Task => Executor$TaskRunner...
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] erenavsarogullari commented on a change in pull request #34980: [SPARK-37710][CORE] Add detailed error message for java.io.IOException occurring on Kryo flow

Posted by GitBox <gi...@apache.org>.
erenavsarogullari commented on a change in pull request #34980:
URL: https://github.com/apache/spark/pull/34980#discussion_r776122976



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -926,30 +933,48 @@ private[spark] class BlockManager(
           })
           Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
         } else if (level.useDisk && diskStore.contains(blockId)) {
-          val diskData = diskStore.getBytes(blockId)
-          val iterToReturn: Iterator[Any] = {
-            if (level.deserialized) {
-              val diskValues = serializerManager.dataDeserializeStream(
-                blockId,
-                diskData.toInputStream())(info.classTag)
-              maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
-            } else {
-              val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
-                .map { _.toInputStream(dispose = false) }
-                .getOrElse { diskData.toInputStream() }
-              serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+          try {
+            val diskData = diskStore.getBytes(blockId)
+            val iterToReturn: Iterator[Any] = {
+              if (level.deserialized) {
+                val diskValues = serializerManager.dataDeserializeStream(
+                  blockId,
+                  diskData.toInputStream())(info.classTag)
+                maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
+              } else {
+                val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
+                  .map { _.toInputStream(dispose = false) }
+                  .getOrElse { diskData.toInputStream() }
+                serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+              }
             }
+            val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
+              releaseLockAndDispose(blockId, diskData, taskContext)
+            })
+            Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+          } catch {
+            case ex: KryoException if ex.getCause.isInstanceOf[IOException] =>
+              // We need to have clear error message to catch environmental problems easily.
+              // Further details: https://issues.apache.org/jira/browse/SPARK-37710
+              processKryoException(ex, blockId)
+              throw ex
           }
-          val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
-            releaseLockAndDispose(blockId, diskData, taskContext)
-          })
-          Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
         } else {
           handleLocalReadFailure(blockId)
         }
     }
   }
 
+  private def processKryoException(ex: KryoException, blockId: BlockId): Unit = {
+    var errorMessage = s"${ex.getMessage}. Please check if environment status is healthy for " +
+        s"disk corruption, network failure (etc). ${blockManagerId.toString} - blockName: $blockId"

Review comment:
       In this case, `IOException` is wrapped by `KryoException`. This means it occurs at `Kryo deserialization` level when accessing to I/O via disk/network. In our use-case, this occurred due to `disk corruption` (with error message:  `java.io.IOException: Input/output error`). So, would like to highlight `disk corruption` here as a real-life example for future references. 
   
   `etc` (in the log) already aims to explain as there may be different error cases for `IOException`. This kind of environmental problems hard to catch and they require correlation with infrastructure alerts(e.g: reporting disk corruption) if the pipeline has so as much as possible, providing hints will help for future RCAs.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srowen commented on a change in pull request #34980: [SPARK-37710][CORE] Add detailed error message for java.io.IOException occurring on Kryo flow

Posted by GitBox <gi...@apache.org>.
srowen commented on a change in pull request #34980:
URL: https://github.com/apache/spark/pull/34980#discussion_r779092355



##########
File path: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
##########
@@ -2123,6 +2125,80 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     store.putSingle(broadcast0BlockId, a, StorageLevel.DISK_ONLY)
   }
 
+  test("check KryoException when getting disk blocks and 'Input/output error' is occurred") {
+    val kyoSerializerWithDiskCorruptedInputStream

Review comment:
       Total nit: kyo 

##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -926,30 +933,47 @@ private[spark] class BlockManager(
           })
           Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
         } else if (level.useDisk && diskStore.contains(blockId)) {
-          val diskData = diskStore.getBytes(blockId)
-          val iterToReturn: Iterator[Any] = {
-            if (level.deserialized) {
-              val diskValues = serializerManager.dataDeserializeStream(
-                blockId,
-                diskData.toInputStream())(info.classTag)
-              maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
-            } else {
-              val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
-                .map { _.toInputStream(dispose = false) }
-                .getOrElse { diskData.toInputStream() }
-              serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+          try {
+            val diskData = diskStore.getBytes(blockId)
+            val iterToReturn: Iterator[Any] = {
+              if (level.deserialized) {
+                val diskValues = serializerManager.dataDeserializeStream(
+                  blockId,
+                  diskData.toInputStream())(info.classTag)
+                maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
+              } else {
+                val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
+                  .map { _.toInputStream(dispose = false) }
+                  .getOrElse { diskData.toInputStream() }
+                serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+              }
             }
+            val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
+              releaseLockAndDispose(blockId, diskData, taskContext)
+            })
+            Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+          } catch {
+            case ex: KryoException if ex.getCause.isInstanceOf[IOException] =>
+              // We need to have clear error message to catch environmental problems easily.
+              // Further details: https://issues.apache.org/jira/browse/SPARK-37710
+              processKryoException(ex, blockId)
+              throw ex
           }
-          val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
-            releaseLockAndDispose(blockId, diskData, taskContext)
-          })
-          Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
         } else {
           handleLocalReadFailure(blockId)
         }
     }
   }
 
+  private def processKryoException(ex: KryoException, blockId: BlockId): Unit = {
+    var warningMessage = s"${ex.getMessage}. ${blockManagerId.toString} - blockName: $blockId"
+    val file = diskBlockManager.getFile(blockId)
+    if (file.exists()) {
+      warningMessage = warningMessage + s" - blockDiskPath: ${file.getAbsolutePath}"

Review comment:
       Total nit: warningMessage can be in the format string




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] erenavsarogullari commented on a change in pull request #34980: [SPARK-37710][CORE] Add clear error message for java.io.IOException: Input/output error

Posted by GitBox <gi...@apache.org>.
erenavsarogullari commented on a change in pull request #34980:
URL: https://github.com/apache/spark/pull/34980#discussion_r775643254



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -926,30 +936,62 @@ private[spark] class BlockManager(
           })
           Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
         } else if (level.useDisk && diskStore.contains(blockId)) {
-          val diskData = diskStore.getBytes(blockId)
-          val iterToReturn: Iterator[Any] = {
-            if (level.deserialized) {
-              val diskValues = serializerManager.dataDeserializeStream(
-                blockId,
-                diskData.toInputStream())(info.classTag)
-              maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
-            } else {
-              val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
-                .map { _.toInputStream(dispose = false) }
-                .getOrElse { diskData.toInputStream() }
-              serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
-            }
-          }
-          val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
-            releaseLockAndDispose(blockId, diskData, taskContext)
-          })
-          Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+          getLocalValuesFromDisk(blockId, info, taskContext)
         } else {
           handleLocalReadFailure(blockId)
         }
     }
   }
 
+  private def getLocalValuesFromDisk(blockId: BlockId, info: BlockInfo,
+    taskContext: Option[TaskContext]): Option[BlockResult] = {
+    try {
+      val level = info.level
+      val diskData = diskStore.getBytes(blockId)
+      val iterToReturn: Iterator[Any] = {
+        if (level.deserialized) {
+          val diskValues = serializerManager.dataDeserializeStream(
+            blockId,
+            diskData.toInputStream())(info.classTag)
+          maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
+        } else {
+          val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
+            .map { _.toInputStream(dispose = false) }
+            .getOrElse { diskData.toInputStream() }
+          serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+        }
+      }
+      val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
+        releaseLockAndDispose(blockId, diskData, taskContext)
+      })
+      Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+    } catch {
+      case ex: KryoException
+        // We need to have clear error message to catch environmental problems easily.
+        // Further details: https://jira2.workday.com/browse/PRISM-102331
+        if ex.getMessage.toLowerCase(Locale.ROOT)
+          .contains("java.io.ioexception: input/output error") => {
+        processKryoException(ex, blockId)
+        throw ex
+      }
+    }
+  }
+
+  private def processKryoException(ex: KryoException, blockId: BlockId): Unit = {
+    val errorMessage =
+      new StringBuffer(s"${ex.getMessage} usually occurs due to environmental problems " +

Review comment:
       In the light of `IOException` match, i also updated error message. In my use-case, 3 attempts of same task had failed on the same host' s corrupted disk with `java.io.IOException: Input/output error` so as much as possible, i would like to cover disk corruption as an example of potential root-causes if `KryoException` throws with `IOException` cause. (If that make sense)
   **New error message:**
   ```
   java.io.IOException: Input/output error. Please check if environment status is healthy 
   for disk corruption, network failure (etc). BlockManagerId(driver, localhost, 56816, None)
    - blockName: test_my-block-id - blockDiskPath: /private/var/folders/kj/mccyycwn6mjdwnglw9g3k6pm0000gq/T
   /blockmgr-89221407-59ca-453b-aa7f-82117a460174/11/test_my-block-id
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] erenavsarogullari commented on a change in pull request #34980: [SPARK-37710][CORE] Add detailed error message for java.io.IOException occurring on Kryo flow

Posted by GitBox <gi...@apache.org>.
erenavsarogullari commented on a change in pull request #34980:
URL: https://github.com/apache/spark/pull/34980#discussion_r776122976



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -926,30 +933,48 @@ private[spark] class BlockManager(
           })
           Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
         } else if (level.useDisk && diskStore.contains(blockId)) {
-          val diskData = diskStore.getBytes(blockId)
-          val iterToReturn: Iterator[Any] = {
-            if (level.deserialized) {
-              val diskValues = serializerManager.dataDeserializeStream(
-                blockId,
-                diskData.toInputStream())(info.classTag)
-              maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
-            } else {
-              val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
-                .map { _.toInputStream(dispose = false) }
-                .getOrElse { diskData.toInputStream() }
-              serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+          try {
+            val diskData = diskStore.getBytes(blockId)
+            val iterToReturn: Iterator[Any] = {
+              if (level.deserialized) {
+                val diskValues = serializerManager.dataDeserializeStream(
+                  blockId,
+                  diskData.toInputStream())(info.classTag)
+                maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
+              } else {
+                val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
+                  .map { _.toInputStream(dispose = false) }
+                  .getOrElse { diskData.toInputStream() }
+                serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+              }
             }
+            val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
+              releaseLockAndDispose(blockId, diskData, taskContext)
+            })
+            Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+          } catch {
+            case ex: KryoException if ex.getCause.isInstanceOf[IOException] =>
+              // We need to have clear error message to catch environmental problems easily.
+              // Further details: https://issues.apache.org/jira/browse/SPARK-37710
+              processKryoException(ex, blockId)
+              throw ex
           }
-          val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
-            releaseLockAndDispose(blockId, diskData, taskContext)
-          })
-          Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
         } else {
           handleLocalReadFailure(blockId)
         }
     }
   }
 
+  private def processKryoException(ex: KryoException, blockId: BlockId): Unit = {
+    var errorMessage = s"${ex.getMessage}. Please check if environment status is healthy for " +
+        s"disk corruption, network failure (etc). ${blockManagerId.toString} - blockName: $blockId"

Review comment:
       In this case, `IOException` is wrapped by `KryoException`. This means it occurs at `Kryo deserialization` level when accessing to I/O via disk/network. In our use-case, this occurred due to `disk corruption` (with error message:  `java.io.IOException: Input/output error`). So, would like to highlight `disk corruption` here as a real-life example for future references. 
   
   `etc` (in the log) already explains as there may be different error cases for `IOException`. This kind of environmental problems are hard to catch and they require correlation with infrastructure alerts(e.g: reporting disk corruption) if the pipeline has so as much as possible, providing hints will help for future RCAs.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34980: SPARK-37710 - Add clear error message for java.io.IOException: Input/output error

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34980:
URL: https://github.com/apache/spark/pull/34980#issuecomment-999147250


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/146454/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34980: SPARK-37710 - Add clear error message for java.io.IOException: Input/output error

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34980:
URL: https://github.com/apache/spark/pull/34980#issuecomment-999184858


   Kubernetes integration test status failure
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50929/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] erenavsarogullari commented on pull request #34980: [SPARK-37710][CORE] Add clear error message for java.io.IOException: Input/output error

Posted by GitBox <gi...@apache.org>.
erenavsarogullari commented on pull request #34980:
URL: https://github.com/apache/spark/pull/34980#issuecomment-999191989


   ok to test


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34980: [SPARK-37710][CORE] Add detailed error message for java.io.IOException occurring on Kryo flow

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34980:
URL: https://github.com/apache/spark/pull/34980#discussion_r776644543



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -926,30 +933,49 @@ private[spark] class BlockManager(
           })
           Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
         } else if (level.useDisk && diskStore.contains(blockId)) {
-          val diskData = diskStore.getBytes(blockId)
-          val iterToReturn: Iterator[Any] = {
-            if (level.deserialized) {
-              val diskValues = serializerManager.dataDeserializeStream(
-                blockId,
-                diskData.toInputStream())(info.classTag)
-              maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
-            } else {
-              val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
-                .map { _.toInputStream(dispose = false) }
-                .getOrElse { diskData.toInputStream() }
-              serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+          try {
+            val diskData = diskStore.getBytes(blockId)
+            val iterToReturn: Iterator[Any] = {
+              if (level.deserialized) {
+                val diskValues = serializerManager.dataDeserializeStream(
+                  blockId,
+                  diskData.toInputStream())(info.classTag)
+                maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
+              } else {
+                val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
+                  .map { _.toInputStream(dispose = false) }
+                  .getOrElse { diskData.toInputStream() }
+                serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+              }
             }
+            val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
+              releaseLockAndDispose(blockId, diskData, taskContext)
+            })
+            Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+          } catch {
+            case ex: KryoException if ex.getCause.isInstanceOf[IOException] =>
+              // We need to have clear error message to catch environmental problems easily.
+              // Further details: https://issues.apache.org/jira/browse/SPARK-37710
+              processKryoException(ex, blockId)
+              throw ex
           }
-          val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
-            releaseLockAndDispose(blockId, diskData, taskContext)
-          })
-          Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
         } else {
           handleLocalReadFailure(blockId)
         }
     }
   }
 
+  private def processKryoException(ex: KryoException, blockId: BlockId): Unit = {
+    var errorMessage = s"${ex.getMessage}. Please check if environment status is healthy " +
+      s"(e.g: disk corruption, network failure (etc)). " +

Review comment:
       I dont particularly like this part of the message (`Please check if environment status is healthy (e.g: disk corruption, network failure (etc)).`) - we will end up keeping on adding to this list for different types of `IOException` causes as time goes by - this is better captured by `ex.getMessage` itself.
   
   Can we omit this ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34980: [SPARK-37710][CORE] Add detailed error message for java.io.IOException occurring on Kryo flow

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34980:
URL: https://github.com/apache/spark/pull/34980#discussion_r779650264



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -926,30 +933,49 @@ private[spark] class BlockManager(
           })
           Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
         } else if (level.useDisk && diskStore.contains(blockId)) {
-          val diskData = diskStore.getBytes(blockId)
-          val iterToReturn: Iterator[Any] = {
-            if (level.deserialized) {
-              val diskValues = serializerManager.dataDeserializeStream(
-                blockId,
-                diskData.toInputStream())(info.classTag)
-              maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
-            } else {
-              val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
-                .map { _.toInputStream(dispose = false) }
-                .getOrElse { diskData.toInputStream() }
-              serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+          try {
+            val diskData = diskStore.getBytes(blockId)
+            val iterToReturn: Iterator[Any] = {
+              if (level.deserialized) {
+                val diskValues = serializerManager.dataDeserializeStream(
+                  blockId,
+                  diskData.toInputStream())(info.classTag)
+                maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
+              } else {
+                val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
+                  .map { _.toInputStream(dispose = false) }
+                  .getOrElse { diskData.toInputStream() }
+                serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+              }
             }
+            val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
+              releaseLockAndDispose(blockId, diskData, taskContext)
+            })
+            Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+          } catch {
+            case ex: KryoException if ex.getCause.isInstanceOf[IOException] =>
+              // We need to have clear error message to catch environmental problems easily.
+              // Further details: https://issues.apache.org/jira/browse/SPARK-37710
+              processKryoException(ex, blockId)
+              throw ex
           }
-          val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
-            releaseLockAndDispose(blockId, diskData, taskContext)
-          })
-          Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
         } else {
           handleLocalReadFailure(blockId)
         }
     }
   }
 
+  private def processKryoException(ex: KryoException, blockId: BlockId): Unit = {
+    var errorMessage = s"${ex.getMessage}. Please check if environment status is healthy " +
+      s"(e.g: disk corruption, network failure (etc)). " +
+      s"${blockManagerId.toString} - blockName: $blockId"
+    if (diskBlockManager.containsBlock(blockId)) {
+      val file = diskBlockManager.getFile(blockId)
+      errorMessage = errorMessage + s" - blockDiskPath: ${file.getAbsolutePath}"
+    }
+    logError(errorMessage)

Review comment:
       I gave an example of why it is unactionable - when executors shutdown, we end up with files getting deleted, and IOException's getting thrown in this path.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34980: [SPARK-37710][CORE] Add detailed error message for java.io.IOException occurring on Kryo flow

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34980:
URL: https://github.com/apache/spark/pull/34980#discussion_r779278699



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -926,30 +933,49 @@ private[spark] class BlockManager(
           })
           Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
         } else if (level.useDisk && diskStore.contains(blockId)) {
-          val diskData = diskStore.getBytes(blockId)
-          val iterToReturn: Iterator[Any] = {
-            if (level.deserialized) {
-              val diskValues = serializerManager.dataDeserializeStream(
-                blockId,
-                diskData.toInputStream())(info.classTag)
-              maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
-            } else {
-              val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
-                .map { _.toInputStream(dispose = false) }
-                .getOrElse { diskData.toInputStream() }
-              serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+          try {
+            val diskData = diskStore.getBytes(blockId)
+            val iterToReturn: Iterator[Any] = {
+              if (level.deserialized) {
+                val diskValues = serializerManager.dataDeserializeStream(
+                  blockId,
+                  diskData.toInputStream())(info.classTag)
+                maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
+              } else {
+                val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
+                  .map { _.toInputStream(dispose = false) }
+                  .getOrElse { diskData.toInputStream() }
+                serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+              }
             }
+            val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
+              releaseLockAndDispose(blockId, diskData, taskContext)
+            })
+            Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+          } catch {
+            case ex: KryoException if ex.getCause.isInstanceOf[IOException] =>
+              // We need to have clear error message to catch environmental problems easily.
+              // Further details: https://issues.apache.org/jira/browse/SPARK-37710
+              processKryoException(ex, blockId)
+              throw ex
           }
-          val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
-            releaseLockAndDispose(blockId, diskData, taskContext)
-          })
-          Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
         } else {
           handleLocalReadFailure(blockId)
         }
     }
   }
 
+  private def processKryoException(ex: KryoException, blockId: BlockId): Unit = {
+    var errorMessage = s"${ex.getMessage}. Please check if environment status is healthy " +
+      s"(e.g: disk corruption, network failure (etc)). " +
+      s"${blockManagerId.toString} - blockName: $blockId"
+    if (diskBlockManager.containsBlock(blockId)) {
+      val file = diskBlockManager.getFile(blockId)
+      errorMessage = errorMessage + s" - blockDiskPath: ${file.getAbsolutePath}"
+    }
+    logError(errorMessage)

Review comment:
       I am not in favor of making this ERROR or WARN - it is not actionable for end users, ends up surfacing for a larger number of cases (not just unrecoverable errors), and the actual message is handled elsewhere while this PR giving supporting information to help triage an issue.
   
   I understand that there some specific set of corner cases where this information would be useful (like disk corruption, etc) - which is why I am fine with leaving it at INFO instead of DEBUG.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34980: SPARK-37710 - Add clear error message for java.io.IOException: Input/output error

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34980:
URL: https://github.com/apache/spark/pull/34980#issuecomment-999163301


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50929/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] erenavsarogullari commented on a change in pull request #34980: [SPARK-37710][CORE] Add clear error message for java.io.IOException: Input/output error

Posted by GitBox <gi...@apache.org>.
erenavsarogullari commented on a change in pull request #34980:
URL: https://github.com/apache/spark/pull/34980#discussion_r775665864



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -926,30 +933,55 @@ private[spark] class BlockManager(
           })
           Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
         } else if (level.useDisk && diskStore.contains(blockId)) {
-          val diskData = diskStore.getBytes(blockId)
-          val iterToReturn: Iterator[Any] = {
-            if (level.deserialized) {
-              val diskValues = serializerManager.dataDeserializeStream(
-                blockId,
-                diskData.toInputStream())(info.classTag)
-              maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
-            } else {
-              val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
-                .map { _.toInputStream(dispose = false) }
-                .getOrElse { diskData.toInputStream() }
-              serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
-            }
-          }
-          val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
-            releaseLockAndDispose(blockId, diskData, taskContext)
-          })
-          Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+          getLocalValuesFromDisk(blockId, info, taskContext)
         } else {
           handleLocalReadFailure(blockId)
         }
     }
   }
 
+  private def getLocalValuesFromDisk(blockId: BlockId, info: BlockInfo,
+    taskContext: Option[TaskContext]): Option[BlockResult] = {
+    try {
+      val level = info.level
+      val diskData = diskStore.getBytes(blockId)
+      val iterToReturn: Iterator[Any] = {
+        if (level.deserialized) {
+          val diskValues = serializerManager.dataDeserializeStream(
+            blockId,
+            diskData.toInputStream())(info.classTag)
+          maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
+        } else {
+          val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
+            .map { _.toInputStream(dispose = false) }
+            .getOrElse { diskData.toInputStream() }
+          serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+        }
+      }
+      val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
+        releaseLockAndDispose(blockId, diskData, taskContext)
+      })
+      Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+    } catch {
+      case ex: KryoException if ex.getCause.isInstanceOf[IOException] =>
+        // We need to have clear error message to catch environmental problems easily.
+        // Further details: https://issues.apache.org/jira/browse/SPARK-37710
+        processKryoException(ex, blockId)
+        throw ex
+    }
+  }
+
+  private def processKryoException(ex: KryoException, blockId: BlockId): Unit = {
+    val errorMessage =
+      new StringBuffer(s"${ex.getMessage}. Please check if environment status is healthy for " +

Review comment:
       Addressed

##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -926,30 +936,62 @@ private[spark] class BlockManager(
           })
           Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
         } else if (level.useDisk && diskStore.contains(blockId)) {
-          val diskData = diskStore.getBytes(blockId)
-          val iterToReturn: Iterator[Any] = {
-            if (level.deserialized) {
-              val diskValues = serializerManager.dataDeserializeStream(
-                blockId,
-                diskData.toInputStream())(info.classTag)
-              maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
-            } else {
-              val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
-                .map { _.toInputStream(dispose = false) }
-                .getOrElse { diskData.toInputStream() }
-              serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
-            }
-          }
-          val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
-            releaseLockAndDispose(blockId, diskData, taskContext)
-          })
-          Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+          getLocalValuesFromDisk(blockId, info, taskContext)
         } else {
           handleLocalReadFailure(blockId)
         }
     }
   }
 
+  private def getLocalValuesFromDisk(blockId: BlockId, info: BlockInfo,

Review comment:
       Addressed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] erenavsarogullari commented on a change in pull request #34980: [SPARK-37710][CORE] Add detailed error message for java.io.IOException occurring on Kryo flow

Posted by GitBox <gi...@apache.org>.
erenavsarogullari commented on a change in pull request #34980:
URL: https://github.com/apache/spark/pull/34980#discussion_r779052285



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -926,30 +933,49 @@ private[spark] class BlockManager(
           })
           Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
         } else if (level.useDisk && diskStore.contains(blockId)) {
-          val diskData = diskStore.getBytes(blockId)
-          val iterToReturn: Iterator[Any] = {
-            if (level.deserialized) {
-              val diskValues = serializerManager.dataDeserializeStream(
-                blockId,
-                diskData.toInputStream())(info.classTag)
-              maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
-            } else {
-              val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
-                .map { _.toInputStream(dispose = false) }
-                .getOrElse { diskData.toInputStream() }
-              serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+          try {
+            val diskData = diskStore.getBytes(blockId)
+            val iterToReturn: Iterator[Any] = {
+              if (level.deserialized) {
+                val diskValues = serializerManager.dataDeserializeStream(
+                  blockId,
+                  diskData.toInputStream())(info.classTag)
+                maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
+              } else {
+                val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
+                  .map { _.toInputStream(dispose = false) }
+                  .getOrElse { diskData.toInputStream() }
+                serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+              }
             }
+            val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
+              releaseLockAndDispose(blockId, diskData, taskContext)
+            })
+            Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+          } catch {
+            case ex: KryoException if ex.getCause.isInstanceOf[IOException] =>
+              // We need to have clear error message to catch environmental problems easily.
+              // Further details: https://issues.apache.org/jira/browse/SPARK-37710
+              processKryoException(ex, blockId)
+              throw ex
           }
-          val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
-            releaseLockAndDispose(blockId, diskData, taskContext)
-          })
-          Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
         } else {
           handleLocalReadFailure(blockId)
         }
     }
   }
 
+  private def processKryoException(ex: KryoException, blockId: BlockId): Unit = {
+    var errorMessage = s"${ex.getMessage}. Please check if environment status is healthy " +
+      s"(e.g: disk corruption, network failure (etc)). " +
+      s"${blockManagerId.toString} - blockName: $blockId"
+    if (diskBlockManager.containsBlock(blockId)) {
+      val file = diskBlockManager.getFile(blockId)
+      errorMessage = errorMessage + s" - blockDiskPath: ${file.getAbsolutePath}"
+    }
+    logError(errorMessage)

Review comment:
       I think the main goal of this patch is to provide `problematic BlockManager`, `blockId` and `blockPath` (in addition `error` message) when specially `disk problems` occurred on `prod` systems (using `INFO` log level).
   For example, our use-case was `disk corruption` as a real-life example and if `problematic BlockManager`, `blockId` and `blockPath` are logged at DEBUG level where prod systems mostly run at INFO log-level, these hints will be missed on prod logs. On the other hand, it is hard to catch this kind of environmental problems root-cause on prod systems and also, hard to reproduce on test environments (unless additional test pipeline is built to simulate network conditions, e.g: `toxiproxy`).
   
   I updated this log message level from ERROR to WARN and i think it is useful to be logged explicitly on prod systems if environmental problems (e.g: disk corruption) are occurred on Kryo flow.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] erenavsarogullari commented on a change in pull request #34980: [SPARK-37710][CORE] Add detailed error message for java.io.IOException occurring on Kryo flow

Posted by GitBox <gi...@apache.org>.
erenavsarogullari commented on a change in pull request #34980:
URL: https://github.com/apache/spark/pull/34980#discussion_r779052285



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -926,30 +933,49 @@ private[spark] class BlockManager(
           })
           Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
         } else if (level.useDisk && diskStore.contains(blockId)) {
-          val diskData = diskStore.getBytes(blockId)
-          val iterToReturn: Iterator[Any] = {
-            if (level.deserialized) {
-              val diskValues = serializerManager.dataDeserializeStream(
-                blockId,
-                diskData.toInputStream())(info.classTag)
-              maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
-            } else {
-              val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
-                .map { _.toInputStream(dispose = false) }
-                .getOrElse { diskData.toInputStream() }
-              serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+          try {
+            val diskData = diskStore.getBytes(blockId)
+            val iterToReturn: Iterator[Any] = {
+              if (level.deserialized) {
+                val diskValues = serializerManager.dataDeserializeStream(
+                  blockId,
+                  diskData.toInputStream())(info.classTag)
+                maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
+              } else {
+                val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
+                  .map { _.toInputStream(dispose = false) }
+                  .getOrElse { diskData.toInputStream() }
+                serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+              }
             }
+            val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
+              releaseLockAndDispose(blockId, diskData, taskContext)
+            })
+            Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+          } catch {
+            case ex: KryoException if ex.getCause.isInstanceOf[IOException] =>
+              // We need to have clear error message to catch environmental problems easily.
+              // Further details: https://issues.apache.org/jira/browse/SPARK-37710
+              processKryoException(ex, blockId)
+              throw ex
           }
-          val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
-            releaseLockAndDispose(blockId, diskData, taskContext)
-          })
-          Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
         } else {
           handleLocalReadFailure(blockId)
         }
     }
   }
 
+  private def processKryoException(ex: KryoException, blockId: BlockId): Unit = {
+    var errorMessage = s"${ex.getMessage}. Please check if environment status is healthy " +
+      s"(e.g: disk corruption, network failure (etc)). " +
+      s"${blockManagerId.toString} - blockName: $blockId"
+    if (diskBlockManager.containsBlock(blockId)) {
+      val file = diskBlockManager.getFile(blockId)
+      errorMessage = errorMessage + s" - blockDiskPath: ${file.getAbsolutePath}"
+    }
+    logError(errorMessage)

Review comment:
       I think the main goal of this patch is to provide `problematic BlockManager`, `blockId` and `blockPath` (in addition `error` message) when specially `disk problems` occurred on `prod` systems (using `INFO` log level).
   For example, our use-case was `disk corruption` as a real-life example and if `problematic BlockManager`, `blockId` and `blockPath` are logged at DEBUG level where prod systems mostly run at INFO log-level, these hints will be missed on prod logs. On the other hand, it is hard to catch this kind of environmental problems root-cause on prod systems and also, hard to reproduce on test environments (unless additional test pipeline is built to simulate network conditions, e.g: `toxiproxy`).
   
   I updated this log message level from ERROR to WARN and i think it is useful to be logged explicitly on prod systems if environmental problems (e.g: disk corruption) are occurred.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] erenavsarogullari commented on a change in pull request #34980: [SPARK-37710][CORE] Add detailed error message for java.io.IOException occurring on Kryo flow

Posted by GitBox <gi...@apache.org>.
erenavsarogullari commented on a change in pull request #34980:
URL: https://github.com/apache/spark/pull/34980#discussion_r779052285



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -926,30 +933,49 @@ private[spark] class BlockManager(
           })
           Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
         } else if (level.useDisk && diskStore.contains(blockId)) {
-          val diskData = diskStore.getBytes(blockId)
-          val iterToReturn: Iterator[Any] = {
-            if (level.deserialized) {
-              val diskValues = serializerManager.dataDeserializeStream(
-                blockId,
-                diskData.toInputStream())(info.classTag)
-              maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
-            } else {
-              val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
-                .map { _.toInputStream(dispose = false) }
-                .getOrElse { diskData.toInputStream() }
-              serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+          try {
+            val diskData = diskStore.getBytes(blockId)
+            val iterToReturn: Iterator[Any] = {
+              if (level.deserialized) {
+                val diskValues = serializerManager.dataDeserializeStream(
+                  blockId,
+                  diskData.toInputStream())(info.classTag)
+                maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
+              } else {
+                val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
+                  .map { _.toInputStream(dispose = false) }
+                  .getOrElse { diskData.toInputStream() }
+                serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+              }
             }
+            val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
+              releaseLockAndDispose(blockId, diskData, taskContext)
+            })
+            Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+          } catch {
+            case ex: KryoException if ex.getCause.isInstanceOf[IOException] =>
+              // We need to have clear error message to catch environmental problems easily.
+              // Further details: https://issues.apache.org/jira/browse/SPARK-37710
+              processKryoException(ex, blockId)
+              throw ex
           }
-          val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
-            releaseLockAndDispose(blockId, diskData, taskContext)
-          })
-          Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
         } else {
           handleLocalReadFailure(blockId)
         }
     }
   }
 
+  private def processKryoException(ex: KryoException, blockId: BlockId): Unit = {
+    var errorMessage = s"${ex.getMessage}. Please check if environment status is healthy " +
+      s"(e.g: disk corruption, network failure (etc)). " +
+      s"${blockManagerId.toString} - blockName: $blockId"
+    if (diskBlockManager.containsBlock(blockId)) {
+      val file = diskBlockManager.getFile(blockId)
+      errorMessage = errorMessage + s" - blockDiskPath: ${file.getAbsolutePath}"
+    }
+    logError(errorMessage)

Review comment:
       I think the main goal of this patch is to provide `problematic BlockManager`, `blockId` and `blockPath` (in addition `error` message) when specially `disk problems` occurred on `prod` systems (using `INFO` log level).
   For example, our use-case was `disk corruption` as a real-life example and if `problematic BlockManager`, `blockId` and `blockPath` are logged at DEBUG level where prod systems mostly run at INFO log-level, these hints will be missed on prod logs. On the other hand, it is hard to catch this kind of environmental problems on prod systems and also, hard to reproduce on test environments (unless additional test pipeline is built to simulate network conditions, e.g: `toxiproxy`).
   
   I updated this log message level from ERROR to WARN and i think it is useful to be logged explicitly on prod systems if environmental problems (e.g: disk corruption) are occurred.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34980: [SPARK-37710][CORE] Add detailed error message for java.io.IOException occurring on Kryo flow

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34980:
URL: https://github.com/apache/spark/pull/34980#discussion_r779650264



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -926,30 +933,49 @@ private[spark] class BlockManager(
           })
           Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
         } else if (level.useDisk && diskStore.contains(blockId)) {
-          val diskData = diskStore.getBytes(blockId)
-          val iterToReturn: Iterator[Any] = {
-            if (level.deserialized) {
-              val diskValues = serializerManager.dataDeserializeStream(
-                blockId,
-                diskData.toInputStream())(info.classTag)
-              maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
-            } else {
-              val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
-                .map { _.toInputStream(dispose = false) }
-                .getOrElse { diskData.toInputStream() }
-              serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+          try {
+            val diskData = diskStore.getBytes(blockId)
+            val iterToReturn: Iterator[Any] = {
+              if (level.deserialized) {
+                val diskValues = serializerManager.dataDeserializeStream(
+                  blockId,
+                  diskData.toInputStream())(info.classTag)
+                maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
+              } else {
+                val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
+                  .map { _.toInputStream(dispose = false) }
+                  .getOrElse { diskData.toInputStream() }
+                serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+              }
             }
+            val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
+              releaseLockAndDispose(blockId, diskData, taskContext)
+            })
+            Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+          } catch {
+            case ex: KryoException if ex.getCause.isInstanceOf[IOException] =>
+              // We need to have clear error message to catch environmental problems easily.
+              // Further details: https://issues.apache.org/jira/browse/SPARK-37710
+              processKryoException(ex, blockId)
+              throw ex
           }
-          val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
-            releaseLockAndDispose(blockId, diskData, taskContext)
-          })
-          Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
         } else {
           handleLocalReadFailure(blockId)
         }
     }
   }
 
+  private def processKryoException(ex: KryoException, blockId: BlockId): Unit = {
+    var errorMessage = s"${ex.getMessage}. Please check if environment status is healthy " +
+      s"(e.g: disk corruption, network failure (etc)). " +
+      s"${blockManagerId.toString} - blockName: $blockId"
+    if (diskBlockManager.containsBlock(blockId)) {
+      val file = diskBlockManager.getFile(blockId)
+      errorMessage = errorMessage + s" - blockDiskPath: ${file.getAbsolutePath}"
+    }
+    logError(errorMessage)

Review comment:
       I gave an example of why can be unactionable - when executors shutdown, we end up with files getting deleted, and IOException's getting thrown in this path.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34980: SPARK-37710 - Add clear error message for java.io.IOException: Input/output error

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34980:
URL: https://github.com/apache/spark/pull/34980#issuecomment-999147250


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/146454/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srowen commented on a change in pull request #34980: [SPARK-37710][CORE] Add detailed error message for java.io.IOException occurring on Kryo flow

Posted by GitBox <gi...@apache.org>.
srowen commented on a change in pull request #34980:
URL: https://github.com/apache/spark/pull/34980#discussion_r776886586



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -926,30 +933,49 @@ private[spark] class BlockManager(
           })
           Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
         } else if (level.useDisk && diskStore.contains(blockId)) {
-          val diskData = diskStore.getBytes(blockId)
-          val iterToReturn: Iterator[Any] = {
-            if (level.deserialized) {
-              val diskValues = serializerManager.dataDeserializeStream(
-                blockId,
-                diskData.toInputStream())(info.classTag)
-              maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
-            } else {
-              val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
-                .map { _.toInputStream(dispose = false) }
-                .getOrElse { diskData.toInputStream() }
-              serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+          try {
+            val diskData = diskStore.getBytes(blockId)
+            val iterToReturn: Iterator[Any] = {
+              if (level.deserialized) {
+                val diskValues = serializerManager.dataDeserializeStream(
+                  blockId,
+                  diskData.toInputStream())(info.classTag)
+                maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
+              } else {
+                val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
+                  .map { _.toInputStream(dispose = false) }
+                  .getOrElse { diskData.toInputStream() }
+                serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+              }
             }
+            val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
+              releaseLockAndDispose(blockId, diskData, taskContext)
+            })
+            Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+          } catch {
+            case ex: KryoException if ex.getCause.isInstanceOf[IOException] =>
+              // We need to have clear error message to catch environmental problems easily.
+              // Further details: https://issues.apache.org/jira/browse/SPARK-37710
+              processKryoException(ex, blockId)
+              throw ex
           }
-          val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
-            releaseLockAndDispose(blockId, diskData, taskContext)
-          })
-          Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
         } else {
           handleLocalReadFailure(blockId)
         }
     }
   }
 
+  private def processKryoException(ex: KryoException, blockId: BlockId): Unit = {
+    var errorMessage = s"${ex.getMessage}. Please check if environment status is healthy " +
+      s"(e.g: disk corruption, network failure (etc)). " +
+      s"${blockManagerId.toString} - blockName: $blockId"
+    if (diskBlockManager.containsBlock(blockId)) {
+      val file = diskBlockManager.getFile(blockId)
+      errorMessage = errorMessage + s" - blockDiskPath: ${file.getAbsolutePath}"
+    }
+    logError(errorMessage)

Review comment:
       At least  warn seems appropriate. If it's possibly recoverable, that's fine. If not, error.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34980: [SPARK-37710][CORE] Add detailed error message for java.io.IOException occurring on Kryo flow

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34980:
URL: https://github.com/apache/spark/pull/34980#discussion_r776912611



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -926,30 +933,49 @@ private[spark] class BlockManager(
           })
           Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
         } else if (level.useDisk && diskStore.contains(blockId)) {
-          val diskData = diskStore.getBytes(blockId)
-          val iterToReturn: Iterator[Any] = {
-            if (level.deserialized) {
-              val diskValues = serializerManager.dataDeserializeStream(
-                blockId,
-                diskData.toInputStream())(info.classTag)
-              maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
-            } else {
-              val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
-                .map { _.toInputStream(dispose = false) }
-                .getOrElse { diskData.toInputStream() }
-              serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+          try {
+            val diskData = diskStore.getBytes(blockId)
+            val iterToReturn: Iterator[Any] = {
+              if (level.deserialized) {
+                val diskValues = serializerManager.dataDeserializeStream(
+                  blockId,
+                  diskData.toInputStream())(info.classTag)
+                maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
+              } else {
+                val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
+                  .map { _.toInputStream(dispose = false) }
+                  .getOrElse { diskData.toInputStream() }
+                serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+              }
             }
+            val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
+              releaseLockAndDispose(blockId, diskData, taskContext)
+            })
+            Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+          } catch {
+            case ex: KryoException if ex.getCause.isInstanceOf[IOException] =>
+              // We need to have clear error message to catch environmental problems easily.
+              // Further details: https://issues.apache.org/jira/browse/SPARK-37710
+              processKryoException(ex, blockId)
+              throw ex
           }
-          val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
-            releaseLockAndDispose(blockId, diskData, taskContext)
-          })
-          Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
         } else {
           handleLocalReadFailure(blockId)
         }
     }
   }
 
+  private def processKryoException(ex: KryoException, blockId: BlockId): Unit = {
+    var errorMessage = s"${ex.getMessage}. Please check if environment status is healthy " +
+      s"(e.g: disk corruption, network failure (etc)). " +
+      s"${blockManagerId.toString} - blockName: $blockId"
+    if (diskBlockManager.containsBlock(blockId)) {
+      val file = diskBlockManager.getFile(blockId)
+      errorMessage = errorMessage + s" - blockDiskPath: ${file.getAbsolutePath}"
+    }
+    logError(errorMessage)

Review comment:
       The actual failure is logged/handled where the `KryoException` is caught - this log message is to give additional information to help debug in case someone wants to understand more on why the KryoException is thrown.
   As such, this is informational at best and debug if it becomes very verbose - I am err'ing on side of caution that it makes sense to make this INFO.
   
   For example, we have had very verbose error/warn messages in past during executor shutdown (deletion of directories/files which cause a bunch of IOExceptions), which end up with polluting the logs.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #34980: [SPARK-37710][CORE] Add detailed log message for java.io.IOException occurring on Kryo flow

Posted by GitBox <gi...@apache.org>.
mridulm commented on pull request #34980:
URL: https://github.com/apache/spark/pull/34980#issuecomment-1006999323


   Merging to master


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34980: [SPARK-37710][CORE] Add detailed error message for java.io.IOException occurring on Kryo flow

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34980:
URL: https://github.com/apache/spark/pull/34980#discussion_r775762199



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -926,30 +933,48 @@ private[spark] class BlockManager(
           })
           Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
         } else if (level.useDisk && diskStore.contains(blockId)) {
-          val diskData = diskStore.getBytes(blockId)
-          val iterToReturn: Iterator[Any] = {
-            if (level.deserialized) {
-              val diskValues = serializerManager.dataDeserializeStream(
-                blockId,
-                diskData.toInputStream())(info.classTag)
-              maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
-            } else {
-              val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
-                .map { _.toInputStream(dispose = false) }
-                .getOrElse { diskData.toInputStream() }
-              serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+          try {
+            val diskData = diskStore.getBytes(blockId)
+            val iterToReturn: Iterator[Any] = {
+              if (level.deserialized) {
+                val diskValues = serializerManager.dataDeserializeStream(
+                  blockId,
+                  diskData.toInputStream())(info.classTag)
+                maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
+              } else {
+                val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
+                  .map { _.toInputStream(dispose = false) }
+                  .getOrElse { diskData.toInputStream() }
+                serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+              }
             }
+            val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
+              releaseLockAndDispose(blockId, diskData, taskContext)
+            })
+            Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+          } catch {
+            case ex: KryoException if ex.getCause.isInstanceOf[IOException] =>
+              // We need to have clear error message to catch environmental problems easily.
+              // Further details: https://issues.apache.org/jira/browse/SPARK-37710
+              processKryoException(ex, blockId)
+              throw ex
           }
-          val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
-            releaseLockAndDispose(blockId, diskData, taskContext)
-          })
-          Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
         } else {
           handleLocalReadFailure(blockId)
         }
     }
   }
 
+  private def processKryoException(ex: KryoException, blockId: BlockId): Unit = {
+    var errorMessage = s"${ex.getMessage}. Please check if environment status is healthy for " +
+        s"disk corruption, network failure (etc). ${blockManagerId.toString} - blockName: $blockId"

Review comment:
       `IOException` can occur for a variety of reasons. Specifically calling out a few would confuse: avoid them.

##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -926,30 +933,48 @@ private[spark] class BlockManager(
           })
           Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
         } else if (level.useDisk && diskStore.contains(blockId)) {
-          val diskData = diskStore.getBytes(blockId)
-          val iterToReturn: Iterator[Any] = {
-            if (level.deserialized) {
-              val diskValues = serializerManager.dataDeserializeStream(
-                blockId,
-                diskData.toInputStream())(info.classTag)
-              maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
-            } else {
-              val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
-                .map { _.toInputStream(dispose = false) }
-                .getOrElse { diskData.toInputStream() }
-              serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+          try {
+            val diskData = diskStore.getBytes(blockId)
+            val iterToReturn: Iterator[Any] = {
+              if (level.deserialized) {
+                val diskValues = serializerManager.dataDeserializeStream(
+                  blockId,
+                  diskData.toInputStream())(info.classTag)
+                maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
+              } else {
+                val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
+                  .map { _.toInputStream(dispose = false) }
+                  .getOrElse { diskData.toInputStream() }
+                serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+              }
             }
+            val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
+              releaseLockAndDispose(blockId, diskData, taskContext)
+            })
+            Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+          } catch {
+            case ex: KryoException if ex.getCause.isInstanceOf[IOException] =>
+              // We need to have clear error message to catch environmental problems easily.
+              // Further details: https://issues.apache.org/jira/browse/SPARK-37710
+              processKryoException(ex, blockId)
+              throw ex
           }
-          val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
-            releaseLockAndDispose(blockId, diskData, taskContext)
-          })
-          Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
         } else {
           handleLocalReadFailure(blockId)
         }
     }
   }
 
+  private def processKryoException(ex: KryoException, blockId: BlockId): Unit = {
+    var errorMessage = s"${ex.getMessage}. Please check if environment status is healthy for " +
+        s"disk corruption, network failure (etc). ${blockManagerId.toString} - blockName: $blockId"
+    if (diskBlockManager.containsBlock(blockId)) {
+      val file = diskBlockManager.getFile(blockId)
+      errorMessage = errorMessage + s" - blockDiskPath: ${file.getAbsolutePath}"
+    }
+    logError(errorMessage, ex)

Review comment:
       log and rethrow pattern is anti-pattern - I am fine with a `logDebug` or `logInfo` giving additional info (blockManagerId, blockId, file name) in case this context is lost - the stack trace is already getting logged elsewhere.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] erenavsarogullari commented on a change in pull request #34980: [SPARK-37710][CORE] Add clear error message for java.io.IOException: Input/output error

Posted by GitBox <gi...@apache.org>.
erenavsarogullari commented on a change in pull request #34980:
URL: https://github.com/apache/spark/pull/34980#discussion_r775640779



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -926,30 +936,62 @@ private[spark] class BlockManager(
           })
           Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
         } else if (level.useDisk && diskStore.contains(blockId)) {
-          val diskData = diskStore.getBytes(blockId)
-          val iterToReturn: Iterator[Any] = {
-            if (level.deserialized) {
-              val diskValues = serializerManager.dataDeserializeStream(
-                blockId,
-                diskData.toInputStream())(info.classTag)
-              maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
-            } else {
-              val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
-                .map { _.toInputStream(dispose = false) }
-                .getOrElse { diskData.toInputStream() }
-              serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
-            }
-          }
-          val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
-            releaseLockAndDispose(blockId, diskData, taskContext)
-          })
-          Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+          getLocalValuesFromDisk(blockId, info, taskContext)
         } else {
           handleLocalReadFailure(blockId)
         }
     }
   }
 
+  private def getLocalValuesFromDisk(blockId: BlockId, info: BlockInfo,

Review comment:
       `BlockManager.getLocalValues` function is composed by `47 rows` as default and it increases more with my patch and violates `Rule of 30`.
   https://github.com/databricks/scala-style-guide#rule_of_30
   It is up-to-us and we can still preserve legacy code block. However, i think `Rule of 30` improves readability.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] erenavsarogullari commented on a change in pull request #34980: [SPARK-37710][CORE] Add detailed error message for java.io.IOException occurring on Kryo flow

Posted by GitBox <gi...@apache.org>.
erenavsarogullari commented on a change in pull request #34980:
URL: https://github.com/apache/spark/pull/34980#discussion_r776122976



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -926,30 +933,48 @@ private[spark] class BlockManager(
           })
           Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
         } else if (level.useDisk && diskStore.contains(blockId)) {
-          val diskData = diskStore.getBytes(blockId)
-          val iterToReturn: Iterator[Any] = {
-            if (level.deserialized) {
-              val diskValues = serializerManager.dataDeserializeStream(
-                blockId,
-                diskData.toInputStream())(info.classTag)
-              maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
-            } else {
-              val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
-                .map { _.toInputStream(dispose = false) }
-                .getOrElse { diskData.toInputStream() }
-              serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+          try {
+            val diskData = diskStore.getBytes(blockId)
+            val iterToReturn: Iterator[Any] = {
+              if (level.deserialized) {
+                val diskValues = serializerManager.dataDeserializeStream(
+                  blockId,
+                  diskData.toInputStream())(info.classTag)
+                maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
+              } else {
+                val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
+                  .map { _.toInputStream(dispose = false) }
+                  .getOrElse { diskData.toInputStream() }
+                serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+              }
             }
+            val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
+              releaseLockAndDispose(blockId, diskData, taskContext)
+            })
+            Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+          } catch {
+            case ex: KryoException if ex.getCause.isInstanceOf[IOException] =>
+              // We need to have clear error message to catch environmental problems easily.
+              // Further details: https://issues.apache.org/jira/browse/SPARK-37710
+              processKryoException(ex, blockId)
+              throw ex
           }
-          val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
-            releaseLockAndDispose(blockId, diskData, taskContext)
-          })
-          Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
         } else {
           handleLocalReadFailure(blockId)
         }
     }
   }
 
+  private def processKryoException(ex: KryoException, blockId: BlockId): Unit = {
+    var errorMessage = s"${ex.getMessage}. Please check if environment status is healthy for " +
+        s"disk corruption, network failure (etc). ${blockManagerId.toString} - blockName: $blockId"

Review comment:
       In this case, `IOException` is wrapped by `KryoException`. This means it occurs at `Kryo deserialization` level when accessing to I/O via disk/network. In our use-case, this occurred due to `disk corruption` (with error message:  `java.io.IOException: Input/output error`). So, would like to highlight `disk corruption` here as a real-life example for future references. 
   
   `etc` (in the log) already explains as there may be different error cases for `IOException`. This kind of environmental problems are hard to be catched and they require correlation with infrastructure alerts(e.g: reporting disk corruption) if the pipeline has. So, as much as possible, providing hints will help for future RCAs.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34980: [SPARK-37710][CORE] Add detailed error message for java.io.IOException occurring on Kryo flow

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34980:
URL: https://github.com/apache/spark/pull/34980#discussion_r776644768



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -926,30 +933,49 @@ private[spark] class BlockManager(
           })
           Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
         } else if (level.useDisk && diskStore.contains(blockId)) {
-          val diskData = diskStore.getBytes(blockId)
-          val iterToReturn: Iterator[Any] = {
-            if (level.deserialized) {
-              val diskValues = serializerManager.dataDeserializeStream(
-                blockId,
-                diskData.toInputStream())(info.classTag)
-              maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
-            } else {
-              val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
-                .map { _.toInputStream(dispose = false) }
-                .getOrElse { diskData.toInputStream() }
-              serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+          try {
+            val diskData = diskStore.getBytes(blockId)
+            val iterToReturn: Iterator[Any] = {
+              if (level.deserialized) {
+                val diskValues = serializerManager.dataDeserializeStream(
+                  blockId,
+                  diskData.toInputStream())(info.classTag)
+                maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
+              } else {
+                val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
+                  .map { _.toInputStream(dispose = false) }
+                  .getOrElse { diskData.toInputStream() }
+                serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+              }
             }
+            val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
+              releaseLockAndDispose(blockId, diskData, taskContext)
+            })
+            Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+          } catch {
+            case ex: KryoException if ex.getCause.isInstanceOf[IOException] =>
+              // We need to have clear error message to catch environmental problems easily.
+              // Further details: https://issues.apache.org/jira/browse/SPARK-37710
+              processKryoException(ex, blockId)
+              throw ex
           }
-          val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
-            releaseLockAndDispose(blockId, diskData, taskContext)
-          })
-          Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
         } else {
           handleLocalReadFailure(blockId)
         }
     }
   }
 
+  private def processKryoException(ex: KryoException, blockId: BlockId): Unit = {
+    var errorMessage = s"${ex.getMessage}. Please check if environment status is healthy " +
+      s"(e.g: disk corruption, network failure (etc)). " +
+      s"${blockManagerId.toString} - blockName: $blockId"
+    if (diskBlockManager.containsBlock(blockId)) {
+      val file = diskBlockManager.getFile(blockId)
+      errorMessage = errorMessage + s" - blockDiskPath: ${file.getAbsolutePath}"
+    }
+    logError(errorMessage)

Review comment:
       I probably mentioned this already, change this to `logDebug` or `logInfo`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] erenavsarogullari commented on a change in pull request #34980: [SPARK-37710][CORE] Add detailed error message for java.io.IOException occurring on Kryo flow

Posted by GitBox <gi...@apache.org>.
erenavsarogullari commented on a change in pull request #34980:
URL: https://github.com/apache/spark/pull/34980#discussion_r776895636



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -926,30 +933,49 @@ private[spark] class BlockManager(
           })
           Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
         } else if (level.useDisk && diskStore.contains(blockId)) {
-          val diskData = diskStore.getBytes(blockId)
-          val iterToReturn: Iterator[Any] = {
-            if (level.deserialized) {
-              val diskValues = serializerManager.dataDeserializeStream(
-                blockId,
-                diskData.toInputStream())(info.classTag)
-              maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
-            } else {
-              val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
-                .map { _.toInputStream(dispose = false) }
-                .getOrElse { diskData.toInputStream() }
-              serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+          try {
+            val diskData = diskStore.getBytes(blockId)
+            val iterToReturn: Iterator[Any] = {
+              if (level.deserialized) {
+                val diskValues = serializerManager.dataDeserializeStream(
+                  blockId,
+                  diskData.toInputStream())(info.classTag)
+                maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
+              } else {
+                val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
+                  .map { _.toInputStream(dispose = false) }
+                  .getOrElse { diskData.toInputStream() }
+                serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+              }
             }
+            val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
+              releaseLockAndDispose(blockId, diskData, taskContext)
+            })
+            Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+          } catch {
+            case ex: KryoException if ex.getCause.isInstanceOf[IOException] =>
+              // We need to have clear error message to catch environmental problems easily.
+              // Further details: https://issues.apache.org/jira/browse/SPARK-37710
+              processKryoException(ex, blockId)
+              throw ex
           }
-          val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
-            releaseLockAndDispose(blockId, diskData, taskContext)
-          })
-          Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
         } else {
           handleLocalReadFailure(blockId)
         }
     }
   }
 
+  private def processKryoException(ex: KryoException, blockId: BlockId): Unit = {
+    var errorMessage = s"${ex.getMessage}. Please check if environment status is healthy " +
+      s"(e.g: disk corruption, network failure (etc)). " +
+      s"${blockManagerId.toString} - blockName: $blockId"
+    if (diskBlockManager.containsBlock(blockId)) {
+      val file = diskBlockManager.getFile(blockId)
+      errorMessage = errorMessage + s" - blockDiskPath: ${file.getAbsolutePath}"
+    }
+    logError(errorMessage)

Review comment:
       Exactly, we can use `error/warn` in this case, however, not `info/debug`.
   
   This kind of problems are retried with `task-attempts` (3 attempts in our case). 
   - If I/O issue is `transient`, it can be recovered by next task-attempt. 
   - If it is not transient, it will fail `task/stage/job/query` (after all attempts).
   I think we can also use `warn` due to current task-retry flow (even if the first task attempt fails, flow will re-try until retry-limit).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34980: [SPARK-37710][CORE] Add detailed error message for java.io.IOException occurring on Kryo flow

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34980:
URL: https://github.com/apache/spark/pull/34980#discussion_r776912611



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -926,30 +933,49 @@ private[spark] class BlockManager(
           })
           Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
         } else if (level.useDisk && diskStore.contains(blockId)) {
-          val diskData = diskStore.getBytes(blockId)
-          val iterToReturn: Iterator[Any] = {
-            if (level.deserialized) {
-              val diskValues = serializerManager.dataDeserializeStream(
-                blockId,
-                diskData.toInputStream())(info.classTag)
-              maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
-            } else {
-              val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
-                .map { _.toInputStream(dispose = false) }
-                .getOrElse { diskData.toInputStream() }
-              serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+          try {
+            val diskData = diskStore.getBytes(blockId)
+            val iterToReturn: Iterator[Any] = {
+              if (level.deserialized) {
+                val diskValues = serializerManager.dataDeserializeStream(
+                  blockId,
+                  diskData.toInputStream())(info.classTag)
+                maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
+              } else {
+                val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
+                  .map { _.toInputStream(dispose = false) }
+                  .getOrElse { diskData.toInputStream() }
+                serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+              }
             }
+            val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
+              releaseLockAndDispose(blockId, diskData, taskContext)
+            })
+            Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+          } catch {
+            case ex: KryoException if ex.getCause.isInstanceOf[IOException] =>
+              // We need to have clear error message to catch environmental problems easily.
+              // Further details: https://issues.apache.org/jira/browse/SPARK-37710
+              processKryoException(ex, blockId)
+              throw ex
           }
-          val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
-            releaseLockAndDispose(blockId, diskData, taskContext)
-          })
-          Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
         } else {
           handleLocalReadFailure(blockId)
         }
     }
   }
 
+  private def processKryoException(ex: KryoException, blockId: BlockId): Unit = {
+    var errorMessage = s"${ex.getMessage}. Please check if environment status is healthy " +
+      s"(e.g: disk corruption, network failure (etc)). " +
+      s"${blockManagerId.toString} - blockName: $blockId"
+    if (diskBlockManager.containsBlock(blockId)) {
+      val file = diskBlockManager.getFile(blockId)
+      errorMessage = errorMessage + s" - blockDiskPath: ${file.getAbsolutePath}"
+    }
+    logError(errorMessage)

Review comment:
       The actual failure is logged/handled where the `KryoException` is caught - this log message is to give additional information to help debug in case someone wants to understand more on why the `KryoException` is thrown.
   As such, this is informational at best and debug if it becomes very verbose - I am err'ing on side of caution that it makes sense to make this INFO.
   
   For example, we have had very verbose error/warn messages in past during executor shutdown (deletion of directories/files which cause a bunch of IOExceptions), which end up with polluting the logs.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34980: [SPARK-37710][CORE] Add detailed error message for java.io.IOException occurring on Kryo flow

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34980:
URL: https://github.com/apache/spark/pull/34980#discussion_r779278699



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -926,30 +933,49 @@ private[spark] class BlockManager(
           })
           Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
         } else if (level.useDisk && diskStore.contains(blockId)) {
-          val diskData = diskStore.getBytes(blockId)
-          val iterToReturn: Iterator[Any] = {
-            if (level.deserialized) {
-              val diskValues = serializerManager.dataDeserializeStream(
-                blockId,
-                diskData.toInputStream())(info.classTag)
-              maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
-            } else {
-              val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
-                .map { _.toInputStream(dispose = false) }
-                .getOrElse { diskData.toInputStream() }
-              serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+          try {
+            val diskData = diskStore.getBytes(blockId)
+            val iterToReturn: Iterator[Any] = {
+              if (level.deserialized) {
+                val diskValues = serializerManager.dataDeserializeStream(
+                  blockId,
+                  diskData.toInputStream())(info.classTag)
+                maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
+              } else {
+                val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
+                  .map { _.toInputStream(dispose = false) }
+                  .getOrElse { diskData.toInputStream() }
+                serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+              }
             }
+            val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
+              releaseLockAndDispose(blockId, diskData, taskContext)
+            })
+            Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+          } catch {
+            case ex: KryoException if ex.getCause.isInstanceOf[IOException] =>
+              // We need to have clear error message to catch environmental problems easily.
+              // Further details: https://issues.apache.org/jira/browse/SPARK-37710
+              processKryoException(ex, blockId)
+              throw ex
           }
-          val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
-            releaseLockAndDispose(blockId, diskData, taskContext)
-          })
-          Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
         } else {
           handleLocalReadFailure(blockId)
         }
     }
   }
 
+  private def processKryoException(ex: KryoException, blockId: BlockId): Unit = {
+    var errorMessage = s"${ex.getMessage}. Please check if environment status is healthy " +
+      s"(e.g: disk corruption, network failure (etc)). " +
+      s"${blockManagerId.toString} - blockName: $blockId"
+    if (diskBlockManager.containsBlock(blockId)) {
+      val file = diskBlockManager.getFile(blockId)
+      errorMessage = errorMessage + s" - blockDiskPath: ${file.getAbsolutePath}"
+    }
+    logError(errorMessage)

Review comment:
       I am not in favor of making this ERROR or WARN - it is not actionable for end users, ends up surfacing for a larger number of cases (not just unrecoverable system errors), and the actual exception is handled elsewhere while this PR giving supporting information to help triage an issue.
   
   I understand that there some specific set of corner cases where this information would be useful (like disk corruption, etc) - which is why I am fine with leaving it at INFO instead of DEBUG.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srowen commented on a change in pull request #34980: [SPARK-37710][CORE] Add detailed error message for java.io.IOException occurring on Kryo flow

Posted by GitBox <gi...@apache.org>.
srowen commented on a change in pull request #34980:
URL: https://github.com/apache/spark/pull/34980#discussion_r779578894



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -926,30 +933,49 @@ private[spark] class BlockManager(
           })
           Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
         } else if (level.useDisk && diskStore.contains(blockId)) {
-          val diskData = diskStore.getBytes(blockId)
-          val iterToReturn: Iterator[Any] = {
-            if (level.deserialized) {
-              val diskValues = serializerManager.dataDeserializeStream(
-                blockId,
-                diskData.toInputStream())(info.classTag)
-              maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
-            } else {
-              val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
-                .map { _.toInputStream(dispose = false) }
-                .getOrElse { diskData.toInputStream() }
-              serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+          try {
+            val diskData = diskStore.getBytes(blockId)
+            val iterToReturn: Iterator[Any] = {
+              if (level.deserialized) {
+                val diskValues = serializerManager.dataDeserializeStream(
+                  blockId,
+                  diskData.toInputStream())(info.classTag)
+                maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
+              } else {
+                val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
+                  .map { _.toInputStream(dispose = false) }
+                  .getOrElse { diskData.toInputStream() }
+                serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+              }
             }
+            val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
+              releaseLockAndDispose(blockId, diskData, taskContext)
+            })
+            Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+          } catch {
+            case ex: KryoException if ex.getCause.isInstanceOf[IOException] =>
+              // We need to have clear error message to catch environmental problems easily.
+              // Further details: https://issues.apache.org/jira/browse/SPARK-37710
+              processKryoException(ex, blockId)
+              throw ex
           }
-          val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
-            releaseLockAndDispose(blockId, diskData, taskContext)
-          })
-          Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
         } else {
           handleLocalReadFailure(blockId)
         }
     }
   }
 
+  private def processKryoException(ex: KryoException, blockId: BlockId): Unit = {
+    var errorMessage = s"${ex.getMessage}. Please check if environment status is healthy " +
+      s"(e.g: disk corruption, network failure (etc)). " +
+      s"${blockManagerId.toString} - blockName: $blockId"
+    if (diskBlockManager.containsBlock(blockId)) {
+      val file = diskBlockManager.getFile(blockId)
+      errorMessage = errorMessage + s" - blockDiskPath: ${file.getAbsolutePath}"
+    }
+    logError(errorMessage)

Review comment:
       I disagree it's unactionable and seems important to surface; I don't think it's one of these things that can happen 'normally' and ends up being log noise. But OK let's put it at INFO to close this out




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] erenavsarogullari commented on pull request #34980: [SPARK-37710][CORE] Add detailed error message for java.io.IOException occurring on Kryo flow

Posted by GitBox <gi...@apache.org>.
erenavsarogullari commented on pull request #34980:
URL: https://github.com/apache/spark/pull/34980#issuecomment-1006073924


   Happy new year to everyone and thanks all again for the reviews.
   @srowen, i also addressed last comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] asfgit closed pull request #34980: [SPARK-37710][CORE] Add detailed log message for java.io.IOException occurring on Kryo flow

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #34980:
URL: https://github.com/apache/spark/pull/34980


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] erenavsarogullari commented on a change in pull request #34980: [SPARK-37710][CORE] Add detailed error message for java.io.IOException occurring on Kryo flow

Posted by GitBox <gi...@apache.org>.
erenavsarogullari commented on a change in pull request #34980:
URL: https://github.com/apache/spark/pull/34980#discussion_r776895636



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -926,30 +933,49 @@ private[spark] class BlockManager(
           })
           Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
         } else if (level.useDisk && diskStore.contains(blockId)) {
-          val diskData = diskStore.getBytes(blockId)
-          val iterToReturn: Iterator[Any] = {
-            if (level.deserialized) {
-              val diskValues = serializerManager.dataDeserializeStream(
-                blockId,
-                diskData.toInputStream())(info.classTag)
-              maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
-            } else {
-              val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
-                .map { _.toInputStream(dispose = false) }
-                .getOrElse { diskData.toInputStream() }
-              serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+          try {
+            val diskData = diskStore.getBytes(blockId)
+            val iterToReturn: Iterator[Any] = {
+              if (level.deserialized) {
+                val diskValues = serializerManager.dataDeserializeStream(
+                  blockId,
+                  diskData.toInputStream())(info.classTag)
+                maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
+              } else {
+                val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
+                  .map { _.toInputStream(dispose = false) }
+                  .getOrElse { diskData.toInputStream() }
+                serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+              }
             }
+            val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
+              releaseLockAndDispose(blockId, diskData, taskContext)
+            })
+            Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+          } catch {
+            case ex: KryoException if ex.getCause.isInstanceOf[IOException] =>
+              // We need to have clear error message to catch environmental problems easily.
+              // Further details: https://issues.apache.org/jira/browse/SPARK-37710
+              processKryoException(ex, blockId)
+              throw ex
           }
-          val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
-            releaseLockAndDispose(blockId, diskData, taskContext)
-          })
-          Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
         } else {
           handleLocalReadFailure(blockId)
         }
     }
   }
 
+  private def processKryoException(ex: KryoException, blockId: BlockId): Unit = {
+    var errorMessage = s"${ex.getMessage}. Please check if environment status is healthy " +
+      s"(e.g: disk corruption, network failure (etc)). " +
+      s"${blockManagerId.toString} - blockName: $blockId"
+    if (diskBlockManager.containsBlock(blockId)) {
+      val file = diskBlockManager.getFile(blockId)
+      errorMessage = errorMessage + s" - blockDiskPath: ${file.getAbsolutePath}"
+    }
+    logError(errorMessage)

Review comment:
       Exactly, we can use `error/warn` in this case, however, not `info/debug`.
   
   This kind of problems are retried with `task-attempts` (3 attempts in our case). 
   - If I/O issue is `transient`, it can be recovered by next task-attempt. 
   - If it is not transient, it will fail `task/stage/job/query` (after all attempts).
   I think we can also use `warn` due to current task-retry flow (even if the first task attempt fails, flow will re-try until retry-limit so introducing recoverability).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34980: [SPARK-37710][CORE] Add detailed error message for java.io.IOException occurring on Kryo flow

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34980:
URL: https://github.com/apache/spark/pull/34980#discussion_r776644543



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -926,30 +933,49 @@ private[spark] class BlockManager(
           })
           Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
         } else if (level.useDisk && diskStore.contains(blockId)) {
-          val diskData = diskStore.getBytes(blockId)
-          val iterToReturn: Iterator[Any] = {
-            if (level.deserialized) {
-              val diskValues = serializerManager.dataDeserializeStream(
-                blockId,
-                diskData.toInputStream())(info.classTag)
-              maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
-            } else {
-              val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
-                .map { _.toInputStream(dispose = false) }
-                .getOrElse { diskData.toInputStream() }
-              serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+          try {
+            val diskData = diskStore.getBytes(blockId)
+            val iterToReturn: Iterator[Any] = {
+              if (level.deserialized) {
+                val diskValues = serializerManager.dataDeserializeStream(
+                  blockId,
+                  diskData.toInputStream())(info.classTag)
+                maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
+              } else {
+                val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
+                  .map { _.toInputStream(dispose = false) }
+                  .getOrElse { diskData.toInputStream() }
+                serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+              }
             }
+            val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
+              releaseLockAndDispose(blockId, diskData, taskContext)
+            })
+            Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+          } catch {
+            case ex: KryoException if ex.getCause.isInstanceOf[IOException] =>
+              // We need to have clear error message to catch environmental problems easily.
+              // Further details: https://issues.apache.org/jira/browse/SPARK-37710
+              processKryoException(ex, blockId)
+              throw ex
           }
-          val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
-            releaseLockAndDispose(blockId, diskData, taskContext)
-          })
-          Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
         } else {
           handleLocalReadFailure(blockId)
         }
     }
   }
 
+  private def processKryoException(ex: KryoException, blockId: BlockId): Unit = {
+    var errorMessage = s"${ex.getMessage}. Please check if environment status is healthy " +
+      s"(e.g: disk corruption, network failure (etc)). " +

Review comment:
       I dont particularly like this part of the message (`Please check if environment status is healthy (e.g: disk corruption, network failure (etc)).`) - we will end up keeping on adding to this list for different types of `IOException` causes as time goes by - this is better captured by `ex.getMessage` itself. This is currently speculative at best, and does not add too much value.
   
   Can we omit this ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34980: [SPARK-37710][CORE] Add detailed error message for java.io.IOException occurring on Kryo flow

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34980:
URL: https://github.com/apache/spark/pull/34980#discussion_r776912611



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -926,30 +933,49 @@ private[spark] class BlockManager(
           })
           Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
         } else if (level.useDisk && diskStore.contains(blockId)) {
-          val diskData = diskStore.getBytes(blockId)
-          val iterToReturn: Iterator[Any] = {
-            if (level.deserialized) {
-              val diskValues = serializerManager.dataDeserializeStream(
-                blockId,
-                diskData.toInputStream())(info.classTag)
-              maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
-            } else {
-              val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
-                .map { _.toInputStream(dispose = false) }
-                .getOrElse { diskData.toInputStream() }
-              serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+          try {
+            val diskData = diskStore.getBytes(blockId)
+            val iterToReturn: Iterator[Any] = {
+              if (level.deserialized) {
+                val diskValues = serializerManager.dataDeserializeStream(
+                  blockId,
+                  diskData.toInputStream())(info.classTag)
+                maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
+              } else {
+                val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
+                  .map { _.toInputStream(dispose = false) }
+                  .getOrElse { diskData.toInputStream() }
+                serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+              }
             }
+            val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
+              releaseLockAndDispose(blockId, diskData, taskContext)
+            })
+            Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+          } catch {
+            case ex: KryoException if ex.getCause.isInstanceOf[IOException] =>
+              // We need to have clear error message to catch environmental problems easily.
+              // Further details: https://issues.apache.org/jira/browse/SPARK-37710
+              processKryoException(ex, blockId)
+              throw ex
           }
-          val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
-            releaseLockAndDispose(blockId, diskData, taskContext)
-          })
-          Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
         } else {
           handleLocalReadFailure(blockId)
         }
     }
   }
 
+  private def processKryoException(ex: KryoException, blockId: BlockId): Unit = {
+    var errorMessage = s"${ex.getMessage}. Please check if environment status is healthy " +
+      s"(e.g: disk corruption, network failure (etc)). " +
+      s"${blockManagerId.toString} - blockName: $blockId"
+    if (diskBlockManager.containsBlock(blockId)) {
+      val file = diskBlockManager.getFile(blockId)
+      errorMessage = errorMessage + s" - blockDiskPath: ${file.getAbsolutePath}"
+    }
+    logError(errorMessage)

Review comment:
       The actual failure is logged/handled where the `KryoException` is caught - this log message is to give additional information to help debug in case someone wants to understand more on why the `KryoException` was thrown.
   As such, this is informational at best and at debug level if it becomes very verbose - I am err'ing on side of caution that it makes sense to keep this INFO.
   
   For example, we have had very verbose error/warn messages in past during executor shutdown (deletion of directories/files which cause a bunch of IOExceptions), which end up with polluting the logs.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34980: [SPARK-37710][CORE] Add detailed error message for java.io.IOException occurring on Kryo flow

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34980:
URL: https://github.com/apache/spark/pull/34980#discussion_r776912611



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -926,30 +933,49 @@ private[spark] class BlockManager(
           })
           Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
         } else if (level.useDisk && diskStore.contains(blockId)) {
-          val diskData = diskStore.getBytes(blockId)
-          val iterToReturn: Iterator[Any] = {
-            if (level.deserialized) {
-              val diskValues = serializerManager.dataDeserializeStream(
-                blockId,
-                diskData.toInputStream())(info.classTag)
-              maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
-            } else {
-              val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
-                .map { _.toInputStream(dispose = false) }
-                .getOrElse { diskData.toInputStream() }
-              serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+          try {
+            val diskData = diskStore.getBytes(blockId)
+            val iterToReturn: Iterator[Any] = {
+              if (level.deserialized) {
+                val diskValues = serializerManager.dataDeserializeStream(
+                  blockId,
+                  diskData.toInputStream())(info.classTag)
+                maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
+              } else {
+                val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
+                  .map { _.toInputStream(dispose = false) }
+                  .getOrElse { diskData.toInputStream() }
+                serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+              }
             }
+            val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
+              releaseLockAndDispose(blockId, diskData, taskContext)
+            })
+            Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+          } catch {
+            case ex: KryoException if ex.getCause.isInstanceOf[IOException] =>
+              // We need to have clear error message to catch environmental problems easily.
+              // Further details: https://issues.apache.org/jira/browse/SPARK-37710
+              processKryoException(ex, blockId)
+              throw ex
           }
-          val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
-            releaseLockAndDispose(blockId, diskData, taskContext)
-          })
-          Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
         } else {
           handleLocalReadFailure(blockId)
         }
     }
   }
 
+  private def processKryoException(ex: KryoException, blockId: BlockId): Unit = {
+    var errorMessage = s"${ex.getMessage}. Please check if environment status is healthy " +
+      s"(e.g: disk corruption, network failure (etc)). " +
+      s"${blockManagerId.toString} - blockName: $blockId"
+    if (diskBlockManager.containsBlock(blockId)) {
+      val file = diskBlockManager.getFile(blockId)
+      errorMessage = errorMessage + s" - blockDiskPath: ${file.getAbsolutePath}"
+    }
+    logError(errorMessage)

Review comment:
       The actual failure is logged/handled where the `KryoException` is caught - this log message is to give additional information to help debug in case someone wants to understand more on why the `KryoException` was thrown.
   As such, this is informational at best and debug if it becomes very verbose - I am err'ing on side of caution that it makes sense to make this INFO.
   
   For example, we have had very verbose error/warn messages in past during executor shutdown (deletion of directories/files which cause a bunch of IOExceptions), which end up with polluting the logs.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34980: SPARK-37710 - Add clear error message for java.io.IOException: Input/output error

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34980:
URL: https://github.com/apache/spark/pull/34980#issuecomment-999147234


   **[Test build #146454 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/146454/testReport)** for PR 34980 at commit [`2902183`](https://github.com/apache/spark/commit/2902183a41fa533dfece14f514384216299dc522).
    * This patch **fails Scala style tests**.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #34980: SPARK-37710 - Add clear error message for java.io.IOException: Input/output error

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #34980:
URL: https://github.com/apache/spark/pull/34980#issuecomment-999146272


   **[Test build #146454 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/146454/testReport)** for PR 34980 at commit [`2902183`](https://github.com/apache/spark/commit/2902183a41fa533dfece14f514384216299dc522).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34980: [SPARK-37710][CORE] Add detailed error message for java.io.IOException occurring on Kryo flow

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34980:
URL: https://github.com/apache/spark/pull/34980#discussion_r775762438



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -926,30 +933,48 @@ private[spark] class BlockManager(
           })
           Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
         } else if (level.useDisk && diskStore.contains(blockId)) {
-          val diskData = diskStore.getBytes(blockId)
-          val iterToReturn: Iterator[Any] = {
-            if (level.deserialized) {
-              val diskValues = serializerManager.dataDeserializeStream(
-                blockId,
-                diskData.toInputStream())(info.classTag)
-              maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
-            } else {
-              val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
-                .map { _.toInputStream(dispose = false) }
-                .getOrElse { diskData.toInputStream() }
-              serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+          try {
+            val diskData = diskStore.getBytes(blockId)
+            val iterToReturn: Iterator[Any] = {
+              if (level.deserialized) {
+                val diskValues = serializerManager.dataDeserializeStream(
+                  blockId,
+                  diskData.toInputStream())(info.classTag)
+                maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
+              } else {
+                val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
+                  .map { _.toInputStream(dispose = false) }
+                  .getOrElse { diskData.toInputStream() }
+                serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+              }
             }
+            val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
+              releaseLockAndDispose(blockId, diskData, taskContext)
+            })
+            Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+          } catch {
+            case ex: KryoException if ex.getCause.isInstanceOf[IOException] =>
+              // We need to have clear error message to catch environmental problems easily.
+              // Further details: https://issues.apache.org/jira/browse/SPARK-37710
+              processKryoException(ex, blockId)
+              throw ex
           }
-          val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
-            releaseLockAndDispose(blockId, diskData, taskContext)
-          })
-          Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
         } else {
           handleLocalReadFailure(blockId)
         }
     }
   }
 
+  private def processKryoException(ex: KryoException, blockId: BlockId): Unit = {
+    var errorMessage = s"${ex.getMessage}. Please check if environment status is healthy for " +
+        s"disk corruption, network failure (etc). ${blockManagerId.toString} - blockName: $blockId"
+    if (diskBlockManager.containsBlock(blockId)) {
+      val file = diskBlockManager.getFile(blockId)
+      errorMessage = errorMessage + s" - blockDiskPath: ${file.getAbsolutePath}"
+    }
+    logError(errorMessage, ex)

Review comment:
       log and rethrow is an anti-pattern - I am fine with a `logDebug` or `logInfo` giving additional info (blockManagerId, blockId, file name) in case this context is lost - the msg & stack trace are already getting logged elsewhere, drop `ex`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34980: [SPARK-37710][CORE] Add detailed error message for java.io.IOException occurring on Kryo flow

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34980:
URL: https://github.com/apache/spark/pull/34980#discussion_r775762438



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -926,30 +933,48 @@ private[spark] class BlockManager(
           })
           Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
         } else if (level.useDisk && diskStore.contains(blockId)) {
-          val diskData = diskStore.getBytes(blockId)
-          val iterToReturn: Iterator[Any] = {
-            if (level.deserialized) {
-              val diskValues = serializerManager.dataDeserializeStream(
-                blockId,
-                diskData.toInputStream())(info.classTag)
-              maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
-            } else {
-              val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
-                .map { _.toInputStream(dispose = false) }
-                .getOrElse { diskData.toInputStream() }
-              serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+          try {
+            val diskData = diskStore.getBytes(blockId)
+            val iterToReturn: Iterator[Any] = {
+              if (level.deserialized) {
+                val diskValues = serializerManager.dataDeserializeStream(
+                  blockId,
+                  diskData.toInputStream())(info.classTag)
+                maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
+              } else {
+                val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
+                  .map { _.toInputStream(dispose = false) }
+                  .getOrElse { diskData.toInputStream() }
+                serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+              }
             }
+            val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
+              releaseLockAndDispose(blockId, diskData, taskContext)
+            })
+            Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+          } catch {
+            case ex: KryoException if ex.getCause.isInstanceOf[IOException] =>
+              // We need to have clear error message to catch environmental problems easily.
+              // Further details: https://issues.apache.org/jira/browse/SPARK-37710
+              processKryoException(ex, blockId)
+              throw ex
           }
-          val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
-            releaseLockAndDispose(blockId, diskData, taskContext)
-          })
-          Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
         } else {
           handleLocalReadFailure(blockId)
         }
     }
   }
 
+  private def processKryoException(ex: KryoException, blockId: BlockId): Unit = {
+    var errorMessage = s"${ex.getMessage}. Please check if environment status is healthy for " +
+        s"disk corruption, network failure (etc). ${blockManagerId.toString} - blockName: $blockId"
+    if (diskBlockManager.containsBlock(blockId)) {
+      val file = diskBlockManager.getFile(blockId)
+      errorMessage = errorMessage + s" - blockDiskPath: ${file.getAbsolutePath}"
+    }
+    logError(errorMessage, ex)

Review comment:
       log and rethrow is an anti-pattern - I am fine with a `logDebug` or `logInfo` giving additional info (blockManagerId, blockId, file name) in case this context is lost - the stack trace is already getting logged elsewhere.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34980: [SPARK-37710][CORE] Add detailed error message for java.io.IOException occurring on Kryo flow

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34980:
URL: https://github.com/apache/spark/pull/34980#discussion_r776647028



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -926,30 +933,48 @@ private[spark] class BlockManager(
           })
           Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
         } else if (level.useDisk && diskStore.contains(blockId)) {
-          val diskData = diskStore.getBytes(blockId)
-          val iterToReturn: Iterator[Any] = {
-            if (level.deserialized) {
-              val diskValues = serializerManager.dataDeserializeStream(
-                blockId,
-                diskData.toInputStream())(info.classTag)
-              maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
-            } else {
-              val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
-                .map { _.toInputStream(dispose = false) }
-                .getOrElse { diskData.toInputStream() }
-              serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+          try {
+            val diskData = diskStore.getBytes(blockId)
+            val iterToReturn: Iterator[Any] = {
+              if (level.deserialized) {
+                val diskValues = serializerManager.dataDeserializeStream(
+                  blockId,
+                  diskData.toInputStream())(info.classTag)
+                maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
+              } else {
+                val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
+                  .map { _.toInputStream(dispose = false) }
+                  .getOrElse { diskData.toInputStream() }
+                serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+              }
             }
+            val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
+              releaseLockAndDispose(blockId, diskData, taskContext)
+            })
+            Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+          } catch {
+            case ex: KryoException if ex.getCause.isInstanceOf[IOException] =>
+              // We need to have clear error message to catch environmental problems easily.
+              // Further details: https://issues.apache.org/jira/browse/SPARK-37710
+              processKryoException(ex, blockId)
+              throw ex
           }
-          val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
-            releaseLockAndDispose(blockId, diskData, taskContext)
-          })
-          Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
         } else {
           handleLocalReadFailure(blockId)
         }
     }
   }
 
+  private def processKryoException(ex: KryoException, blockId: BlockId): Unit = {
+    var errorMessage = s"${ex.getMessage}. Please check if environment status is healthy for " +
+        s"disk corruption, network failure (etc). ${blockManagerId.toString} - blockName: $blockId"
+    if (diskBlockManager.containsBlock(blockId)) {
+      val file = diskBlockManager.getFile(blockId)
+      errorMessage = errorMessage + s" - blockDiskPath: ${file.getAbsolutePath}"
+    }
+    logError(errorMessage, ex)

Review comment:
       To clarify - log exception + rethrow is an antipattern.
   I am fine with either augmenting the thrown exception with additional details (typically the better way to handle this), or logging additional details (like blockId, etc) and throwing original exception to be appropriately handled - avoiding duplicate stacktraces across log file.
   
   The latest change addresses this part, so resolving the conversation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #34980: [SPARK-37710][CORE] Add detailed log message for java.io.IOException occurring on Kryo flow

Posted by GitBox <gi...@apache.org>.
mridulm commented on pull request #34980:
URL: https://github.com/apache/spark/pull/34980#issuecomment-1007002198


   Thanks for the improvement @erenavsarogullari !
   Thanks for the review @srowen :-)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34980: SPARK-37710 - Add clear error message for java.io.IOException: Input/output error

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34980:
URL: https://github.com/apache/spark/pull/34980#issuecomment-999146272


   **[Test build #146454 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/146454/testReport)** for PR 34980 at commit [`2902183`](https://github.com/apache/spark/commit/2902183a41fa533dfece14f514384216299dc522).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srowen commented on a change in pull request #34980: [SPARK-37710][CORE] Add clear error message for java.io.IOException: Input/output error

Posted by GitBox <gi...@apache.org>.
srowen commented on a change in pull request #34980:
URL: https://github.com/apache/spark/pull/34980#discussion_r774597380



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -926,30 +936,62 @@ private[spark] class BlockManager(
           })
           Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
         } else if (level.useDisk && diskStore.contains(blockId)) {
-          val diskData = diskStore.getBytes(blockId)
-          val iterToReturn: Iterator[Any] = {
-            if (level.deserialized) {
-              val diskValues = serializerManager.dataDeserializeStream(
-                blockId,
-                diskData.toInputStream())(info.classTag)
-              maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
-            } else {
-              val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
-                .map { _.toInputStream(dispose = false) }
-                .getOrElse { diskData.toInputStream() }
-              serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
-            }
-          }
-          val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
-            releaseLockAndDispose(blockId, diskData, taskContext)
-          })
-          Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+          getLocalValuesFromDisk(blockId, info, taskContext)
         } else {
           handleLocalReadFailure(blockId)
         }
     }
   }
 
+  private def getLocalValuesFromDisk(blockId: BlockId, info: BlockInfo,

Review comment:
       I missed why this needed to be refactored

##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -342,6 +343,15 @@ private[spark] class BlockManager(
             iter.close()
             false
         }
+      } catch {
+        case ex: KryoException
+          // We need to have clear error message to catch environmental problems easily.
+          // Further details: https://issues.apache.org/jira/browse/SPARK-37710
+          if ex.getMessage.toLowerCase(Locale.ROOT)

Review comment:
       Can we match on the cause of the KryoException to detect IOException? would that catch it more cleanly, or are only some IOExceptions of interest?

##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -926,30 +936,62 @@ private[spark] class BlockManager(
           })
           Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
         } else if (level.useDisk && diskStore.contains(blockId)) {
-          val diskData = diskStore.getBytes(blockId)
-          val iterToReturn: Iterator[Any] = {
-            if (level.deserialized) {
-              val diskValues = serializerManager.dataDeserializeStream(
-                blockId,
-                diskData.toInputStream())(info.classTag)
-              maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
-            } else {
-              val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
-                .map { _.toInputStream(dispose = false) }
-                .getOrElse { diskData.toInputStream() }
-              serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
-            }
-          }
-          val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
-            releaseLockAndDispose(blockId, diskData, taskContext)
-          })
-          Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+          getLocalValuesFromDisk(blockId, info, taskContext)
         } else {
           handleLocalReadFailure(blockId)
         }
     }
   }
 
+  private def getLocalValuesFromDisk(blockId: BlockId, info: BlockInfo,
+    taskContext: Option[TaskContext]): Option[BlockResult] = {
+    try {
+      val level = info.level
+      val diskData = diskStore.getBytes(blockId)
+      val iterToReturn: Iterator[Any] = {
+        if (level.deserialized) {
+          val diskValues = serializerManager.dataDeserializeStream(
+            blockId,
+            diskData.toInputStream())(info.classTag)
+          maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
+        } else {
+          val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
+            .map { _.toInputStream(dispose = false) }
+            .getOrElse { diskData.toInputStream() }
+          serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+        }
+      }
+      val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
+        releaseLockAndDispose(blockId, diskData, taskContext)
+      })
+      Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+    } catch {
+      case ex: KryoException
+        // We need to have clear error message to catch environmental problems easily.
+        // Further details: https://jira2.workday.com/browse/PRISM-102331
+        if ex.getMessage.toLowerCase(Locale.ROOT)
+          .contains("java.io.ioexception: input/output error") => {
+        processKryoException(ex, blockId)
+        throw ex
+      }
+    }
+  }
+
+  private def processKryoException(ex: KryoException, blockId: BlockId): Unit = {
+    val errorMessage =
+      new StringBuffer(s"${ex.getMessage} usually occurs due to environmental problems " +

Review comment:
       Does this really provide more info?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34980: [SPARK-37710][CORE] Add clear error message for java.io.IOException: Input/output error

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34980:
URL: https://github.com/apache/spark/pull/34980#issuecomment-999194151


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/50929/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srowen commented on a change in pull request #34980: [SPARK-37710][CORE] Add clear error message for java.io.IOException: Input/output error

Posted by GitBox <gi...@apache.org>.
srowen commented on a change in pull request #34980:
URL: https://github.com/apache/spark/pull/34980#discussion_r775652957



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -926,30 +933,55 @@ private[spark] class BlockManager(
           })
           Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
         } else if (level.useDisk && diskStore.contains(blockId)) {
-          val diskData = diskStore.getBytes(blockId)
-          val iterToReturn: Iterator[Any] = {
-            if (level.deserialized) {
-              val diskValues = serializerManager.dataDeserializeStream(
-                blockId,
-                diskData.toInputStream())(info.classTag)
-              maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
-            } else {
-              val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
-                .map { _.toInputStream(dispose = false) }
-                .getOrElse { diskData.toInputStream() }
-              serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
-            }
-          }
-          val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
-            releaseLockAndDispose(blockId, diskData, taskContext)
-          })
-          Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+          getLocalValuesFromDisk(blockId, info, taskContext)
         } else {
           handleLocalReadFailure(blockId)
         }
     }
   }
 
+  private def getLocalValuesFromDisk(blockId: BlockId, info: BlockInfo,
+    taskContext: Option[TaskContext]): Option[BlockResult] = {
+    try {
+      val level = info.level
+      val diskData = diskStore.getBytes(blockId)
+      val iterToReturn: Iterator[Any] = {
+        if (level.deserialized) {
+          val diskValues = serializerManager.dataDeserializeStream(
+            blockId,
+            diskData.toInputStream())(info.classTag)
+          maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
+        } else {
+          val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
+            .map { _.toInputStream(dispose = false) }
+            .getOrElse { diskData.toInputStream() }
+          serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+        }
+      }
+      val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
+        releaseLockAndDispose(blockId, diskData, taskContext)
+      })
+      Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+    } catch {
+      case ex: KryoException if ex.getCause.isInstanceOf[IOException] =>
+        // We need to have clear error message to catch environmental problems easily.
+        // Further details: https://issues.apache.org/jira/browse/SPARK-37710
+        processKryoException(ex, blockId)
+        throw ex
+    }
+  }
+
+  private def processKryoException(ex: KryoException, blockId: BlockId): Unit = {
+    val errorMessage =
+      new StringBuffer(s"${ex.getMessage}. Please check if environment status is healthy for " +

Review comment:
       Just use a string and concat, no need for the extra complexity

##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -926,30 +936,62 @@ private[spark] class BlockManager(
           })
           Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
         } else if (level.useDisk && diskStore.contains(blockId)) {
-          val diskData = diskStore.getBytes(blockId)
-          val iterToReturn: Iterator[Any] = {
-            if (level.deserialized) {
-              val diskValues = serializerManager.dataDeserializeStream(
-                blockId,
-                diskData.toInputStream())(info.classTag)
-              maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
-            } else {
-              val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
-                .map { _.toInputStream(dispose = false) }
-                .getOrElse { diskData.toInputStream() }
-              serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
-            }
-          }
-          val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
-            releaseLockAndDispose(blockId, diskData, taskContext)
-          })
-          Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+          getLocalValuesFromDisk(blockId, info, taskContext)
         } else {
           handleLocalReadFailure(blockId)
         }
     }
   }
 
+  private def getLocalValuesFromDisk(blockId: BlockId, info: BlockInfo,

Review comment:
       OK, maybe, but it seems unrelated and yet is about half the change here. Maybe better to do another place if at all




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34980: [SPARK-37710][CORE] Add detailed error message for java.io.IOException occurring on Kryo flow

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34980:
URL: https://github.com/apache/spark/pull/34980#discussion_r775762438



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -926,30 +933,48 @@ private[spark] class BlockManager(
           })
           Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
         } else if (level.useDisk && diskStore.contains(blockId)) {
-          val diskData = diskStore.getBytes(blockId)
-          val iterToReturn: Iterator[Any] = {
-            if (level.deserialized) {
-              val diskValues = serializerManager.dataDeserializeStream(
-                blockId,
-                diskData.toInputStream())(info.classTag)
-              maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
-            } else {
-              val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
-                .map { _.toInputStream(dispose = false) }
-                .getOrElse { diskData.toInputStream() }
-              serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+          try {
+            val diskData = diskStore.getBytes(blockId)
+            val iterToReturn: Iterator[Any] = {
+              if (level.deserialized) {
+                val diskValues = serializerManager.dataDeserializeStream(
+                  blockId,
+                  diskData.toInputStream())(info.classTag)
+                maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
+              } else {
+                val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
+                  .map { _.toInputStream(dispose = false) }
+                  .getOrElse { diskData.toInputStream() }
+                serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+              }
             }
+            val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
+              releaseLockAndDispose(blockId, diskData, taskContext)
+            })
+            Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+          } catch {
+            case ex: KryoException if ex.getCause.isInstanceOf[IOException] =>
+              // We need to have clear error message to catch environmental problems easily.
+              // Further details: https://issues.apache.org/jira/browse/SPARK-37710
+              processKryoException(ex, blockId)
+              throw ex
           }
-          val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
-            releaseLockAndDispose(blockId, diskData, taskContext)
-          })
-          Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
         } else {
           handleLocalReadFailure(blockId)
         }
     }
   }
 
+  private def processKryoException(ex: KryoException, blockId: BlockId): Unit = {
+    var errorMessage = s"${ex.getMessage}. Please check if environment status is healthy for " +
+        s"disk corruption, network failure (etc). ${blockManagerId.toString} - blockName: $blockId"
+    if (diskBlockManager.containsBlock(blockId)) {
+      val file = diskBlockManager.getFile(blockId)
+      errorMessage = errorMessage + s" - blockDiskPath: ${file.getAbsolutePath}"
+    }
+    logError(errorMessage, ex)

Review comment:
       log and rethrow is an anti-pattern - I am fine with a `logDebug` or `logInfo` giving additional info (blockManagerId, blockId, file name) in case this context is lost - the msg & stack trace are already getting logged elsewhere, drop `ex` and `ex.getMessage`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] erenavsarogullari commented on a change in pull request #34980: [SPARK-37710][CORE] Add detailed error message for java.io.IOException occurring on Kryo flow

Posted by GitBox <gi...@apache.org>.
erenavsarogullari commented on a change in pull request #34980:
URL: https://github.com/apache/spark/pull/34980#discussion_r776869729



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -926,30 +933,49 @@ private[spark] class BlockManager(
           })
           Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
         } else if (level.useDisk && diskStore.contains(blockId)) {
-          val diskData = diskStore.getBytes(blockId)
-          val iterToReturn: Iterator[Any] = {
-            if (level.deserialized) {
-              val diskValues = serializerManager.dataDeserializeStream(
-                blockId,
-                diskData.toInputStream())(info.classTag)
-              maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
-            } else {
-              val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
-                .map { _.toInputStream(dispose = false) }
-                .getOrElse { diskData.toInputStream() }
-              serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+          try {
+            val diskData = diskStore.getBytes(blockId)
+            val iterToReturn: Iterator[Any] = {
+              if (level.deserialized) {
+                val diskValues = serializerManager.dataDeserializeStream(
+                  blockId,
+                  diskData.toInputStream())(info.classTag)
+                maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
+              } else {
+                val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
+                  .map { _.toInputStream(dispose = false) }
+                  .getOrElse { diskData.toInputStream() }
+                serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+              }
             }
+            val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
+              releaseLockAndDispose(blockId, diskData, taskContext)
+            })
+            Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+          } catch {
+            case ex: KryoException if ex.getCause.isInstanceOf[IOException] =>
+              // We need to have clear error message to catch environmental problems easily.
+              // Further details: https://issues.apache.org/jira/browse/SPARK-37710
+              processKryoException(ex, blockId)
+              throw ex
           }
-          val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
-            releaseLockAndDispose(blockId, diskData, taskContext)
-          })
-          Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
         } else {
           handleLocalReadFailure(blockId)
         }
     }
   }
 
+  private def processKryoException(ex: KryoException, blockId: BlockId): Unit = {
+    var errorMessage = s"${ex.getMessage}. Please check if environment status is healthy " +
+      s"(e.g: disk corruption, network failure (etc)). " +
+      s"${blockManagerId.toString} - blockName: $blockId"
+    if (diskBlockManager.containsBlock(blockId)) {
+      val file = diskBlockManager.getFile(blockId)
+      errorMessage = errorMessage + s" - blockDiskPath: ${file.getAbsolutePath}"
+    }
+    logError(errorMessage)

Review comment:
       `logDebug`: `INFO` log level is mostly used at production systems so for example: in production, if disk corruption occurs on Kryo deserialization level, `problematic BlockManager, blockId` and `blockPath` information will be missed with `debug` log level.
   `logInfo`: This log message covers `ex.getMessage` and is going to be printed in only `error` case for `problematic BlockManager, blockId` and `blockPath` information.
   
   In the light of these, i could not see a good reason to be used `info` or `debug` level for this use-case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34980: [SPARK-37710][CORE] Add detailed error message for java.io.IOException occurring on Kryo flow

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34980:
URL: https://github.com/apache/spark/pull/34980#discussion_r775762438



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -926,30 +933,48 @@ private[spark] class BlockManager(
           })
           Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
         } else if (level.useDisk && diskStore.contains(blockId)) {
-          val diskData = diskStore.getBytes(blockId)
-          val iterToReturn: Iterator[Any] = {
-            if (level.deserialized) {
-              val diskValues = serializerManager.dataDeserializeStream(
-                blockId,
-                diskData.toInputStream())(info.classTag)
-              maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
-            } else {
-              val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
-                .map { _.toInputStream(dispose = false) }
-                .getOrElse { diskData.toInputStream() }
-              serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+          try {
+            val diskData = diskStore.getBytes(blockId)
+            val iterToReturn: Iterator[Any] = {
+              if (level.deserialized) {
+                val diskValues = serializerManager.dataDeserializeStream(
+                  blockId,
+                  diskData.toInputStream())(info.classTag)
+                maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
+              } else {
+                val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
+                  .map { _.toInputStream(dispose = false) }
+                  .getOrElse { diskData.toInputStream() }
+                serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+              }
             }
+            val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
+              releaseLockAndDispose(blockId, diskData, taskContext)
+            })
+            Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+          } catch {
+            case ex: KryoException if ex.getCause.isInstanceOf[IOException] =>
+              // We need to have clear error message to catch environmental problems easily.
+              // Further details: https://issues.apache.org/jira/browse/SPARK-37710
+              processKryoException(ex, blockId)
+              throw ex
           }
-          val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
-            releaseLockAndDispose(blockId, diskData, taskContext)
-          })
-          Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
         } else {
           handleLocalReadFailure(blockId)
         }
     }
   }
 
+  private def processKryoException(ex: KryoException, blockId: BlockId): Unit = {
+    var errorMessage = s"${ex.getMessage}. Please check if environment status is healthy for " +
+        s"disk corruption, network failure (etc). ${blockManagerId.toString} - blockName: $blockId"
+    if (diskBlockManager.containsBlock(blockId)) {
+      val file = diskBlockManager.getFile(blockId)
+      errorMessage = errorMessage + s" - blockDiskPath: ${file.getAbsolutePath}"
+    }
+    logError(errorMessage, ex)

Review comment:
       log and rethrow is an anti-pattern - I am fine with a `logDebug` or `logInfo` giving additional info (blockManagerId, blockId, file name) in case this context is lost - the stack trace is already getting logged elsewhere, drop `ex`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] erenavsarogullari commented on pull request #34980: [SPARK-37710][CORE] Add detailed error message for java.io.IOException occurring on Kryo flow

Posted by GitBox <gi...@apache.org>.
erenavsarogullari commented on pull request #34980:
URL: https://github.com/apache/spark/pull/34980#issuecomment-1001866941


   Thanks @srowen for quick review and follow-up.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34980: [SPARK-37710][CORE] Add clear error message for java.io.IOException: Input/output error

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34980:
URL: https://github.com/apache/spark/pull/34980#issuecomment-999194151


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/50929/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] erenavsarogullari commented on a change in pull request #34980: [SPARK-37710][CORE] Add clear error message for java.io.IOException: Input/output error

Posted by GitBox <gi...@apache.org>.
erenavsarogullari commented on a change in pull request #34980:
URL: https://github.com/apache/spark/pull/34980#discussion_r775641280



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -342,6 +343,15 @@ private[spark] class BlockManager(
             iter.close()
             false
         }
+      } catch {
+        case ex: KryoException
+          // We need to have clear error message to catch environmental problems easily.
+          // Further details: https://issues.apache.org/jira/browse/SPARK-37710
+          if ex.getMessage.toLowerCase(Locale.ROOT)

Review comment:
       Yes, we can check `IOException` at `Kryo` deserialization level for more generic error message. Updated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34980: [SPARK-37710][CORE] Add detailed error message for java.io.IOException occurring on Kryo flow

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34980:
URL: https://github.com/apache/spark/pull/34980#discussion_r779278699



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -926,30 +933,49 @@ private[spark] class BlockManager(
           })
           Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
         } else if (level.useDisk && diskStore.contains(blockId)) {
-          val diskData = diskStore.getBytes(blockId)
-          val iterToReturn: Iterator[Any] = {
-            if (level.deserialized) {
-              val diskValues = serializerManager.dataDeserializeStream(
-                blockId,
-                diskData.toInputStream())(info.classTag)
-              maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
-            } else {
-              val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
-                .map { _.toInputStream(dispose = false) }
-                .getOrElse { diskData.toInputStream() }
-              serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+          try {
+            val diskData = diskStore.getBytes(blockId)
+            val iterToReturn: Iterator[Any] = {
+              if (level.deserialized) {
+                val diskValues = serializerManager.dataDeserializeStream(
+                  blockId,
+                  diskData.toInputStream())(info.classTag)
+                maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
+              } else {
+                val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
+                  .map { _.toInputStream(dispose = false) }
+                  .getOrElse { diskData.toInputStream() }
+                serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+              }
             }
+            val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
+              releaseLockAndDispose(blockId, diskData, taskContext)
+            })
+            Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+          } catch {
+            case ex: KryoException if ex.getCause.isInstanceOf[IOException] =>
+              // We need to have clear error message to catch environmental problems easily.
+              // Further details: https://issues.apache.org/jira/browse/SPARK-37710
+              processKryoException(ex, blockId)
+              throw ex
           }
-          val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
-            releaseLockAndDispose(blockId, diskData, taskContext)
-          })
-          Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
         } else {
           handleLocalReadFailure(blockId)
         }
     }
   }
 
+  private def processKryoException(ex: KryoException, blockId: BlockId): Unit = {
+    var errorMessage = s"${ex.getMessage}. Please check if environment status is healthy " +
+      s"(e.g: disk corruption, network failure (etc)). " +
+      s"${blockManagerId.toString} - blockName: $blockId"
+    if (diskBlockManager.containsBlock(blockId)) {
+      val file = diskBlockManager.getFile(blockId)
+      errorMessage = errorMessage + s" - blockDiskPath: ${file.getAbsolutePath}"
+    }
+    logError(errorMessage)

Review comment:
       I am not in favor of making this ERROR or WARN - it is not actionable for end users, ends up surfacing for a larger number of cases (not just unrecoverable system errors), and the actual exception is handled elsewhere while this PR is giving supporting information to help triage an issue.
   
   I understand that there some specific set of corner cases where this information would be useful (like disk corruption, etc) - which is why I am fine with leaving it at INFO instead of DEBUG.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34980: [SPARK-37710][CORE] Add detailed error message for java.io.IOException occurring on Kryo flow

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34980:
URL: https://github.com/apache/spark/pull/34980#discussion_r779278699



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -926,30 +933,49 @@ private[spark] class BlockManager(
           })
           Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
         } else if (level.useDisk && diskStore.contains(blockId)) {
-          val diskData = diskStore.getBytes(blockId)
-          val iterToReturn: Iterator[Any] = {
-            if (level.deserialized) {
-              val diskValues = serializerManager.dataDeserializeStream(
-                blockId,
-                diskData.toInputStream())(info.classTag)
-              maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
-            } else {
-              val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
-                .map { _.toInputStream(dispose = false) }
-                .getOrElse { diskData.toInputStream() }
-              serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+          try {
+            val diskData = diskStore.getBytes(blockId)
+            val iterToReturn: Iterator[Any] = {
+              if (level.deserialized) {
+                val diskValues = serializerManager.dataDeserializeStream(
+                  blockId,
+                  diskData.toInputStream())(info.classTag)
+                maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
+              } else {
+                val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
+                  .map { _.toInputStream(dispose = false) }
+                  .getOrElse { diskData.toInputStream() }
+                serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+              }
             }
+            val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
+              releaseLockAndDispose(blockId, diskData, taskContext)
+            })
+            Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+          } catch {
+            case ex: KryoException if ex.getCause.isInstanceOf[IOException] =>
+              // We need to have clear error message to catch environmental problems easily.
+              // Further details: https://issues.apache.org/jira/browse/SPARK-37710
+              processKryoException(ex, blockId)
+              throw ex
           }
-          val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
-            releaseLockAndDispose(blockId, diskData, taskContext)
-          })
-          Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
         } else {
           handleLocalReadFailure(blockId)
         }
     }
   }
 
+  private def processKryoException(ex: KryoException, blockId: BlockId): Unit = {
+    var errorMessage = s"${ex.getMessage}. Please check if environment status is healthy " +
+      s"(e.g: disk corruption, network failure (etc)). " +
+      s"${blockManagerId.toString} - blockName: $blockId"
+    if (diskBlockManager.containsBlock(blockId)) {
+      val file = diskBlockManager.getFile(blockId)
+      errorMessage = errorMessage + s" - blockDiskPath: ${file.getAbsolutePath}"
+    }
+    logError(errorMessage)

Review comment:
       I am not in favor of making this ERROR or WARN - it is not actionable for end users, ends up surfacing for a larger number of cases (not just unrecoverable system errors), and the actual message is handled elsewhere while this PR giving supporting information to help triage an issue.
   
   I understand that there some specific set of corner cases where this information would be useful (like disk corruption, etc) - which is why I am fine with leaving it at INFO instead of DEBUG.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org