You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2022/03/25 18:01:10 UTC

[spark] branch master updated: [SPARK-37618][CORE] Remove shuffle blocks using the shuffle service for released executors

This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a7596e  [SPARK-37618][CORE] Remove shuffle blocks using the shuffle service for released executors
9a7596e is described below

commit 9a7596e1dde0f1dd596aa6d3b2efbcb5d1ef70ea
Author: Adam Binford <ad...@gmail.com>
AuthorDate: Fri Mar 25 13:00:17 2022 -0500

    [SPARK-37618][CORE] Remove shuffle blocks using the shuffle service for released executors
    
    ### What changes were proposed in this pull request?
    
    Add support for removing shuffle files on released executors via the external shuffle service. The shuffle service already supports removing shuffle service cached RDD blocks, so I reused this mechanism to remove shuffle blocks as well, so as not to require updating the shuffle service itself.
    
    To support this change functioning in a secure Yarn environment, I updated permissions on some of the block manager folders and files. Specifically:
    - Block manager sub directories have the group write posix permission added to them. This gives the shuffle service permission to delete files from within these folders.
    - Shuffle files have the world readable posix permission added to them. This is because when the sub directories are marked group writable, they lose the setgid bit that gets set in a secure Yarn environment. Without this, the permissions on the files would be `rw-r-----`, and since the group running Yarn (and therefore the shuffle service), is no longer the group owner of the file, it does not have access to read the file. The sub directories still do not have world execute permissio [...]
    
    Both of these changes are done after creating a file so that umasks don't affect the resulting permissions.
    
    ### Why are the changes needed?
    
    External shuffle services are very useful for long running jobs and dynamic allocation. However, currently if an executor is removed (either through dynamic deallocation or through some error), the shuffle files created by that executor will live until the application finishes. This results in local disks slowly filling up over time, eventually causing problems for long running applications.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    New unit test. Not sure if there's a better way I could have tested for the files being deleted or any other tests I should add.
    
    Closes #35085 from Kimahriman/shuffle-service-remove-shuffle-blocks.
    
    Authored-by: Adam Binford <ad...@gmail.com>
    Signed-off-by: Thomas Graves <tg...@apache.org>
---
 .../network/shuffle/ExternalBlockStoreClient.java  |   4 +-
 .../sort/io/LocalDiskShuffleMapOutputWriter.java   |   3 +-
 .../scala/org/apache/spark/ContextCleaner.scala    |   4 +-
 .../src/main/scala/org/apache/spark/SparkEnv.scala |   6 +-
 .../org/apache/spark/internal/config/package.scala |  10 ++
 .../spark/shuffle/IndexShuffleBlockResolver.scala  |  18 ++-
 .../spark/shuffle/ShuffleBlockResolver.scala       |   8 ++
 .../spark/storage/BlockManagerMasterEndpoint.scala |  89 +++++++++++----
 .../apache/spark/storage/DiskBlockManager.scala    |  61 +++++++++-
 .../scala/org/apache/spark/storage/DiskStore.scala |  10 ++
 .../shuffle/sort/UnsafeShuffleWriterSuite.java     |   8 ++
 .../apache/spark/ExternalShuffleServiceSuite.scala | 127 ++++++++++++++++++++-
 .../sort/BypassMergeSortShuffleWriterSuite.scala   |  11 ++
 .../sort/IndexShuffleBlockResolverSuite.scala      |   5 +
 .../io/LocalDiskShuffleMapOutputWriterSuite.scala  |   5 +
 .../storage/BlockManagerReplicationSuite.scala     |   3 +-
 .../apache/spark/storage/BlockManagerSuite.scala   |   3 +-
 .../spark/storage/DiskBlockManagerSuite.scala      |  26 ++++-
 docs/configuration.md                              |  11 ++
 .../streaming/ReceivedBlockHandlerSuite.scala      |   3 +-
 20 files changed, 372 insertions(+), 43 deletions(-)

diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
index d2df776..b066d99 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
@@ -299,7 +299,7 @@ public class ExternalBlockStoreClient extends BlockStoreClient {
           BlockTransferMessage msgObj = BlockTransferMessage.Decoder.fromByteBuffer(response);
           numRemovedBlocksFuture.complete(((BlocksRemoved) msgObj).numRemovedBlocks);
         } catch (Throwable t) {
-          logger.warn("Error trying to remove RDD blocks " + Arrays.toString(blockIds) +
+          logger.warn("Error trying to remove blocks " + Arrays.toString(blockIds) +
             " via external shuffle service from executor: " + execId, t);
           numRemovedBlocksFuture.complete(0);
         }
@@ -307,7 +307,7 @@ public class ExternalBlockStoreClient extends BlockStoreClient {
 
       @Override
       public void onFailure(Throwable e) {
-        logger.warn("Error trying to remove RDD blocks " + Arrays.toString(blockIds) +
+        logger.warn("Error trying to remove blocks " + Arrays.toString(blockIds) +
           " via external shuffle service from executor: " + execId, e);
         numRemovedBlocksFuture.complete(0);
       }
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java
index 6c5025d..efe508d 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java
@@ -36,7 +36,6 @@ import org.apache.spark.shuffle.api.WritableByteChannelWrapper;
 import org.apache.spark.internal.config.package$;
 import org.apache.spark.shuffle.IndexShuffleBlockResolver;
 import org.apache.spark.shuffle.api.metadata.MapOutputCommitMessage;
-import org.apache.spark.util.Utils;
 
 /**
  * Implementation of {@link ShuffleMapOutputWriter} that replicates the functionality of shuffle
@@ -87,7 +86,7 @@ public class LocalDiskShuffleMapOutputWriter implements ShuffleMapOutputWriter {
     }
     lastPartitionId = reducePartitionId;
     if (outputTempFile == null) {
-      outputTempFile = Utils.tempFileWith(outputFile);
+      outputTempFile = blockResolver.createTempFile(outputFile);
     }
     if (outputFileChannel != null) {
       currChannelPosition = outputFileChannel.position();
diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
index 091b5e1..a6fa28b 100644
--- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
@@ -235,8 +235,10 @@ private[spark] class ContextCleaner(
     try {
       if (mapOutputTrackerMaster.containsShuffle(shuffleId)) {
         logDebug("Cleaning shuffle " + shuffleId)
-        mapOutputTrackerMaster.unregisterShuffle(shuffleId)
+        // Shuffle must be removed before it's unregistered from the output tracker
+        // to find blocks served by the shuffle service on deallocated executors
         shuffleDriverComponents.removeShuffle(shuffleId, blocking)
+        mapOutputTrackerMaster.unregisterShuffle(shuffleId)
         listeners.asScala.foreach(_.shuffleCleaned(shuffleId))
         logDebug("Cleaned shuffle " + shuffleId)
       } else {
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index d07614a..19467e7 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -343,12 +343,14 @@ object SparkEnv extends Logging {
           isLocal,
           conf,
           listenerBus,
-          if (conf.get(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED)) {
+          if (conf.get(config.SHUFFLE_SERVICE_ENABLED)) {
             externalShuffleClient
           } else {
             None
           }, blockManagerInfo,
-          mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], isDriver)),
+          mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
+          shuffleManager,
+          isDriver)),
       registerOrLookupEndpoint(
         BlockManagerMaster.DRIVER_HEARTBEAT_ENDPOINT_NAME,
         new BlockManagerMasterHeartbeatEndpoint(rpcEnv, isLocal, blockManagerInfo)),
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index fa048f5..aa8f63e 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -686,6 +686,16 @@ package object config {
       .booleanConf
       .createWithDefault(false)
 
+  private[spark] val SHUFFLE_SERVICE_REMOVE_SHUFFLE_ENABLED =
+    ConfigBuilder("spark.shuffle.service.removeShuffle")
+      .doc("Whether to use the ExternalShuffleService for deleting shuffle blocks for " +
+        "deallocated executors when the shuffle is no longer needed. Without this enabled, " +
+        "shuffle data on executors that are deallocated will remain on disk until the " +
+        "application ends.")
+      .version("3.3.0")
+      .booleanConf
+      .createWithDefault(false)
+
   private[spark] val SHUFFLE_SERVICE_FETCH_RDD_ENABLED =
     ConfigBuilder(Constants.SHUFFLE_SERVICE_FETCH_RDD_ENABLED)
       .doc("Whether to use the ExternalShuffleService for fetching disk persisted RDD blocks. " +
diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
index f1485ec..ba54555 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
@@ -84,6 +84,11 @@ private[spark] class IndexShuffleBlockResolver(
     shuffleFiles.map(_.length()).sum
   }
 
+  /** Create a temporary file that will be renamed to the final resulting file */
+  def createTempFile(file: File): File = {
+    blockManager.diskBlockManager.createTempFileWith(file)
+  }
+
   /**
    * Get the shuffle data file.
    *
@@ -234,7 +239,7 @@ private[spark] class IndexShuffleBlockResolver(
         throw new IllegalStateException(s"Unexpected shuffle block transfer ${blockId} as " +
           s"${blockId.getClass().getSimpleName()}")
     }
-    val fileTmp = Utils.tempFileWith(file)
+    val fileTmp = createTempFile(file)
     val channel = Channels.newChannel(
       serializerManager.wrapStream(blockId,
         new FileOutputStream(fileTmp)))
@@ -335,7 +340,7 @@ private[spark] class IndexShuffleBlockResolver(
       checksums: Array[Long],
       dataTmp: File): Unit = {
     val indexFile = getIndexFile(shuffleId, mapId)
-    val indexTmp = Utils.tempFileWith(indexFile)
+    val indexTmp = createTempFile(indexFile)
 
     val checksumEnabled = checksums.nonEmpty
     val (checksumFileOpt, checksumTmpOpt) = if (checksumEnabled) {
@@ -343,7 +348,7 @@ private[spark] class IndexShuffleBlockResolver(
         "The size of partition lengths and checksums should be equal")
       val checksumFile =
         getChecksumFile(shuffleId, mapId, conf.get(config.SHUFFLE_CHECKSUM_ALGORITHM))
-      (Some(checksumFile), Some(Utils.tempFileWith(checksumFile)))
+      (Some(checksumFile), Some(createTempFile(checksumFile)))
     } else {
       (None, None)
     }
@@ -597,6 +602,13 @@ private[spark] class IndexShuffleBlockResolver(
     }
   }
 
+  override def getBlocksForShuffle(shuffleId: Int, mapId: Long): Seq[BlockId] = {
+    Seq(
+      ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID),
+      ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID)
+    )
+  }
+
   override def stop(): Unit = {}
 }
 
diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockResolver.scala
index 0f35f8c..c8fde8d 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockResolver.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockResolver.scala
@@ -42,6 +42,14 @@ trait ShuffleBlockResolver {
   def getBlockData(blockId: BlockId, dirs: Option[Array[String]] = None): ManagedBuffer
 
   /**
+   * Retrieve a list of BlockIds for a given shuffle map. Used to delete shuffle files
+   * from the external shuffle service after the associated executor has been removed.
+   */
+  def getBlocksForShuffle(shuffleId: Int, mapId: Long): Seq[BlockId] = {
+    Seq.empty
+  }
+
+  /**
    * Retrieve the data for the specified merged shuffle block as multiple chunks.
    */
   def getMergedBlockData(
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index b96befc..4d8ba9b 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -36,6 +36,7 @@ import org.apache.spark.network.shuffle.ExternalBlockStoreClient
 import org.apache.spark.rpc.{IsolatedRpcEndpoint, RpcCallContext, RpcEndpointRef, RpcEnv}
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.{CoarseGrainedClusterMessages, CoarseGrainedSchedulerBackend}
+import org.apache.spark.shuffle.ShuffleManager
 import org.apache.spark.storage.BlockManagerMessages._
 import org.apache.spark.util.{RpcUtils, ThreadUtils, Utils}
 
@@ -52,6 +53,7 @@ class BlockManagerMasterEndpoint(
     externalBlockStoreClient: Option[ExternalBlockStoreClient],
     blockManagerInfo: mutable.Map[BlockManagerId, BlockManagerInfo],
     mapOutputTracker: MapOutputTrackerMaster,
+    shuffleManager: ShuffleManager,
     isDriver: Boolean)
   extends IsolatedRpcEndpoint with Logging {
 
@@ -104,9 +106,11 @@ class BlockManagerMasterEndpoint(
   private val pushBasedShuffleEnabled = Utils.isPushBasedShuffleEnabled(conf, isDriver)
 
   logInfo("BlockManagerMasterEndpoint up")
-  // same as `conf.get(config.SHUFFLE_SERVICE_ENABLED)
-  //   && conf.get(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED)`
-  private val externalShuffleServiceRddFetchEnabled: Boolean = externalBlockStoreClient.isDefined
+
+  private val externalShuffleServiceRemoveShuffleEnabled: Boolean =
+    externalBlockStoreClient.isDefined && conf.get(config.SHUFFLE_SERVICE_REMOVE_SHUFFLE_ENABLED)
+  private val externalShuffleServiceRddFetchEnabled: Boolean =
+    externalBlockStoreClient.isDefined && conf.get(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED)
   private val externalShuffleServicePort: Int = StorageUtils.externalShuffleServicePort(conf)
 
   private lazy val driverEndpoint =
@@ -294,33 +298,74 @@ class BlockManagerMasterEndpoint(
       }
     }.toSeq
 
-    val removeRddBlockViaExtShuffleServiceFutures = externalBlockStoreClient.map { shuffleClient =>
-      blocksToDeleteByShuffleService.map { case (bmId, blockIds) =>
-        Future[Int] {
-          val numRemovedBlocks = shuffleClient.removeBlocks(
-            bmId.host,
-            bmId.port,
-            bmId.executorId,
-            blockIds.map(_.toString).toArray)
-          numRemovedBlocks.get(defaultRpcTimeout.duration.toSeconds, TimeUnit.SECONDS)
+    val removeRddBlockViaExtShuffleServiceFutures = if (externalShuffleServiceRddFetchEnabled) {
+      externalBlockStoreClient.map { shuffleClient =>
+        blocksToDeleteByShuffleService.map { case (bmId, blockIds) =>
+          Future[Int] {
+            val numRemovedBlocks = shuffleClient.removeBlocks(
+              bmId.host,
+              bmId.port,
+              bmId.executorId,
+              blockIds.map(_.toString).toArray)
+            numRemovedBlocks.get(defaultRpcTimeout.duration.toSeconds, TimeUnit.SECONDS)
+          }
         }
-      }
-    }.getOrElse(Seq.empty)
+      }.getOrElse(Seq.empty)
+    } else {
+      Seq.empty
+    }
 
     Future.sequence(removeRddFromExecutorsFutures ++ removeRddBlockViaExtShuffleServiceFutures)
   }
 
   private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = {
-    // Nothing to do in the BlockManagerMasterEndpoint data structures
     val removeMsg = RemoveShuffle(shuffleId)
-    Future.sequence(
-      blockManagerInfo.values.map { bm =>
-        bm.storageEndpoint.ask[Boolean](removeMsg).recover {
-          // use false as default value means no shuffle data were removed
-          handleBlockRemovalFailure("shuffle", shuffleId.toString, bm.blockManagerId, false)
+    val removeShuffleFromExecutorsFutures = blockManagerInfo.values.map { bm =>
+      bm.storageEndpoint.ask[Boolean](removeMsg).recover {
+        // use false as default value means no shuffle data were removed
+        handleBlockRemovalFailure("shuffle", shuffleId.toString, bm.blockManagerId, false)
+      }
+    }.toSeq
+
+    // Find all shuffle blocks on executors that are no longer running
+    val blocksToDeleteByShuffleService =
+      new mutable.HashMap[BlockManagerId, mutable.HashSet[BlockId]]
+    if (externalShuffleServiceRemoveShuffleEnabled) {
+      mapOutputTracker.shuffleStatuses.get(shuffleId).foreach { shuffleStatus =>
+        shuffleStatus.withMapStatuses { mapStatuses =>
+          mapStatuses.foreach { mapStatus =>
+            // Check if the executor has been deallocated
+            if (!blockManagerIdByExecutor.contains(mapStatus.location.executorId)) {
+              val blocksToDel =
+                shuffleManager.shuffleBlockResolver.getBlocksForShuffle(shuffleId, mapStatus.mapId)
+              if (blocksToDel.nonEmpty) {
+                val blocks = blocksToDeleteByShuffleService.getOrElseUpdate(mapStatus.location,
+                  new mutable.HashSet[BlockId])
+                blocks ++= blocksToDel
+              }
+            }
+          }
         }
-      }.toSeq
-    )
+      }
+    }
+
+    val removeShuffleFromShuffleServicesFutures =
+      externalBlockStoreClient.map { shuffleClient =>
+        blocksToDeleteByShuffleService.map { case (bmId, blockIds) =>
+          Future[Boolean] {
+            val numRemovedBlocks = shuffleClient.removeBlocks(
+              bmId.host,
+              bmId.port,
+              bmId.executorId,
+              blockIds.map(_.toString).toArray)
+            numRemovedBlocks.get(defaultRpcTimeout.duration.toSeconds,
+              TimeUnit.SECONDS) == blockIds.size
+          }
+        }
+      }.getOrElse(Seq.empty)
+
+    Future.sequence(removeShuffleFromExecutorsFutures ++
+      removeShuffleFromShuffleServicesFutures)
   }
 
   /**
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index c6a2297..e29f3fc 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -19,6 +19,7 @@ package org.apache.spark.storage
 
 import java.io.{File, IOException}
 import java.nio.file.Files
+import java.nio.file.attribute.PosixFilePermission
 import java.util.UUID
 
 import scala.collection.mutable.HashMap
@@ -77,6 +78,15 @@ private[spark] class DiskBlockManager(
 
   private val shutdownHook = addShutdownHook()
 
+  // If either of these features are enabled, we must change permissions on block manager
+  // directories and files to accomodate the shuffle service deleting files in a secure environment.
+  // Parent directories are assumed to be restrictive to prevent unauthorized users from accessing
+  // or modifying world readable files.
+  private val permissionChangingRequired = conf.get(config.SHUFFLE_SERVICE_ENABLED) && (
+    conf.get(config.SHUFFLE_SERVICE_REMOVE_SHUFFLE_ENABLED) ||
+    conf.get(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED)
+  )
+
   /** Looks up a file by hashing it into one of our local subdirectories. */
   // This method should be kept in sync with
   // org.apache.spark.network.shuffle.ExecutorDiskUtils#getFilePath().
@@ -94,7 +104,16 @@ private[spark] class DiskBlockManager(
       } else {
         val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
         if (!newDir.exists()) {
-          Files.createDirectory(newDir.toPath)
+          val path = newDir.toPath
+          Files.createDirectory(path)
+          if (permissionChangingRequired) {
+            // SPARK-37618: Create dir as group writable so files within can be deleted by the
+            // shuffle service in a secure setup. This will remove the setgid bit so files created
+            // within won't be created with the parent folder group.
+            val currentPerms = Files.getPosixFilePermissions(path)
+            currentPerms.add(PosixFilePermission.GROUP_WRITE)
+            Files.setPosixFilePermissions(path, currentPerms)
+          }
         }
         subDirs(dirId)(subDirId) = newDir
         newDir
@@ -166,6 +185,37 @@ private[spark] class DiskBlockManager(
     }
   }
 
+  /**
+   * SPARK-37618: Makes sure that the file is created as world readable. This is to get
+   * around the fact that making the block manager sub dirs group writable removes
+   * the setgid bit in secure Yarn environments, which prevents the shuffle service
+   * from being able to read shuffle files. The outer directories will still not be
+   * world executable, so this doesn't allow access to these files except for the
+   * running user and shuffle service.
+   */
+  def createWorldReadableFile(file: File): Unit = {
+    val path = file.toPath
+    Files.createFile(path)
+    val currentPerms = Files.getPosixFilePermissions(path)
+    currentPerms.add(PosixFilePermission.OTHERS_READ)
+    Files.setPosixFilePermissions(path, currentPerms)
+  }
+
+  /**
+   * Creates a temporary version of the given file with world readable permissions (if required).
+   * Used to create block files that will be renamed to the final version of the file.
+   */
+  def createTempFileWith(file: File): File = {
+    val tmpFile = Utils.tempFileWith(file)
+    if (permissionChangingRequired) {
+      // SPARK-37618: we need to make the file world readable because the parent will
+      // lose the setgid bit when making it group writable. Without this the shuffle
+      // service can't read the shuffle files in a secure setup.
+      createWorldReadableFile(tmpFile)
+    }
+    tmpFile
+  }
+
   /** Produces a unique block id and File suitable for storing local intermediate results. */
   def createTempLocalBlock(): (TempLocalBlockId, File) = {
     var blockId = new TempLocalBlockId(UUID.randomUUID())
@@ -181,7 +231,14 @@ private[spark] class DiskBlockManager(
     while (getFile(blockId).exists()) {
       blockId = new TempShuffleBlockId(UUID.randomUUID())
     }
-    (blockId, getFile(blockId))
+    val tmpFile = getFile(blockId)
+    if (permissionChangingRequired) {
+      // SPARK-37618: we need to make the file world readable because the parent will
+      // lose the setgid bit when making it group writable. Without this the shuffle
+      // service can't read the shuffle files in a secure setup.
+      createWorldReadableFile(tmpFile)
+    }
+    (blockId, tmpFile)
   }
 
   /**
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index f0334c5..d45947d 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -50,6 +50,9 @@ private[spark] class DiskStore(
   private val maxMemoryMapBytes = conf.get(config.MEMORY_MAP_LIMIT_FOR_TESTS)
   private val blockSizes = new ConcurrentHashMap[BlockId, Long]()
 
+  private val shuffleServiceFetchRddEnabled = conf.get(config.SHUFFLE_SERVICE_ENABLED) &&
+    conf.get(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED)
+
   def getSize(blockId: BlockId): Long = blockSizes.get(blockId)
 
   /**
@@ -71,6 +74,13 @@ private[spark] class DiskStore(
     logDebug(s"Attempting to put block $blockId")
     val startTimeNs = System.nanoTime()
     val file = diskManager.getFile(blockId)
+
+    // SPARK-37618: If fetching cached RDDs from the shuffle service is enabled, we must make
+    // the file world readable, as it will not be owned by the group running the shuffle service
+    // in a secure environment. This is due to changing directory permissions to allow deletion,
+    if (shuffleServiceFetchRddEnabled) {
+      diskManager.createWorldReadableFile(file)
+    }
     val out = new CountingWritableChannel(openForWrite(file))
     var threwException: Boolean = true
     try {
diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
index f4e09b7..8a3df5a 100644
--- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
+++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
@@ -127,6 +127,10 @@ public class UnsafeShuffleWriterSuite implements ShuffleChecksumTestHelper {
       });
 
     when(shuffleBlockResolver.getDataFile(anyInt(), anyLong())).thenReturn(mergedOutputFile);
+    when(shuffleBlockResolver.createTempFile(any(File.class))).thenAnswer(invocationOnMock -> {
+      File file = (File) invocationOnMock.getArguments()[0];
+      return Utils.tempFileWith(file);
+    });
 
     Answer<?> renameTempAnswer = invocationOnMock -> {
       partitionSizesInMergedFile = (long[]) invocationOnMock.getArguments()[2];
@@ -158,6 +162,10 @@ public class UnsafeShuffleWriterSuite implements ShuffleChecksumTestHelper {
       spillFilesCreated.add(file);
       return Tuple2$.MODULE$.apply(blockId, file);
     });
+    when(diskBlockManager.createTempFileWith(any(File.class))).thenAnswer(invocationOnMock -> {
+      File file = (File) invocationOnMock.getArguments()[0];
+      return Utils.tempFileWith(file);
+    });
 
     when(taskContext.taskMetrics()).thenReturn(taskMetrics);
     when(shuffleDep.serializer()).thenReturn(serializer);
diff --git a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
index 48c1cc5..dd3d90f3 100644
--- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
@@ -17,6 +17,13 @@
 
 package org.apache.spark
 
+import java.io.File
+import java.nio.file.Files
+import java.nio.file.attribute.PosixFilePermission
+
+import scala.concurrent.Promise
+import scala.concurrent.duration.Duration
+
 import org.scalatest.BeforeAndAfterAll
 import org.scalatest.concurrent.Eventually
 import org.scalatest.matchers.should.Matchers._
@@ -26,9 +33,9 @@ import org.apache.spark.internal.config
 import org.apache.spark.network.TransportContext
 import org.apache.spark.network.netty.SparkTransportConf
 import org.apache.spark.network.server.TransportServer
-import org.apache.spark.network.shuffle.{ExternalBlockHandler, ExternalBlockStoreClient}
-import org.apache.spark.storage.{RDDBlockId, StorageLevel}
-import org.apache.spark.util.Utils
+import org.apache.spark.network.shuffle.{ExecutorDiskUtils, ExternalBlockHandler, ExternalBlockStoreClient}
+import org.apache.spark.storage.{RDDBlockId, ShuffleBlockId, ShuffleDataBlockId, ShuffleIndexBlockId, StorageLevel}
+import org.apache.spark.util.{ThreadUtils, Utils}
 
 /**
  * This suite creates an external shuffle server and routes all shuffle fetches through it.
@@ -101,7 +108,9 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll wi
   }
 
   test("SPARK-25888: using external shuffle service fetching disk persisted blocks") {
-    val confWithRddFetchEnabled = conf.clone.set(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED, true)
+    val confWithRddFetchEnabled = conf.clone
+      .set(config.SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED, true)
+      .set(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED, true)
     sc = new SparkContext("local-cluster[1,1,1024]", "test", confWithRddFetchEnabled)
     sc.env.blockManager.externalShuffleServiceEnabled should equal(true)
     sc.env.blockManager.blockStoreClient.getClass should equal(classOf[ExternalBlockStoreClient])
@@ -113,13 +122,42 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll wi
       rdd.count()
 
       val blockId = RDDBlockId(rdd.id, 0)
-      eventually(timeout(2.seconds), interval(100.milliseconds)) {
+      val bms = eventually(timeout(2.seconds), interval(100.milliseconds)) {
         val locations = sc.env.blockManager.master.getLocations(blockId)
         assert(locations.size === 2)
         assert(locations.map(_.port).contains(server.getPort),
           "external shuffle service port should be contained")
+        locations
       }
 
+      val dirManager = sc.env.blockManager.hostLocalDirManager
+          .getOrElse(fail("No host local dir manager"))
+
+      val promises = bms.map { case bmid =>
+          val promise = Promise[File]()
+          dirManager.getHostLocalDirs(bmid.host, bmid.port, Seq(bmid.executorId).toArray) {
+            case scala.util.Success(res) => res.foreach { case (eid, dirs) =>
+              val file = new File(ExecutorDiskUtils.getFilePath(dirs,
+                sc.env.blockManager.subDirsPerLocalDir, blockId.name))
+              promise.success(file)
+            }
+            case scala.util.Failure(error) => promise.failure(error)
+          }
+          promise.future
+        }
+      val filesToCheck = promises.map(p => ThreadUtils.awaitResult(p, Duration(2, "sec")))
+
+      filesToCheck.foreach(f => {
+        val parentPerms = Files.getPosixFilePermissions(f.getParentFile.toPath)
+        assert(parentPerms.contains(PosixFilePermission.GROUP_WRITE))
+
+        // On most operating systems the default umask will make this test pass
+        // even if the permission isn't changed. To properly test this, run the
+        // test with a umask of 0027
+        val perms = Files.getPosixFilePermissions(f.toPath)
+        assert(perms.contains(PosixFilePermission.OTHERS_READ))
+      })
+
       sc.killExecutors(sc.getExecutorIds())
 
       eventually(timeout(2.seconds), interval(100.milliseconds)) {
@@ -138,4 +176,83 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll wi
       rpcHandler.applicationRemoved(sc.conf.getAppId, true)
     }
   }
+
+  test("SPARK-37618: external shuffle service removes shuffle blocks from deallocated executors") {
+    for (enabled <- Seq(true, false)) {
+      // Use local disk reading to get location of shuffle files on disk
+      val confWithLocalDiskReading = conf.clone
+        .set(config.SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED, true)
+        .set(config.SHUFFLE_SERVICE_REMOVE_SHUFFLE_ENABLED, enabled)
+      sc = new SparkContext("local-cluster[1,1,1024]", "test", confWithLocalDiskReading)
+      sc.env.blockManager.externalShuffleServiceEnabled should equal(true)
+      sc.env.blockManager.blockStoreClient.getClass should equal(classOf[ExternalBlockStoreClient])
+      try {
+        val rdd = sc.parallelize(0 until 100, 2)
+          .map { i => (i, 1) }
+          .repartition(1)
+
+        rdd.count()
+
+        val mapOutputs = sc.env.mapOutputTracker.getMapSizesByExecutorId(0, 0).toSeq
+
+        val dirManager = sc.env.blockManager.hostLocalDirManager
+          .getOrElse(fail("No host local dir manager"))
+
+        val promises = mapOutputs.map { case (bmid, blocks) =>
+          val promise = Promise[Seq[File]]()
+          dirManager.getHostLocalDirs(bmid.host, bmid.port, Seq(bmid.executorId).toArray) {
+            case scala.util.Success(res) => res.foreach { case (eid, dirs) =>
+              val files = blocks.flatMap { case (blockId, _, _) =>
+                val shuffleBlockId = blockId.asInstanceOf[ShuffleBlockId]
+                Seq(
+                  ShuffleDataBlockId(shuffleBlockId.shuffleId, shuffleBlockId.mapId,
+                    shuffleBlockId.reduceId).name,
+                  ShuffleIndexBlockId(shuffleBlockId.shuffleId, shuffleBlockId.mapId,
+                    shuffleBlockId.reduceId).name
+                ).map { blockId =>
+                  new File(ExecutorDiskUtils.getFilePath(dirs,
+                    sc.env.blockManager.subDirsPerLocalDir, blockId))
+                }
+              }
+              promise.success(files)
+            }
+            case scala.util.Failure(error) => promise.failure(error)
+          }
+          promise.future
+        }
+        val filesToCheck = promises.flatMap(p => ThreadUtils.awaitResult(p, Duration(2, "sec")))
+        assert(filesToCheck.length == 4)
+        assert(filesToCheck.forall(_.exists()))
+
+        if (enabled) {
+          filesToCheck.foreach(f => {
+            val parentPerms = Files.getPosixFilePermissions(f.getParentFile.toPath)
+            assert(parentPerms.contains(PosixFilePermission.GROUP_WRITE))
+
+            // On most operating systems the default umask will make this test pass
+            // even if the permission isn't changed. To properly test this, run the
+            // test with a umask of 0027
+            val perms = Files.getPosixFilePermissions(f.toPath)
+            assert(perms.contains(PosixFilePermission.OTHERS_READ))
+          })
+        }
+
+        sc.killExecutors(sc.getExecutorIds())
+        eventually(timeout(2.seconds), interval(100.milliseconds)) {
+          assert(sc.env.blockManager.master.getExecutorEndpointRef("0").isEmpty)
+        }
+
+        sc.cleaner.foreach(_.doCleanupShuffle(0, true))
+
+        if (enabled) {
+          assert(filesToCheck.forall(!_.exists()))
+        } else {
+          assert(filesToCheck.forall(_.exists()))
+        }
+      } finally {
+        rpcHandler.applicationRemoved(sc.conf.getAppId, true)
+        sc.stop()
+      }
+    }
+  }
 }
diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
index 38ed702..83bd3b0 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
@@ -111,6 +111,12 @@ class BypassMergeSortShuffleWriterSuite
           blockId = args(0).asInstanceOf[BlockId])
       }
 
+    when(blockResolver.createTempFile(any(classOf[File])))
+      .thenAnswer { invocationOnMock =>
+        val file = invocationOnMock.getArguments()(0).asInstanceOf[File]
+        Utils.tempFileWith(file)
+      }
+
     when(diskBlockManager.createTempShuffleBlock())
       .thenAnswer { _ =>
         val blockId = new TempShuffleBlockId(UUID.randomUUID)
@@ -266,6 +272,11 @@ class BypassMergeSortShuffleWriterSuite
         temporaryFilesCreated += file
         (blockId, file)
       }
+    when(diskBlockManager.createTempFileWith(any(classOf[File])))
+      .thenAnswer { invocationOnMock =>
+        val file = invocationOnMock.getArguments()(0).asInstanceOf[File]
+        Utils.tempFileWith(file)
+      }
 
     val numPartition = shuffleHandle.dependency.partitioner.numPartitions
     val writer = new BypassMergeSortShuffleWriter[Int, Int](
diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala
index 21704b1..de12f68 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/sort/IndexShuffleBlockResolverSuite.scala
@@ -56,6 +56,11 @@ class IndexShuffleBlockResolverSuite extends SparkFunSuite with BeforeAndAfterEa
       any[BlockId], any[Option[Array[String]]])).thenAnswer(
       (invocation: InvocationOnMock) => new File(tempDir, invocation.getArguments.head.toString))
     when(diskBlockManager.localDirs).thenReturn(Array(tempDir))
+    when(diskBlockManager.createTempFileWith(any(classOf[File])))
+      .thenAnswer { invocationOnMock =>
+        val file = invocationOnMock.getArguments()(0).asInstanceOf[File]
+        Utils.tempFileWith(file)
+      }
     conf.set("spark.app.id", appId)
   }
 
diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriterSuite.scala
index 35d9b4a..6c9ec8b 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriterSuite.scala
@@ -74,6 +74,11 @@ class LocalDiskShuffleMapOutputWriterSuite extends SparkFunSuite with BeforeAndA
       .set("spark.app.id", "example.spark.app")
       .set("spark.shuffle.unsafe.file.output.buffer", "16k")
     when(blockResolver.getDataFile(anyInt, anyLong)).thenReturn(mergedOutputFile)
+    when(blockResolver.createTempFile(any(classOf[File])))
+      .thenAnswer { invocationOnMock =>
+        val file = invocationOnMock.getArguments()(0).asInstanceOf[File]
+        Utils.tempFileWith(file)
+      }
     when(blockResolver.writeMetadataFileAndCommit(
       anyInt, anyLong, any(classOf[Array[Long]]), any(classOf[Array[Long]]), any(classOf[File])))
       .thenAnswer { invocationOnMock =>
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index fc7b7a4..14e1ee5 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -102,7 +102,8 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
     val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]()
     master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
       new BlockManagerMasterEndpoint(rpcEnv, true, conf,
-        new LiveListenerBus(conf), None, blockManagerInfo, mapOutputTracker, isDriver = true)),
+        new LiveListenerBus(conf), None, blockManagerInfo, mapOutputTracker, sc.env.shuffleManager,
+        isDriver = true)),
       rpcEnv.setupEndpoint("blockmanagerHeartbeat",
       new BlockManagerMasterHeartbeatEndpoint(rpcEnv, true, blockManagerInfo)), conf, true)
     allStores.clear()
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 0f99ea8..45e05b2 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -188,7 +188,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     liveListenerBus = spy(new LiveListenerBus(conf))
     master = spy(new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
       new BlockManagerMasterEndpoint(rpcEnv, true, conf,
-        liveListenerBus, None, blockManagerInfo, mapOutputTracker, isDriver = true)),
+        liveListenerBus, None, blockManagerInfo, mapOutputTracker, shuffleManager,
+        isDriver = true)),
       rpcEnv.setupEndpoint("blockmanagerHeartbeat",
       new BlockManagerMasterHeartbeatEndpoint(rpcEnv, true, blockManagerInfo)), conf, true))
   }
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
index b36eeb7..58fe40f 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.storage
 
 import java.io.{File, FileWriter}
 import java.nio.file.{Files, Paths}
-import java.nio.file.attribute.PosixFilePermissions
+import java.nio.file.attribute.{PosixFilePermission, PosixFilePermissions}
 import java.util.HashMap
 
 import com.fasterxml.jackson.core.`type`.TypeReference
@@ -141,6 +141,30 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with B
     assert(attemptId.equals("1"))
   }
 
+  test("SPARK-37618: Sub dirs are group writable when removing from shuffle service enabled") {
+    val conf = testConf.clone
+    conf.set("spark.local.dir", rootDirs)
+    conf.set("spark.shuffle.service.enabled", "true")
+    conf.set("spark.shuffle.service.removeShuffle", "false")
+    val diskBlockManager = new DiskBlockManager(conf, deleteFilesOnStop = true, isDriver = false)
+    val blockId = new TestBlockId("test")
+    val newFile = diskBlockManager.getFile(blockId)
+    val parentDir = newFile.getParentFile()
+    assert(parentDir.exists && parentDir.isDirectory)
+    val permission = Files.getPosixFilePermissions(parentDir.toPath)
+    assert(!permission.contains(PosixFilePermission.GROUP_WRITE))
+
+    assert(parentDir.delete())
+
+    conf.set("spark.shuffle.service.removeShuffle", "true")
+    val diskBlockManager2 = new DiskBlockManager(conf, deleteFilesOnStop = true, isDriver = false)
+    val newFile2 = diskBlockManager2.getFile(blockId)
+    val parentDir2 = newFile2.getParentFile()
+    assert(parentDir2.exists && parentDir2.isDirectory)
+    val permission2 = Files.getPosixFilePermissions(parentDir2.toPath)
+    assert(permission2.contains(PosixFilePermission.GROUP_WRITE))
+  }
+
   def writeToFile(file: File, numBytes: Int): Unit = {
     val writer = new FileWriter(file, true)
     for (i <- 0 until numBytes) writer.write(i)
diff --git a/docs/configuration.md b/docs/configuration.md
index a2cf233..4fa3779 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -997,6 +997,17 @@ Apart from these, the following properties are also available, and may be useful
   <td>2.3.0</td>
 </tr>
 <tr>
+  <td><code>spark.shuffle.service.removeShuffle</code></td>
+  <td>false</td>
+  <td>
+    Whether to use the ExternalShuffleService for deleting shuffle blocks for
+    deallocated executors when the shuffle is no longer needed. Without this enabled,
+    shuffle data on executors that are deallocated will remain on disk until the
+    application ends.
+  </td>
+  <td>3.3.0</td>
+</tr>
+<tr>
   <td><code>spark.shuffle.maxChunksBeingTransferred</code></td>
   <td>Long.MAX_VALUE</td>
   <td>
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index a3b5b38..dcf82d5 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -93,7 +93,8 @@ abstract class BaseReceivedBlockHandlerSuite(enableEncryption: Boolean)
     val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]()
     blockManagerMaster = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
       new BlockManagerMasterEndpoint(rpcEnv, true, conf,
-        new LiveListenerBus(conf), None, blockManagerInfo, mapOutputTracker, isDriver = true)),
+        new LiveListenerBus(conf), None, blockManagerInfo, mapOutputTracker, shuffleManager,
+        isDriver = true)),
       rpcEnv.setupEndpoint("blockmanagerHeartbeat",
       new BlockManagerMasterHeartbeatEndpoint(rpcEnv, true, blockManagerInfo)), conf, true)
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org