You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by yucai <gi...@git.apache.org> on 2015/12/09 14:04:02 UTC

[GitHub] spark pull request: [SPARK-12196][Core] Store blocks in storage de...

GitHub user yucai opened a pull request:

    https://github.com/apache/spark/pull/10225

    [SPARK-12196][Core] Store blocks in storage devices with hierarchy way

    https://issues.apache.org/jira/browse/SPARK-12196

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/yucai/spark hs

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/10225.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #10225
    
----
commit 547973570070528648b7f8f8eb7ba3ca6efdb640
Author: yucai <yu...@intel.com>
Date:   2015-12-09T06:07:06Z

    [SPARK-12196][Core] Store blocks in storage devices with hierarchy way

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #10225: [SPARK-12196][Core] Store/retrieve blocks in diff...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10225#discussion_r76946878
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala ---
    @@ -50,35 +50,98 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
     
       private val shutdownHook = addShutdownHook()
     
    +  private abstract class FileAllocationStrategy {
    +    def apply(filename: String): File
    +
    +    protected def getFile(filename: String, storageDirs: Array[File]): File = {
    +      require(storageDirs.nonEmpty, "could not find file when the directories are empty")
    +
    +      // Figure out which local directory it hashes to, and which subdirectory in that
    +      val hash = Utils.nonNegativeHash(filename)
    +      val dirId = localDirs.indexOf(storageDirs(hash % storageDirs.length))
    +      val subDirId = (hash / storageDirs.length) % subDirsPerLocalDir
    +
    +      // Create the subdirectory if it doesn't already exist
    +      val subDir = subDirs(dirId).synchronized {
    +        val old = subDirs(dirId)(subDirId)
    +        if (old != null) {
    +          old
    +        } else {
    +          val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
    +          if (!newDir.exists() && !newDir.mkdir()) {
    --- End diff --
    
    @yucai It makes the subdir here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12196][Core] Store blocks in storage de...

Posted by chenghao-intel <gi...@git.apache.org>.
Github user chenghao-intel commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10225#discussion_r47098583
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala ---
    @@ -80,6 +77,65 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
         new File(subDir, filename)
       }
     
    +  def getFileDefault(filename: String): File = {
    +    _getFile(filename, localDirs)
    +  }
    +  var getFile = getFileDefault _
    --- End diff --
    
    OK, after doublechecking the code, I think we'd better still keep the `getFile` as `def getFile(file: String)`, and leave the status checking within the function for contention reason.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12196][Core] Store/retrieve blocks in d...

Posted by yucai <gi...@git.apache.org>.
Github user yucai commented on the pull request:

    https://github.com/apache/spark/pull/10225#issuecomment-172698659
  
    @rxin @JoshRosen, the PR has been ready, could you kindly help review?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [WIP][SPARK-12196][Core] Store/retrieve blocks...

Posted by chenghao-intel <gi...@git.apache.org>.
Github user chenghao-intel commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10225#discussion_r48699576
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala ---
    @@ -53,35 +53,98 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
     
       private val shutdownHook = addShutdownHook()
     
    +  private abstract class FileAllocationStrategy {
    +    def apply(filename: String): File
    +
    +    protected def getFile(filename: String, storageDirs: Array[File]): File = {
    +      require(storageDirs.nonEmpty, "could not find file when the directories are empty")
    +
    +      // Figure out which local directory it hashes to, and which subdirectory in that
    +      val hash = Utils.nonNegativeHash(filename)
    +      val dirId = localDirs.indexOf(storageDirs(hash % storageDirs.length))
    +      val subDirId = (hash / storageDirs.length) % subDirsPerLocalDir
    +
    +      // Create the subdirectory if it doesn't already exist
    +      val subDir = subDirs(dirId).synchronized {
    +        val old = subDirs(dirId)(subDirId)
    +        if (old != null) {
    +          old
    +        } else {
    +          val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
    +          if (!newDir.exists() && !newDir.mkdir()) {
    +            throw new IOException(s"Failed to create local dir in $newDir.")
    +          }
    +          subDirs(dirId)(subDirId) = newDir
    +          newDir
    +        }
    +      }
    +
    +      new File(subDir, filename)
    +    }
    +  }
    +
       /** Looks up a file by hashing it into one of our local subdirectories. */
       // This method should be kept in sync with
       // org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getFile().
    -  def getFile(filename: String): File = {
    -    // Figure out which local directory it hashes to, and which subdirectory in that
    -    val hash = Utils.nonNegativeHash(filename)
    -    val dirId = hash % localDirs.length
    -    val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
    -
    -    // Create the subdirectory if it doesn't already exist
    -    val subDir = subDirs(dirId).synchronized {
    -      val old = subDirs(dirId)(subDirId)
    -      if (old != null) {
    -        old
    -      } else {
    -        val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
    -        if (!newDir.exists() && !newDir.mkdir()) {
    -          throw new IOException(s"Failed to create local dir in $newDir.")
    -        }
    -        subDirs(dirId)(subDirId) = newDir
    -        newDir
    +  private object hashAllocator extends FileAllocationStrategy {
    +    def apply(filename: String): File = getFile(filename, localDirs)
    +  }
    +
    +  /** Looks up a file by hierarchy way in different speed storage devices. */
    +  private val hierarchyStore = conf.getOption("spark.storage.hierarchyStore")
    +  private class HierarchyAllocator extends FileAllocationStrategy {
    +    case class LayerInfo(key: String, threshold: Long, dirs: Array[File])
    +    val hsSpecs: Array[(String, Long)] =
    +      // e.g.: hierarchyStore = "ssd 200GB, hdd 100GB"
    +      hierarchyStore.get.trim.split(",").map {
    +        s => val x = s.trim.split(" +")
    +          (x(0).toLowerCase, Utils.byteStringAsGb(x(1)))
           }
    +    val hsLayers: Array[LayerInfo] = hsSpecs.map(
    +      s => LayerInfo(s._1, s._2, localDirs.filter(_.getPath.toLowerCase.containsSlice(s._1)))
    +    )
    +    val lastLayerDirs = localDirs.filter(dir => !hsLayers.exists(_.dirs.contains(dir)))
    +    val allLayers: Array[LayerInfo] = hsLayers :+
    +      LayerInfo("Last Storage", 10.toLong, lastLayerDirs)
    +    val finalLayers: Array[LayerInfo] = allLayers.filter(_.dirs.nonEmpty)
    +    logInfo("Hierarchy store info:")
    +    for (layer <- finalLayers) {
    +      logInfo("Layer: %s, Threshold: %dGB".format(layer.key, layer.threshold))
    +      layer.dirs.foreach { dir => logInfo("\t%s".format(dir.getCanonicalPath)) }
         }
     
    -    new File(subDir, filename)
    +    def apply(filename: String): File = {
    +      var availableFile: File = null
    +      for (layer <- finalLayers) {
    +        val file = getFile(filename, layer.dirs)
    +        if (file.exists()) return file
    +
    +        if (availableFile == null && file.getParentFile.getUsableSpace>>30 >= layer.threshold) {
    --- End diff --
    
    There are some issues here:
    1. file.getParentFile probably returns null, the following is that I am trying under windows
    ```scala
    scala> val f = new java.io.File("c:/")
    f: java.io.File = c:\
    scala> f.getParentFile
    res7: java.io.File = null
    ```
    2. getUsableSpaces returns available space in bytes, if we can save the threshold in bytes, then we don't need to convert the available space from bytes to GB.
    3. `availableFile == null`, I think what we are going to do here is exit the loop if we found the proper directory, isn't it?
    4. `file.getParentFile.getUsableSpace>>30` ==> `file.getParentFile.getUsableSpace >> 30`, spaces before/after `>>`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [WIP][SPARK-12196][Core] Store blocks in stora...

Posted by yucai <gi...@git.apache.org>.
Github user yucai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10225#discussion_r47182102
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala ---
    @@ -80,6 +77,65 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
         new File(subDir, filename)
       }
     
    +  def getFileDefault(filename: String): File = {
    +    _getFile(filename, localDirs)
    +  }
    +  var getFile = getFileDefault _
    +
    +  private val hierarchyStore = conf.getOption("spark.storage.hierarchyStore")
    +  if (hierarchyStore.isDefined) {
    +    val HSLayers: Array[(String, Long)] =
    +      hierarchyStore.get.trim.split(",").map {
    +        s => val x = s.trim.split(" +")
    +          (x(0), Utils.byteStringAsGb(x(1)))
    +      }
    +    val HSLayerDirs: Array[Array[File]] = HSLayers.map(
    +      s => localDirs.filter(_.getPath().toLowerCase().containsSlice(s._1))
    +    )
    +    val lastLayer = localDirs.filter {
    +      dir => var found = false
    +        for(dirs <- HSLayerDirs if !found)
    +          if (dirs.contains(dir)) found = true
    +        !found
    +    }
    +    val (finalHSLayers, finalHSLayerDirs) = if (!lastLayer.isEmpty) {
    +      (HSLayers :+ ("Backup Storage", 0.toLong), HSLayerDirs :+ lastLayer)
    +    } else {
    +      val len = HSLayers.length
    +      (HSLayers.take(len-1) :+ (HSLayers(len-1)._1, -1.toLong), HSLayerDirs)
    +    }
    +    for (i <- 0 until finalHSLayers.length) {
    +      val s = finalHSLayers(i)
    +      logInfo("Layer: %s, Threshold: %dGB".format(s._1, s._2))
    +      finalHSLayerDirs(i).foreach {
    +        x => logInfo("\t%s".format(x.getPath()))
    +      }
    +    }
    +
    +    def getFileHierarchy(filename: String): File = {
    +      // serach existing file in each layer
    --- End diff --
    
    Got it, thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #10225: [SPARK-12196][Core] Store/retrieve blocks in diff...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10225#discussion_r76923680
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala ---
    @@ -50,35 +50,98 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
     
       private val shutdownHook = addShutdownHook()
     
    +  private abstract class FileAllocationStrategy {
    +    def apply(filename: String): File
    +
    +    protected def getFile(filename: String, storageDirs: Array[File]): File = {
    +      require(storageDirs.nonEmpty, "could not find file when the directories are empty")
    +
    +      // Figure out which local directory it hashes to, and which subdirectory in that
    +      val hash = Utils.nonNegativeHash(filename)
    +      val dirId = localDirs.indexOf(storageDirs(hash % storageDirs.length))
    +      val subDirId = (hash / storageDirs.length) % subDirsPerLocalDir
    +
    +      // Create the subdirectory if it doesn't already exist
    +      val subDir = subDirs(dirId).synchronized {
    +        val old = subDirs(dirId)(subDirId)
    +        if (old != null) {
    +          old
    +        } else {
    +          val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
    +          if (!newDir.exists() && !newDir.mkdir()) {
    +            throw new IOException(s"Failed to create local dir in $newDir.")
    +          }
    +          subDirs(dirId)(subDirId) = newDir
    +          newDir
    +        }
    +      }
    +
    +      new File(subDir, filename)
    +    }
    +  }
    +
       /** Looks up a file by hashing it into one of our local subdirectories. */
       // This method should be kept in sync with
       // org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getFile().
    -  def getFile(filename: String): File = {
    -    // Figure out which local directory it hashes to, and which subdirectory in that
    -    val hash = Utils.nonNegativeHash(filename)
    -    val dirId = hash % localDirs.length
    -    val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
    -
    -    // Create the subdirectory if it doesn't already exist
    -    val subDir = subDirs(dirId).synchronized {
    -      val old = subDirs(dirId)(subDirId)
    -      if (old != null) {
    -        old
    -      } else {
    -        val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
    -        if (!newDir.exists() && !newDir.mkdir()) {
    -          throw new IOException(s"Failed to create local dir in $newDir.")
    -        }
    -        subDirs(dirId)(subDirId) = newDir
    -        newDir
    +  private object hashAllocator extends FileAllocationStrategy {
    +    def apply(filename: String): File = getFile(filename, localDirs)
    +  }
    +
    +  /** Looks up a file by hierarchy way in different speed storage devices. */
    +  private val hierarchyStore = conf.getOption("spark.storage.hierarchyStore")
    +  private class HierarchyAllocator extends FileAllocationStrategy {
    +    case class LayerInfo(key: String, threshold: Long, dirs: Array[File])
    +    val hsSpecs: Array[(String, Long)] =
    +      // e.g.: hierarchyStore = "ssd 200GB, hdd 100GB"
    +      hierarchyStore.get.trim.split(",").map {
    +        s => val x = s.trim.split(" +")
    +          (x(0).toLowerCase, Utils.byteStringAsBytes(x(1)))
           }
    +    val hsLayers: Array[LayerInfo] = hsSpecs.map(
    +      s => LayerInfo(s._1, s._2, localDirs.filter(_.getPath.toLowerCase.containsSlice(s._1)))
    +    )
    +    val lastLayerDirs = localDirs.filter(dir => !hsLayers.exists(_.dirs.contains(dir)))
    +    val allLayers: Array[LayerInfo] = hsLayers :+
    +      LayerInfo("Last Storage", 10.toLong, lastLayerDirs)
    +    val finalLayers: Array[LayerInfo] = allLayers.filter(_.dirs.nonEmpty)
    +    logInfo("Hierarchy store info:")
    +    for (layer <- finalLayers) {
    +      logInfo("Layer: %s, Threshold: %s".format(layer.key, Utils.bytesToString(layer.threshold)))
    +      layer.dirs.foreach { dir => logInfo("\t%s".format(dir.getCanonicalPath)) }
         }
     
    -    new File(subDir, filename)
    +    def apply(filename: String): File = {
    +      var availableFile: File = null
    +      for (layer <- finalLayers) {
    --- End diff --
    
    Once you get `availableFile`, you can stop this loop early to prevent creating useless subdirs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #10225: [SPARK-12196][Core] Store/retrieve blocks from di...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10225#discussion_r77479165
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala ---
    @@ -50,33 +50,98 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
     
       private val shutdownHook = addShutdownHook()
     
    +  private abstract class FileAllocationStrategy {
    +    def apply(filename: String): File
    +
    +    protected def getFile(filename: String, storageDirs: Array[File]): File = {
    +      require(storageDirs.nonEmpty, "could not find file when the directories are empty")
    +
    +      // Figure out which local directory it hashes to, and which subdirectory in that
    +      val hash = Utils.nonNegativeHash(filename)
    +      val dirId = localDirs.indexOf(storageDirs(hash % storageDirs.length))
    +      val subDirId = (hash / storageDirs.length) % subDirsPerLocalDir
    +
    +      // Create the subdirectory if it doesn't already exist
    +      val subDir = subDirs(dirId).synchronized {
    +        val old = subDirs(dirId)(subDirId)
    +        if (old != null) {
    +          old
    +        } else {
    +          val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
    +          if (!newDir.exists() && !newDir.mkdir()) {
    +            throw new IOException(s"Failed to create local dir in $newDir.")
    +          }
    +          subDirs(dirId)(subDirId) = newDir
    +          newDir
    +        }
    +      }
    +
    +      new File(subDir, filename)
    +    }
    +  }
    +
       /** Looks up a file by hashing it into one of our local subdirectories. */
       // This method should be kept in sync with
       // org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getFile().
    -  def getFile(filename: String): File = {
    -    // Figure out which local directory it hashes to, and which subdirectory in that
    -    val hash = Utils.nonNegativeHash(filename)
    -    val dirId = hash % localDirs.length
    -    val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
    -
    -    // Create the subdirectory if it doesn't already exist
    -    val subDir = subDirs(dirId).synchronized {
    -      val old = subDirs(dirId)(subDirId)
    -      if (old != null) {
    -        old
    -      } else {
    -        val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
    -        if (!newDir.exists() && !newDir.mkdir()) {
    -          throw new IOException(s"Failed to create local dir in $newDir.")
    -        }
    -        subDirs(dirId)(subDirId) = newDir
    -        newDir
    +  private object hashAllocator extends FileAllocationStrategy {
    +    def apply(filename: String): File = getFile(filename, localDirs)
    +  }
    +
    +  /** Looks up a file by hierarchy way in different speed storage devices. */
    +  private val hierarchy = conf.getOption("spark.diskStore.hierarchy")
    +  private class HierarchyAllocator extends FileAllocationStrategy {
    +    case class Level(key: String, threshold: Long, dirs: Array[File])
    +    val hsDescs: Array[(String, Long)] =
    +      // e.g.: hierarchy = "ram_disk 1GB, ssd 20GB"
    +      hierarchy.get.trim.split(",").map {
    +        s => val x = s.trim.split(" +")
    +          val storage = x(0).toLowerCase
    +          val threshold = s.replaceFirst(x(0), "").trim
    +          (storage, Utils.byteStringAsBytes(threshold))
    --- End diff --
    
    ok. looks good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [WIP][SPARK-12196][Core] Store/retrieve blocks...

Posted by yucai <gi...@git.apache.org>.
Github user yucai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10225#discussion_r48708933
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala ---
    @@ -53,35 +53,98 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
     
       private val shutdownHook = addShutdownHook()
     
    +  private abstract class FileAllocationStrategy {
    +    def apply(filename: String): File
    +
    +    protected def getFile(filename: String, storageDirs: Array[File]): File = {
    +      require(storageDirs.nonEmpty, "could not find file when the directories are empty")
    +
    +      // Figure out which local directory it hashes to, and which subdirectory in that
    +      val hash = Utils.nonNegativeHash(filename)
    +      val dirId = localDirs.indexOf(storageDirs(hash % storageDirs.length))
    +      val subDirId = (hash / storageDirs.length) % subDirsPerLocalDir
    +
    +      // Create the subdirectory if it doesn't already exist
    +      val subDir = subDirs(dirId).synchronized {
    +        val old = subDirs(dirId)(subDirId)
    +        if (old != null) {
    +          old
    +        } else {
    +          val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
    +          if (!newDir.exists() && !newDir.mkdir()) {
    +            throw new IOException(s"Failed to create local dir in $newDir.")
    +          }
    +          subDirs(dirId)(subDirId) = newDir
    +          newDir
    +        }
    +      }
    +
    +      new File(subDir, filename)
    +    }
    +  }
    +
       /** Looks up a file by hashing it into one of our local subdirectories. */
       // This method should be kept in sync with
       // org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getFile().
    -  def getFile(filename: String): File = {
    -    // Figure out which local directory it hashes to, and which subdirectory in that
    -    val hash = Utils.nonNegativeHash(filename)
    -    val dirId = hash % localDirs.length
    -    val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
    -
    -    // Create the subdirectory if it doesn't already exist
    -    val subDir = subDirs(dirId).synchronized {
    -      val old = subDirs(dirId)(subDirId)
    -      if (old != null) {
    -        old
    -      } else {
    -        val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
    -        if (!newDir.exists() && !newDir.mkdir()) {
    -          throw new IOException(s"Failed to create local dir in $newDir.")
    -        }
    -        subDirs(dirId)(subDirId) = newDir
    -        newDir
    +  private object hashAllocator extends FileAllocationStrategy {
    +    def apply(filename: String): File = getFile(filename, localDirs)
    +  }
    +
    +  /** Looks up a file by hierarchy way in different speed storage devices. */
    +  private val hierarchyStore = conf.getOption("spark.storage.hierarchyStore")
    +  private class HierarchyAllocator extends FileAllocationStrategy {
    +    case class LayerInfo(key: String, threshold: Long, dirs: Array[File])
    +    val hsSpecs: Array[(String, Long)] =
    +      // e.g.: hierarchyStore = "ssd 200GB, hdd 100GB"
    +      hierarchyStore.get.trim.split(",").map {
    +        s => val x = s.trim.split(" +")
    +          (x(0).toLowerCase, Utils.byteStringAsGb(x(1)))
           }
    +    val hsLayers: Array[LayerInfo] = hsSpecs.map(
    +      s => LayerInfo(s._1, s._2, localDirs.filter(_.getPath.toLowerCase.containsSlice(s._1)))
    +    )
    +    val lastLayerDirs = localDirs.filter(dir => !hsLayers.exists(_.dirs.contains(dir)))
    +    val allLayers: Array[LayerInfo] = hsLayers :+
    +      LayerInfo("Last Storage", 10.toLong, lastLayerDirs)
    +    val finalLayers: Array[LayerInfo] = allLayers.filter(_.dirs.nonEmpty)
    +    logInfo("Hierarchy store info:")
    +    for (layer <- finalLayers) {
    +      logInfo("Layer: %s, Threshold: %dGB".format(layer.key, layer.threshold))
    +      layer.dirs.foreach { dir => logInfo("\t%s".format(dir.getCanonicalPath)) }
         }
     
    -    new File(subDir, filename)
    +    def apply(filename: String): File = {
    +      var availableFile: File = null
    +      for (layer <- finalLayers) {
    +        val file = getFile(filename, layer.dirs)
    +        if (file.exists()) return file
    +
    +        if (availableFile == null && file.getParentFile.getUsableSpace>>30 >= layer.threshold) {
    --- End diff --
    
    1. The ```file``` returned from FileAllocationStrategy.getFile definitely have parent directory, so the file.getParentFile could not be null.
    2. ```availableFile != null``` means we find an available file location which has enough disk space, but we cannot return at this point, because some file may have existed in the slower layer, we have to keep checking each layer in finalLayers.
    3. Others have been updated, thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #10225: [SPARK-12196][Core] Store/retrieve blocks from different...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/10225
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #10225: [SPARK-12196][Core] Store/retrieve blocks from different...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/10225
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #10225: [SPARK-12196][Core] Store/retrieve blocks from different...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/10225
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #10225: [SPARK-12196][Core] Store/retrieve blocks in diff...

Posted by yucai <gi...@git.apache.org>.
Github user yucai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10225#discussion_r76972643
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala ---
    @@ -50,35 +50,98 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
     
       private val shutdownHook = addShutdownHook()
     
    +  private abstract class FileAllocationStrategy {
    +    def apply(filename: String): File
    +
    +    protected def getFile(filename: String, storageDirs: Array[File]): File = {
    +      require(storageDirs.nonEmpty, "could not find file when the directories are empty")
    +
    +      // Figure out which local directory it hashes to, and which subdirectory in that
    +      val hash = Utils.nonNegativeHash(filename)
    +      val dirId = localDirs.indexOf(storageDirs(hash % storageDirs.length))
    +      val subDirId = (hash / storageDirs.length) % subDirsPerLocalDir
    +
    +      // Create the subdirectory if it doesn't already exist
    +      val subDir = subDirs(dirId).synchronized {
    +        val old = subDirs(dirId)(subDirId)
    +        if (old != null) {
    +          old
    +        } else {
    +          val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
    +          if (!newDir.exists() && !newDir.mkdir()) {
    --- End diff --
    
    Oh, I see your point now, thanks for explaining!
    But I cannot stop the loop, even I get ```availableFile``` from below.
    ```
            if (availableFile == null && file.getParentFile.getUsableSpace >= layer.threshold) {
              availableFile = file
            }
    ```
    It is because the file (block) could exist in any hierarchy layers, I have to go through them.
    Anyway to improve?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [WIP][SPARK-12196][Core] Store/retrieve blocks...

Posted by chenghao-intel <gi...@git.apache.org>.
Github user chenghao-intel commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10225#discussion_r48699527
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala ---
    @@ -53,35 +53,98 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
     
       private val shutdownHook = addShutdownHook()
     
    +  private abstract class FileAllocationStrategy {
    +    def apply(filename: String): File
    +
    +    protected def getFile(filename: String, storageDirs: Array[File]): File = {
    +      require(storageDirs.nonEmpty, "could not find file when the directories are empty")
    +
    +      // Figure out which local directory it hashes to, and which subdirectory in that
    +      val hash = Utils.nonNegativeHash(filename)
    +      val dirId = localDirs.indexOf(storageDirs(hash % storageDirs.length))
    +      val subDirId = (hash / storageDirs.length) % subDirsPerLocalDir
    +
    +      // Create the subdirectory if it doesn't already exist
    +      val subDir = subDirs(dirId).synchronized {
    +        val old = subDirs(dirId)(subDirId)
    +        if (old != null) {
    +          old
    +        } else {
    +          val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
    +          if (!newDir.exists() && !newDir.mkdir()) {
    +            throw new IOException(s"Failed to create local dir in $newDir.")
    +          }
    +          subDirs(dirId)(subDirId) = newDir
    +          newDir
    +        }
    +      }
    +
    +      new File(subDir, filename)
    +    }
    +  }
    +
       /** Looks up a file by hashing it into one of our local subdirectories. */
       // This method should be kept in sync with
       // org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getFile().
    -  def getFile(filename: String): File = {
    -    // Figure out which local directory it hashes to, and which subdirectory in that
    -    val hash = Utils.nonNegativeHash(filename)
    -    val dirId = hash % localDirs.length
    -    val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
    -
    -    // Create the subdirectory if it doesn't already exist
    -    val subDir = subDirs(dirId).synchronized {
    -      val old = subDirs(dirId)(subDirId)
    -      if (old != null) {
    -        old
    -      } else {
    -        val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
    -        if (!newDir.exists() && !newDir.mkdir()) {
    -          throw new IOException(s"Failed to create local dir in $newDir.")
    -        }
    -        subDirs(dirId)(subDirId) = newDir
    -        newDir
    +  private object hashAllocator extends FileAllocationStrategy {
    +    def apply(filename: String): File = getFile(filename, localDirs)
    +  }
    +
    +  /** Looks up a file by hierarchy way in different speed storage devices. */
    +  private val hierarchyStore = conf.getOption("spark.storage.hierarchyStore")
    +  private class HierarchyAllocator extends FileAllocationStrategy {
    +    case class LayerInfo(key: String, threshold: Long, dirs: Array[File])
    +    val hsSpecs: Array[(String, Long)] =
    +      // e.g.: hierarchyStore = "ssd 200GB, hdd 100GB"
    +      hierarchyStore.get.trim.split(",").map {
    +        s => val x = s.trim.split(" +")
    +          (x(0).toLowerCase, Utils.byteStringAsGb(x(1)))
    --- End diff --
    
    `Utils.byteStringAsBytes` instead?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12196][Core] Store blocks in storage de...

Posted by chenghao-intel <gi...@git.apache.org>.
Github user chenghao-intel commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10225#discussion_r47096059
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala ---
    @@ -80,6 +77,65 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
         new File(subDir, filename)
       }
     
    +  def getFileDefault(filename: String): File = {
    +    _getFile(filename, localDirs)
    +  }
    +  var getFile = getFileDefault _
    +
    +  private val hierarchyStore = conf.getOption("spark.storage.hierarchyStore")
    +  if (hierarchyStore.isDefined) {
    +    val HSLayers: Array[(String, Long)] =
    +      hierarchyStore.get.trim.split(",").map {
    +        s => val x = s.trim.split(" +")
    +          (x(0), Utils.byteStringAsGb(x(1)))
    +      }
    +    val HSLayerDirs: Array[Array[File]] = HSLayers.map(
    --- End diff --
    
    `spark.storage.hierarchyStore` is optional and the devices are from the fastest to slowest, or given the weight for each device.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #10225: [SPARK-12196][Core] Store/retrieve blocks from different...

Posted by yucai <gi...@git.apache.org>.
Github user yucai commented on the issue:

    https://github.com/apache/spark/pull/10225
  
    @JoshRosen We just renew this PR, could you kindly help take a look at?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #10225: [SPARK-12196][Core] Store/retrieve blocks from di...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10225#discussion_r77479386
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala ---
    @@ -50,33 +50,98 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
     
       private val shutdownHook = addShutdownHook()
     
    +  private abstract class FileAllocationStrategy {
    +    def apply(filename: String): File
    +
    +    protected def getFile(filename: String, storageDirs: Array[File]): File = {
    +      require(storageDirs.nonEmpty, "could not find file when the directories are empty")
    +
    +      // Figure out which local directory it hashes to, and which subdirectory in that
    +      val hash = Utils.nonNegativeHash(filename)
    +      val dirId = localDirs.indexOf(storageDirs(hash % storageDirs.length))
    +      val subDirId = (hash / storageDirs.length) % subDirsPerLocalDir
    +
    +      // Create the subdirectory if it doesn't already exist
    +      val subDir = subDirs(dirId).synchronized {
    +        val old = subDirs(dirId)(subDirId)
    +        if (old != null) {
    +          old
    +        } else {
    +          val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
    +          if (!newDir.exists() && !newDir.mkdir()) {
    +            throw new IOException(s"Failed to create local dir in $newDir.")
    +          }
    +          subDirs(dirId)(subDirId) = newDir
    +          newDir
    +        }
    +      }
    +
    +      new File(subDir, filename)
    +    }
    +  }
    +
       /** Looks up a file by hashing it into one of our local subdirectories. */
       // This method should be kept in sync with
       // org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getFile().
    -  def getFile(filename: String): File = {
    -    // Figure out which local directory it hashes to, and which subdirectory in that
    -    val hash = Utils.nonNegativeHash(filename)
    -    val dirId = hash % localDirs.length
    -    val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
    -
    -    // Create the subdirectory if it doesn't already exist
    -    val subDir = subDirs(dirId).synchronized {
    -      val old = subDirs(dirId)(subDirId)
    -      if (old != null) {
    -        old
    -      } else {
    -        val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
    -        if (!newDir.exists() && !newDir.mkdir()) {
    -          throw new IOException(s"Failed to create local dir in $newDir.")
    -        }
    -        subDirs(dirId)(subDirId) = newDir
    -        newDir
    +  private object hashAllocator extends FileAllocationStrategy {
    +    def apply(filename: String): File = getFile(filename, localDirs)
    +  }
    +
    +  /** Looks up a file by hierarchy way in different speed storage devices. */
    +  private val hierarchy = conf.getOption("spark.diskStore.hierarchy")
    +  private class HierarchyAllocator extends FileAllocationStrategy {
    +    case class Level(key: String, threshold: Long, dirs: Array[File])
    +    val hsDescs: Array[(String, Long)] =
    +      // e.g.: hierarchy = "ram_disk 1GB, ssd 20GB"
    +      hierarchy.get.trim.split(",").map {
    +        s => val x = s.trim.split(" +")
    +          val storage = x(0).toLowerCase
    +          val threshold = s.replaceFirst(x(0), "").trim
    +          (storage, Utils.byteStringAsBytes(threshold))
    --- End diff --
    
    style:
    
        // e.g.: hierarchy = "ram_disk 1GB, ssd 20GB"
        val hsDescs: Array[(String, Long)] = hierarchy.get.trim.split(",").map { s =>


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [WIP][SPARK-12196][Core] Store/retrieve blocks...

Posted by yucai <gi...@git.apache.org>.
Github user yucai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10225#discussion_r48709004
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala ---
    @@ -53,35 +53,98 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
     
       private val shutdownHook = addShutdownHook()
     
    +  private abstract class FileAllocationStrategy {
    +    def apply(filename: String): File
    +
    +    protected def getFile(filename: String, storageDirs: Array[File]): File = {
    +      require(storageDirs.nonEmpty, "could not find file when the directories are empty")
    +
    +      // Figure out which local directory it hashes to, and which subdirectory in that
    +      val hash = Utils.nonNegativeHash(filename)
    +      val dirId = localDirs.indexOf(storageDirs(hash % storageDirs.length))
    +      val subDirId = (hash / storageDirs.length) % subDirsPerLocalDir
    +
    +      // Create the subdirectory if it doesn't already exist
    +      val subDir = subDirs(dirId).synchronized {
    +        val old = subDirs(dirId)(subDirId)
    +        if (old != null) {
    +          old
    +        } else {
    +          val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
    +          if (!newDir.exists() && !newDir.mkdir()) {
    +            throw new IOException(s"Failed to create local dir in $newDir.")
    +          }
    +          subDirs(dirId)(subDirId) = newDir
    +          newDir
    +        }
    +      }
    +
    +      new File(subDir, filename)
    +    }
    +  }
    +
       /** Looks up a file by hashing it into one of our local subdirectories. */
       // This method should be kept in sync with
       // org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getFile().
    -  def getFile(filename: String): File = {
    -    // Figure out which local directory it hashes to, and which subdirectory in that
    -    val hash = Utils.nonNegativeHash(filename)
    -    val dirId = hash % localDirs.length
    -    val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
    -
    -    // Create the subdirectory if it doesn't already exist
    -    val subDir = subDirs(dirId).synchronized {
    -      val old = subDirs(dirId)(subDirId)
    -      if (old != null) {
    -        old
    -      } else {
    -        val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
    -        if (!newDir.exists() && !newDir.mkdir()) {
    -          throw new IOException(s"Failed to create local dir in $newDir.")
    -        }
    -        subDirs(dirId)(subDirId) = newDir
    -        newDir
    +  private object hashAllocator extends FileAllocationStrategy {
    +    def apply(filename: String): File = getFile(filename, localDirs)
    +  }
    +
    +  /** Looks up a file by hierarchy way in different speed storage devices. */
    +  private val hierarchyStore = conf.getOption("spark.storage.hierarchyStore")
    +  private class HierarchyAllocator extends FileAllocationStrategy {
    +    case class LayerInfo(key: String, threshold: Long, dirs: Array[File])
    +    val hsSpecs: Array[(String, Long)] =
    +      // e.g.: hierarchyStore = "ssd 200GB, hdd 100GB"
    +      hierarchyStore.get.trim.split(",").map {
    +        s => val x = s.trim.split(" +")
    +          (x(0).toLowerCase, Utils.byteStringAsGb(x(1)))
           }
    +    val hsLayers: Array[LayerInfo] = hsSpecs.map(
    +      s => LayerInfo(s._1, s._2, localDirs.filter(_.getPath.toLowerCase.containsSlice(s._1)))
    +    )
    +    val lastLayerDirs = localDirs.filter(dir => !hsLayers.exists(_.dirs.contains(dir)))
    +    val allLayers: Array[LayerInfo] = hsLayers :+
    +      LayerInfo("Last Storage", 10.toLong, lastLayerDirs)
    +    val finalLayers: Array[LayerInfo] = allLayers.filter(_.dirs.nonEmpty)
    +    logInfo("Hierarchy store info:")
    +    for (layer <- finalLayers) {
    +      logInfo("Layer: %s, Threshold: %dGB".format(layer.key, layer.threshold))
    +      layer.dirs.foreach { dir => logInfo("\t%s".format(dir.getCanonicalPath)) }
         }
     
    -    new File(subDir, filename)
    +    def apply(filename: String): File = {
    +      var availableFile: File = null
    +      for (layer <- finalLayers) {
    +        val file = getFile(filename, layer.dirs)
    +        if (file.exists()) return file
    +
    +        if (availableFile == null && file.getParentFile.getUsableSpace>>30 >= layer.threshold) {
    +          availableFile = file
    +        }
    +      }
    +
    +      if (availableFile == null) {
    +        throw new IOException(s"No enough disk space.")
    +      }
    +      availableFile
    +    }
       }
     
    -  def getFile(blockId: BlockId): File = getFile(blockId.name)
    +  private val fileAllocator: FileAllocationStrategy =
    +    if (hierarchyStore.isDefined && !conf.getBoolean("spark.shuffle.service.enabled", false)) {
    --- End diff --
    
    Yes, I would like to have another PR to enable the similar hierarchy strategy for external shuffle service. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12196][Core] Store/retrieve blocks in d...

Posted by yucai <gi...@git.apache.org>.
Github user yucai commented on the pull request:

    https://github.com/apache/spark/pull/10225#issuecomment-208193697
  
    @JoshRosen @rxin 
    Dear committers:
    In China, most of companies are still using HDDs as external storage, the IO bottleneck is quite obvious in shuffle, we make this solution to use 1 PCIe SSD as cache, it completely eliminates IO bottleneck and with very low cost.
    
    In Baidu real production case, Spark SQL improves x1.7 with this patch.
    In Youku's machine learning case, the application improves x1.8 with this patch. 
    And because only 1 SSD is added, the cost is very attractive to customers.
    
    This PR is really important to Chinese internet and big data company, more and more company are showing interest and evaluating it in their environment. We are quite sure it will benefit others also, kindly help review.
    
    Much thanks! 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [WIP][SPARK-12196][Core] Store/retrieve blocks...

Posted by chenghao-intel <gi...@git.apache.org>.
Github user chenghao-intel commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10225#discussion_r48699647
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala ---
    @@ -53,35 +53,98 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
     
       private val shutdownHook = addShutdownHook()
     
    +  private abstract class FileAllocationStrategy {
    +    def apply(filename: String): File
    +
    +    protected def getFile(filename: String, storageDirs: Array[File]): File = {
    +      require(storageDirs.nonEmpty, "could not find file when the directories are empty")
    +
    +      // Figure out which local directory it hashes to, and which subdirectory in that
    +      val hash = Utils.nonNegativeHash(filename)
    +      val dirId = localDirs.indexOf(storageDirs(hash % storageDirs.length))
    +      val subDirId = (hash / storageDirs.length) % subDirsPerLocalDir
    +
    +      // Create the subdirectory if it doesn't already exist
    +      val subDir = subDirs(dirId).synchronized {
    +        val old = subDirs(dirId)(subDirId)
    +        if (old != null) {
    +          old
    +        } else {
    +          val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
    +          if (!newDir.exists() && !newDir.mkdir()) {
    +            throw new IOException(s"Failed to create local dir in $newDir.")
    +          }
    +          subDirs(dirId)(subDirId) = newDir
    +          newDir
    +        }
    +      }
    +
    +      new File(subDir, filename)
    +    }
    +  }
    +
       /** Looks up a file by hashing it into one of our local subdirectories. */
       // This method should be kept in sync with
       // org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getFile().
    -  def getFile(filename: String): File = {
    -    // Figure out which local directory it hashes to, and which subdirectory in that
    -    val hash = Utils.nonNegativeHash(filename)
    -    val dirId = hash % localDirs.length
    -    val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
    -
    -    // Create the subdirectory if it doesn't already exist
    -    val subDir = subDirs(dirId).synchronized {
    -      val old = subDirs(dirId)(subDirId)
    -      if (old != null) {
    -        old
    -      } else {
    -        val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
    -        if (!newDir.exists() && !newDir.mkdir()) {
    -          throw new IOException(s"Failed to create local dir in $newDir.")
    -        }
    -        subDirs(dirId)(subDirId) = newDir
    -        newDir
    +  private object hashAllocator extends FileAllocationStrategy {
    +    def apply(filename: String): File = getFile(filename, localDirs)
    +  }
    +
    +  /** Looks up a file by hierarchy way in different speed storage devices. */
    +  private val hierarchyStore = conf.getOption("spark.storage.hierarchyStore")
    +  private class HierarchyAllocator extends FileAllocationStrategy {
    +    case class LayerInfo(key: String, threshold: Long, dirs: Array[File])
    +    val hsSpecs: Array[(String, Long)] =
    +      // e.g.: hierarchyStore = "ssd 200GB, hdd 100GB"
    +      hierarchyStore.get.trim.split(",").map {
    +        s => val x = s.trim.split(" +")
    +          (x(0).toLowerCase, Utils.byteStringAsGb(x(1)))
           }
    +    val hsLayers: Array[LayerInfo] = hsSpecs.map(
    +      s => LayerInfo(s._1, s._2, localDirs.filter(_.getPath.toLowerCase.containsSlice(s._1)))
    +    )
    +    val lastLayerDirs = localDirs.filter(dir => !hsLayers.exists(_.dirs.contains(dir)))
    +    val allLayers: Array[LayerInfo] = hsLayers :+
    +      LayerInfo("Last Storage", 10.toLong, lastLayerDirs)
    +    val finalLayers: Array[LayerInfo] = allLayers.filter(_.dirs.nonEmpty)
    +    logInfo("Hierarchy store info:")
    +    for (layer <- finalLayers) {
    +      logInfo("Layer: %s, Threshold: %dGB".format(layer.key, layer.threshold))
    +      layer.dirs.foreach { dir => logInfo("\t%s".format(dir.getCanonicalPath)) }
         }
     
    -    new File(subDir, filename)
    +    def apply(filename: String): File = {
    +      var availableFile: File = null
    +      for (layer <- finalLayers) {
    +        val file = getFile(filename, layer.dirs)
    +        if (file.exists()) return file
    +
    +        if (availableFile == null && file.getParentFile.getUsableSpace>>30 >= layer.threshold) {
    +          availableFile = file
    +        }
    +      }
    +
    +      if (availableFile == null) {
    +        throw new IOException(s"No enough disk space.")
    +      }
    +      availableFile
    +    }
       }
     
    -  def getFile(blockId: BlockId): File = getFile(blockId.name)
    +  private val fileAllocator: FileAllocationStrategy =
    +    if (hierarchyStore.isDefined && !conf.getBoolean("spark.shuffle.service.enabled", false)) {
    --- End diff --
    
    Hmm, this is a good question, are those change still available, if we enable the shuffle service and dynamic allocation.
    Shuffle Service & dynamic allocation enabled probably very often seen in production env.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12196][Core] Store blocks in storage de...

Posted by chenghao-intel <gi...@git.apache.org>.
Github user chenghao-intel commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10225#discussion_r47096378
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala ---
    @@ -80,6 +77,65 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
         new File(subDir, filename)
       }
     
    +  def getFileDefault(filename: String): File = {
    +    _getFile(filename, localDirs)
    +  }
    +  var getFile = getFileDefault _
    +
    +  private val hierarchyStore = conf.getOption("spark.storage.hierarchyStore")
    +  if (hierarchyStore.isDefined) {
    +    val HSLayers: Array[(String, Long)] =
    +      hierarchyStore.get.trim.split(",").map {
    +        s => val x = s.trim.split(" +")
    +          (x(0), Utils.byteStringAsGb(x(1)))
    +      }
    +    val HSLayerDirs: Array[Array[File]] = HSLayers.map(
    --- End diff --
    
    In general, I think we'd better to give users less chance to mistake in the configuration.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #10225: [SPARK-12196][Core] Store/retrieve blocks in diff...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10225#discussion_r77109895
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala ---
    @@ -50,35 +50,98 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
     
       private val shutdownHook = addShutdownHook()
     
    +  private abstract class FileAllocationStrategy {
    +    def apply(filename: String): File
    +
    +    protected def getFile(filename: String, storageDirs: Array[File]): File = {
    +      require(storageDirs.nonEmpty, "could not find file when the directories are empty")
    +
    +      // Figure out which local directory it hashes to, and which subdirectory in that
    +      val hash = Utils.nonNegativeHash(filename)
    +      val dirId = localDirs.indexOf(storageDirs(hash % storageDirs.length))
    +      val subDirId = (hash / storageDirs.length) % subDirsPerLocalDir
    +
    +      // Create the subdirectory if it doesn't already exist
    +      val subDir = subDirs(dirId).synchronized {
    +        val old = subDirs(dirId)(subDirId)
    +        if (old != null) {
    +          old
    +        } else {
    +          val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
    +          if (!newDir.exists() && !newDir.mkdir()) {
    --- End diff --
    
    I see. This may not be an important issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #10225: [SPARK-12196][Core] Store/retrieve blocks from different...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/10225
  
    **[Test build #97761 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97761/testReport)** for PR 10225 at commit [`507c506`](https://github.com/apache/spark/commit/507c506d1d8cdc1674bece4e4d149334b4ccf8b0).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #10225: [SPARK-12196][Core] Store/retrieve blocks from different...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/10225
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/97723/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12196][Core] Store blocks in storage de...

Posted by chenghao-intel <gi...@git.apache.org>.
Github user chenghao-intel commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10225#discussion_r47097655
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala ---
    @@ -80,6 +77,65 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
         new File(subDir, filename)
       }
     
    +  def getFileDefault(filename: String): File = {
    +    _getFile(filename, localDirs)
    +  }
    +  var getFile = getFileDefault _
    +
    +  private val hierarchyStore = conf.getOption("spark.storage.hierarchyStore")
    +  if (hierarchyStore.isDefined) {
    +    val HSLayers: Array[(String, Long)] =
    +      hierarchyStore.get.trim.split(",").map {
    +        s => val x = s.trim.split(" +")
    +          (x(0), Utils.byteStringAsGb(x(1)))
    +      }
    +    val HSLayerDirs: Array[Array[File]] = HSLayers.map(
    +      s => localDirs.filter(_.getPath().toLowerCase().containsSlice(s._1))
    +    )
    +    val lastLayer = localDirs.filter {
    +      dir => var found = false
    +        for(dirs <- HSLayerDirs if !found)
    +          if (dirs.contains(dir)) found = true
    +        !found
    +    }
    +    val (finalHSLayers, finalHSLayerDirs) = if (!lastLayer.isEmpty) {
    +      (HSLayers :+ ("Backup Storage", 0.toLong), HSLayerDirs :+ lastLayer)
    +    } else {
    +      val len = HSLayers.length
    +      (HSLayers.take(len-1) :+ (HSLayers(len-1)._1, -1.toLong), HSLayerDirs)
    +    }
    +    for (i <- 0 until finalHSLayers.length) {
    +      val s = finalHSLayers(i)
    +      logInfo("Layer: %s, Threshold: %dGB".format(s._1, s._2))
    +      finalHSLayerDirs(i).foreach {
    +        x => logInfo("\t%s".format(x.getPath()))
    +      }
    +    }
    +
    +    def getFileHierarchy(filename: String): File = {
    +      // serach existing file in each layer
    +      var candidateFiles = new scala.collection.immutable.HashMap[Int, File]
    +      for ((dirs, i) <- finalHSLayerDirs.zipWithIndex if !dirs.isEmpty) {
    +        val file = _getFile(filename, dirs)
    +        if (file.exists())
    +          return file
    +        candidateFiles += (i -> file)
    +      }
    +
    +      // if not found, return file from the upper layer with enough usable space 
    +      for ((dirs, i) <- finalHSLayerDirs.zipWithIndex if !dirs.isEmpty) {
    +        val dir = dirs(Utils.nonNegativeHash(filename) % dirs.length)
    +        val usableSpace = dir.getUsableSpace() >> 30
    +        if (usableSpace >= finalHSLayers(i)._2)
    +          return candidateFiles(i)
    +      }
    +
    +      null
    --- End diff --
    
    throw exception with meaningful msg?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12196][Core] Store blocks in storage de...

Posted by chenghao-intel <gi...@git.apache.org>.
Github user chenghao-intel commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10225#discussion_r47095833
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala ---
    @@ -80,6 +77,65 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
         new File(subDir, filename)
       }
     
    +  def getFileDefault(filename: String): File = {
    +    _getFile(filename, localDirs)
    +  }
    +  var getFile = getFileDefault _
    +
    +  private val hierarchyStore = conf.getOption("spark.storage.hierarchyStore")
    +  if (hierarchyStore.isDefined) {
    +    val HSLayers: Array[(String, Long)] =
    +      hierarchyStore.get.trim.split(",").map {
    +        s => val x = s.trim.split(" +")
    +          (x(0), Utils.byteStringAsGb(x(1)))
    +      }
    +    val HSLayerDirs: Array[Array[File]] = HSLayers.map(
    --- End diff --
    
    I am wondering if we can split the faster devices and the normal devices for backward-compatible.
    e.g.
    ```
    spark.storage.hierarchyStore=/mnt/nvm1,/mnt/nvm2,/mnt/ssd1,/mnt/ssd2
    spark.local.dir=/mnt/sata1/,/mnt/sata2,/mnt/stat3
    ```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12196][Core] Store/retrieve blocks in d...

Posted by chenghao-intel <gi...@git.apache.org>.
Github user chenghao-intel commented on the pull request:

    https://github.com/apache/spark/pull/10225#issuecomment-205712538
  
    @JoshRosen I am not sure if this still be part of your refactorings, or can we bring up this PR? This PR is quite critical performance improvement when mixed PCI-E SSD / HDD, particularly for the large mount of data shuffling scenario.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [WIP][SPARK-12196][Core] Store/retrieve blocks...

Posted by yucai <gi...@git.apache.org>.
Github user yucai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10225#discussion_r48708534
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala ---
    @@ -53,35 +53,98 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
     
       private val shutdownHook = addShutdownHook()
     
    +  private abstract class FileAllocationStrategy {
    +    def apply(filename: String): File
    +
    +    protected def getFile(filename: String, storageDirs: Array[File]): File = {
    +      require(storageDirs.nonEmpty, "could not find file when the directories are empty")
    +
    +      // Figure out which local directory it hashes to, and which subdirectory in that
    +      val hash = Utils.nonNegativeHash(filename)
    +      val dirId = localDirs.indexOf(storageDirs(hash % storageDirs.length))
    +      val subDirId = (hash / storageDirs.length) % subDirsPerLocalDir
    +
    +      // Create the subdirectory if it doesn't already exist
    +      val subDir = subDirs(dirId).synchronized {
    +        val old = subDirs(dirId)(subDirId)
    +        if (old != null) {
    +          old
    +        } else {
    +          val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
    +          if (!newDir.exists() && !newDir.mkdir()) {
    +            throw new IOException(s"Failed to create local dir in $newDir.")
    +          }
    +          subDirs(dirId)(subDirId) = newDir
    +          newDir
    +        }
    +      }
    +
    +      new File(subDir, filename)
    +    }
    +  }
    +
       /** Looks up a file by hashing it into one of our local subdirectories. */
       // This method should be kept in sync with
       // org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getFile().
    -  def getFile(filename: String): File = {
    -    // Figure out which local directory it hashes to, and which subdirectory in that
    -    val hash = Utils.nonNegativeHash(filename)
    -    val dirId = hash % localDirs.length
    -    val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
    -
    -    // Create the subdirectory if it doesn't already exist
    -    val subDir = subDirs(dirId).synchronized {
    -      val old = subDirs(dirId)(subDirId)
    -      if (old != null) {
    -        old
    -      } else {
    -        val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
    -        if (!newDir.exists() && !newDir.mkdir()) {
    -          throw new IOException(s"Failed to create local dir in $newDir.")
    -        }
    -        subDirs(dirId)(subDirId) = newDir
    -        newDir
    +  private object hashAllocator extends FileAllocationStrategy {
    +    def apply(filename: String): File = getFile(filename, localDirs)
    +  }
    +
    +  /** Looks up a file by hierarchy way in different speed storage devices. */
    +  private val hierarchyStore = conf.getOption("spark.storage.hierarchyStore")
    +  private class HierarchyAllocator extends FileAllocationStrategy {
    +    case class LayerInfo(key: String, threshold: Long, dirs: Array[File])
    +    val hsSpecs: Array[(String, Long)] =
    +      // e.g.: hierarchyStore = "ssd 200GB, hdd 100GB"
    +      hierarchyStore.get.trim.split(",").map {
    +        s => val x = s.trim.split(" +")
    +          (x(0).toLowerCase, Utils.byteStringAsGb(x(1)))
    --- End diff --
    
    Ok, by this way, we can even eliminate "file.getParentFile.getUsableSpace>>30" operation in apply(), thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #10225: [SPARK-12196][Core] Store/retrieve blocks from different...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/10225
  
    **[Test build #97723 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97723/testReport)** for PR 10225 at commit [`507c506`](https://github.com/apache/spark/commit/507c506d1d8cdc1674bece4e4d149334b4ccf8b0).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #10225: [SPARK-12196][Core] Store/retrieve blocks from di...

Posted by chenghao-intel <gi...@git.apache.org>.
Github user chenghao-intel commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10225#discussion_r77748327
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
    @@ -136,7 +136,9 @@ private[spark] class IndexShuffleBlockResolver(
           shuffleId: Int,
           mapId: Int,
           lengths: Array[Long],
    -      dataTmp: File): Unit = {
    --- End diff --
    
    Do we have to change the code in this function?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #10225: [SPARK-12196][Core] Store/retrieve blocks in different s...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/10225
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #10225: [SPARK-12196][Core] Store/retrieve blocks from different...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/10225
  
    **[Test build #97790 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97790/testReport)** for PR 10225 at commit [`507c506`](https://github.com/apache/spark/commit/507c506d1d8cdc1674bece4e4d149334b4ccf8b0).
     * This patch **fails build dependency tests**.
     * This patch **does not merge cleanly**.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #10225: [SPARK-12196][Core] Store/retrieve blocks in diff...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10225#discussion_r76941111
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala ---
    @@ -50,35 +50,98 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
     
       private val shutdownHook = addShutdownHook()
     
    +  private abstract class FileAllocationStrategy {
    +    def apply(filename: String): File
    +
    +    protected def getFile(filename: String, storageDirs: Array[File]): File = {
    +      require(storageDirs.nonEmpty, "could not find file when the directories are empty")
    +
    +      // Figure out which local directory it hashes to, and which subdirectory in that
    +      val hash = Utils.nonNegativeHash(filename)
    +      val dirId = localDirs.indexOf(storageDirs(hash % storageDirs.length))
    +      val subDirId = (hash / storageDirs.length) % subDirsPerLocalDir
    +
    +      // Create the subdirectory if it doesn't already exist
    +      val subDir = subDirs(dirId).synchronized {
    +        val old = subDirs(dirId)(subDirId)
    +        if (old != null) {
    +          old
    +        } else {
    +          val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
    +          if (!newDir.exists() && !newDir.mkdir()) {
    +            throw new IOException(s"Failed to create local dir in $newDir.")
    +          }
    +          subDirs(dirId)(subDirId) = newDir
    +          newDir
    +        }
    +      }
    +
    +      new File(subDir, filename)
    +    }
    +  }
    +
       /** Looks up a file by hashing it into one of our local subdirectories. */
       // This method should be kept in sync with
       // org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getFile().
    -  def getFile(filename: String): File = {
    -    // Figure out which local directory it hashes to, and which subdirectory in that
    -    val hash = Utils.nonNegativeHash(filename)
    -    val dirId = hash % localDirs.length
    -    val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
    -
    -    // Create the subdirectory if it doesn't already exist
    -    val subDir = subDirs(dirId).synchronized {
    -      val old = subDirs(dirId)(subDirId)
    -      if (old != null) {
    -        old
    -      } else {
    -        val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
    -        if (!newDir.exists() && !newDir.mkdir()) {
    -          throw new IOException(s"Failed to create local dir in $newDir.")
    -        }
    -        subDirs(dirId)(subDirId) = newDir
    -        newDir
    +  private object hashAllocator extends FileAllocationStrategy {
    +    def apply(filename: String): File = getFile(filename, localDirs)
    +  }
    +
    +  /** Looks up a file by hierarchy way in different speed storage devices. */
    +  private val hierarchyStore = conf.getOption("spark.storage.hierarchyStore")
    +  private class HierarchyAllocator extends FileAllocationStrategy {
    +    case class LayerInfo(key: String, threshold: Long, dirs: Array[File])
    +    val hsSpecs: Array[(String, Long)] =
    +      // e.g.: hierarchyStore = "ssd 200GB, hdd 100GB"
    +      hierarchyStore.get.trim.split(",").map {
    +        s => val x = s.trim.split(" +")
    +          (x(0).toLowerCase, Utils.byteStringAsBytes(x(1)))
           }
    +    val hsLayers: Array[LayerInfo] = hsSpecs.map(
    +      s => LayerInfo(s._1, s._2, localDirs.filter(_.getPath.toLowerCase.containsSlice(s._1)))
    +    )
    +    val lastLayerDirs = localDirs.filter(dir => !hsLayers.exists(_.dirs.contains(dir)))
    +    val allLayers: Array[LayerInfo] = hsLayers :+
    +      LayerInfo("Last Storage", 10.toLong, lastLayerDirs)
    +    val finalLayers: Array[LayerInfo] = allLayers.filter(_.dirs.nonEmpty)
    +    logInfo("Hierarchy store info:")
    +    for (layer <- finalLayers) {
    +      logInfo("Layer: %s, Threshold: %s".format(layer.key, Utils.bytesToString(layer.threshold)))
    +      layer.dirs.foreach { dir => logInfo("\t%s".format(dir.getCanonicalPath)) }
         }
     
    -    new File(subDir, filename)
    +    def apply(filename: String): File = {
    +      var availableFile: File = null
    +      for (layer <- finalLayers) {
    --- End diff --
    
    The `getFile` call below will create local dir even the filename doesn't exist. So you will create useless local dirs for other laters even you already get `availableFile`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #10225: [SPARK-12196][Core] Store/retrieve blocks from different...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/10225
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12196][Core] Store blocks in storage de...

Posted by chenghao-intel <gi...@git.apache.org>.
Github user chenghao-intel commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10225#discussion_r47095121
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala ---
    @@ -80,6 +77,65 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
         new File(subDir, filename)
       }
     
    +  def getFileDefault(filename: String): File = {
    +    _getFile(filename, localDirs)
    +  }
    +  var getFile = getFileDefault _
    +
    +  private val hierarchyStore = conf.getOption("spark.storage.hierarchyStore")
    +  if (hierarchyStore.isDefined) {
    +    val HSLayers: Array[(String, Long)] =
    +      hierarchyStore.get.trim.split(",").map {
    +        s => val x = s.trim.split(" +")
    +          (x(0), Utils.byteStringAsGb(x(1)))
    --- End diff --
    
    I am wondering if we can get the free space via API, not the configuration. e.g.
    ```scala
    val file = new File(xxx)
    file.getFreeSpace()
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12196][Core] Store blocks in storage de...

Posted by chenghao-intel <gi...@git.apache.org>.
Github user chenghao-intel commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10225#discussion_r47094903
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala ---
    @@ -80,6 +77,65 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
         new File(subDir, filename)
       }
     
    +  def getFileDefault(filename: String): File = {
    +    _getFile(filename, localDirs)
    +  }
    +  var getFile = getFileDefault _
    --- End diff --
    
    val getFile: String => File = conf.getOption("spark.storage.hierarachyStore") match {
      case Some(fastDevices) => ...
                                                 getFileHierarchy _
      case None => _getFile _
    }



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #10225: [SPARK-12196][Core] Store/retrieve blocks from different...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/10225
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #10225: [SPARK-12196][Core] Store/retrieve blocks from di...

Posted by yucai <gi...@git.apache.org>.
Github user yucai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10225#discussion_r77480256
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala ---
    @@ -50,33 +50,98 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
     
       private val shutdownHook = addShutdownHook()
     
    +  private abstract class FileAllocationStrategy {
    +    def apply(filename: String): File
    +
    +    protected def getFile(filename: String, storageDirs: Array[File]): File = {
    +      require(storageDirs.nonEmpty, "could not find file when the directories are empty")
    +
    +      // Figure out which local directory it hashes to, and which subdirectory in that
    +      val hash = Utils.nonNegativeHash(filename)
    +      val dirId = localDirs.indexOf(storageDirs(hash % storageDirs.length))
    +      val subDirId = (hash / storageDirs.length) % subDirsPerLocalDir
    +
    +      // Create the subdirectory if it doesn't already exist
    +      val subDir = subDirs(dirId).synchronized {
    +        val old = subDirs(dirId)(subDirId)
    +        if (old != null) {
    +          old
    +        } else {
    +          val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
    +          if (!newDir.exists() && !newDir.mkdir()) {
    +            throw new IOException(s"Failed to create local dir in $newDir.")
    +          }
    +          subDirs(dirId)(subDirId) = newDir
    +          newDir
    +        }
    +      }
    +
    +      new File(subDir, filename)
    +    }
    +  }
    +
       /** Looks up a file by hashing it into one of our local subdirectories. */
       // This method should be kept in sync with
       // org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getFile().
    -  def getFile(filename: String): File = {
    -    // Figure out which local directory it hashes to, and which subdirectory in that
    -    val hash = Utils.nonNegativeHash(filename)
    -    val dirId = hash % localDirs.length
    -    val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
    -
    -    // Create the subdirectory if it doesn't already exist
    -    val subDir = subDirs(dirId).synchronized {
    -      val old = subDirs(dirId)(subDirId)
    -      if (old != null) {
    -        old
    -      } else {
    -        val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
    -        if (!newDir.exists() && !newDir.mkdir()) {
    -          throw new IOException(s"Failed to create local dir in $newDir.")
    -        }
    -        subDirs(dirId)(subDirId) = newDir
    -        newDir
    +  private object hashAllocator extends FileAllocationStrategy {
    +    def apply(filename: String): File = getFile(filename, localDirs)
    +  }
    +
    +  /** Looks up a file by hierarchy way in different speed storage devices. */
    +  private val hierarchy = conf.getOption("spark.diskStore.hierarchy")
    +  private class HierarchyAllocator extends FileAllocationStrategy {
    +    case class Level(key: String, threshold: Long, dirs: Array[File])
    +    val hsDescs: Array[(String, Long)] =
    +      // e.g.: hierarchy = "ram_disk 1GB, ssd 20GB"
    +      hierarchy.get.trim.split(",").map {
    +        s => val x = s.trim.split(" +")
    +          val storage = x(0).toLowerCase
    +          val threshold = s.replaceFirst(x(0), "").trim
    +          (storage, Utils.byteStringAsBytes(threshold))
    --- End diff --
    
    Ok, thanks, I have updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [WIP][SPARK-12196][Core] Store/retrieve blocks...

Posted by chenghao-intel <gi...@git.apache.org>.
Github user chenghao-intel commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10225#discussion_r48699693
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala ---
    @@ -53,35 +53,98 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
     
       private val shutdownHook = addShutdownHook()
     
    +  private abstract class FileAllocationStrategy {
    +    def apply(filename: String): File
    +
    +    protected def getFile(filename: String, storageDirs: Array[File]): File = {
    +      require(storageDirs.nonEmpty, "could not find file when the directories are empty")
    +
    +      // Figure out which local directory it hashes to, and which subdirectory in that
    +      val hash = Utils.nonNegativeHash(filename)
    +      val dirId = localDirs.indexOf(storageDirs(hash % storageDirs.length))
    +      val subDirId = (hash / storageDirs.length) % subDirsPerLocalDir
    +
    +      // Create the subdirectory if it doesn't already exist
    +      val subDir = subDirs(dirId).synchronized {
    +        val old = subDirs(dirId)(subDirId)
    +        if (old != null) {
    +          old
    +        } else {
    +          val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
    +          if (!newDir.exists() && !newDir.mkdir()) {
    +            throw new IOException(s"Failed to create local dir in $newDir.")
    +          }
    +          subDirs(dirId)(subDirId) = newDir
    +          newDir
    +        }
    +      }
    +
    +      new File(subDir, filename)
    +    }
    +  }
    +
       /** Looks up a file by hashing it into one of our local subdirectories. */
       // This method should be kept in sync with
       // org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getFile().
    --- End diff --
    
    We need to sync up the `org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getFile()`, so this feature can also be used within the external shuffle service in dynamic allocation of yarn.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #10225: [SPARK-12196][Core] Store/retrieve blocks in diff...

Posted by yucai <gi...@git.apache.org>.
Github user yucai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10225#discussion_r76935911
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala ---
    @@ -50,35 +50,98 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
     
       private val shutdownHook = addShutdownHook()
     
    +  private abstract class FileAllocationStrategy {
    +    def apply(filename: String): File
    +
    +    protected def getFile(filename: String, storageDirs: Array[File]): File = {
    +      require(storageDirs.nonEmpty, "could not find file when the directories are empty")
    +
    +      // Figure out which local directory it hashes to, and which subdirectory in that
    +      val hash = Utils.nonNegativeHash(filename)
    +      val dirId = localDirs.indexOf(storageDirs(hash % storageDirs.length))
    +      val subDirId = (hash / storageDirs.length) % subDirsPerLocalDir
    +
    +      // Create the subdirectory if it doesn't already exist
    +      val subDir = subDirs(dirId).synchronized {
    +        val old = subDirs(dirId)(subDirId)
    +        if (old != null) {
    +          old
    +        } else {
    +          val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
    +          if (!newDir.exists() && !newDir.mkdir()) {
    +            throw new IOException(s"Failed to create local dir in $newDir.")
    +          }
    +          subDirs(dirId)(subDirId) = newDir
    +          newDir
    +        }
    +      }
    +
    +      new File(subDir, filename)
    +    }
    +  }
    +
       /** Looks up a file by hashing it into one of our local subdirectories. */
       // This method should be kept in sync with
       // org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getFile().
    -  def getFile(filename: String): File = {
    -    // Figure out which local directory it hashes to, and which subdirectory in that
    -    val hash = Utils.nonNegativeHash(filename)
    -    val dirId = hash % localDirs.length
    -    val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
    -
    -    // Create the subdirectory if it doesn't already exist
    -    val subDir = subDirs(dirId).synchronized {
    -      val old = subDirs(dirId)(subDirId)
    -      if (old != null) {
    -        old
    -      } else {
    -        val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
    -        if (!newDir.exists() && !newDir.mkdir()) {
    -          throw new IOException(s"Failed to create local dir in $newDir.")
    -        }
    -        subDirs(dirId)(subDirId) = newDir
    -        newDir
    +  private object hashAllocator extends FileAllocationStrategy {
    +    def apply(filename: String): File = getFile(filename, localDirs)
    +  }
    +
    +  /** Looks up a file by hierarchy way in different speed storage devices. */
    +  private val hierarchyStore = conf.getOption("spark.storage.hierarchyStore")
    +  private class HierarchyAllocator extends FileAllocationStrategy {
    +    case class LayerInfo(key: String, threshold: Long, dirs: Array[File])
    +    val hsSpecs: Array[(String, Long)] =
    +      // e.g.: hierarchyStore = "ssd 200GB, hdd 100GB"
    +      hierarchyStore.get.trim.split(",").map {
    +        s => val x = s.trim.split(" +")
    +          (x(0).toLowerCase, Utils.byteStringAsBytes(x(1)))
           }
    +    val hsLayers: Array[LayerInfo] = hsSpecs.map(
    +      s => LayerInfo(s._1, s._2, localDirs.filter(_.getPath.toLowerCase.containsSlice(s._1)))
    +    )
    +    val lastLayerDirs = localDirs.filter(dir => !hsLayers.exists(_.dirs.contains(dir)))
    +    val allLayers: Array[LayerInfo] = hsLayers :+
    +      LayerInfo("Last Storage", 10.toLong, lastLayerDirs)
    +    val finalLayers: Array[LayerInfo] = allLayers.filter(_.dirs.nonEmpty)
    +    logInfo("Hierarchy store info:")
    +    for (layer <- finalLayers) {
    +      logInfo("Layer: %s, Threshold: %s".format(layer.key, Utils.bytesToString(layer.threshold)))
    +      layer.dirs.foreach { dir => logInfo("\t%s".format(dir.getCanonicalPath)) }
         }
     
    -    new File(subDir, filename)
    +    def apply(filename: String): File = {
    +      var availableFile: File = null
    +      for (layer <- finalLayers) {
    --- End diff --
    
    @viirya , much thanks for review!
    But I do return immediately once the file directory has been created in any hierarchy layer before.
    ```
            if (file.exists()) return file
    ```
    And if the file directory has not been created yet, here I need a for-loop to go through all hierarchy layers, and then I will return the fastest layer which has enough space also. Like below:
    ```
            if (availableFile == null && file.getParentFile.getUsableSpace >= layer.threshold) {
              availableFile = file
            }
    ```
    How do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #10225: [SPARK-12196][Core] Store/retrieve blocks in diff...

Posted by yucai <gi...@git.apache.org>.
Github user yucai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10225#discussion_r77473963
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala ---
    @@ -50,33 +50,98 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
     
       private val shutdownHook = addShutdownHook()
     
    +  private abstract class FileAllocationStrategy {
    +    def apply(filename: String): File
    +
    +    protected def getFile(filename: String, storageDirs: Array[File]): File = {
    +      require(storageDirs.nonEmpty, "could not find file when the directories are empty")
    +
    +      // Figure out which local directory it hashes to, and which subdirectory in that
    +      val hash = Utils.nonNegativeHash(filename)
    +      val dirId = localDirs.indexOf(storageDirs(hash % storageDirs.length))
    +      val subDirId = (hash / storageDirs.length) % subDirsPerLocalDir
    +
    +      // Create the subdirectory if it doesn't already exist
    +      val subDir = subDirs(dirId).synchronized {
    +        val old = subDirs(dirId)(subDirId)
    +        if (old != null) {
    +          old
    +        } else {
    +          val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
    +          if (!newDir.exists() && !newDir.mkdir()) {
    +            throw new IOException(s"Failed to create local dir in $newDir.")
    +          }
    +          subDirs(dirId)(subDirId) = newDir
    +          newDir
    +        }
    +      }
    +
    +      new File(subDir, filename)
    +    }
    +  }
    +
       /** Looks up a file by hashing it into one of our local subdirectories. */
       // This method should be kept in sync with
       // org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getFile().
    -  def getFile(filename: String): File = {
    -    // Figure out which local directory it hashes to, and which subdirectory in that
    -    val hash = Utils.nonNegativeHash(filename)
    -    val dirId = hash % localDirs.length
    -    val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
    -
    -    // Create the subdirectory if it doesn't already exist
    -    val subDir = subDirs(dirId).synchronized {
    -      val old = subDirs(dirId)(subDirId)
    -      if (old != null) {
    -        old
    -      } else {
    -        val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
    -        if (!newDir.exists() && !newDir.mkdir()) {
    -          throw new IOException(s"Failed to create local dir in $newDir.")
    -        }
    -        subDirs(dirId)(subDirId) = newDir
    -        newDir
    +  private object hashAllocator extends FileAllocationStrategy {
    +    def apply(filename: String): File = getFile(filename, localDirs)
    +  }
    +
    +  /** Looks up a file by hierarchy way in different speed storage devices. */
    +  private val hierarchy = conf.getOption("spark.diskStore.hierarchy")
    +  private class HierarchyAllocator extends FileAllocationStrategy {
    +    case class Level(key: String, threshold: Long, dirs: Array[File])
    +    val hsDescs: Array[(String, Long)] =
    +      // e.g.: hierarchy = "ram_disk 1GB, ssd 20GB"
    +      hierarchy.get.trim.split(",").map {
    +        s => val x = s.trim.split(" +")
    +          val storage = x(0).toLowerCase
    +          val threshold = s.replaceFirst(x(0), "").trim
    +          (storage, Utils.byteStringAsBytes(threshold))
    --- End diff --
    
    @viirya I update the code here, now if the "spark.diskStore.hierarchy" is not specified correctly, runtime exception will fail the Spark's startup directly. Does it make sense to you?
    
    It is the same to other setting's behavior, like "spark.executor.memory", Spark does not handle its exception also.
    ```
        if (conf.contains("spark.executor.memory")) {
          val executorMemory = conf.getSizeAsBytes("spark.executor.memory")
          if (executorMemory < MIN_MEMORY_BYTES) {
            throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " +
              s"$MIN_MEMORY_BYTES. Please increase executor memory using the " +
              s"--executor-memory option or spark.executor.memory in Spark configuration.")
          }
        }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12196][Core] Store blocks in storage de...

Posted by chenghao-intel <gi...@git.apache.org>.
Github user chenghao-intel commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10225#discussion_r47096208
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala ---
    @@ -80,6 +77,65 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
         new File(subDir, filename)
       }
     
    +  def getFileDefault(filename: String): File = {
    +    _getFile(filename, localDirs)
    +  }
    +  var getFile = getFileDefault _
    +
    +  private val hierarchyStore = conf.getOption("spark.storage.hierarchyStore")
    +  if (hierarchyStore.isDefined) {
    +    val HSLayers: Array[(String, Long)] =
    +      hierarchyStore.get.trim.split(",").map {
    +        s => val x = s.trim.split(" +")
    +          (x(0), Utils.byteStringAsGb(x(1)))
    +      }
    +    val HSLayerDirs: Array[Array[File]] = HSLayers.map(
    +      s => localDirs.filter(_.getPath().toLowerCase().containsSlice(s._1))
    +    )
    +    val lastLayer = localDirs.filter {
    +      dir => var found = false
    +        for(dirs <- HSLayerDirs if !found)
    +          if (dirs.contains(dir)) found = true
    +        !found
    +    }
    +    val (finalHSLayers, finalHSLayerDirs) = if (!lastLayer.isEmpty) {
    +      (HSLayers :+ ("Backup Storage", 0.toLong), HSLayerDirs :+ lastLayer)
    +    } else {
    +      val len = HSLayers.length
    +      (HSLayers.take(len-1) :+ (HSLayers(len-1)._1, -1.toLong), HSLayerDirs)
    +    }
    +    for (i <- 0 until finalHSLayers.length) {
    +      val s = finalHSLayers(i)
    +      logInfo("Layer: %s, Threshold: %dGB".format(s._1, s._2))
    +      finalHSLayerDirs(i).foreach {
    +        x => logInfo("\t%s".format(x.getPath()))
    +      }
    +    }
    +
    +    def getFileHierarchy(filename: String): File = {
    +      // serach existing file in each layer
    --- End diff --
    
    typo `search`, not `serach`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [WIP][SPARK-12196][Core] Store blocks in stora...

Posted by yucai <gi...@git.apache.org>.
Github user yucai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10225#discussion_r47181555
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala ---
    @@ -80,6 +77,65 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
         new File(subDir, filename)
       }
     
    +  def getFileDefault(filename: String): File = {
    +    _getFile(filename, localDirs)
    +  }
    +  var getFile = getFileDefault _
    +
    +  private val hierarchyStore = conf.getOption("spark.storage.hierarchyStore")
    +  if (hierarchyStore.isDefined) {
    +    val HSLayers: Array[(String, Long)] =
    +      hierarchyStore.get.trim.split(",").map {
    +        s => val x = s.trim.split(" +")
    +          (x(0), Utils.byteStringAsGb(x(1)))
    --- End diff --
    
    This number is the *threshold* before dropping into the next layer. 
    We do get free space by using getFreeSpace method, like in getFileHierarchy.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12196][Core] Store blocks in storage de...

Posted by chenghao-intel <gi...@git.apache.org>.
Github user chenghao-intel commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10225#discussion_r47095159
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala ---
    @@ -80,6 +77,65 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
         new File(subDir, filename)
       }
     
    +  def getFileDefault(filename: String): File = {
    +    _getFile(filename, localDirs)
    +  }
    +  var getFile = getFileDefault _
    +
    +  private val hierarchyStore = conf.getOption("spark.storage.hierarchyStore")
    +  if (hierarchyStore.isDefined) {
    +    val HSLayers: Array[(String, Long)] =
    --- End diff --
    
    `HSLayers` => `hsLayers`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12196][Core] Store blocks in storage de...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10225#issuecomment-163224930
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [WIP][SPARK-12196][Core] Store/retrieve blocks...

Posted by chenghao-intel <gi...@git.apache.org>.
Github user chenghao-intel commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10225#discussion_r48699717
  
    --- Diff: docs/configuration.md ---
    @@ -926,6 +926,21 @@ Apart from these, the following properties are also available, and may be useful
       </td>
     </tr>
     <tr>
    +  <td><code>spark.storage.hierarchyStore</code></td>
    +  <td>(none)</td>
    +  <td>
    +     Store blocks in different speed storage devices by hierarchy way.<br />
    +     For example:<br />
    +     <code>nvm 40GB, ssd 20GB</code><br />
    +     It means the fastest is nvm(threshold 40GB), lower layer is ssd(threshold 20GB), all the rest are the final layer.
    +     The threshold means when the device's usable space is less than it, dropping into the next layer.<br />
    +     And then, You need configure "nvm" and "ssd" location in local dirs.<br />
    +     For example in Standalone, Mesos: <br />
    +     <code>spark.local.dir=/mnt/nvm1,/mnt/ssd1,/mnt/ssd2,/mnt/others</code><br />
    +     In Yarn, refer to the its document to configure yarn.nodemanager.local-dirs<br />
    --- End diff --
    
    Probably we need to emphasize that we need to put the option `spark.storage.hierarchyStore` into `yarn-site.xml` as well in yarn mode.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12196][Core] Store blocks in storage de...

Posted by chenghao-intel <gi...@git.apache.org>.
Github user chenghao-intel commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10225#discussion_r47096750
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala ---
    @@ -80,6 +77,65 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
         new File(subDir, filename)
       }
     
    +  def getFileDefault(filename: String): File = {
    +    _getFile(filename, localDirs)
    +  }
    +  var getFile = getFileDefault _
    +
    +  private val hierarchyStore = conf.getOption("spark.storage.hierarchyStore")
    +  if (hierarchyStore.isDefined) {
    +    val HSLayers: Array[(String, Long)] =
    +      hierarchyStore.get.trim.split(",").map {
    +        s => val x = s.trim.split(" +")
    +          (x(0), Utils.byteStringAsGb(x(1)))
    +      }
    +    val HSLayerDirs: Array[Array[File]] = HSLayers.map(
    +      s => localDirs.filter(_.getPath().toLowerCase().containsSlice(s._1))
    +    )
    +    val lastLayer = localDirs.filter {
    +      dir => var found = false
    +        for(dirs <- HSLayerDirs if !found)
    +          if (dirs.contains(dir)) found = true
    +        !found
    +    }
    +    val (finalHSLayers, finalHSLayerDirs) = if (!lastLayer.isEmpty) {
    +      (HSLayers :+ ("Backup Storage", 0.toLong), HSLayerDirs :+ lastLayer)
    +    } else {
    +      val len = HSLayers.length
    +      (HSLayers.take(len-1) :+ (HSLayers(len-1)._1, -1.toLong), HSLayerDirs)
    --- End diff --
    
    Nit: `len-1` => `len - 1`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #10225: [SPARK-12196][Core] Store/retrieve blocks in diff...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10225#discussion_r76922041
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala ---
    @@ -50,35 +50,98 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
     
       private val shutdownHook = addShutdownHook()
     
    +  private abstract class FileAllocationStrategy {
    +    def apply(filename: String): File
    +
    +    protected def getFile(filename: String, storageDirs: Array[File]): File = {
    +      require(storageDirs.nonEmpty, "could not find file when the directories are empty")
    +
    +      // Figure out which local directory it hashes to, and which subdirectory in that
    +      val hash = Utils.nonNegativeHash(filename)
    +      val dirId = localDirs.indexOf(storageDirs(hash % storageDirs.length))
    +      val subDirId = (hash / storageDirs.length) % subDirsPerLocalDir
    +
    +      // Create the subdirectory if it doesn't already exist
    +      val subDir = subDirs(dirId).synchronized {
    +        val old = subDirs(dirId)(subDirId)
    +        if (old != null) {
    +          old
    +        } else {
    +          val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
    +          if (!newDir.exists() && !newDir.mkdir()) {
    +            throw new IOException(s"Failed to create local dir in $newDir.")
    +          }
    +          subDirs(dirId)(subDirId) = newDir
    +          newDir
    +        }
    +      }
    +
    +      new File(subDir, filename)
    +    }
    +  }
    +
       /** Looks up a file by hashing it into one of our local subdirectories. */
       // This method should be kept in sync with
       // org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getFile().
    -  def getFile(filename: String): File = {
    -    // Figure out which local directory it hashes to, and which subdirectory in that
    -    val hash = Utils.nonNegativeHash(filename)
    -    val dirId = hash % localDirs.length
    -    val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
    -
    -    // Create the subdirectory if it doesn't already exist
    -    val subDir = subDirs(dirId).synchronized {
    -      val old = subDirs(dirId)(subDirId)
    -      if (old != null) {
    -        old
    -      } else {
    -        val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
    -        if (!newDir.exists() && !newDir.mkdir()) {
    -          throw new IOException(s"Failed to create local dir in $newDir.")
    -        }
    -        subDirs(dirId)(subDirId) = newDir
    -        newDir
    +  private object hashAllocator extends FileAllocationStrategy {
    +    def apply(filename: String): File = getFile(filename, localDirs)
    +  }
    +
    +  /** Looks up a file by hierarchy way in different speed storage devices. */
    +  private val hierarchyStore = conf.getOption("spark.storage.hierarchyStore")
    +  private class HierarchyAllocator extends FileAllocationStrategy {
    +    case class LayerInfo(key: String, threshold: Long, dirs: Array[File])
    +    val hsSpecs: Array[(String, Long)] =
    +      // e.g.: hierarchyStore = "ssd 200GB, hdd 100GB"
    +      hierarchyStore.get.trim.split(",").map {
    +        s => val x = s.trim.split(" +")
    +          (x(0).toLowerCase, Utils.byteStringAsBytes(x(1)))
    --- End diff --
    
    It is better to add error handling here to prevent wrong format.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #10225: [SPARK-12196][Core] Store/retrieve blocks in diff...

Posted by yucai <gi...@git.apache.org>.
Github user yucai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10225#discussion_r76942560
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala ---
    @@ -50,35 +50,98 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
     
       private val shutdownHook = addShutdownHook()
     
    +  private abstract class FileAllocationStrategy {
    +    def apply(filename: String): File
    +
    +    protected def getFile(filename: String, storageDirs: Array[File]): File = {
    +      require(storageDirs.nonEmpty, "could not find file when the directories are empty")
    +
    +      // Figure out which local directory it hashes to, and which subdirectory in that
    +      val hash = Utils.nonNegativeHash(filename)
    +      val dirId = localDirs.indexOf(storageDirs(hash % storageDirs.length))
    +      val subDirId = (hash / storageDirs.length) % subDirsPerLocalDir
    +
    +      // Create the subdirectory if it doesn't already exist
    +      val subDir = subDirs(dirId).synchronized {
    +        val old = subDirs(dirId)(subDirId)
    +        if (old != null) {
    +          old
    +        } else {
    +          val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
    +          if (!newDir.exists() && !newDir.mkdir()) {
    +            throw new IOException(s"Failed to create local dir in $newDir.")
    +          }
    +          subDirs(dirId)(subDirId) = newDir
    +          newDir
    +        }
    +      }
    +
    +      new File(subDir, filename)
    +    }
    +  }
    +
       /** Looks up a file by hashing it into one of our local subdirectories. */
       // This method should be kept in sync with
       // org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getFile().
    -  def getFile(filename: String): File = {
    -    // Figure out which local directory it hashes to, and which subdirectory in that
    -    val hash = Utils.nonNegativeHash(filename)
    -    val dirId = hash % localDirs.length
    -    val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
    -
    -    // Create the subdirectory if it doesn't already exist
    -    val subDir = subDirs(dirId).synchronized {
    -      val old = subDirs(dirId)(subDirId)
    -      if (old != null) {
    -        old
    -      } else {
    -        val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
    -        if (!newDir.exists() && !newDir.mkdir()) {
    -          throw new IOException(s"Failed to create local dir in $newDir.")
    -        }
    -        subDirs(dirId)(subDirId) = newDir
    -        newDir
    +  private object hashAllocator extends FileAllocationStrategy {
    +    def apply(filename: String): File = getFile(filename, localDirs)
    +  }
    +
    +  /** Looks up a file by hierarchy way in different speed storage devices. */
    +  private val hierarchyStore = conf.getOption("spark.storage.hierarchyStore")
    +  private class HierarchyAllocator extends FileAllocationStrategy {
    +    case class LayerInfo(key: String, threshold: Long, dirs: Array[File])
    +    val hsSpecs: Array[(String, Long)] =
    +      // e.g.: hierarchyStore = "ssd 200GB, hdd 100GB"
    +      hierarchyStore.get.trim.split(",").map {
    +        s => val x = s.trim.split(" +")
    +          (x(0).toLowerCase, Utils.byteStringAsBytes(x(1)))
           }
    +    val hsLayers: Array[LayerInfo] = hsSpecs.map(
    +      s => LayerInfo(s._1, s._2, localDirs.filter(_.getPath.toLowerCase.containsSlice(s._1)))
    +    )
    +    val lastLayerDirs = localDirs.filter(dir => !hsLayers.exists(_.dirs.contains(dir)))
    +    val allLayers: Array[LayerInfo] = hsLayers :+
    +      LayerInfo("Last Storage", 10.toLong, lastLayerDirs)
    +    val finalLayers: Array[LayerInfo] = allLayers.filter(_.dirs.nonEmpty)
    +    logInfo("Hierarchy store info:")
    +    for (layer <- finalLayers) {
    +      logInfo("Layer: %s, Threshold: %s".format(layer.key, Utils.bytesToString(layer.threshold)))
    +      layer.dirs.foreach { dir => logInfo("\t%s".format(dir.getCanonicalPath)) }
         }
     
    -    new File(subDir, filename)
    +    def apply(filename: String): File = {
    +      var availableFile: File = null
    +      for (layer <- finalLayers) {
    --- End diff --
    
    ```getFile``` only returns File object instead of creating local dir, and then I need use ```file.exists()``` to verify if this directory exists actually.
    ```
            val file = getFile(filename, layer.dirs)
            if (file.exists()) return file
    ```
    Otherwise, it will always return in the 1st loop.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #10225: [SPARK-12196][Core] Store/retrieve blocks from di...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10225#discussion_r77479267
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala ---
    @@ -50,33 +50,98 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
     
       private val shutdownHook = addShutdownHook()
     
    +  private abstract class FileAllocationStrategy {
    +    def apply(filename: String): File
    +
    +    protected def getFile(filename: String, storageDirs: Array[File]): File = {
    +      require(storageDirs.nonEmpty, "could not find file when the directories are empty")
    +
    +      // Figure out which local directory it hashes to, and which subdirectory in that
    +      val hash = Utils.nonNegativeHash(filename)
    +      val dirId = localDirs.indexOf(storageDirs(hash % storageDirs.length))
    +      val subDirId = (hash / storageDirs.length) % subDirsPerLocalDir
    +
    +      // Create the subdirectory if it doesn't already exist
    +      val subDir = subDirs(dirId).synchronized {
    +        val old = subDirs(dirId)(subDirId)
    +        if (old != null) {
    +          old
    +        } else {
    +          val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
    +          if (!newDir.exists() && !newDir.mkdir()) {
    +            throw new IOException(s"Failed to create local dir in $newDir.")
    +          }
    +          subDirs(dirId)(subDirId) = newDir
    +          newDir
    +        }
    +      }
    +
    +      new File(subDir, filename)
    +    }
    +  }
    +
       /** Looks up a file by hashing it into one of our local subdirectories. */
       // This method should be kept in sync with
       // org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getFile().
    -  def getFile(filename: String): File = {
    -    // Figure out which local directory it hashes to, and which subdirectory in that
    -    val hash = Utils.nonNegativeHash(filename)
    -    val dirId = hash % localDirs.length
    -    val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
    -
    -    // Create the subdirectory if it doesn't already exist
    -    val subDir = subDirs(dirId).synchronized {
    -      val old = subDirs(dirId)(subDirId)
    -      if (old != null) {
    -        old
    -      } else {
    -        val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
    -        if (!newDir.exists() && !newDir.mkdir()) {
    -          throw new IOException(s"Failed to create local dir in $newDir.")
    -        }
    -        subDirs(dirId)(subDirId) = newDir
    -        newDir
    +  private object hashAllocator extends FileAllocationStrategy {
    +    def apply(filename: String): File = getFile(filename, localDirs)
    +  }
    +
    +  /** Looks up a file by hierarchy way in different speed storage devices. */
    +  private val hierarchy = conf.getOption("spark.diskStore.hierarchy")
    +  private class HierarchyAllocator extends FileAllocationStrategy {
    +    case class Level(key: String, threshold: Long, dirs: Array[File])
    +    val hsDescs: Array[(String, Long)] =
    +      // e.g.: hierarchy = "ram_disk 1GB, ssd 20GB"
    +      hierarchy.get.trim.split(",").map {
    +        s => val x = s.trim.split(" +")
    +          val storage = x(0).toLowerCase
    +          val threshold = s.replaceFirst(x(0), "").trim
    +          (storage, Utils.byteStringAsBytes(threshold))
    --- End diff --
    
    style:
    
        hierarchy.get.trim.split(",").map { s =>
          val x = s.trim.split(" +")
          val storage = x(0).toLowerCase
        


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #10225: [SPARK-12196][Core] Store/retrieve blocks from different...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/10225
  
    **[Test build #97761 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/97761/testReport)** for PR 10225 at commit [`507c506`](https://github.com/apache/spark/commit/507c506d1d8cdc1674bece4e4d149334b4ccf8b0).
     * This patch **fails Spark unit tests**.
     * This patch **does not merge cleanly**.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #10225: [SPARK-12196][Core] Store/retrieve blocks from different...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/10225
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #10225: [SPARK-12196][Core] Store/retrieve blocks from di...

Posted by yucai <gi...@git.apache.org>.
Github user yucai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10225#discussion_r77478689
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala ---
    @@ -50,33 +50,98 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
     
       private val shutdownHook = addShutdownHook()
     
    +  private abstract class FileAllocationStrategy {
    +    def apply(filename: String): File
    +
    +    protected def getFile(filename: String, storageDirs: Array[File]): File = {
    +      require(storageDirs.nonEmpty, "could not find file when the directories are empty")
    +
    +      // Figure out which local directory it hashes to, and which subdirectory in that
    +      val hash = Utils.nonNegativeHash(filename)
    +      val dirId = localDirs.indexOf(storageDirs(hash % storageDirs.length))
    +      val subDirId = (hash / storageDirs.length) % subDirsPerLocalDir
    +
    +      // Create the subdirectory if it doesn't already exist
    +      val subDir = subDirs(dirId).synchronized {
    +        val old = subDirs(dirId)(subDirId)
    +        if (old != null) {
    +          old
    +        } else {
    +          val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
    +          if (!newDir.exists() && !newDir.mkdir()) {
    +            throw new IOException(s"Failed to create local dir in $newDir.")
    +          }
    +          subDirs(dirId)(subDirId) = newDir
    +          newDir
    +        }
    +      }
    +
    +      new File(subDir, filename)
    +    }
    +  }
    +
       /** Looks up a file by hashing it into one of our local subdirectories. */
       // This method should be kept in sync with
       // org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getFile().
    -  def getFile(filename: String): File = {
    -    // Figure out which local directory it hashes to, and which subdirectory in that
    -    val hash = Utils.nonNegativeHash(filename)
    -    val dirId = hash % localDirs.length
    -    val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
    -
    -    // Create the subdirectory if it doesn't already exist
    -    val subDir = subDirs(dirId).synchronized {
    -      val old = subDirs(dirId)(subDirId)
    -      if (old != null) {
    -        old
    -      } else {
    -        val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
    -        if (!newDir.exists() && !newDir.mkdir()) {
    -          throw new IOException(s"Failed to create local dir in $newDir.")
    -        }
    -        subDirs(dirId)(subDirId) = newDir
    -        newDir
    +  private object hashAllocator extends FileAllocationStrategy {
    +    def apply(filename: String): File = getFile(filename, localDirs)
    +  }
    +
    +  /** Looks up a file by hierarchy way in different speed storage devices. */
    +  private val hierarchy = conf.getOption("spark.diskStore.hierarchy")
    +  private class HierarchyAllocator extends FileAllocationStrategy {
    +    case class Level(key: String, threshold: Long, dirs: Array[File])
    +    val hsDescs: Array[(String, Long)] =
    +      // e.g.: hierarchy = "ram_disk 1GB, ssd 20GB"
    +      hierarchy.get.trim.split(",").map {
    +        s => val x = s.trim.split(" +")
    +          val storage = x(0).toLowerCase
    +          val threshold = s.replaceFirst(x(0), "").trim
    +          (storage, Utils.byteStringAsBytes(threshold))
    --- End diff --
    
    If user does not specify spark.diskStore.hierarchy correctly, runtime exception will be thrown by  Utils.byteStringAsBytes, is it OK?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [WIP][SPARK-12196][Core] Store blocks in stora...

Posted by yucai <gi...@git.apache.org>.
Github user yucai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10225#discussion_r47182068
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala ---
    @@ -80,6 +77,65 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
         new File(subDir, filename)
       }
     
    +  def getFileDefault(filename: String): File = {
    +    _getFile(filename, localDirs)
    +  }
    +  var getFile = getFileDefault _
    +
    +  private val hierarchyStore = conf.getOption("spark.storage.hierarchyStore")
    +  if (hierarchyStore.isDefined) {
    +    val HSLayers: Array[(String, Long)] =
    +      hierarchyStore.get.trim.split(",").map {
    +        s => val x = s.trim.split(" +")
    +          (x(0), Utils.byteStringAsGb(x(1)))
    +      }
    +    val HSLayerDirs: Array[Array[File]] = HSLayers.map(
    --- End diff --
    
    Sorry about misunderstanding! The devices in spark.storage.hierarchyStore are from the fastest to slowest. 
    Generally, user needs two steps to configure the hierarchy store.
    1. Setup the priority and threshold for each layer.
    ```
    spark.storage.hierarchyStore='nvm 50GB,ssd 80GB'
    ```
    It builds a 3 layers hierarchy store: the 1st is "nvm", the 2nd is "sdd", all the rest form the last layer.
    2. Configure each layer's location, user just needs put the keyword like "nvm", "ssd", which are specified in step 1 into directories.
    ```
    spark.local.dir=/mnt/nvm1,/mnt/ssd1,/mnt/ssd2,/mnt/ssd3,/mnt/disk1,/mnt/disk2,/mnt/disk3,/mnt/disk4,/mnt/others
    ```
    After then, restart your Spark application, it will allocate blocks from nvm first.
    When nvm's usable space is less than 50GB, it starts to allocate from ssd.
    When ssd's usable space is less than 80GB, it starts to allocate from the last layer.
    
    More details in https://issues.apache.org/jira/browse/SPARK-12196
    
    Is it still too confusing?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #10225: [SPARK-12196][Core] Store/retrieve blocks in diff...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10225#discussion_r77475503
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala ---
    @@ -50,33 +50,98 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
     
       private val shutdownHook = addShutdownHook()
     
    +  private abstract class FileAllocationStrategy {
    +    def apply(filename: String): File
    +
    +    protected def getFile(filename: String, storageDirs: Array[File]): File = {
    +      require(storageDirs.nonEmpty, "could not find file when the directories are empty")
    +
    +      // Figure out which local directory it hashes to, and which subdirectory in that
    +      val hash = Utils.nonNegativeHash(filename)
    +      val dirId = localDirs.indexOf(storageDirs(hash % storageDirs.length))
    +      val subDirId = (hash / storageDirs.length) % subDirsPerLocalDir
    +
    +      // Create the subdirectory if it doesn't already exist
    +      val subDir = subDirs(dirId).synchronized {
    +        val old = subDirs(dirId)(subDirId)
    +        if (old != null) {
    +          old
    +        } else {
    +          val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
    +          if (!newDir.exists() && !newDir.mkdir()) {
    +            throw new IOException(s"Failed to create local dir in $newDir.")
    +          }
    +          subDirs(dirId)(subDirId) = newDir
    +          newDir
    +        }
    +      }
    +
    +      new File(subDir, filename)
    +    }
    +  }
    +
       /** Looks up a file by hashing it into one of our local subdirectories. */
       // This method should be kept in sync with
       // org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getFile().
    -  def getFile(filename: String): File = {
    -    // Figure out which local directory it hashes to, and which subdirectory in that
    -    val hash = Utils.nonNegativeHash(filename)
    -    val dirId = hash % localDirs.length
    -    val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
    -
    -    // Create the subdirectory if it doesn't already exist
    -    val subDir = subDirs(dirId).synchronized {
    -      val old = subDirs(dirId)(subDirId)
    -      if (old != null) {
    -        old
    -      } else {
    -        val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
    -        if (!newDir.exists() && !newDir.mkdir()) {
    -          throw new IOException(s"Failed to create local dir in $newDir.")
    -        }
    -        subDirs(dirId)(subDirId) = newDir
    -        newDir
    +  private object hashAllocator extends FileAllocationStrategy {
    +    def apply(filename: String): File = getFile(filename, localDirs)
    +  }
    +
    +  /** Looks up a file by hierarchy way in different speed storage devices. */
    +  private val hierarchy = conf.getOption("spark.diskStore.hierarchy")
    +  private class HierarchyAllocator extends FileAllocationStrategy {
    +    case class Level(key: String, threshold: Long, dirs: Array[File])
    +    val hsDescs: Array[(String, Long)] =
    +      // e.g.: hierarchy = "ram_disk 1GB, ssd 20GB"
    +      hierarchy.get.trim.split(",").map {
    +        s => val x = s.trim.split(" +")
    +          val storage = x(0).toLowerCase
    +          val threshold = s.replaceFirst(x(0), "").trim
    +          (storage, Utils.byteStringAsBytes(threshold))
    --- End diff --
    
    Have you updated? I don't see the code to throw exception?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12196][Core] Store/retrieve blocks in d...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/10225#issuecomment-172720130
  
    Sorry we are a bit short on review bandwidth and this part of the code is going through some big refactorings to simplify things (see @JoshRosen's prs, and more to come), so this one has to wait.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [WIP][SPARK-12196][Core] Store/retrieve blocks...

Posted by yucai <gi...@git.apache.org>.
Github user yucai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10225#discussion_r48709207
  
    --- Diff: docs/configuration.md ---
    @@ -926,6 +926,21 @@ Apart from these, the following properties are also available, and may be useful
       </td>
     </tr>
     <tr>
    +  <td><code>spark.storage.hierarchyStore</code></td>
    +  <td>(none)</td>
    +  <td>
    +     Store blocks in different speed storage devices by hierarchy way.<br />
    +     For example:<br />
    +     <code>nvm 40GB, ssd 20GB</code><br />
    +     It means the fastest is nvm(threshold 40GB), lower layer is ssd(threshold 20GB), all the rest are the final layer.
    +     The threshold means when the device's usable space is less than it, dropping into the next layer.<br />
    +     And then, You need configure "nvm" and "ssd" location in local dirs.<br />
    +     For example in Standalone, Mesos: <br />
    +     <code>spark.local.dir=/mnt/nvm1,/mnt/ssd1,/mnt/ssd2,/mnt/others</code><br />
    +     In Yarn, refer to the its document to configure yarn.nodemanager.local-dirs<br />
    --- End diff --
    
    User does not need put ```spark.storage.hierarchyStore``` in ```yarn-site.xml```, they need configure ```yarn.nodemanager.local-dirs``` in it only.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [WIP][SPARK-12196][Core] Store/retrieve blocks...

Posted by yucai <gi...@git.apache.org>.
Github user yucai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10225#discussion_r48709047
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala ---
    @@ -53,35 +53,98 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
     
       private val shutdownHook = addShutdownHook()
     
    +  private abstract class FileAllocationStrategy {
    +    def apply(filename: String): File
    +
    +    protected def getFile(filename: String, storageDirs: Array[File]): File = {
    +      require(storageDirs.nonEmpty, "could not find file when the directories are empty")
    +
    +      // Figure out which local directory it hashes to, and which subdirectory in that
    +      val hash = Utils.nonNegativeHash(filename)
    +      val dirId = localDirs.indexOf(storageDirs(hash % storageDirs.length))
    +      val subDirId = (hash / storageDirs.length) % subDirsPerLocalDir
    +
    +      // Create the subdirectory if it doesn't already exist
    +      val subDir = subDirs(dirId).synchronized {
    +        val old = subDirs(dirId)(subDirId)
    +        if (old != null) {
    +          old
    +        } else {
    +          val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
    +          if (!newDir.exists() && !newDir.mkdir()) {
    +            throw new IOException(s"Failed to create local dir in $newDir.")
    +          }
    +          subDirs(dirId)(subDirId) = newDir
    +          newDir
    +        }
    +      }
    +
    +      new File(subDir, filename)
    +    }
    +  }
    +
       /** Looks up a file by hashing it into one of our local subdirectories. */
       // This method should be kept in sync with
       // org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getFile().
    --- End diff --
    
    Yes, we can enable the same in external shuffle service also.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #10225: [SPARK-12196][Core] Store/retrieve blocks from different...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/10225
  
    Build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #10225: [SPARK-12196][Core] Store/retrieve blocks from different...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/10225
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org