You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2022/09/23 11:27:57 UTC
[spark] branch branch-3.2 updated: [SPARK-39200][CORE] Make Fallback Storage readFully on content
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new f9a05dd8486 [SPARK-39200][CORE] Make Fallback Storage readFully on content
f9a05dd8486 is described below
commit f9a05dd8486a92ed4a2124bbd784bad0ee32a030
Author: Frank Yin <fr...@ziprecruiter.com>
AuthorDate: Fri Sep 23 04:23:26 2022 -0700
[SPARK-39200][CORE] Make Fallback Storage readFully on content
Looks like from bug description, fallback storage doesn't readFully and then cause `org.apache.spark.shuffle.FetchFailedException: Decompression error: Corrupted block detected`. This is an attempt to fix this by read the underlying stream fully.
Fix a bug documented in SPARK-39200
No
Wrote a unit test
Closes #37960 from ukby1234/SPARK-39200.
Authored-by: Frank Yin <fr...@ziprecruiter.com>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
(cherry picked from commit 07061f1a07a96f59ae42c9df6110eb784d2f3dab)
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
.../org/apache/spark/storage/FallbackStorage.scala | 2 +-
.../spark/storage/FallbackStorageSuite.scala | 90 +++++++++++++++++++++-
2 files changed, 90 insertions(+), 2 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala b/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
index 76137133227..29a88e175ba 100644
--- a/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
+++ b/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
@@ -185,7 +185,7 @@ private[spark] object FallbackStorage extends Logging {
val array = new Array[Byte](size.toInt)
val startTimeNs = System.nanoTime()
f.seek(offset)
- f.read(array)
+ f.readFully(array)
logDebug(s"Took ${(System.nanoTime() - startTimeNs) / (1000 * 1000)}ms")
f.close()
new NioManagedBuffer(ByteBuffer.wrap(array))
diff --git a/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala b/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala
index 7d648c979cd..444a217db3d 100644
--- a/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala
@@ -16,13 +16,15 @@
*/
package org.apache.spark.storage
-import java.io.{DataOutputStream, File, FileOutputStream, IOException}
+import java.io.{DataOutputStream, File, FileOutputStream, InputStream, IOException}
import java.net.{InetAddress, UnknownHostException}
import java.nio.file.Files
import scala.concurrent.duration._
+import scala.util.Random
import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FSDataInputStream, LocalFileSystem, Path, PositionedReadable, Seekable}
import org.mockito.{ArgumentMatchers => mc}
import org.mockito.Mockito.{mock, times, verify, when}
import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}
@@ -115,6 +117,49 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext {
FallbackStorage.read(conf, ShuffleBlockId(1, 2L, 0))
}
+ test("SPARK-39200: fallback storage APIs - readFully") {
+ val conf = new SparkConf(false)
+ .set("spark.app.id", "testId")
+ .set("spark.hadoop.fs.file.impl", classOf[ReadPartialFileSystem].getName)
+ .set(SHUFFLE_COMPRESS, false)
+ .set(STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true)
+ .set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH,
+ "file://" + Files.createTempDirectory("tmp").toFile.getAbsolutePath + "/")
+ val fallbackStorage = new FallbackStorage(conf)
+ val bmm = new BlockManagerMaster(new NoopRpcEndpointRef(conf), null, conf, false)
+
+ val bm = mock(classOf[BlockManager])
+ val dbm = new DiskBlockManager(conf, deleteFilesOnStop = false, isDriver = false)
+ when(bm.diskBlockManager).thenReturn(dbm)
+ when(bm.master).thenReturn(bmm)
+ val resolver = new IndexShuffleBlockResolver(conf, bm)
+ when(bm.migratableResolver).thenReturn(resolver)
+
+ val length = 100000
+ val content = new Array[Byte](length)
+ Random.nextBytes(content)
+
+ val indexFile = resolver.getIndexFile(1, 2L)
+ tryWithResource(new FileOutputStream(indexFile)) { fos =>
+ val dos = new DataOutputStream(fos)
+ dos.writeLong(0)
+ dos.writeLong(length)
+ }
+
+ val dataFile = resolver.getDataFile(1, 2L)
+ tryWithResource(new FileOutputStream(dataFile)) { fos =>
+ fos.write(content)
+ }
+
+ fallbackStorage.copy(ShuffleBlockInfo(1, 2L), bm)
+
+ assert(fallbackStorage.exists(1, ShuffleIndexBlockId(1, 2L, NOOP_REDUCE_ID).name))
+ assert(fallbackStorage.exists(1, ShuffleDataBlockId(1, 2L, NOOP_REDUCE_ID).name))
+
+ val readResult = FallbackStorage.read(conf, ShuffleBlockId(1, 2L, 0))
+ assert(readResult.nioByteBuffer().array().sameElements(content))
+ }
+
test("SPARK-34142: fallback storage API - cleanUp") {
withTempDir { dir =>
Seq(true, false).foreach { cleanUp =>
@@ -297,3 +342,46 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext {
}
}
}
+class ReadPartialInputStream(val in: FSDataInputStream) extends InputStream
+ with Seekable with PositionedReadable {
+ override def read: Int = in.read
+
+ override def read(b: Array[Byte], off: Int, len: Int): Int = {
+ if (len > 1) {
+ in.read(b, off, len - 1)
+ } else {
+ in.read(b, off, len)
+ }
+ }
+
+ override def seek(pos: Long): Unit = {
+ in.seek(pos)
+ }
+
+ override def getPos: Long = in.getPos
+
+ override def seekToNewSource(targetPos: Long): Boolean = in.seekToNewSource(targetPos)
+
+ override def read(position: Long, buffer: Array[Byte], offset: Int, length: Int): Int = {
+ if (length > 1) {
+ in.read(position, buffer, offset, length - 1)
+ } else {
+ in.read(position, buffer, offset, length)
+ }
+ }
+
+ override def readFully(position: Long, buffer: Array[Byte], offset: Int, length: Int): Unit = {
+ in.readFully(position, buffer, offset, length)
+ }
+
+ override def readFully(position: Long, buffer: Array[Byte]): Unit = {
+ in.readFully(position, buffer)
+ }
+}
+
+class ReadPartialFileSystem extends LocalFileSystem {
+ override def open(f: Path): FSDataInputStream = {
+ val stream = super.open(f)
+ new FSDataInputStream(new ReadPartialInputStream(stream))
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org