You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by jerryshao <gi...@git.apache.org> on 2017/10/12 02:08:55 UTC

[GitHub] spark pull request #19476: [SPARK-22062][CORE] Spill large block to disk in ...

GitHub user jerryshao opened a pull request:

    https://github.com/apache/spark/pull/19476

    [SPARK-22062][CORE] Spill large block to disk in BlockManager's remote fetch to avoid OOM

    ## What changes were proposed in this pull request?
    
    In the current BlockManager's `getRemoteBytes`, it will call `BlockTransferService#fetchBlockSync` to get remote block. In the `fetchBlockSync`, Spark will allocate a temporary `ByteBuffer` to store the whole fetched block. This will potentially lead to OOM if block size is too big or several blocks are fetched simultaneously in this executor.
    
    So here leveraging the idea of shuffle fetch, to spill the large block to local disk before consumed by upstream code. The behavior is controlled by newly added configuration, if block size is smaller than the threshold, then this block will be persisted in memory; otherwise it will first spill to disk, and then read from disk file.
    
    To achieve this feature, what I did is:
    
    1. Rename `TempShuffleFileManager` to `TempFileManager`, since now it is not only used by shuffle.
    2. Add a new `TempFileManager` to manage the files of fetched remote blocks, the files are tracked by weak reference, will be deleted when no use at all.
    
    ## How was this patch tested?
    
    This was tested by adding UT, also manual verification in local test to perform GC to clean the files.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/jerryshao/apache-spark SPARK-22062

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/19476.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #19476
    
----
commit f50a7b75c303bd2cf261dfb1b4fe74fa5498ca4b
Author: jerryshao <ss...@hortonworks.com>
Date:   2017-10-12T01:47:35Z

    Spill large blocks to disk during remote fetches in BlockManager

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19476: [SPARK-22062][CORE] Spill large block to disk in BlockMa...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/19476
  
    thanks, merging to master!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19476: [SPARK-22062][CORE] Spill large block to disk in ...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19476#discussion_r144775481
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -1552,4 +1582,65 @@ private[spark] object BlockManager {
         override val metricRegistry = new MetricRegistry
         metricRegistry.registerAll(metricSet)
       }
    +
    +  class RemoteBlockTempFileManager(blockManager: BlockManager)
    +      extends TempFileManager with Logging {
    +
    +    private class ReferenceWithCleanup(file: File, referenceQueue: JReferenceQueue[File])
    +        extends WeakReference[File](file, referenceQueue) {
    +      private val filePath = file.getAbsolutePath
    +
    +      def cleanUp(): Unit = {
    +        logDebug(s"Clean up file $filePath")
    +
    +        if (!new File(filePath).delete()) {
    +          logDebug(s"Fail to delete file $filePath")
    +        }
    +      }
    +    }
    --- End diff --
    
    My concern is that: for shuffle part, since there's a explicit API to `cleanup` temp files, so it's not so necessary to track again with weak reference. Also weak reference is triggered with GC, and shuffle operations are usually much more frequent and heavier, using weak reference to track temp shuffle files may increase the overhead of GC probably. Whereas, compared to shuffle, fetching remote blocks are happened occasionally when block is not cached in local, so using weak reference may not increase the overhead a lot.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19476: [SPARK-22062][CORE] Spill large block to disk in BlockMa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19476
  
    **[Test build #82653 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82653/testReport)** for PR 19476 at commit [`f50a7b7`](https://github.com/apache/spark/commit/f50a7b75c303bd2cf261dfb1b4fe74fa5498ca4b).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19476: [SPARK-22062][CORE] Spill large block to disk in BlockMa...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on the issue:

    https://github.com/apache/spark/pull/19476
  
    @cloud-fan @jiangxb1987 @jinxing64 would you please help to review when you have time, thanks!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19476: [SPARK-22062][CORE] Spill large block to disk in ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19476#discussion_r144871260
  
    --- Diff: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
    @@ -509,11 +508,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
         val bmId1 = BlockManagerId("id1", localHost, 1)
         val bmId2 = BlockManagerId("id2", localHost, 2)
         val bmId3 = BlockManagerId("id3", otherHost, 3)
    -    when(bmMaster.getLocations(mc.any[BlockId])).thenReturn(Seq(bmId1, bmId2, bmId3))
    --- End diff --
    
    why remove it? the `BlockManager#getLocations` still exist


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19476: [SPARK-22062][CORE] Spill large block to disk in ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19476#discussion_r144867101
  
    --- Diff: core/src/main/scala/org/apache/spark/SparkConf.scala ---
    @@ -662,7 +662,9 @@ private[spark] object SparkConf extends Logging {
         "spark.yarn.jars" -> Seq(
           AlternateConfig("spark.yarn.jar", "2.0")),
         "spark.yarn.access.hadoopFileSystems" -> Seq(
    -      AlternateConfig("spark.yarn.access.namenodes", "2.2"))
    +      AlternateConfig("spark.yarn.access.namenodes", "2.2")),
    +    "spark.maxRemoteBlockSizeFetchToMem" -> Seq(
    --- End diff --
    
    do we still need this?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19476: [SPARK-22062][CORE] Spill large block to disk in BlockMa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19476
  
    **[Test build #82727 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82727/testReport)** for PR 19476 at commit [`2d00fd2`](https://github.com/apache/spark/commit/2d00fd25043c40ab271f4da1dc5b4dd7a407fc6f).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19476: [SPARK-22062][CORE] Spill large block to disk in ...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19476#discussion_r144456817
  
    --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala ---
    @@ -426,4 +426,11 @@ package object config {
         .toSequence
         .createOptional
     
    +  private[spark] val MAX_REMOTE_BLOCK_SIZE_TO_MEM =
    --- End diff --
    
    I would prefer to use `spark.storage.maxRemoteBlockSizeFetchToMemory`, since driver side block manager will also leverage this feature.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19476: [SPARK-22062][CORE] Spill large block to disk in BlockMa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19476
  
    **[Test build #82833 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82833/testReport)** for PR 19476 at commit [`bba8ea1`](https://github.com/apache/spark/commit/bba8ea121576987319311e28f43aaa2bcae0cac5).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19476: [SPARK-22062][CORE] Spill large block to disk in ...

Posted by jiangxb1987 <gi...@git.apache.org>.
Github user jiangxb1987 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19476#discussion_r144859135
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -684,7 +713,7 @@ private[spark] class BlockManager(
               // take a significant amount of time. To get rid of these stale entries
               // we refresh the block locations after a certain number of fetch failures
               if (runningFailureCount >= maxFailuresBeforeLocationRefresh) {
    -            locationIterator = getLocations(blockId).iterator
    +            locationIterator = sortLocations(master.getLocationsAndStatus(blockId)._1).iterator
    --- End diff --
    
    It looks `getLocations` is still useful for this LOC.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19476: [SPARK-22062][CORE] Spill large block to disk in ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19476#discussion_r144456522
  
    --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala ---
    @@ -426,4 +426,11 @@ package object config {
         .toSequence
         .createOptional
     
    +  private[spark] val MAX_REMOTE_BLOCK_SIZE_TO_MEM =
    --- End diff --
    
    how about `MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM` and `spark.executor.maxRemoteBlockSizeFetchToMemory`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19476: [SPARK-22062][CORE] Spill large block to disk in ...

Posted by jinxing64 <gi...@git.apache.org>.
Github user jinxing64 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19476#discussion_r144586111
  
    --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala ---
    @@ -355,11 +355,21 @@ package object config {
           .doc("The blocks of a shuffle request will be fetched to disk when size of the request is " +
             "above this threshold. This is to avoid a giant request takes too much memory. We can " +
             "enable this config by setting a specific value(e.g. 200m). Note that this config can " +
    -        "be enabled only when the shuffle shuffle service is newer than Spark-2.2 or the shuffle" +
    +        "be enabled only when the shuffle service is newer than Spark-2.2 or the shuffle" +
             " service is disabled.")
           .bytesConf(ByteUnit.BYTE)
           .createWithDefault(Long.MaxValue)
     
    +  private[spark] val MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM =
    +    ConfigBuilder("spark.maxRemoteBlockSizeFetchToMem")
    +      .doc("Remote block will be fetched to disk when size of the block is " +
    +        "above this threshold. This is to avoid a giant request takes too much memory. We can " +
    +        "enable this config by setting a specific value(e.g. 200m). Note this configuration will " +
    +        "affect both shuffle fetch and block manager remote block fetch. For users who " +
    +        "enabled external shuffle service, this feature can only be worked when external shuffle" +
    +        " service is newer than Spark 2.2.")
    +      .fallbackConf(REDUCER_MAX_REQ_SIZE_SHUFFLE_TO_MEM)
    +
    --- End diff --
    
    Is it possible to merge `MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM` and `REDUCER_MAX_REQ_SIZE_SHUFFLE_TO_MEM` to be one config? 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19476: [SPARK-22062][CORE] Spill large block to disk in ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19476#discussion_r144458322
  
    --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala ---
    @@ -426,4 +426,11 @@ package object config {
         .toSequence
         .createOptional
     
    +  private[spark] val MAX_REMOTE_BLOCK_SIZE_TO_MEM =
    --- End diff --
    
    `spark.storage.maxRemoteBlockSizeFetchToMemory` is not very clear that it works for shuffle too...


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19476: [SPARK-22062][CORE] Spill large block to disk in ...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19476#discussion_r145009567
  
    --- Diff: core/src/main/scala/org/apache/spark/SparkConf.scala ---
    @@ -662,7 +662,9 @@ private[spark] object SparkConf extends Logging {
         "spark.yarn.jars" -> Seq(
           AlternateConfig("spark.yarn.jar", "2.0")),
         "spark.yarn.access.hadoopFileSystems" -> Seq(
    -      AlternateConfig("spark.yarn.access.namenodes", "2.2"))
    +      AlternateConfig("spark.yarn.access.namenodes", "2.2")),
    +    "spark.maxRemoteBlockSizeFetchToMem" -> Seq(
    --- End diff --
    
    Yes, I think so. `SparkConf` will print out warning log if we added here.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19476: [SPARK-22062][CORE] Spill large block to disk in BlockMa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19476
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82653/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19476: [SPARK-22062][CORE] Spill large block to disk in BlockMa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19476
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19476: [SPARK-22062][CORE] Spill large block to disk in ...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19476#discussion_r145011775
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -684,7 +713,7 @@ private[spark] class BlockManager(
               // take a significant amount of time. To get rid of these stale entries
               // we refresh the block locations after a certain number of fetch failures
               if (runningFailureCount >= maxFailuresBeforeLocationRefresh) {
    -            locationIterator = getLocations(blockId).iterator
    +            locationIterator = sortLocations(master.getLocationsAndStatus(blockId)._1).iterator
    --- End diff --
    
    Agreed, will change it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19476: [SPARK-22062][CORE] Spill large block to disk in ...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19476#discussion_r144769226
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -1552,4 +1582,65 @@ private[spark] object BlockManager {
         override val metricRegistry = new MetricRegistry
         metricRegistry.registerAll(metricSet)
       }
    +
    +  class RemoteBlockTempFileManager(blockManager: BlockManager)
    +      extends TempFileManager with Logging {
    +
    +    private class ReferenceWithCleanup(file: File, referenceQueue: JReferenceQueue[File])
    +        extends WeakReference[File](file, referenceQueue) {
    +      private val filePath = file.getAbsolutePath
    +
    +      def cleanUp(): Unit = {
    +        logDebug(s"Clean up file $filePath")
    +
    +        if (!new File(filePath).delete()) {
    +          logDebug(s"Fail to delete file $filePath")
    +        }
    +      }
    +    }
    --- End diff --
    
    But here in `ShuffleBlockFetcherIterator#registerTempFileToClean`, the caller will delete the file if it returns false, does it still has file leak problem?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19476: [SPARK-22062][CORE] Spill large block to disk in ...

Posted by jinxing64 <gi...@git.apache.org>.
Github user jinxing64 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19476#discussion_r144577910
  
    --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala ---
    @@ -355,11 +355,21 @@ package object config {
           .doc("The blocks of a shuffle request will be fetched to disk when size of the request is " +
             "above this threshold. This is to avoid a giant request takes too much memory. We can " +
             "enable this config by setting a specific value(e.g. 200m). Note that this config can " +
    -        "be enabled only when the shuffle shuffle service is newer than Spark-2.2 or the shuffle" +
    +        "be enabled only when the shuffle service is newer than Spark-2.2 or the shuffle" +
    --- End diff --
    
    Thanks for fix :)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19476: [SPARK-22062][CORE] Spill large block to disk in BlockMa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19476
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82673/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19476: [SPARK-22062][CORE] Spill large block to disk in ...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19476#discussion_r144770999
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -1552,4 +1582,65 @@ private[spark] object BlockManager {
         override val metricRegistry = new MetricRegistry
         metricRegistry.registerAll(metricSet)
       }
    +
    +  class RemoteBlockTempFileManager(blockManager: BlockManager)
    +      extends TempFileManager with Logging {
    +
    +    private class ReferenceWithCleanup(file: File, referenceQueue: JReferenceQueue[File])
    +        extends WeakReference[File](file, referenceQueue) {
    +      private val filePath = file.getAbsolutePath
    +
    +      def cleanUp(): Unit = {
    +        logDebug(s"Clean up file $filePath")
    +
    +        if (!new File(filePath).delete()) {
    +          logDebug(s"Fail to delete file $filePath")
    +        }
    +      }
    +    }
    --- End diff --
    
    Yes, that's what I mean. No matter false (the caller) or true (`ShuffleBlockFetcherIterator`), the file will be deleted, that's my question why there still has file leak issue?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19476: [SPARK-22062][CORE] Spill large block to disk in BlockMa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19476
  
    **[Test build #82833 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82833/testReport)** for PR 19476 at commit [`bba8ea1`](https://github.com/apache/spark/commit/bba8ea121576987319311e28f43aaa2bcae0cac5).
     * This patch **fails due to an unknown error code, -9**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `  case class BlockLocationsAndStatus(locations: Seq[BlockManagerId], status: BlockStatus) `


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19476: [SPARK-22062][CORE] Spill large block to disk in ...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19476#discussion_r144764761
  
    --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala ---
    @@ -355,11 +355,21 @@ package object config {
           .doc("The blocks of a shuffle request will be fetched to disk when size of the request is " +
             "above this threshold. This is to avoid a giant request takes too much memory. We can " +
             "enable this config by setting a specific value(e.g. 200m). Note that this config can " +
    -        "be enabled only when the shuffle shuffle service is newer than Spark-2.2 or the shuffle" +
    +        "be enabled only when the shuffle service is newer than Spark-2.2 or the shuffle" +
             " service is disabled.")
           .bytesConf(ByteUnit.BYTE)
           .createWithDefault(Long.MaxValue)
     
    +  private[spark] val MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM =
    +    ConfigBuilder("spark.maxRemoteBlockSizeFetchToMem")
    +      .doc("Remote block will be fetched to disk when size of the block is " +
    +        "above this threshold. This is to avoid a giant request takes too much memory. We can " +
    +        "enable this config by setting a specific value(e.g. 200m). Note this configuration will " +
    +        "affect both shuffle fetch and block manager remote block fetch. For users who " +
    +        "enabled external shuffle service, this feature can only be worked when external shuffle" +
    +        " service is newer than Spark 2.2.")
    +      .fallbackConf(REDUCER_MAX_REQ_SIZE_SHUFFLE_TO_MEM)
    --- End diff --
    
    Thanks, let me check it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19476: [SPARK-22062][CORE] Spill large block to disk in ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19476#discussion_r144764554
  
    --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala ---
    @@ -355,11 +355,21 @@ package object config {
           .doc("The blocks of a shuffle request will be fetched to disk when size of the request is " +
             "above this threshold. This is to avoid a giant request takes too much memory. We can " +
             "enable this config by setting a specific value(e.g. 200m). Note that this config can " +
    -        "be enabled only when the shuffle shuffle service is newer than Spark-2.2 or the shuffle" +
    +        "be enabled only when the shuffle service is newer than Spark-2.2 or the shuffle" +
             " service is disabled.")
           .bytesConf(ByteUnit.BYTE)
           .createWithDefault(Long.MaxValue)
     
    +  private[spark] val MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM =
    +    ConfigBuilder("spark.maxRemoteBlockSizeFetchToMem")
    +      .doc("Remote block will be fetched to disk when size of the block is " +
    +        "above this threshold. This is to avoid a giant request takes too much memory. We can " +
    +        "enable this config by setting a specific value(e.g. 200m). Note this configuration will " +
    +        "affect both shuffle fetch and block manager remote block fetch. For users who " +
    +        "enabled external shuffle service, this feature can only be worked when external shuffle" +
    +        " service is newer than Spark 2.2.")
    +      .fallbackConf(REDUCER_MAX_REQ_SIZE_SHUFFLE_TO_MEM)
    --- End diff --
    
    how about `ConfigBuilder.withAlternative`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19476: [SPARK-22062][CORE] Spill large block to disk in ...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19476#discussion_r144765076
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -1552,4 +1582,65 @@ private[spark] object BlockManager {
         override val metricRegistry = new MetricRegistry
         metricRegistry.registerAll(metricSet)
       }
    +
    +  class RemoteBlockTempFileManager(blockManager: BlockManager)
    +      extends TempFileManager with Logging {
    +
    +    private class ReferenceWithCleanup(file: File, referenceQueue: JReferenceQueue[File])
    +        extends WeakReference[File](file, referenceQueue) {
    +      private val filePath = file.getAbsolutePath
    +
    +      def cleanUp(): Unit = {
    +        logDebug(s"Clean up file $filePath")
    +
    +        if (!new File(filePath).delete()) {
    +          logDebug(s"Fail to delete file $filePath")
    +        }
    +      }
    +    }
    --- End diff --
    
    I think the overhead is not big, but I'm not sure why there's a file leak issue here in `ShuffleBlockFetcherIterator` (the implementation of `TempFileManager`). From the code, all the temp files are tracked in `shuffleFilesSet`, and will be deleted during `cleanup`, can you please elaborate more?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19476: [SPARK-22062][CORE] Spill large block to disk in ...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19476#discussion_r145011923
  
    --- Diff: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
    @@ -509,11 +508,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
         val bmId1 = BlockManagerId("id1", localHost, 1)
         val bmId2 = BlockManagerId("id2", localHost, 2)
         val bmId3 = BlockManagerId("id3", otherHost, 3)
    -    when(bmMaster.getLocations(mc.any[BlockId])).thenReturn(Seq(bmId1, bmId2, bmId3))
    --- End diff --
    
    Agreed, will revert it back.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19476: [SPARK-22062][CORE] Spill large block to disk in ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19476#discussion_r144453129
  
    --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala ---
    @@ -426,4 +426,11 @@ package object config {
         .toSequence
         .createOptional
     
    +  private[spark] val MAX_REMOTE_BLOCK_SIZE_TO_MEM =
    --- End diff --
    
    Shall consolidate this config and `REDUCER_MAX_REQ_SIZE_SHUFFLE_TO_MEM`? shuffle blocks and remote RDD blocks are all remote blocks.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19476: [SPARK-22062][CORE] Spill large block to disk in BlockMa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19476
  
    **[Test build #82792 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82792/testReport)** for PR 19476 at commit [`5e32c3f`](https://github.com/apache/spark/commit/5e32c3feef47fff925d914006f91cbba09f6f330).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19476: [SPARK-22062][CORE] Spill large block to disk in ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/19476


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19476: [SPARK-22062][CORE] Spill large block to disk in BlockMa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19476
  
    **[Test build #82727 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82727/testReport)** for PR 19476 at commit [`2d00fd2`](https://github.com/apache/spark/commit/2d00fd25043c40ab271f4da1dc5b4dd7a407fc6f).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19476: [SPARK-22062][CORE] Spill large block to disk in BlockMa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19476
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19476: [SPARK-22062][CORE] Spill large block to disk in BlockMa...

Posted by jinxing64 <gi...@git.apache.org>.
Github user jinxing64 commented on the issue:

    https://github.com/apache/spark/pull/19476
  
    @jerryshao 
    Thanks a lot for ping. I left comments by my understanding. Not sure if it's helpful :)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19476: [SPARK-22062][CORE] Spill large block to disk in BlockMa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19476
  
    **[Test build #82673 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82673/testReport)** for PR 19476 at commit [`a77359d`](https://github.com/apache/spark/commit/a77359da7549d1d5be9731f07f26d76d2154af40).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19476: [SPARK-22062][CORE] Spill large block to disk in BlockMa...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on the issue:

    https://github.com/apache/spark/pull/19476
  
    Jenkins, retest this please.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19476: [SPARK-22062][CORE] Spill large block to disk in ...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19476#discussion_r145013312
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -653,15 +663,34 @@ private[spark] class BlockManager(
         require(blockId != null, "BlockId is null")
         var runningFailureCount = 0
         var totalFailureCount = 0
    -    val locations = getLocations(blockId)
    +
    +    // Because all the remote blocks are registered in driver, so it is not necessary to ask
    +    // all the slave executors to get block status.
    +    val locationAndStatus = master.getLocationsAndStatus(blockId)
    +
    +    val blockSize = locationAndStatus._2.map { status =>
    +      // Disk size and mem size cannot co-exist, so it's ok to sum them together to get block size.
    +      status.diskSize + status.memSize
    --- End diff --
    
    I get your point, also thinking of using `Math.max` instead.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19476: [SPARK-22062][CORE] Spill large block to disk in ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19476#discussion_r144869423
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala ---
    @@ -422,6 +425,13 @@ class BlockManagerMasterEndpoint(
         if (blockLocations.containsKey(blockId)) blockLocations.get(blockId).toSeq else Seq.empty
       }
     
    +  private def getLocationsAndStatus(
    +      blockId: BlockId): (Seq[BlockManagerId], Option[BlockStatus]) = {
    --- End diff --
    
    shall we create a class for location and status? e.g.
    ```
    case class BlockLocationsAndStatus(locations: Seq[BlockManagerId], status: BlockStatus) {
      assert(locations.nonEmpty)
    }
    ```
    Then this method can return `Option[BlockLocationsAndStatus]`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19476: [SPARK-22062][CORE] Spill large block to disk in ...

Posted by jinxing64 <gi...@git.apache.org>.
Github user jinxing64 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19476#discussion_r144772767
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -1552,4 +1582,65 @@ private[spark] object BlockManager {
         override val metricRegistry = new MetricRegistry
         metricRegistry.registerAll(metricSet)
       }
    +
    +  class RemoteBlockTempFileManager(blockManager: BlockManager)
    +      extends TempFileManager with Logging {
    +
    +    private class ReferenceWithCleanup(file: File, referenceQueue: JReferenceQueue[File])
    +        extends WeakReference[File](file, referenceQueue) {
    +      private val filePath = file.getAbsolutePath
    +
    +      def cleanUp(): Unit = {
    +        logDebug(s"Clean up file $filePath")
    +
    +        if (!new File(filePath).delete()) {
    +          logDebug(s"Fail to delete file $filePath")
    +        }
    +      }
    +    }
    --- End diff --
    
    Ah, I just think caller should be very careful for file leak. Maybe we can delete by weak reference. So caller just `registerTempFileToClean` and don't have to cleanup file itself and no worry about the file leak.
    I just think the implementation in this patch is good to void potential file leak.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19476: [SPARK-22062][CORE] Spill large block to disk in ...

Posted by jiangxb1987 <gi...@git.apache.org>.
Github user jiangxb1987 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19476#discussion_r144857068
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -653,15 +663,34 @@ private[spark] class BlockManager(
         require(blockId != null, "BlockId is null")
         var runningFailureCount = 0
         var totalFailureCount = 0
    -    val locations = getLocations(blockId)
    +
    +    // Because all the remote blocks are registered in driver, so it is not necessary to ask
    +    // all the slave executors to get block status.
    +    val locationAndStatus = master.getLocationsAndStatus(blockId)
    +
    +    val blockSize = locationAndStatus._2.map { status =>
    +      // Disk size and mem size cannot co-exist, so it's ok to sum them together to get block size.
    +      status.diskSize + status.memSize
    --- End diff --
    
    I think it's dangerous, because you assume the default value of diskSize/memSize is always 0, but this is not guaranteed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19476: [SPARK-22062][CORE] Spill large block to disk in BlockMa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19476
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82792/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19476: [SPARK-22062][CORE] Spill large block to disk in BlockMa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19476
  
    **[Test build #82834 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82834/testReport)** for PR 19476 at commit [`bba8ea1`](https://github.com/apache/spark/commit/bba8ea121576987319311e28f43aaa2bcae0cac5).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `  case class BlockLocationsAndStatus(locations: Seq[BlockManagerId], status: BlockStatus) `


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19476: [SPARK-22062][CORE] Spill large block to disk in BlockMa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19476
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82679/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19476: [SPARK-22062][CORE] Spill large block to disk in BlockMa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19476
  
    **[Test build #82679 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82679/testReport)** for PR 19476 at commit [`70e84d7`](https://github.com/apache/spark/commit/70e84d78c30bf5b93d733cfcab94e449480921a8).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19476: [SPARK-22062][CORE] Spill large block to disk in BlockMa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19476
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82834/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19476: [SPARK-22062][CORE] Spill large block to disk in ...

Posted by jiangxb1987 <gi...@git.apache.org>.
Github user jiangxb1987 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19476#discussion_r144855916
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -653,15 +663,34 @@ private[spark] class BlockManager(
         require(blockId != null, "BlockId is null")
         var runningFailureCount = 0
         var totalFailureCount = 0
    -    val locations = getLocations(blockId)
    +
    +    // Because all the remote blocks are registered in driver, so it is not necessary to ask
    --- End diff --
    
    nit: `so` is not needed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19476: [SPARK-22062][CORE] Spill large block to disk in BlockMa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19476
  
    **[Test build #82653 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82653/testReport)** for PR 19476 at commit [`f50a7b7`](https://github.com/apache/spark/commit/f50a7b75c303bd2cf261dfb1b4fe74fa5498ca4b).
     * This patch **fails from timeout after a configured wait of \`250m\`**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19476: [SPARK-22062][CORE] Spill large block to disk in ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19476#discussion_r144871889
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -684,7 +713,7 @@ private[spark] class BlockManager(
               // take a significant amount of time. To get rid of these stale entries
               // we refresh the block locations after a certain number of fetch failures
               if (runningFailureCount >= maxFailuresBeforeLocationRefresh) {
    -            locationIterator = getLocations(blockId).iterator
    +            locationIterator = sortLocations(master.getLocationsAndStatus(blockId)._1).iterator
    --- End diff --
    
    `master.getLocationsAndStatus` -> `master.getLocations`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19476: [SPARK-22062][CORE] Spill large block to disk in ...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19476#discussion_r145009167
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -653,15 +663,34 @@ private[spark] class BlockManager(
         require(blockId != null, "BlockId is null")
         var runningFailureCount = 0
         var totalFailureCount = 0
    -    val locations = getLocations(blockId)
    +
    +    // Because all the remote blocks are registered in driver, so it is not necessary to ask
    +    // all the slave executors to get block status.
    +    val locationAndStatus = master.getLocationsAndStatus(blockId)
    +
    +    val blockSize = locationAndStatus._2.map { status =>
    +      // Disk size and mem size cannot co-exist, so it's ok to sum them together to get block size.
    +      status.diskSize + status.memSize
    --- End diff --
    
    @jiangxb1987 would you please explain more? I'm not quite following your comment. Are you referring to the below line `  }.getOrElse(0L)`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19476: [SPARK-22062][CORE] Spill large block to disk in BlockMa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19476
  
    **[Test build #82679 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82679/testReport)** for PR 19476 at commit [`70e84d7`](https://github.com/apache/spark/commit/70e84d78c30bf5b93d733cfcab94e449480921a8).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19476: [SPARK-22062][CORE] Spill large block to disk in BlockMa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19476
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19476: [SPARK-22062][CORE] Spill large block to disk in ...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19476#discussion_r145010440
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -653,15 +663,34 @@ private[spark] class BlockManager(
         require(blockId != null, "BlockId is null")
         var runningFailureCount = 0
         var totalFailureCount = 0
    -    val locations = getLocations(blockId)
    +
    +    // Because all the remote blocks are registered in driver, so it is not necessary to ask
    +    // all the slave executors to get block status.
    +    val locationAndStatus = master.getLocationsAndStatus(blockId)
    +
    +    val blockSize = locationAndStatus._2.map { status =>
    +      // Disk size and mem size cannot co-exist, so it's ok to sum them together to get block size.
    +      status.diskSize + status.memSize
    --- End diff --
    
    Are you saying we need to check `StorageLevel` to decide whether to use diskSize or memSize as block size?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19476: [SPARK-22062][CORE] Spill large block to disk in ...

Posted by jinxing64 <gi...@git.apache.org>.
Github user jinxing64 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19476#discussion_r144768355
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -1552,4 +1582,65 @@ private[spark] object BlockManager {
         override val metricRegistry = new MetricRegistry
         metricRegistry.registerAll(metricSet)
       }
    +
    +  class RemoteBlockTempFileManager(blockManager: BlockManager)
    +      extends TempFileManager with Logging {
    +
    +    private class ReferenceWithCleanup(file: File, referenceQueue: JReferenceQueue[File])
    +        extends WeakReference[File](file, referenceQueue) {
    +      private val filePath = file.getAbsolutePath
    +
    +      def cleanUp(): Unit = {
    +        logDebug(s"Clean up file $filePath")
    +
    +        if (!new File(filePath).delete()) {
    +          logDebug(s"Fail to delete file $filePath")
    +        }
    +      }
    +    }
    --- End diff --
    
    Yeah, the thing is if we `registerTempFileToClean` after `cleanup`, the tmp file will never be cleaned. In #18565, if `registerTempFileToClean` return false, it means the `TempFileManager` has already stopped, i.e. `cleanup` has already been finished and will not be called again. We can not rely on `TempFileManager` to clean the tmp file.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19476: [SPARK-22062][CORE] Spill large block to disk in BlockMa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19476
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19476: [SPARK-22062][CORE] Spill large block to disk in BlockMa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19476
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82833/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19476: [SPARK-22062][CORE] Spill large block to disk in ...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19476#discussion_r144763884
  
    --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala ---
    @@ -355,11 +355,21 @@ package object config {
           .doc("The blocks of a shuffle request will be fetched to disk when size of the request is " +
             "above this threshold. This is to avoid a giant request takes too much memory. We can " +
             "enable this config by setting a specific value(e.g. 200m). Note that this config can " +
    -        "be enabled only when the shuffle shuffle service is newer than Spark-2.2 or the shuffle" +
    +        "be enabled only when the shuffle service is newer than Spark-2.2 or the shuffle" +
             " service is disabled.")
           .bytesConf(ByteUnit.BYTE)
           .createWithDefault(Long.MaxValue)
     
    +  private[spark] val MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM =
    +    ConfigBuilder("spark.maxRemoteBlockSizeFetchToMem")
    +      .doc("Remote block will be fetched to disk when size of the block is " +
    +        "above this threshold. This is to avoid a giant request takes too much memory. We can " +
    +        "enable this config by setting a specific value(e.g. 200m). Note this configuration will " +
    +        "affect both shuffle fetch and block manager remote block fetch. For users who " +
    +        "enabled external shuffle service, this feature can only be worked when external shuffle" +
    +        " service is newer than Spark 2.2.")
    +      .fallbackConf(REDUCER_MAX_REQ_SIZE_SHUFFLE_TO_MEM)
    --- End diff --
    
    From my understanding of the current code, it will not fallback to the deprecated config if we're using this api `SparkConf#get[T](entry: ConfigEntry[T])`, unless we specifically add `fallbackConf` definition. This is different from `SparkConf#getOption(key: String)`.
    



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19476: [SPARK-22062][CORE] Spill large block to disk in BlockMa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19476
  
    **[Test build #82673 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82673/testReport)** for PR 19476 at commit [`a77359d`](https://github.com/apache/spark/commit/a77359da7549d1d5be9731f07f26d76d2154af40).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19476: [SPARK-22062][CORE] Spill large block to disk in ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19476#discussion_r144871310
  
    --- Diff: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---
    @@ -529,14 +527,13 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
         val bmId3 = BlockManagerId("id3", otherHost, 3, Some(otherRack))
         val bmId4 = BlockManagerId("id4", otherHost, 4, Some(otherRack))
         val bmId5 = BlockManagerId("id5", otherHost, 5, Some(localRack))
    -    when(bmMaster.getLocations(mc.any[BlockId]))
    --- End diff --
    
    ditto


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19476: [SPARK-22062][CORE] Spill large block to disk in ...

Posted by jinxing64 <gi...@git.apache.org>.
Github user jinxing64 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19476#discussion_r144776222
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -1552,4 +1582,65 @@ private[spark] object BlockManager {
         override val metricRegistry = new MetricRegistry
         metricRegistry.registerAll(metricSet)
       }
    +
    +  class RemoteBlockTempFileManager(blockManager: BlockManager)
    +      extends TempFileManager with Logging {
    +
    +    private class ReferenceWithCleanup(file: File, referenceQueue: JReferenceQueue[File])
    +        extends WeakReference[File](file, referenceQueue) {
    +      private val filePath = file.getAbsolutePath
    +
    +      def cleanUp(): Unit = {
    +        logDebug(s"Clean up file $filePath")
    +
    +        if (!new File(filePath).delete()) {
    +          logDebug(s"Fail to delete file $filePath")
    +        }
    +      }
    +    }
    --- End diff --
    
    I get it, thanks a lot for explanation :)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19476: [SPARK-22062][CORE] Spill large block to disk in ...

Posted by jinxing64 <gi...@git.apache.org>.
Github user jinxing64 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19476#discussion_r144585860
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -1552,4 +1582,65 @@ private[spark] object BlockManager {
         override val metricRegistry = new MetricRegistry
         metricRegistry.registerAll(metricSet)
       }
    +
    +  class RemoteBlockTempFileManager(blockManager: BlockManager)
    +      extends TempFileManager with Logging {
    +
    +    private class ReferenceWithCleanup(file: File, referenceQueue: JReferenceQueue[File])
    +        extends WeakReference[File](file, referenceQueue) {
    +      private val filePath = file.getAbsolutePath
    +
    +      def cleanUp(): Unit = {
    +        logDebug(s"Clean up file $filePath")
    +
    +        if (!new File(filePath).delete()) {
    +          logDebug(s"Fail to delete file $filePath")
    +        }
    +      }
    +    }
    --- End diff --
    
    Yes, it's good idea to delete the tmp files by `WeakReference`. Is it possible to make this a default behavior of `TempFileManager`, and make it to be `override def registerTempFileToClean(file: File): Unit`.
    In https://github.com/apache/spark/pull/18565, I was struggling for file leak, because `TempFileManager` are not responsible for deleting tmp files.
    Additionally, files are deleted by the `cleaningThread`, hope the cost can be neglected.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19476: [SPARK-22062][CORE] Spill large block to disk in BlockMa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19476
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19476: [SPARK-22062][CORE] Spill large block to disk in BlockMa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19476
  
    **[Test build #82834 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82834/testReport)** for PR 19476 at commit [`bba8ea1`](https://github.com/apache/spark/commit/bba8ea121576987319311e28f43aaa2bcae0cac5).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19476: [SPARK-22062][CORE] Spill large block to disk in ...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19476#discussion_r144453507
  
    --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala ---
    @@ -426,4 +426,11 @@ package object config {
         .toSequence
         .createOptional
     
    +  private[spark] val MAX_REMOTE_BLOCK_SIZE_TO_MEM =
    --- End diff --
    
    I was thinking about this, but the configuration name of `REDUCER_MAX_REQ_SIZE_SHUFFLE_TO_MEM` seems too shuffle specific, maybe we should rename it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19476: [SPARK-22062][CORE] Spill large block to disk in ...

Posted by jinxing64 <gi...@git.apache.org>.
Github user jinxing64 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19476#discussion_r144770017
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -1552,4 +1582,65 @@ private[spark] object BlockManager {
         override val metricRegistry = new MetricRegistry
         metricRegistry.registerAll(metricSet)
       }
    +
    +  class RemoteBlockTempFileManager(blockManager: BlockManager)
    +      extends TempFileManager with Logging {
    +
    +    private class ReferenceWithCleanup(file: File, referenceQueue: JReferenceQueue[File])
    +        extends WeakReference[File](file, referenceQueue) {
    +      private val filePath = file.getAbsolutePath
    +
    +      def cleanUp(): Unit = {
    +        logDebug(s"Clean up file $filePath")
    +
    +        if (!new File(filePath).delete()) {
    +          logDebug(s"Fail to delete file $filePath")
    +        }
    +      }
    +    }
    --- End diff --
    
    I don't thinks so:
    if `false`, `ShuffleBlockFetcherIterator` is stopped and caller remove file;
    if `true`, `ShuffleBlockFetcherIterator` remove file.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19476: [SPARK-22062][CORE] Spill large block to disk in BlockMa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19476
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19476: [SPARK-22062][CORE] Spill large block to disk in BlockMa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19476
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19476: [SPARK-22062][CORE] Spill large block to disk in ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19476#discussion_r144458346
  
    --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala ---
    @@ -426,4 +426,11 @@ package object config {
         .toSequence
         .createOptional
     
    +  private[spark] val MAX_REMOTE_BLOCK_SIZE_TO_MEM =
    --- End diff --
    
    maybe just `spark.maxRemoteBlockSizeFetchToMemory`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19476: [SPARK-22062][CORE] Spill large block to disk in BlockMa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19476
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82727/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19476: [SPARK-22062][CORE] Spill large block to disk in ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19476#discussion_r144573081
  
    --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala ---
    @@ -355,11 +355,21 @@ package object config {
           .doc("The blocks of a shuffle request will be fetched to disk when size of the request is " +
             "above this threshold. This is to avoid a giant request takes too much memory. We can " +
             "enable this config by setting a specific value(e.g. 200m). Note that this config can " +
    -        "be enabled only when the shuffle shuffle service is newer than Spark-2.2 or the shuffle" +
    +        "be enabled only when the shuffle service is newer than Spark-2.2 or the shuffle" +
             " service is disabled.")
           .bytesConf(ByteUnit.BYTE)
           .createWithDefault(Long.MaxValue)
     
    +  private[spark] val MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM =
    +    ConfigBuilder("spark.maxRemoteBlockSizeFetchToMem")
    +      .doc("Remote block will be fetched to disk when size of the block is " +
    +        "above this threshold. This is to avoid a giant request takes too much memory. We can " +
    +        "enable this config by setting a specific value(e.g. 200m). Note this configuration will " +
    +        "affect both shuffle fetch and block manager remote block fetch. For users who " +
    +        "enabled external shuffle service, this feature can only be worked when external shuffle" +
    +        " service is newer than Spark 2.2.")
    +      .fallbackConf(REDUCER_MAX_REQ_SIZE_SHUFFLE_TO_MEM)
    --- End diff --
    
    I think we can just remove `REDUCER_MAX_REQ_SIZE_SHUFFLE_TO_MEM`, we already fallback to it via https://github.com/apache/spark/pull/19476/files#diff-529fc5c06b9731c1fbda6f3db60b16aaR666


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19476: [SPARK-22062][CORE] Spill large block to disk in ...

Posted by jiangxb1987 <gi...@git.apache.org>.
Github user jiangxb1987 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19476#discussion_r145013046
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -653,15 +663,34 @@ private[spark] class BlockManager(
         require(blockId != null, "BlockId is null")
         var runningFailureCount = 0
         var totalFailureCount = 0
    -    val locations = getLocations(blockId)
    +
    +    // Because all the remote blocks are registered in driver, so it is not necessary to ask
    +    // all the slave executors to get block status.
    +    val locationAndStatus = master.getLocationsAndStatus(blockId)
    +
    +    val blockSize = locationAndStatus._2.map { status =>
    +      // Disk size and mem size cannot co-exist, so it's ok to sum them together to get block size.
    +      status.diskSize + status.memSize
    --- End diff --
    
    The default value of diskSize or memSize can be 0 or -1 or even other numbers, it's not guaranteed. Maybe we can use `Math.max(status.diskSize, status.memSize)` instead?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19476: [SPARK-22062][CORE] Spill large block to disk in BlockMa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19476
  
    **[Test build #82792 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82792/testReport)** for PR 19476 at commit [`5e32c3f`](https://github.com/apache/spark/commit/5e32c3feef47fff925d914006f91cbba09f6f330).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org