You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/04/06 05:40:02 UTC

[GitHub] [spark] mridulm commented on a change in pull request #32007: [SPARK-33350][SHUFFLE] Add support to DiskBlockManager to create merge directory and to get the local shuffle merged data

mridulm commented on a change in pull request #32007:
URL: https://github.com/apache/spark/pull/32007#discussion_r607501885



##########
File path: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
##########
@@ -343,6 +359,50 @@ private[spark] class IndexShuffleBlockResolver(
     }
   }
 
+  /**
+   * This is only used for reading local merged block data. In such cases, all chunks in the
+   * merged shuffle file need to be identified at once, so the ShuffleBlockFetcherIterator
+   * knows how to consume local merged shuffle file as multiple chunks.
+   */
+  override def getMergedBlockData(blockId: ShuffleBlockId): Seq[ManagedBuffer] = {
+    val indexFile = getMergedBlockIndexFile(conf.getAppId, blockId.shuffleId, blockId.reduceId)
+    val dataFile = getMergedBlockDataFile(conf.getAppId, blockId.shuffleId, blockId.reduceId)
+    // Load all the indexes in order to identify all chunks in the specified merged shuffle file.
+    val size = indexFile.length.toInt

Review comment:
       This assumption about length is fine as we are directly reading content into a `byte[]` and the number of chunks is reasonably bounded << `Int.MaxValue`.

##########
File path: core/src/main/scala/org/apache/spark/storage/BlockId.scala
##########
@@ -87,6 +87,29 @@ case class ShufflePushBlockId(shuffleId: Int, mapIndex: Int, reduceId: Int) exte
   override def name: String = "shufflePush_" + shuffleId + "_" + mapIndex + "_" + reduceId
 }
 
+@DeveloperApi
+case class ShuffleMergedBlockId(appId: String, shuffleId: Int, reduceId: Int) extends BlockId {
+  override def name: String = "mergedShuffle_" + appId + "_" + shuffleId + "_" + reduceId + ".data"
+}
+
+@DeveloperApi
+case class ShuffleMergedIndexBlockId(
+  appId: String,
+  shuffleId: Int,
+  reduceId: Int) extends BlockId {
+  override def name: String =
+    "mergedShuffle_" + appId + "_" + shuffleId + "_" + reduceId + ".index"

Review comment:
       `appId` would not be sufficient - we can have multiple attempts for an application.
   (here and in other block id's introduced).

##########
File path: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
##########
@@ -343,6 +359,50 @@ private[spark] class IndexShuffleBlockResolver(
     }
   }
 
+  /**
+   * This is only used for reading local merged block data. In such cases, all chunks in the
+   * merged shuffle file need to be identified at once, so the ShuffleBlockFetcherIterator
+   * knows how to consume local merged shuffle file as multiple chunks.
+   */
+  override def getMergedBlockData(blockId: ShuffleBlockId): Seq[ManagedBuffer] = {
+    val indexFile = getMergedBlockIndexFile(conf.getAppId, blockId.shuffleId, blockId.reduceId)
+    val dataFile = getMergedBlockDataFile(conf.getAppId, blockId.shuffleId, blockId.reduceId)
+    // Load all the indexes in order to identify all chunks in the specified merged shuffle file.
+    val size = indexFile.length.toInt
+    val buffer = ByteBuffer.allocate(size)
+    val offsets = buffer.asLongBuffer
+    val dis = new DataInputStream(Files.newInputStream(indexFile.toPath))

Review comment:
       Minor note: `Files.newInputStream` has had issue in past (see SPARK-21475, specifically revert by Shixiong Zhu).
   While not relevant to this change specifically, adding for context to reviewers.

##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -153,6 +193,58 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
     }
   }
 
+  /**
+   * Get the list of configured local dirs storing merged shuffle blocks created by external
+   * shuffle services if push based shuffle is enabled. Note that the files in this directory
+   * will be created by the external shuffle services. We only create the merge_manager directories
+   * here because currently the shuffle service doesn't have permission to create directories
+   * under application local directories.
+   */
+  private def createLocalDirsForMergedShuffleBlocks(conf: SparkConf): Unit = {
+    if (Utils.isPushBasedShuffleEnabled(conf)) {
+      // Will create the merge_manager directory only if it doesn't exist under any local dir.
+      val localDirs = Utils.getConfiguredLocalDirs(conf)
+      for (rootDir <- localDirs) {
+        val mergeDir = new File(rootDir, MERGE_MANAGER_DIR)
+        if (mergeDir.exists()) {
+          logDebug(s"Not creating $mergeDir as it already exists")
+          return
+        }
+      }
+      // Since this executor didn't see any merge_manager directories, it will start creating them.
+      // It's possible that the other executors launched at the same time may also reach here but
+      // we are working on the assumption that the executors launched around the same time will
+      // have the same set of application local directories.
+      localDirs.flatMap { rootDir =>
+        try {
+          val mergeDir = new File(rootDir, MERGE_MANAGER_DIR)
+          // Only one container will create this directory. The filesystem will handle any race
+          // conditions.
+          if (!mergeDir.exists()) {
+            for (dirNum <- 0 until subDirsPerLocalDir) {
+              val sudDir = new File(mergeDir, "%02x".format(dirNum))
+              Utils.createDirWith770(sudDir)
+            }
+          }
+          logInfo(s"Merge directory at $mergeDir")
+          Some(mergeDir)
+        } catch {
+          case e: IOException =>
+            logError(
+              s"Failed to create merge dir in $rootDir. Ignoring this directory.", e)
+            None
+        }

Review comment:
       Some and None are not required after change to `foreach`

##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -153,6 +193,58 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
     }
   }
 
+  /**
+   * Get the list of configured local dirs storing merged shuffle blocks created by external
+   * shuffle services if push based shuffle is enabled. Note that the files in this directory
+   * will be created by the external shuffle services. We only create the merge_manager directories
+   * here because currently the shuffle service doesn't have permission to create directories
+   * under application local directories.
+   */
+  private def createLocalDirsForMergedShuffleBlocks(conf: SparkConf): Unit = {
+    if (Utils.isPushBasedShuffleEnabled(conf)) {
+      // Will create the merge_manager directory only if it doesn't exist under any local dir.
+      val localDirs = Utils.getConfiguredLocalDirs(conf)
+      for (rootDir <- localDirs) {
+        val mergeDir = new File(rootDir, MERGE_MANAGER_DIR)
+        if (mergeDir.exists()) {
+          logDebug(s"Not creating $mergeDir as it already exists")
+          return
+        }
+      }
+      // Since this executor didn't see any merge_manager directories, it will start creating them.
+      // It's possible that the other executors launched at the same time may also reach here but
+      // we are working on the assumption that the executors launched around the same time will
+      // have the same set of application local directories.
+      localDirs.flatMap { rootDir =>
+        try {
+          val mergeDir = new File(rootDir, MERGE_MANAGER_DIR)
+          // Only one container will create this directory. The filesystem will handle any race
+          // conditions.
+          if (!mergeDir.exists()) {
+            for (dirNum <- 0 until subDirsPerLocalDir) {
+              val sudDir = new File(mergeDir, "%02x".format(dirNum))
+              Utils.createDirWith770(sudDir)
+            }
+          }
+          logInfo(s"Merge directory at $mergeDir")
+          Some(mergeDir)
+        } catch {
+          case e: IOException =>
+            logError(
+              s"Failed to create merge dir in $rootDir. Ignoring this directory.", e)
+            None
+        }
+      }
+      Utils.getConfiguredLocalDirs(conf).map(rootDir => new File(rootDir, MERGE_MANAGER_DIR))
+    }
+  }
+
+  private def findActiveMergedShuffleDirs(conf: SparkConf): Option[Array[File]] = {
+    Option(Utils.getConfiguredLocalDirs(conf).map(
+      rootDir => new File(rootDir, "merge_manager")).filter(mergeDir => mergeDir.exists()))

Review comment:
       `"merge_manager"` -> `MERGE_MANAGER_DIR`

##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -52,6 +53,14 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
   // of subDirs(i) is protected by the lock of subDirs(i)
   private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir))
 
+  /**
+   * Create merge directories
+   */
+  createLocalDirsForMergedShuffleBlocks(conf)
+
+  private[spark] lazy val activeMergedShuffleDirs: Option[Array[File]] =
+    findActiveMergedShuffleDirs(conf)
+

Review comment:
       Why not populate this as result of `createLocalDirsForMergedShuffleBlocks` ?

##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -153,6 +193,58 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
     }
   }
 
+  /**
+   * Get the list of configured local dirs storing merged shuffle blocks created by external
+   * shuffle services if push based shuffle is enabled. Note that the files in this directory
+   * will be created by the external shuffle services. We only create the merge_manager directories
+   * here because currently the shuffle service doesn't have permission to create directories
+   * under application local directories.
+   */
+  private def createLocalDirsForMergedShuffleBlocks(conf: SparkConf): Unit = {
+    if (Utils.isPushBasedShuffleEnabled(conf)) {
+      // Will create the merge_manager directory only if it doesn't exist under any local dir.
+      val localDirs = Utils.getConfiguredLocalDirs(conf)
+      for (rootDir <- localDirs) {
+        val mergeDir = new File(rootDir, MERGE_MANAGER_DIR)
+        if (mergeDir.exists()) {
+          logDebug(s"Not creating $mergeDir as it already exists")
+          return
+        }
+      }
+      // Since this executor didn't see any merge_manager directories, it will start creating them.
+      // It's possible that the other executors launched at the same time may also reach here but
+      // we are working on the assumption that the executors launched around the same time will
+      // have the same set of application local directories.
+      localDirs.flatMap { rootDir =>
+        try {
+          val mergeDir = new File(rootDir, MERGE_MANAGER_DIR)
+          // Only one container will create this directory. The filesystem will handle any race
+          // conditions.
+          if (!mergeDir.exists()) {
+            for (dirNum <- 0 until subDirsPerLocalDir) {
+              val sudDir = new File(mergeDir, "%02x".format(dirNum))
+              Utils.createDirWith770(sudDir)
+            }
+          }
+          logInfo(s"Merge directory at $mergeDir")
+          Some(mergeDir)
+        } catch {
+          case e: IOException =>
+            logError(
+              s"Failed to create merge dir in $rootDir. Ignoring this directory.", e)
+            None
+        }
+      }
+      Utils.getConfiguredLocalDirs(conf).map(rootDir => new File(rootDir, MERGE_MANAGER_DIR))

Review comment:
       Dead code ?

##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -83,6 +92,37 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
 
   def getFile(blockId: BlockId): File = getFile(blockId.name)
 
+  /**
+   * This should be in sync with
+   * org.apache.spark.network.shuffle.RemoteBlockPushResolver#getMergedShuffleFile
+   */
+  def getMergedShuffleFile(blockId: BlockId): File = {
+    blockId match {
+      case mergedBlockId: ShuffleMergedBlockId =>
+        getMergedShuffleFile(mergedBlockId.appId, mergedBlockId.name)
+      case mergedIndexBlockId: ShuffleMergedIndexBlockId =>
+        getMergedShuffleFile(mergedIndexBlockId.appId, mergedIndexBlockId.name)
+      case mergedMetaBlockId: ShuffleMergedMetaBlockId =>
+        getMergedShuffleFile(mergedMetaBlockId.appId, mergedMetaBlockId.name)
+      case _ =>
+        throw new RuntimeException(s"Only merged block ID is supported, but got ${blockId}")
+    }
+  }
+
+  private def getMergedShuffleFile(appId: String, filename: String): File = {
+    if (activeMergedShuffleDirs.isEmpty) {
+      throw new RuntimeException(

Review comment:
       `RuntimeException` -> `IllegalStateException` ?
   

##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -83,6 +92,37 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
 
   def getFile(blockId: BlockId): File = getFile(blockId.name)
 
+  /**
+   * This should be in sync with
+   * org.apache.spark.network.shuffle.RemoteBlockPushResolver#getMergedShuffleFile
+   */
+  def getMergedShuffleFile(blockId: BlockId): File = {
+    blockId match {
+      case mergedBlockId: ShuffleMergedBlockId =>
+        getMergedShuffleFile(mergedBlockId.appId, mergedBlockId.name)
+      case mergedIndexBlockId: ShuffleMergedIndexBlockId =>
+        getMergedShuffleFile(mergedIndexBlockId.appId, mergedIndexBlockId.name)
+      case mergedMetaBlockId: ShuffleMergedMetaBlockId =>
+        getMergedShuffleFile(mergedMetaBlockId.appId, mergedMetaBlockId.name)
+      case _ =>
+        throw new RuntimeException(s"Only merged block ID is supported, but got ${blockId}")

Review comment:
       `RuntimeException` -> `IllegalArgumentException` ?

##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -153,6 +193,58 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
     }
   }
 
+  /**
+   * Get the list of configured local dirs storing merged shuffle blocks created by external
+   * shuffle services if push based shuffle is enabled. Note that the files in this directory
+   * will be created by the external shuffle services. We only create the merge_manager directories
+   * here because currently the shuffle service doesn't have permission to create directories
+   * under application local directories.
+   */
+  private def createLocalDirsForMergedShuffleBlocks(conf: SparkConf): Unit = {
+    if (Utils.isPushBasedShuffleEnabled(conf)) {
+      // Will create the merge_manager directory only if it doesn't exist under any local dir.
+      val localDirs = Utils.getConfiguredLocalDirs(conf)
+      for (rootDir <- localDirs) {
+        val mergeDir = new File(rootDir, MERGE_MANAGER_DIR)
+        if (mergeDir.exists()) {
+          logDebug(s"Not creating $mergeDir as it already exists")
+          return
+        }
+      }
+      // Since this executor didn't see any merge_manager directories, it will start creating them.
+      // It's possible that the other executors launched at the same time may also reach here but
+      // we are working on the assumption that the executors launched around the same time will
+      // have the same set of application local directories.
+      localDirs.flatMap { rootDir =>

Review comment:
       `flatMap` -> `foreach` ?
   Note, this and the comment below are for the current method - when `createLocalDirsForMergedShuffleBlocks` is changed to return active merge managers, some of these code paths could become relevant again.

##########
File path: core/src/main/scala/org/apache/spark/util/Utils.scala
##########
@@ -315,6 +315,38 @@ private[spark] object Utils extends Logging {
     dir.getCanonicalFile
   }
 
+  /**
+   * Create a directory that is writable by the group.
+   * Grant 770 permission so the shuffle server can create subdirs/files within the merge folder.
+   */
+  def createDirWith770(dirToCreate: File): Unit = {
+    var attempts = 0
+    val maxAttempts = MAX_DIR_CREATION_ATTEMPTS
+    var created: File = null
+    while (created == null) {
+      attempts += 1
+      if (attempts > maxAttempts) {
+        throw new IOException(
+          s"Failed to create directory ${dirToCreate.getAbsolutePath} after " +
+            s"${maxAttempts} attempts!")
+      }
+      try {
+        val builder = new ProcessBuilder().command(
+          "mkdir", "-m770", dirToCreate.getAbsolutePath)

Review comment:
       Instead of invoking external program (`mkdir`), `PosixFilePermissions` wont work here ?
   Something like:
   
   ```
   val groupWritableAttribute = PosixFilePermissions.asFileAttribute(PosixFilePermissions.fromString("rwxrwx---"))
   val createdPath = Files.createDirectory(dirToCreate.getAbsolutePath, groupWritableAttribute)
   ```

##########
File path: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
##########
@@ -343,6 +359,50 @@ private[spark] class IndexShuffleBlockResolver(
     }
   }
 
+  /**
+   * This is only used for reading local merged block data. In such cases, all chunks in the
+   * merged shuffle file need to be identified at once, so the ShuffleBlockFetcherIterator
+   * knows how to consume local merged shuffle file as multiple chunks.
+   */
+  override def getMergedBlockData(blockId: ShuffleBlockId): Seq[ManagedBuffer] = {
+    val indexFile = getMergedBlockIndexFile(conf.getAppId, blockId.shuffleId, blockId.reduceId)
+    val dataFile = getMergedBlockDataFile(conf.getAppId, blockId.shuffleId, blockId.reduceId)
+    // Load all the indexes in order to identify all chunks in the specified merged shuffle file.
+    val size = indexFile.length.toInt
+    val buffer = ByteBuffer.allocate(size)
+    val offsets = buffer.asLongBuffer
+    val dis = new DataInputStream(Files.newInputStream(indexFile.toPath))
+    try {
+      dis.readFully(buffer.array)
+    } finally {
+      dis.close()
+    }

Review comment:
       Use `tryWithResource` instead.
   Something like
   ```
   val offsets = Utils.tryWithResource {
     new DataInputStream(Files.newInputStream(indexFile.toPath))
   } { dis =>
     val buffer = ByteBuffer.allocate(size)
     dis.readFully(buffer.array)
     buffer.asLongBuffer
   }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org