You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by mr...@apache.org on 2022/01/06 23:05:37 UTC
[spark] branch master updated: [SPARK-37710][CORE] Add detailed log message for java.io.IOException occurring on Kryo flow
This is an automated email from the ASF dual-hosted git repository.
mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 5d7031c [SPARK-37710][CORE] Add detailed log message for java.io.IOException occurring on Kryo flow
5d7031c is described below
commit 5d7031ca01dd902ed60f180925b502d9919a7c01
Author: erenavsarogullari <er...@gmail.com>
AuthorDate: Thu Jan 6 17:04:58 2022 -0600
[SPARK-37710][CORE] Add detailed log message for java.io.IOException occurring on Kryo flow
### What changes were proposed in this pull request?
`java.io.IOException: Input/output error` usually points environmental issues such as disk read/write failures due to disk corruption, network access failures etc. This PR aims to be added clear error message to catch this kind of environmental cases occurring on `BlockManager` and logs with `BlockManager hostname`, `blockId` and `blockPath`.
### Why are the changes needed?
This kind of problems usually environmental problems and clear error message can help its analysis and save RCA time.
Following stack-trace is from `disk corruption` case:
```
com.esotericsoftware.kryo.KryoException: java.io.IOException: Input/output error
Serialization trace:
buffers (org.apache.spark.sql.execution.columnar.DefaultCachedBatch)
at com.esotericsoftware.kryo.io.Input.fill(Input.java:166)
at com.esotericsoftware.kryo.io.Input.require(Input.java:196)
at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:346)
at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:326)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:55)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:38)
at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:789)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:381)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:302)
at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:789)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:132)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:816)
at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:296)
at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:168)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299)
at org.apache.spark.storage.BlockManager.maybeCacheDiskValuesInMemory(BlockManager.scala:1569)
at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:877)
at org.apache.spark.storage.BlockManager.get(BlockManager.scala:1163)
...
Caused by: java.io.IOException: Input/output error
at java.io.FileInputStream.readBytes(Native Method)
at java.io.FileInputStream.read(FileInputStream.java:255)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
at net.jpountz.lz4.LZ4BlockInputStream.tryReadFully(LZ4BlockInputStream.java:269)
at net.jpountz.lz4.LZ4BlockInputStream.readFully(LZ4BlockInputStream.java:280)
at net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:243)
at net.jpountz.lz4.LZ4BlockInputStream.read(LZ4BlockInputStream.java:157)
at com.esotericsoftware.kryo.io.Input.fill(Input.java:164)
... 87 more
```
Proposed Error Message:
```
java.io.IOException: Input/output error. BlockManagerId(driver, localhost, 58677, None) - blockId: test_my-block-id
- blockDiskPath: /private/var/folders/kj/mccyycwn6mjdwnglw9g3k6pm0000gq/T/
blockmgr-24325b3a-0045-483a-98b8-673a2e07bed1/11/test_my-block-id
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added 2 new Unit Tests by reproducing the issue with `TestDiskCorruptedInputStream` and logging new error message when getting data blocks from disk.
Closes #34980 from erenavsarogullari/SPARK-37710.
Authored-by: erenavsarogullari <er...@gmail.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
.../org/apache/spark/storage/BlockManager.scala | 57 ++++++++++-----
.../apache/spark/storage/BlockManagerSuite.scala | 80 +++++++++++++++++++++-
2 files changed, 119 insertions(+), 18 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 9ebf26b..ec4dc77 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -34,6 +34,7 @@ import scala.util.{Failure, Random, Success, Try}
import scala.util.control.NonFatal
import com.codahale.metrics.{MetricRegistry, MetricSet}
+import com.esotericsoftware.kryo.KryoException
import com.google.common.cache.CacheBuilder
import org.apache.commons.io.IOUtils
@@ -342,6 +343,12 @@ private[spark] class BlockManager(
iter.close()
false
}
+ } catch {
+ case ex: KryoException if ex.getCause.isInstanceOf[IOException] =>
+ // We need to have detailed log message to catch environmental problems easily.
+ // Further details: https://issues.apache.org/jira/browse/SPARK-37710
+ processKryoException(ex, blockId)
+ throw ex
} finally {
IOUtils.closeQuietly(inputStream)
}
@@ -926,30 +933,48 @@ private[spark] class BlockManager(
})
Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
} else if (level.useDisk && diskStore.contains(blockId)) {
- val diskData = diskStore.getBytes(blockId)
- val iterToReturn: Iterator[Any] = {
- if (level.deserialized) {
- val diskValues = serializerManager.dataDeserializeStream(
- blockId,
- diskData.toInputStream())(info.classTag)
- maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
- } else {
- val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
- .map { _.toInputStream(dispose = false) }
- .getOrElse { diskData.toInputStream() }
- serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+ try {
+ val diskData = diskStore.getBytes(blockId)
+ val iterToReturn: Iterator[Any] = {
+ if (level.deserialized) {
+ val diskValues = serializerManager.dataDeserializeStream(
+ blockId,
+ diskData.toInputStream())(info.classTag)
+ maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
+ } else {
+ val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
+ .map { _.toInputStream(dispose = false) }
+ .getOrElse { diskData.toInputStream() }
+ serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
+ }
}
+ val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
+ releaseLockAndDispose(blockId, diskData, taskContext)
+ })
+ Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+ } catch {
+ case ex: KryoException if ex.getCause.isInstanceOf[IOException] =>
+ // We need to have detailed log message to catch environmental problems easily.
+ // Further details: https://issues.apache.org/jira/browse/SPARK-37710
+ processKryoException(ex, blockId)
+ throw ex
}
- val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
- releaseLockAndDispose(blockId, diskData, taskContext)
- })
- Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
} else {
handleLocalReadFailure(blockId)
}
}
}
+ private def processKryoException(ex: KryoException, blockId: BlockId): Unit = {
+ var message =
+ "%s. %s - blockId: %s".format(ex.getMessage, blockManagerId.toString, blockId)
+ val file = diskBlockManager.getFile(blockId)
+ if (file.exists()) {
+ message = "%s - blockDiskPath: %s".format(message, file.getAbsolutePath)
+ }
+ logInfo(message)
+ }
+
/**
* Get block from the local block manager as serialized bytes.
*/
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 2cb281d..d1dc083 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.storage
-import java.io.File
+import java.io.{File, InputStream, IOException}
import java.nio.ByteBuffer
import java.nio.file.Files
@@ -29,6 +29,7 @@ import scala.concurrent.duration._
import scala.language.implicitConversions
import scala.reflect.ClassTag
+import com.esotericsoftware.kryo.KryoException
import org.apache.commons.lang3.RandomUtils
import org.mockito.{ArgumentCaptor, ArgumentMatchers => mc}
import org.mockito.Mockito.{doAnswer, mock, never, spy, times, verify, when}
@@ -43,6 +44,7 @@ import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.executor.DataReadMethod
import org.apache.spark.internal.config
import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Kryo.{KRYO_USE_POOL, KRYO_USE_UNSAFE}
import org.apache.spark.internal.config.Tests._
import org.apache.spark.memory.{MemoryMode, UnifiedMemoryManager}
import org.apache.spark.network.{BlockDataManager, BlockTransferService, TransportContext}
@@ -57,7 +59,7 @@ import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEnv}
import org.apache.spark.scheduler.{LiveListenerBus, MapStatus, MergeStatus, SparkListenerBlockUpdated}
import org.apache.spark.scheduler.cluster.{CoarseGrainedClusterMessages, CoarseGrainedSchedulerBackend}
import org.apache.spark.security.{CryptoStreamUtils, EncryptionFunSuite}
-import org.apache.spark.serializer.{JavaSerializer, KryoSerializer, SerializerManager}
+import org.apache.spark.serializer.{DeserializationStream, JavaSerializer, KryoDeserializationStream, KryoSerializer, KryoSerializerInstance, SerializerInstance, SerializerManager}
import org.apache.spark.shuffle.{MigratableResolver, ShuffleBlockInfo, ShuffleBlockResolver, ShuffleManager}
import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.storage.BlockManagerMessages._
@@ -2123,6 +2125,80 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
store.putSingle(broadcast0BlockId, a, StorageLevel.DISK_ONLY)
}
+ test("check KryoException when getting disk blocks and 'Input/output error' is occurred") {
+ val kryoSerializerWithDiskCorruptedInputStream
+ = createKryoSerializerWithDiskCorruptedInputStream()
+
+ case class User(id: Long, name: String)
+
+ conf.set(TEST_MEMORY, 1200L)
+ val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", "localhost", 0, 1)
+ val memoryManager = UnifiedMemoryManager(conf, numCores = 1)
+ val serializerManager = new SerializerManager(kryoSerializerWithDiskCorruptedInputStream, conf)
+ val store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master,
+ serializerManager, conf, memoryManager, mapOutputTracker,
+ shuffleManager, transfer, securityMgr, None)
+ allStores += store
+ store.initialize("app-id")
+ store.putSingle("my-block-id", new Array[User](300), StorageLevel.MEMORY_AND_DISK)
+
+ val kryoException = intercept[KryoException] {
+ store.getOrElseUpdate("my-block-id", StorageLevel.MEMORY_AND_DISK, ClassTag.Object,
+ () => List(new Array[User](1)).iterator)
+ }
+ assert(kryoException.getMessage === "java.io.IOException: Input/output error")
+ }
+
+ test("check KryoException when saving blocks into memory and 'Input/output error' is occurred") {
+ val kryoSerializerWithDiskCorruptedInputStream
+ = createKryoSerializerWithDiskCorruptedInputStream()
+
+ conf.set(TEST_MEMORY, 1200L)
+ val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", "localhost", 0, 1)
+ val memoryManager = UnifiedMemoryManager(conf, numCores = 1)
+ val serializerManager = new SerializerManager(kryoSerializerWithDiskCorruptedInputStream, conf)
+ val store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master,
+ serializerManager, conf, memoryManager, mapOutputTracker,
+ shuffleManager, transfer, securityMgr, None)
+ allStores += store
+ store.initialize("app-id")
+
+ val blockId = RDDBlockId(0, 0)
+ val bytes = Array.tabulate[Byte](1000)(_.toByte)
+ val byteBuffer = new ChunkedByteBuffer(ByteBuffer.wrap(bytes))
+
+ val kryoException = intercept[KryoException] {
+ store.putBytes(blockId, byteBuffer, StorageLevel.MEMORY_AND_DISK)
+ }
+ assert(kryoException.getMessage === "java.io.IOException: Input/output error")
+ }
+
+ private def createKryoSerializerWithDiskCorruptedInputStream(): KryoSerializer = {
+ class TestDiskCorruptedInputStream extends InputStream {
+ override def read(): Int = throw new IOException("Input/output error")
+ }
+
+ class TestKryoDeserializationStream(serInstance: KryoSerializerInstance,
+ inStream: InputStream,
+ useUnsafe: Boolean)
+ extends KryoDeserializationStream(serInstance, inStream, useUnsafe)
+
+ class TestKryoSerializerInstance(ks: KryoSerializer, useUnsafe: Boolean, usePool: Boolean)
+ extends KryoSerializerInstance(ks, useUnsafe, usePool) {
+ override def deserializeStream(s: InputStream): DeserializationStream = {
+ new TestKryoDeserializationStream(this, new TestDiskCorruptedInputStream(), false)
+ }
+ }
+
+ class TestKryoSerializer(conf: SparkConf) extends KryoSerializer(conf) {
+ override def newInstance(): SerializerInstance = {
+ new TestKryoSerializerInstance(this, conf.get(KRYO_USE_UNSAFE), conf.get(KRYO_USE_POOL))
+ }
+ }
+
+ new TestKryoSerializer(conf)
+ }
+
class MockBlockTransferService(
val maxFailures: Int,
override val hostName: String = "MockBlockTransferServiceHost") extends BlockTransferService {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org