You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by mr...@apache.org on 2022/01/06 23:05:37 UTC

[spark] branch master updated: [SPARK-37710][CORE] Add detailed log message for java.io.IOException occurring on Kryo flow

This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d7031c  [SPARK-37710][CORE] Add detailed log message for java.io.IOException occurring on Kryo flow
5d7031c is described below

commit 5d7031ca01dd902ed60f180925b502d9919a7c01
Author: erenavsarogullari <er...@gmail.com>
AuthorDate: Thu Jan 6 17:04:58 2022 -0600

    [SPARK-37710][CORE] Add detailed log message for java.io.IOException occurring on Kryo flow
    
    ### What changes were proposed in this pull request?
    `java.io.IOException: Input/output error` usually points environmental issues such as disk read/write failures due to disk corruption, network access failures etc. This PR aims to be added clear error message to catch this kind of environmental cases occurring on `BlockManager` and logs with `BlockManager hostname`, `blockId` and `blockPath`.
    
    ### Why are the changes needed?
    This kind of problems usually environmental problems and clear error message can help its analysis and save RCA time.
    
    Following stack-trace is from `disk corruption` case:
    ```
    com.esotericsoftware.kryo.KryoException: java.io.IOException: Input/output error
    Serialization trace:
    buffers (org.apache.spark.sql.execution.columnar.DefaultCachedBatch)
        at com.esotericsoftware.kryo.io.Input.fill(Input.java:166)
        at com.esotericsoftware.kryo.io.Input.require(Input.java:196)
        at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:346)
        at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:326)
        at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:55)
        at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:38)
        at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:789)
        at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:381)
        at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:302)
        at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:789)
        at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:132)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:816)
        at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:296)
        at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:168)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
        at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
        at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299)
        at org.apache.spark.storage.BlockManager.maybeCacheDiskValuesInMemory(BlockManager.scala:1569)
        at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:877)
        at org.apache.spark.storage.BlockManager.get(BlockManager.scala:1163)
    ...
    Caused by: java.io.IOException: Input/output error
        at java.io.FileInputStream.readBytes(Native Method)
        at java.io.FileInputStream.read(FileInputStream.java:255)
        at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
        at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
        at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
        at net.jpountz.lz4.LZ4BlockInputStream.tryReadFully(LZ4BlockInputStream.java:269)
        at net.jpountz.lz4.LZ4BlockInputStream.readFully(LZ4BlockInputStream.java:280)
        at net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:243)
        at net.jpountz.lz4.LZ4BlockInputStream.read(LZ4BlockInputStream.java:157)
        at com.esotericsoftware.kryo.io.Input.fill(Input.java:164)
        ... 87 more
    ```
    
    Proposed Error Message:
    ```
    java.io.IOException: Input/output error. BlockManagerId(driver, localhost, 58677, None) - blockId: test_my-block-id
    - blockDiskPath: /private/var/folders/kj/mccyycwn6mjdwnglw9g3k6pm0000gq/T/
    blockmgr-24325b3a-0045-483a-98b8-673a2e07bed1/11/test_my-block-id
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Added 2 new Unit Tests by reproducing the issue with `TestDiskCorruptedInputStream` and logging new error message when getting data blocks from disk.
    
    Closes #34980 from erenavsarogullari/SPARK-37710.
    
    Authored-by: erenavsarogullari <er...@gmail.com>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
 .../org/apache/spark/storage/BlockManager.scala    | 57 ++++++++++-----
 .../apache/spark/storage/BlockManagerSuite.scala   | 80 +++++++++++++++++++++-
 2 files changed, 119 insertions(+), 18 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 9ebf26b..ec4dc77 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -34,6 +34,7 @@ import scala.util.{Failure, Random, Success, Try}
 import scala.util.control.NonFatal
 
 import com.codahale.metrics.{MetricRegistry, MetricSet}
+import com.esotericsoftware.kryo.KryoException
 import com.google.common.cache.CacheBuilder
 import org.apache.commons.io.IOUtils
 
@@ -342,6 +343,12 @@ private[spark] class BlockManager(
             iter.close()
             false
         }
+      } catch {
+        case ex: KryoException if ex.getCause.isInstanceOf[IOException] =>
+          // We need to have detailed log message to catch environmental problems easily.
+          // Further details: https://issues.apache.org/jira/browse/SPARK-37710
+          processKryoException(ex, blockId)
+          throw ex
       } finally {
         IOUtils.closeQuietly(inputStream)
       }
@@ -926,30 +933,48 @@ private[spark] class BlockManager(
           })
           Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
         } else if (level.useDisk && diskStore.contains(blockId)) {
-          val diskData = diskStore.getBytes(blockId)
-          val iterToReturn: Iterator[Any] = {
-            if (level.deserialized) {
-              val diskValues = serializerManager.dataDeserializeStream(
-                blockId,
-                diskData.toInputStream())(info.classTag)
-              maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
-            } else {
-              val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
-                .map { _.toInputStream(dispose = false) }
-                .getOrElse { diskData.toInputStream() }
-              serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+          try {
+            val diskData = diskStore.getBytes(blockId)
+            val iterToReturn: Iterator[Any] = {
+              if (level.deserialized) {
+                val diskValues = serializerManager.dataDeserializeStream(
+                  blockId,
+                  diskData.toInputStream())(info.classTag)
+                maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
+              } else {
+                val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
+                  .map { _.toInputStream(dispose = false) }
+                  .getOrElse { diskData.toInputStream() }
+                serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+              }
             }
+            val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
+              releaseLockAndDispose(blockId, diskData, taskContext)
+            })
+            Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+          } catch {
+            case ex: KryoException if ex.getCause.isInstanceOf[IOException] =>
+              // We need to have detailed log message to catch environmental problems easily.
+              // Further details: https://issues.apache.org/jira/browse/SPARK-37710
+              processKryoException(ex, blockId)
+              throw ex
           }
-          val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
-            releaseLockAndDispose(blockId, diskData, taskContext)
-          })
-          Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
         } else {
           handleLocalReadFailure(blockId)
         }
     }
   }
 
+  private def processKryoException(ex: KryoException, blockId: BlockId): Unit = {
+    var message =
+      "%s. %s - blockId: %s".format(ex.getMessage, blockManagerId.toString, blockId)
+    val file = diskBlockManager.getFile(blockId)
+    if (file.exists()) {
+      message = "%s - blockDiskPath: %s".format(message, file.getAbsolutePath)
+    }
+    logInfo(message)
+  }
+
   /**
    * Get block from the local block manager as serialized bytes.
    */
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 2cb281d..d1dc083 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.storage
 
-import java.io.File
+import java.io.{File, InputStream, IOException}
 import java.nio.ByteBuffer
 import java.nio.file.Files
 
@@ -29,6 +29,7 @@ import scala.concurrent.duration._
 import scala.language.implicitConversions
 import scala.reflect.ClassTag
 
+import com.esotericsoftware.kryo.KryoException
 import org.apache.commons.lang3.RandomUtils
 import org.mockito.{ArgumentCaptor, ArgumentMatchers => mc}
 import org.mockito.Mockito.{doAnswer, mock, never, spy, times, verify, when}
@@ -43,6 +44,7 @@ import org.apache.spark.broadcast.BroadcastManager
 import org.apache.spark.executor.DataReadMethod
 import org.apache.spark.internal.config
 import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Kryo.{KRYO_USE_POOL, KRYO_USE_UNSAFE}
 import org.apache.spark.internal.config.Tests._
 import org.apache.spark.memory.{MemoryMode, UnifiedMemoryManager}
 import org.apache.spark.network.{BlockDataManager, BlockTransferService, TransportContext}
@@ -57,7 +59,7 @@ import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEnv}
 import org.apache.spark.scheduler.{LiveListenerBus, MapStatus, MergeStatus, SparkListenerBlockUpdated}
 import org.apache.spark.scheduler.cluster.{CoarseGrainedClusterMessages, CoarseGrainedSchedulerBackend}
 import org.apache.spark.security.{CryptoStreamUtils, EncryptionFunSuite}
-import org.apache.spark.serializer.{JavaSerializer, KryoSerializer, SerializerManager}
+import org.apache.spark.serializer.{DeserializationStream, JavaSerializer, KryoDeserializationStream, KryoSerializer, KryoSerializerInstance, SerializerInstance, SerializerManager}
 import org.apache.spark.shuffle.{MigratableResolver, ShuffleBlockInfo, ShuffleBlockResolver, ShuffleManager}
 import org.apache.spark.shuffle.sort.SortShuffleManager
 import org.apache.spark.storage.BlockManagerMessages._
@@ -2123,6 +2125,80 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     store.putSingle(broadcast0BlockId, a, StorageLevel.DISK_ONLY)
   }
 
+  test("check KryoException when getting disk blocks and 'Input/output error' is occurred") {
+    val kryoSerializerWithDiskCorruptedInputStream
+      = createKryoSerializerWithDiskCorruptedInputStream()
+
+    case class User(id: Long, name: String)
+
+    conf.set(TEST_MEMORY, 1200L)
+    val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", "localhost", 0, 1)
+    val memoryManager = UnifiedMemoryManager(conf, numCores = 1)
+    val serializerManager = new SerializerManager(kryoSerializerWithDiskCorruptedInputStream, conf)
+    val store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master,
+      serializerManager, conf, memoryManager, mapOutputTracker,
+      shuffleManager, transfer, securityMgr, None)
+    allStores += store
+    store.initialize("app-id")
+    store.putSingle("my-block-id", new Array[User](300), StorageLevel.MEMORY_AND_DISK)
+
+    val kryoException = intercept[KryoException] {
+      store.getOrElseUpdate("my-block-id", StorageLevel.MEMORY_AND_DISK, ClassTag.Object,
+        () => List(new Array[User](1)).iterator)
+    }
+    assert(kryoException.getMessage === "java.io.IOException: Input/output error")
+  }
+
+  test("check KryoException when saving blocks into memory and 'Input/output error' is occurred") {
+    val kryoSerializerWithDiskCorruptedInputStream
+      = createKryoSerializerWithDiskCorruptedInputStream()
+
+    conf.set(TEST_MEMORY, 1200L)
+    val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", "localhost", 0, 1)
+    val memoryManager = UnifiedMemoryManager(conf, numCores = 1)
+    val serializerManager = new SerializerManager(kryoSerializerWithDiskCorruptedInputStream, conf)
+    val store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master,
+      serializerManager, conf, memoryManager, mapOutputTracker,
+      shuffleManager, transfer, securityMgr, None)
+    allStores += store
+    store.initialize("app-id")
+
+    val blockId = RDDBlockId(0, 0)
+    val bytes = Array.tabulate[Byte](1000)(_.toByte)
+    val byteBuffer = new ChunkedByteBuffer(ByteBuffer.wrap(bytes))
+
+    val kryoException = intercept[KryoException] {
+      store.putBytes(blockId, byteBuffer, StorageLevel.MEMORY_AND_DISK)
+    }
+    assert(kryoException.getMessage === "java.io.IOException: Input/output error")
+  }
+
+  private def createKryoSerializerWithDiskCorruptedInputStream(): KryoSerializer = {
+    class TestDiskCorruptedInputStream extends InputStream {
+      override def read(): Int = throw new IOException("Input/output error")
+    }
+
+    class TestKryoDeserializationStream(serInstance: KryoSerializerInstance,
+                                        inStream: InputStream,
+                                        useUnsafe: Boolean)
+      extends KryoDeserializationStream(serInstance, inStream, useUnsafe)
+
+    class TestKryoSerializerInstance(ks: KryoSerializer, useUnsafe: Boolean, usePool: Boolean)
+      extends KryoSerializerInstance(ks, useUnsafe, usePool) {
+      override def deserializeStream(s: InputStream): DeserializationStream = {
+        new TestKryoDeserializationStream(this, new TestDiskCorruptedInputStream(), false)
+      }
+    }
+
+    class TestKryoSerializer(conf: SparkConf) extends KryoSerializer(conf) {
+      override def newInstance(): SerializerInstance = {
+        new TestKryoSerializerInstance(this, conf.get(KRYO_USE_UNSAFE), conf.get(KRYO_USE_POOL))
+      }
+    }
+
+    new TestKryoSerializer(conf)
+  }
+
   class MockBlockTransferService(
       val maxFailures: Int,
       override val hostName: String = "MockBlockTransferServiceHost") extends BlockTransferService {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org