You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ho...@apache.org on 2021/02/09 18:22:23 UTC

[spark] branch master updated: [SPARK-34363][CORE] Add an option for limiting storage for migrated shuffle blocks

This is an automated email from the ASF dual-hosted git repository.

holden pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b51843  [SPARK-34363][CORE] Add an option for limiting storage for migrated shuffle blocks
2b51843 is described below

commit 2b51843ca41236f8cec29c406ea35ce1088364cf
Author: Holden Karau <hk...@apple.com>
AuthorDate: Tue Feb 9 10:21:56 2021 -0800

    [SPARK-34363][CORE] Add an option for limiting storage for migrated shuffle blocks
    
    ### What changes were proposed in this pull request?
    
    Allow users to configure a maximum amount of shuffle blocks to be stored and reject remote shuffle blocks when this threshold is exceeded.
    
    ### Why are the changes needed?
    
    In disk constrained environments with large amount of shuffle data, migrations may result in excessive disk pressure on the nodes. On Kube nodes this can result in cascading failures when combined with `emptyDir`.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, new configuration parameter.
    
    ### How was this patch tested?
    
    New unit tests.
    
    Closes #31493 from holdenk/SPARK-34337-reject-disk-blocks-when-under-disk-pressure.
    
    Lead-authored-by: Holden Karau <hk...@apple.com>
    Co-authored-by: Holden Karau <ho...@pigscanfly.ca>
    Signed-off-by: Holden Karau <hk...@apple.com>
---
 .../org/apache/spark/internal/config/package.scala | 11 ++++++
 .../spark/shuffle/IndexShuffleBlockResolver.scala  | 20 +++++++++-
 .../apache/spark/shuffle/MigratableResolver.scala  |  1 +
 .../apache/spark/storage/BlockManagerSuite.scala   | 46 ++++++++++++++++------
 4 files changed, 65 insertions(+), 13 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 1afad30..7aeb51d 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -488,6 +488,17 @@ package object config {
       .booleanConf
       .createWithDefault(false)
 
+  private[spark] val STORAGE_DECOMMISSION_SHUFFLE_MAX_DISK_SIZE =
+    ConfigBuilder("spark.storage.decommission.shuffleBlocks.maxDiskSize")
+      .doc("Maximum disk space to use to store shuffle blocks before rejecting remote " +
+        "shuffle blocks. Rejecting remote shuffle blocks means that an executor will not receive " +
+        "any shuffle migrations, and if there are no other executors available for migration " +
+        "then shuffle blocks will be lost unless " +
+        s"${STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH.key} is configured.")
+      .version("3.2.0")
+      .bytesConf(ByteUnit.BYTE)
+      .createOptional
+
   private[spark] val STORAGE_REPLICATION_TOPOLOGY_FILE =
     ConfigBuilder("spark.storage.replication.topologyFile")
       .version("2.1.0")
diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
index 5f0bb42..d30b73a 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
@@ -22,8 +22,8 @@ import java.nio.ByteBuffer
 import java.nio.channels.Channels
 import java.nio.file.Files
 
-import org.apache.spark.{SparkConf, SparkEnv}
-import org.apache.spark.internal.Logging
+import org.apache.spark.{SparkConf, SparkEnv, SparkException}
+import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.io.NioBufferedFileInputStream
 import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
 import org.apache.spark.network.client.StreamCallbackWithID
@@ -56,6 +56,8 @@ private[spark] class IndexShuffleBlockResolver(
 
   private val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle")
 
+  private val remoteShuffleMaxDisk: Option[Long] =
+    conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_MAX_DISK_SIZE)
 
   def getDataFile(shuffleId: Int, mapId: Long): File = getDataFile(shuffleId, mapId, None)
 
@@ -72,6 +74,13 @@ private[spark] class IndexShuffleBlockResolver(
     }
   }
 
+  private def getShuffleBytesStored(): Long = {
+    val shuffleFiles: Seq[File] = getStoredShuffles().map {
+      si => getDataFile(si.shuffleId, si.mapId)
+    }
+    shuffleFiles.map(_.length()).sum
+  }
+
   /**
    * Get the shuffle data file.
    *
@@ -173,6 +182,13 @@ private[spark] class IndexShuffleBlockResolver(
    */
   override def putShuffleBlockAsStream(blockId: BlockId, serializerManager: SerializerManager):
       StreamCallbackWithID = {
+    // Throw an exception if we have exceeded maximum shuffle files stored
+    remoteShuffleMaxDisk.foreach { maxBytes =>
+      val bytesUsed = getShuffleBytesStored()
+      if (maxBytes < bytesUsed) {
+        throw new SparkException(s"Not storing remote shuffles $bytesUsed exceeds $maxBytes")
+      }
+    }
     val file = blockId match {
       case ShuffleIndexBlockId(shuffleId, mapId, _) =>
         getIndexFile(shuffleId, mapId)
diff --git a/core/src/main/scala/org/apache/spark/shuffle/MigratableResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/MigratableResolver.scala
index 3851fa6..9908281 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/MigratableResolver.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/MigratableResolver.scala
@@ -37,6 +37,7 @@ trait MigratableResolver {
 
   /**
    * Write a provided shuffle block as a stream. Used for block migrations.
+   * Up to the implementation to support STORAGE_REMOTE_SHUFFLE_MAX_DISK
    */
   def putShuffleBlockAsStream(blockId: BlockId, serializerManager: SerializerManager):
       StreamCallbackWithID
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 09678c7..82d7abf 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -103,8 +103,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
       .set(PUSH_BASED_SHUFFLE_ENABLED, true)
   }
 
-  private def makeSortShuffleManager(): SortShuffleManager = {
-    val newMgr = new SortShuffleManager(new SparkConf(false))
+  private def makeSortShuffleManager(conf: Option[SparkConf] = None): SortShuffleManager = {
+    val newMgr = new SortShuffleManager(conf.getOrElse(new SparkConf(false)))
     sortShuffleManagers += newMgr
     newMgr
   }
@@ -1932,22 +1932,29 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     assert(master.getLocations(blockIdLarge) === Seq(store1.blockManagerId))
   }
 
-  test("test migration of shuffle blocks during decommissioning") {
-    val shuffleManager1 = makeSortShuffleManager()
+  private def testShuffleBlockDecommissioning(maxShuffleSize: Option[Int], willReject: Boolean) = {
+    maxShuffleSize.foreach{ size =>
+      conf.set(STORAGE_DECOMMISSION_SHUFFLE_MAX_DISK_SIZE.key, s"${size}b")
+    }
+    val shuffleManager1 = makeSortShuffleManager(Some(conf))
     val bm1 = makeBlockManager(3500, "exec1", shuffleManager = shuffleManager1)
     shuffleManager1.shuffleBlockResolver._blockManager = bm1
 
-    val shuffleManager2 = makeSortShuffleManager()
+    val shuffleManager2 = makeSortShuffleManager(Some(conf))
     val bm2 = makeBlockManager(3500, "exec2", shuffleManager = shuffleManager2)
     shuffleManager2.shuffleBlockResolver._blockManager = bm2
 
     val blockSize = 5
     val shuffleDataBlockContent = Array[Byte](0, 1, 2, 3, 4)
     val shuffleData = ShuffleDataBlockId(0, 0, 0)
+    val shuffleData2 = ShuffleDataBlockId(1, 0, 0)
     Files.write(bm1.diskBlockManager.getFile(shuffleData).toPath(), shuffleDataBlockContent)
+    Files.write(bm2.diskBlockManager.getFile(shuffleData2).toPath(), shuffleDataBlockContent)
     val shuffleIndexBlockContent = Array[Byte](5, 6, 7, 8, 9)
     val shuffleIndex = ShuffleIndexBlockId(0, 0, 0)
+    val shuffleIndex2 = ShuffleIndexBlockId(1, 0, 0)
     Files.write(bm1.diskBlockManager.getFile(shuffleIndex).toPath(), shuffleIndexBlockContent)
+    Files.write(bm2.diskBlockManager.getFile(shuffleIndex2).toPath(), shuffleIndexBlockContent)
 
     mapOutputTracker.registerShuffle(0, 1)
     val decomManager = new BlockManagerDecommissioner(conf, bm1)
@@ -1961,13 +1968,18 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
       decomManager.refreshOffloadingShuffleBlocks()
 
-      eventually(timeout(1.second), interval(10.milliseconds)) {
-        assert(mapOutputTracker.shuffleStatuses(0).mapStatuses(0).location === bm2.blockManagerId)
+      if (willReject) {
+        eventually(timeout(1.second), interval(10.milliseconds)) {
+          assert(mapOutputTracker.shuffleStatuses(0).mapStatuses(0).location === bm2.blockManagerId)
+        }
+        assert(Files.readAllBytes(bm2.diskBlockManager.getFile(shuffleData).toPath())
+          === shuffleDataBlockContent)
+        assert(Files.readAllBytes(bm2.diskBlockManager.getFile(shuffleIndex).toPath())
+          === shuffleIndexBlockContent)
+      } else {
+        Thread.sleep(1000)
+        assert(mapOutputTracker.shuffleStatuses(0).mapStatuses(0).location === bm1.blockManagerId)
       }
-      assert(Files.readAllBytes(bm2.diskBlockManager.getFile(shuffleData).toPath())
-        === shuffleDataBlockContent)
-      assert(Files.readAllBytes(bm2.diskBlockManager.getFile(shuffleIndex).toPath())
-        === shuffleIndexBlockContent)
     } finally {
       mapOutputTracker.unregisterShuffle(0)
       // Avoid thread leak
@@ -1975,6 +1987,18 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     }
   }
 
+  test("test migration of shuffle blocks during decommissioning - no limit") {
+    testShuffleBlockDecommissioning(None, true)
+  }
+
+  test("test migration of shuffle blocks during decommissioning - larger limit") {
+    testShuffleBlockDecommissioning(Some(10000), true)
+  }
+
+  test("[SPARK-34363]test migration of shuffle blocks during decommissioning - small limit") {
+    testShuffleBlockDecommissioning(Some(1), false)
+  }
+
   test("SPARK-32919: Shuffle push merger locations should be bounded with in" +
     " spark.shuffle.push.retainedMergerLocations") {
     assert(master.getShufflePushMergerLocations(10, Set.empty).isEmpty)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org