You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/04/29 08:49:03 UTC

[GitHub] [spark] Ngone51 commented on a change in pull request #32007: [SPARK-33350][SHUFFLE] Add support to DiskBlockManager to create merge directory and to get the local shuffle merged data

Ngone51 commented on a change in pull request #32007:
URL: https://github.com/apache/spark/pull/32007#discussion_r622794341



##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -153,6 +189,59 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
     }
   }
 
+  /**
+   * Get the list of configured local dirs storing merged shuffle blocks created by executors
+   * if push based shuffle is enabled. Note that the files in this directory will be created
+   * by the external shuffle services. We only create the merge_manager directories and
+   * subdirectories here because currently the shuffle service doesn't have permission to
+   * create directories under application local directories.
+   */
+  private def createLocalDirsForMergedShuffleBlocks(conf: SparkConf): Option[Array[File]] = {
+    if (Utils.isPushBasedShuffleEnabled(conf)) {
+      // Will create the merge_manager directory only if it doesn't exist under any local dir.
+      val localDirs = Utils.getConfiguredLocalDirs(conf)
+      var mergeDirCreated = false;

Review comment:
       nit:  unnecessary ";" 

##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -153,6 +189,59 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
     }
   }
 
+  /**
+   * Get the list of configured local dirs storing merged shuffle blocks created by executors
+   * if push based shuffle is enabled. Note that the files in this directory will be created
+   * by the external shuffle services. We only create the merge_manager directories and
+   * subdirectories here because currently the shuffle service doesn't have permission to
+   * create directories under application local directories.
+   */
+  private def createLocalDirsForMergedShuffleBlocks(conf: SparkConf): Option[Array[File]] = {
+    if (Utils.isPushBasedShuffleEnabled(conf)) {
+      // Will create the merge_manager directory only if it doesn't exist under any local dir.
+      val localDirs = Utils.getConfiguredLocalDirs(conf)
+      var mergeDirCreated = false;
+      for (rootDir <- localDirs) {
+        val mergeDir = new File(rootDir, MERGE_MANAGER_DIR)
+        if (mergeDir.exists()) {
+          logDebug(s"Not creating $mergeDir as it already exists")
+          mergeDirCreated = true
+        }
+      }
+      if (!mergeDirCreated) {
+        // This executor didn't see any merge_manager directories, it will start creating them.
+        // It's possible that the other executors launched at the same time may also reach here but
+        // we are working on the assumption that the executors launched around the same time will
+        // have the same set of application local directories.
+        localDirs.foreach { rootDir =>
+          try {
+            val mergeDir = new File(rootDir, MERGE_MANAGER_DIR)
+            // Only one container will create this directory. The filesystem will handle any race
+            // conditions.
+            if (!mergeDir.exists()) {
+              Utils.createDirWith770(mergeDir)
+              for (dirNum <- 0 until subDirsPerLocalDir) {
+                val sudDir = new File(mergeDir, "%02x".format(dirNum))
+                Utils.createDirWith770(sudDir)
+              }
+            }
+            logInfo(s"Merge directory at $mergeDir")
+          } catch {
+            case e: IOException =>
+              logError(
+                s"Failed to create merge dir in $rootDir. Ignoring this directory.", e)
+          }
+        }
+      }
+    }
+    findActiveMergedShuffleDirs(conf)

Review comment:
       I think we can do:
   
   ```scala
   If (Utils.isPushBasedShuffleEnabled(conf)) {
    ....
   } else {
    Array.empty[File]
   }
   ```
   
   we don't need the `Option` here.

##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -728,6 +729,24 @@ private[spark] class BlockManager(
     }
   }
 
+  /**
+   * Get the local merged shuffle block data for the given block ID as multiple chunks.
+   * A merged shuffle file is divided into multiple chunks according to the index file.
+   * Instead of reading the entire file as a single block, we split it into smaller chunks
+   * which will be memory efficient when performing certain operations.
+   */
+  override def getMergedBlockData(blockId: ShuffleBlockId): Seq[ManagedBuffer] = {
+    shuffleManager.shuffleBlockResolver.getMergedBlockData(blockId)
+  }
+
+  /**
+   * Get the local merged shuffle block metada data for the given block ID.
+   */
+  def getMergedBlockMeta(blockId: ShuffleBlockId): MergedBlockMeta = {
+    shuffleManager.shuffleBlockResolver.getMergedBlockMeta(blockId)
+  }
+
+

Review comment:
       nit: redundant blank line.

##########
File path: core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
##########
@@ -1448,6 +1445,17 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
     conf.set(SHUFFLE_SERVICE_ENABLED, true)
     assert(Utils.isPushBasedShuffleEnabled(conf) === true)
   }
+
+  test("Test create dir with 770") {
+    val testDir = new File("target/testDir");
+    FileUtils.deleteQuietly(testDir)
+    Utils.createDirWith770(testDir)

Review comment:
       Shall we check the permission of the created dir?

##########
File path: core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
##########
@@ -30,14 +29,12 @@ import java.util.zip.GZIPOutputStream
 
 import scala.collection.mutable.ListBuffer
 import scala.util.Random
-
 import com.google.common.io.Files
-import org.apache.commons.io.IOUtils
+import org.apache.commons.io.{FileUtils, IOUtils}
 import org.apache.commons.lang3.{JavaVersion, SystemUtils}
 import org.apache.commons.math3.stat.inference.ChiSquareTest
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
-

Review comment:
       ditto

##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -153,6 +189,59 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
     }
   }
 
+  /**
+   * Get the list of configured local dirs storing merged shuffle blocks created by executors
+   * if push based shuffle is enabled. Note that the files in this directory will be created
+   * by the external shuffle services. We only create the merge_manager directories and
+   * subdirectories here because currently the shuffle service doesn't have permission to
+   * create directories under application local directories.
+   */
+  private def createLocalDirsForMergedShuffleBlocks(conf: SparkConf): Option[Array[File]] = {
+    if (Utils.isPushBasedShuffleEnabled(conf)) {
+      // Will create the merge_manager directory only if it doesn't exist under any local dir.
+      val localDirs = Utils.getConfiguredLocalDirs(conf)
+      var mergeDirCreated = false;
+      for (rootDir <- localDirs) {
+        val mergeDir = new File(rootDir, MERGE_MANAGER_DIR)
+        if (mergeDir.exists()) {
+          logDebug(s"Not creating $mergeDir as it already exists")
+          mergeDirCreated = true
+        }
+      }
+      if (!mergeDirCreated) {
+        // This executor didn't see any merge_manager directories, it will start creating them.
+        // It's possible that the other executors launched at the same time may also reach here but
+        // we are working on the assumption that the executors launched around the same time will
+        // have the same set of application local directories.
+        localDirs.foreach { rootDir =>
+          try {
+            val mergeDir = new File(rootDir, MERGE_MANAGER_DIR)
+            // Only one container will create this directory. The filesystem will handle any race
+            // conditions.
+            if (!mergeDir.exists()) {
+              Utils.createDirWith770(mergeDir)
+              for (dirNum <- 0 until subDirsPerLocalDir) {
+                val sudDir = new File(mergeDir, "%02x".format(dirNum))
+                Utils.createDirWith770(sudDir)
+              }
+            }
+            logInfo(s"Merge directory at $mergeDir")
+          } catch {
+            case e: IOException =>
+              logError(
+                s"Failed to create merge dir in $rootDir. Ignoring this directory.", e)
+          }
+        }
+      }
+    }
+    findActiveMergedShuffleDirs(conf)
+  }
+
+  private def findActiveMergedShuffleDirs(conf: SparkConf): Option[Array[File]] = {

Review comment:
       Shall inline this function? It's only called once.

##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -153,6 +189,59 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
     }
   }
 
+  /**
+   * Get the list of configured local dirs storing merged shuffle blocks created by executors
+   * if push based shuffle is enabled. Note that the files in this directory will be created
+   * by the external shuffle services. We only create the merge_manager directories and
+   * subdirectories here because currently the shuffle service doesn't have permission to
+   * create directories under application local directories.
+   */
+  private def createLocalDirsForMergedShuffleBlocks(conf: SparkConf): Option[Array[File]] = {
+    if (Utils.isPushBasedShuffleEnabled(conf)) {
+      // Will create the merge_manager directory only if it doesn't exist under any local dir.
+      val localDirs = Utils.getConfiguredLocalDirs(conf)
+      var mergeDirCreated = false;
+      for (rootDir <- localDirs) {
+        val mergeDir = new File(rootDir, MERGE_MANAGER_DIR)
+        if (mergeDir.exists()) {
+          logDebug(s"Not creating $mergeDir as it already exists")
+          mergeDirCreated = true
+        }
+      }

Review comment:
       Is this necessary?  If you want to skip creating the directory below I think you can do:
   
   ```scala
   locaDirs.forall {  rootDir =>
    val mergeDir = new File(rootDir, MERGE_MANAGER_DIR)
    if (mergeDir.exist()) {
     false
    } else {
     Utils.createDirWith770(mergeDir)
     for (dirNum <- 0 until subDirsPerLocalDir) {
      val sudDir = new File(mergeDir, "%02x".format(dirNum))
      Utils.createDirWith770(sudDir)
     }
     true
   }

##########
File path: core/src/main/scala/org/apache/spark/util/Utils.scala
##########
@@ -315,6 +315,37 @@ private[spark] object Utils extends Logging {
     dir.getCanonicalFile
   }
 
+  /**
+   * Create a directory that is writable by the group.
+   * Grant 770 permission so the shuffle server can create subdirs/files within the merge folder.
+   */
+  def createDirWith770(dirToCreate: File): Unit = {

Review comment:
       Shall we add the permission as a parameter and make this function more general considering it's a util function?

##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -83,6 +91,34 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
 
   def getFile(blockId: BlockId): File = getFile(blockId.name)
 
+  /**
+   * This should be in sync with
+   * org.apache.spark.network.shuffle.RemoteBlockPushResolver#getMergedShuffleFile
+   */
+  def getMergedShuffleFile(blockId: BlockId): File = {
+    blockId match {
+      case mergedBlockId: ShuffleMergedBlockId =>
+        getMergedShuffleFile(mergedBlockId.appId, mergedBlockId.name)
+      case mergedIndexBlockId: ShuffleMergedIndexBlockId =>
+        getMergedShuffleFile(mergedIndexBlockId.appId, mergedIndexBlockId.name)
+      case mergedMetaBlockId: ShuffleMergedMetaBlockId =>
+        getMergedShuffleFile(mergedMetaBlockId.appId, mergedMetaBlockId.name)
+      case _ =>
+        throw new IllegalArgumentException(
+          s"Only merged block ID is supported, but got ${blockId}")
+    }
+  }
+
+  private def getMergedShuffleFile(appId: String, filename: String): File = {

Review comment:
       `appId` not used?

##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -83,6 +91,34 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
 
   def getFile(blockId: BlockId): File = getFile(blockId.name)
 
+  /**
+   * This should be in sync with
+   * org.apache.spark.network.shuffle.RemoteBlockPushResolver#getMergedShuffleFile

Review comment:
       nit: we warp this with `[[...]`, so the class can be linked.

##########
File path: core/src/main/scala/org/apache/spark/network/BlockDataManager.scala
##########
@@ -71,4 +71,9 @@ trait BlockDataManager {
    * Release locks acquired by [[putBlockData()]] and [[getLocalBlockData()]].
    */
   def releaseLock(blockId: BlockId, taskContext: Option[TaskContext]): Unit
+
+  /**
+   * Get the local merged shuffle block data
+   */
+  def getMergedBlockData(blockId: ShuffleBlockId): Seq[ManagedBuffer]

Review comment:
       Shall we rename it to `getLocalMergedBlockData` if it's local only?

##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -153,6 +189,59 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
     }
   }
 
+  /**
+   * Get the list of configured local dirs storing merged shuffle blocks created by executors
+   * if push based shuffle is enabled. Note that the files in this directory will be created
+   * by the external shuffle services. We only create the merge_manager directories and
+   * subdirectories here because currently the shuffle service doesn't have permission to
+   * create directories under application local directories.
+   */
+  private def createLocalDirsForMergedShuffleBlocks(conf: SparkConf): Option[Array[File]] = {
+    if (Utils.isPushBasedShuffleEnabled(conf)) {
+      // Will create the merge_manager directory only if it doesn't exist under any local dir.
+      val localDirs = Utils.getConfiguredLocalDirs(conf)

Review comment:
       Shall we make this a class field so we don't call it multiple times? (e.g., `createLocalDirs`, `findActiveMergedShuffleDirs` also call it) 

##########
File path: core/src/main/scala/org/apache/spark/storage/BlockId.scala
##########
@@ -87,6 +87,32 @@ case class ShufflePushBlockId(shuffleId: Int, mapIndex: Int, reduceId: Int) exte
   override def name: String = "shufflePush_" + shuffleId + "_" + mapIndex + "_" + reduceId
 }
 
+@Since("3.2.0")
+@DeveloperApi
+case class ShuffleMergedBlockId(appId: String, shuffleId: Int, reduceId: Int) extends BlockId {
+  override def name: String = "mergedShuffle_" + appId + "_" + shuffleId + "_" + reduceId + ".data"

Review comment:
       Shall we use `shuffleMerged` instead? It's more consistent with others, e.g., `shufflePush`. And probably be useful somewhere when we need to filter the shuffle-related block by checking the prefix "shuffle".

##########
File path: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
##########
@@ -343,6 +358,48 @@ private[spark] class IndexShuffleBlockResolver(
     }
   }
 
+  /**
+   * This is only used for reading local merged block data. In such cases, all chunks in the
+   * merged shuffle file need to be identified at once, so the ShuffleBlockFetcherIterator
+   * knows how to consume local merged shuffle file as multiple chunks.
+   */
+  override def getMergedBlockData(blockId: ShuffleBlockId): Seq[ManagedBuffer] = {
+    val indexFile = getMergedBlockIndexFile(conf.getAppId, blockId.shuffleId, blockId.reduceId)
+    val dataFile = getMergedBlockDataFile(conf.getAppId, blockId.shuffleId, blockId.reduceId)
+    // Load all the indexes in order to identify all chunks in the specified merged shuffle file.
+    val size = indexFile.length.toInt
+    val offsets = Utils.tryWithResource {
+      new DataInputStream(Files.newInputStream(indexFile.toPath))
+    } { dis =>
+      val buffer = ByteBuffer.allocate(size)
+      dis.readFully(buffer.array)
+      buffer.asLongBuffer
+    }
+    // Number of chunks is number of indexes - 1
+    val numChunks = size / 8 - 1
+    val chunkSizes = new Array[Long](numChunks)
+    for (index <- 0 until numChunks) {
+      chunkSizes(index) = offsets.get(index + 1) - offsets.get(index)
+    }
+    chunkSizes.indices.map {
+      index =>
+        new FileSegmentManagedBuffer(transportConf, dataFile,
+          offsets.get(index), chunkSizes(index))
+    }

Review comment:
       nit:
   
   ```suggestion
       for (index <- 0 until numChunks) {
         new FileSegmentManagedBuffer(transportConf, dataFile, 
          offsets.get(index)
          offsets.get(index + 1) - offsets.get(index)
       }
   ```

##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -83,6 +91,34 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
 
   def getFile(blockId: BlockId): File = getFile(blockId.name)
 
+  /**
+   * This should be in sync with
+   * org.apache.spark.network.shuffle.RemoteBlockPushResolver#getMergedShuffleFile
+   */
+  def getMergedShuffleFile(blockId: BlockId): File = {
+    blockId match {
+      case mergedBlockId: ShuffleMergedBlockId =>
+        getMergedShuffleFile(mergedBlockId.appId, mergedBlockId.name)
+      case mergedIndexBlockId: ShuffleMergedIndexBlockId =>
+        getMergedShuffleFile(mergedIndexBlockId.appId, mergedIndexBlockId.name)
+      case mergedMetaBlockId: ShuffleMergedMetaBlockId =>
+        getMergedShuffleFile(mergedMetaBlockId.appId, mergedMetaBlockId.name)
+      case _ =>
+        throw new IllegalArgumentException(
+          s"Only merged block ID is supported, but got ${blockId}")

Review comment:
       nit: unnecessary "{}"

##########
File path: core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
##########
@@ -30,14 +29,12 @@ import java.util.zip.GZIPOutputStream
 
 import scala.collection.mutable.ListBuffer
 import scala.util.Random
-

Review comment:
       nit: revert the blank.

##########
File path: core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
##########
@@ -17,8 +17,7 @@
 
 package org.apache.spark.util
 
-import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataOutput, DataOutputStream, File,
-  FileOutputStream, PrintStream, SequenceInputStream}
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataOutput, DataOutputStream, File, FileOutputStream, PrintStream, SequenceInputStream}

Review comment:
       nit: `import java.io._`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org