You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2022/09/23 11:27:57 UTC

[spark] branch branch-3.2 updated: [SPARK-39200][CORE] Make Fallback Storage readFully on content

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new f9a05dd8486 [SPARK-39200][CORE] Make Fallback Storage readFully on content
f9a05dd8486 is described below

commit f9a05dd8486a92ed4a2124bbd784bad0ee32a030
Author: Frank Yin <fr...@ziprecruiter.com>
AuthorDate: Fri Sep 23 04:23:26 2022 -0700

    [SPARK-39200][CORE] Make Fallback Storage readFully on content
    
    Looks like from bug description, fallback storage doesn't readFully and then cause `org.apache.spark.shuffle.FetchFailedException: Decompression error: Corrupted block detected`. This is an attempt to fix this by read the underlying stream fully.
    
    Fix a bug documented in SPARK-39200
    
    No
    
    Wrote a unit test
    
    Closes #37960 from ukby1234/SPARK-39200.
    
    Authored-by: Frank Yin <fr...@ziprecruiter.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
    (cherry picked from commit 07061f1a07a96f59ae42c9df6110eb784d2f3dab)
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../org/apache/spark/storage/FallbackStorage.scala |  2 +-
 .../spark/storage/FallbackStorageSuite.scala       | 90 +++++++++++++++++++++-
 2 files changed, 90 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala b/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
index 76137133227..29a88e175ba 100644
--- a/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
+++ b/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
@@ -185,7 +185,7 @@ private[spark] object FallbackStorage extends Logging {
         val array = new Array[Byte](size.toInt)
         val startTimeNs = System.nanoTime()
         f.seek(offset)
-        f.read(array)
+        f.readFully(array)
         logDebug(s"Took ${(System.nanoTime() - startTimeNs) / (1000 * 1000)}ms")
         f.close()
         new NioManagedBuffer(ByteBuffer.wrap(array))
diff --git a/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala b/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala
index 7d648c979cd..444a217db3d 100644
--- a/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala
@@ -16,13 +16,15 @@
  */
 package org.apache.spark.storage
 
-import java.io.{DataOutputStream, File, FileOutputStream, IOException}
+import java.io.{DataOutputStream, File, FileOutputStream, InputStream, IOException}
 import java.net.{InetAddress, UnknownHostException}
 import java.nio.file.Files
 
 import scala.concurrent.duration._
+import scala.util.Random
 
 import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FSDataInputStream, LocalFileSystem, Path, PositionedReadable, Seekable}
 import org.mockito.{ArgumentMatchers => mc}
 import org.mockito.Mockito.{mock, times, verify, when}
 import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}
@@ -115,6 +117,49 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext {
     FallbackStorage.read(conf, ShuffleBlockId(1, 2L, 0))
   }
 
+  test("SPARK-39200: fallback storage APIs - readFully") {
+    val conf = new SparkConf(false)
+      .set("spark.app.id", "testId")
+      .set("spark.hadoop.fs.file.impl", classOf[ReadPartialFileSystem].getName)
+      .set(SHUFFLE_COMPRESS, false)
+      .set(STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true)
+      .set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH,
+        "file://" + Files.createTempDirectory("tmp").toFile.getAbsolutePath + "/")
+    val fallbackStorage = new FallbackStorage(conf)
+    val bmm = new BlockManagerMaster(new NoopRpcEndpointRef(conf), null, conf, false)
+
+    val bm = mock(classOf[BlockManager])
+    val dbm = new DiskBlockManager(conf, deleteFilesOnStop = false, isDriver = false)
+    when(bm.diskBlockManager).thenReturn(dbm)
+    when(bm.master).thenReturn(bmm)
+    val resolver = new IndexShuffleBlockResolver(conf, bm)
+    when(bm.migratableResolver).thenReturn(resolver)
+
+    val length = 100000
+    val content = new Array[Byte](length)
+    Random.nextBytes(content)
+
+    val indexFile = resolver.getIndexFile(1, 2L)
+    tryWithResource(new FileOutputStream(indexFile)) { fos =>
+      val dos = new DataOutputStream(fos)
+      dos.writeLong(0)
+      dos.writeLong(length)
+    }
+
+    val dataFile = resolver.getDataFile(1, 2L)
+    tryWithResource(new FileOutputStream(dataFile)) { fos =>
+        fos.write(content)
+    }
+
+    fallbackStorage.copy(ShuffleBlockInfo(1, 2L), bm)
+
+    assert(fallbackStorage.exists(1, ShuffleIndexBlockId(1, 2L, NOOP_REDUCE_ID).name))
+    assert(fallbackStorage.exists(1, ShuffleDataBlockId(1, 2L, NOOP_REDUCE_ID).name))
+
+    val readResult = FallbackStorage.read(conf, ShuffleBlockId(1, 2L, 0))
+    assert(readResult.nioByteBuffer().array().sameElements(content))
+  }
+
   test("SPARK-34142: fallback storage API - cleanUp") {
     withTempDir { dir =>
       Seq(true, false).foreach { cleanUp =>
@@ -297,3 +342,46 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext {
     }
   }
 }
+class ReadPartialInputStream(val in: FSDataInputStream) extends InputStream
+  with Seekable with PositionedReadable {
+  override def read: Int = in.read
+
+  override def read(b: Array[Byte], off: Int, len: Int): Int = {
+    if (len > 1) {
+      in.read(b, off, len - 1)
+    } else {
+      in.read(b, off, len)
+    }
+  }
+
+  override def seek(pos: Long): Unit = {
+    in.seek(pos)
+  }
+
+  override def getPos: Long = in.getPos
+
+  override def seekToNewSource(targetPos: Long): Boolean = in.seekToNewSource(targetPos)
+
+  override def read(position: Long, buffer: Array[Byte], offset: Int, length: Int): Int = {
+    if (length > 1) {
+      in.read(position, buffer, offset, length - 1)
+    } else {
+      in.read(position, buffer, offset, length)
+    }
+  }
+
+  override def readFully(position: Long, buffer: Array[Byte], offset: Int, length: Int): Unit = {
+    in.readFully(position, buffer, offset, length)
+  }
+
+  override def readFully(position: Long, buffer: Array[Byte]): Unit = {
+    in.readFully(position, buffer)
+  }
+}
+
+class ReadPartialFileSystem extends LocalFileSystem {
+  override def open(f: Path): FSDataInputStream = {
+    val stream = super.open(f)
+    new FSDataInputStream(new ReadPartialInputStream(stream))
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org