You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by JoshRosen <gi...@git.apache.org> on 2015/12/07 05:10:19 UTC

[GitHub] spark pull request: [SPARK-12165] Fix bug in eviction of storage m...

GitHub user JoshRosen opened a pull request:

    https://github.com/apache/spark/pull/10170

    [SPARK-12165] Fix bug in eviction of storage memory by execution

    This patch fixes a bug in the eviction of storage memory by execution.
    
    ## The bug:
    
    In general, execution should be able to storage memory when the total storage memory usage is greater than `maxMemory * spark.memory.storageFraction`. Due to a bug, however, Spark might wind up evicting no storage memory in certain cases where the storage memory usage was between `maxMemory * spark.memory.storageFraction` and `maxMemory`. For example, here is a regression test which illustrates the bug:
    
    ```scala
        val maxMemory = 1000L
        val taskAttemptId = 0L
        val (mm, ms) = makeThings(maxMemory)
        // Since we used the default storage fraction (0.5), we should be able to allocate 500 bytes
        // of storage memory which are immune to eviction by execution memory pressure.
    
        // Acquire enough storage memory to exceed the storage region size
        assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks))
        assertEvictBlocksToFreeSpaceNotCalled(ms)
        assert(mm.executionMemoryUsed === 0L)
        assert(mm.storageMemoryUsed === 750L)
     
        // At this point, storage is using 250 more bytes of memory than it is guaranteed, so execution
        // should be able to reclaim up to 250 bytes of storage memory.
        // Therefore, execution should now be able to require up to 500 bytes of memory:
        assert(mm.acquireExecutionMemory(500L, taskAttemptId, MemoryMode.ON_HEAP) === 500L) // <--- fails by only returning 250L
        assert(mm.storageMemoryUsed === 500L)
        assert(mm.executionMemoryUsed === 500L)
        assertEvictBlocksToFreeSpaceCalled(ms, 250L)
    ```
    
    The problem relates to the control flow / interaction between `StorageMemoryPool.shrinkPoolToReclaimSpace()` and `MemoryStore.ensureFreeSpace()`. While trying to allocate the 500 bytes of execution memory, the `UnifiedMemoryManager` discovers that it will need to reclaim 250 bytes of memory from storage, so it calls `StorageMemoryPool.shrinkPoolToReclaimSpace(250L)`. This method, in turn, calls `MemoryStore.ensureFreeSpace(250L)`. However, `ensureFreeSpace()` first checks whether the requested space is less than `maxStorageMemory - storageMemoryUsed`, which will be true if there is any free execution memory because it turns out that `MemoryStore.maxStorageMemory = (maxMemory - onHeapExecutionMemoryPool.memoryUsed)` when the `UnifiedMemoryManager` is used.
    
    The control flow here is somewhat confusing (it grew to be messy / confusing over time / as a result of the merging / refactoring of several components). In the pre-Spark 1.6 code, `ensureFreeSpace` was called directly by the `MemoryStore` itself, whereas in 1.6 it's involved in a confusing callback flow where `MemoryStore` calls `MemoryManager.acquireStorageMemory`, which then calls back into `MemoryStore.ensureFreeSpace`, which, in turn, calls `MemoryManager.freeStorageMemory`.
    
    ## The solution:
    
    The solution implemented in this patch is to remove the confusing circular control flow between `MemoryManager` and `MemoryStore`, making the storage memory acquisition process much more linear / straightforward. The key changes:
    
    - Remove a layer of inheritance which made the memory manager code harder to understand (53841174760a24a0df3eb1562af1f33dbe340eb9).
    - Move some bounds checks earlier in the call chain (13ba7ada77f87ef1ec362aec35c89a924e6987cb).
    - Refactor `ensureFreeSpace()` so that the part which evicts blocks can be called independently from the part which checks whether there is enough free space to avoid eviction (7c68ca09cb1b12f157400866983f753ac863380e).
    - Realize that this lets us remove a layer of overloads from `ensureFreeSpace` (eec4f6c87423d5e482b710e098486b3bbc4daf06).
    - Realize that `ensureFreeSpace()` can simply be replaced with an `evictBlocksToFreeSpace()` method which is called [after we've already figured out](https://github.com/apache/spark/blob/2dc842aea82c8895125d46a00aa43dfb0d121de9/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala#L88) how much memory needs to be reclaimed via eviction; (2dc842aea82c8895125d46a00aa43dfb0d121de9).
    
    Along the way, I fixed some problems with the mocks in `MemoryManagerSuite`: the old mocks would [unconditionally](https://github.com/apache/spark/blob/80a824d36eec9d9a9f092ee1741453851218ec73/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala#L84) report that a block had been evicted even if there was enough space in the storage pool such that eviction would be avoided.
    
    I also fixed a problem where `StorageMemoryPool._memoryUsed` might become negative due to freed memory being double-counted when excution evicts storage. The problem was that `StorageMemoryPoolshrinkPoolToFreeSpace` would [decrement `_memoryUsed`](https://github.com/apache/spark/commit/7c68ca09cb1b12f157400866983f753ac863380e#diff-935c68a9803be144ed7bafdd2f756a0fL133) even though `StorageMemoryPool.freeMemory` had already decremented it as each evicted block was freed.
    
    ## TODOs
    
    - [ ] Add stronger assertions or a dedicated regression test for the `_memoryUsed < 0` bug, which was uncovered while testing this patch and a related fix for SPARK-12155.
    - [ ] See whether we can now remove the confusing `MemoryStore.maxMemory` method.
    
    /cc @andrewor14 @zsxwing @yhuai @rxin 

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/JoshRosen/spark SPARK-12165

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/10170.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #10170
    
----
commit 012cb4ba9ba1e32337f2f7bb612abb53fe4070be
Author: Josh Rosen <jo...@databricks.com>
Date:   2015-12-06T20:57:45Z

    Reset ensureFreeSpaceCalled before each test.
    
    This reduces coupling between failed tests.

commit b519fe628a9a2b8238dfedbfd9b74bdd2ddc0de4
Author: Josh Rosen <jo...@databricks.com>
Date:   2015-12-07T00:00:23Z

    Add regression test for storage eviction bug.

commit 7c68ca09cb1b12f157400866983f753ac863380e
Author: Josh Rosen <jo...@databricks.com>
Date:   2015-12-06T22:40:07Z

    Add MemoryStore.freeSpaceForExecution() method, which forces blocks to be dropped.
    
    Previously, ensureFreeSpace() might end up not dropping blocks if the total
    storage memory pool usage was less than the maximum possible storage pool usage.

commit 53841174760a24a0df3eb1562af1f33dbe340eb9
Author: Josh Rosen <jo...@databricks.com>
Date:   2015-12-07T01:47:29Z

    Remove a layer of confusing inheritance.

commit 13ba7ada77f87ef1ec362aec35c89a924e6987cb
Author: Josh Rosen <jo...@databricks.com>
Date:   2015-12-07T01:53:45Z

    Put fail-fast for non-fitting blocks earlier in call chain.

commit eec4f6c87423d5e482b710e098486b3bbc4daf06
Author: Josh Rosen <jo...@databricks.com>
Date:   2015-12-07T01:56:48Z

    Collapse ensureFreeSpace overloads

commit 2dc842aea82c8895125d46a00aa43dfb0d121de9
Author: Josh Rosen <jo...@databricks.com>
Date:   2015-12-07T02:56:11Z

    Replace ensureFreeSpace() with evictBlocksToFreeSpace().

commit 0eac7da041326cfbec2c1db2f279cb655744f90e
Author: Josh Rosen <jo...@databricks.com>
Date:   2015-12-07T03:39:37Z

    Update JIRA in test case.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165][SPARK-12189] Fix bugs in evictio...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10170#discussion_r47035741
  
    --- Diff: core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala ---
    @@ -143,7 +148,25 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
         assert(mm.acquireExecutionMemory(400L, taskAttemptId, MemoryMode.ON_HEAP) === 300L)
         assert(mm.executionMemoryUsed === 600L)
         assert(mm.storageMemoryUsed === 400L)
    -    assertEnsureFreeSpaceNotCalled(ms)
    +    assertEvictBlocksToFreeSpaceNotCalled(ms)
    +    assert(evictedBlocks.isEmpty)
    +  }
    +
    +  test("execution can evict storage blocks when storage memory is below max mem (SPARK-12165)") {
    +    val maxMemory = 1000L
    +    val taskAttemptId = 0L
    +    val (mm, ms) = makeThings(maxMemory)
    +    // Acquire enough storage memory to exceed the storage region size
    +    assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks))
    +    assertEvictBlocksToFreeSpaceNotCalled(ms)
    +    assert(mm.executionMemoryUsed === 0L)
    +    assert(mm.storageMemoryUsed === 750L)
    +    // Should now be able to require up to 500 bytes of memory
    +    assert(mm.acquireExecutionMemory(500L, taskAttemptId, MemoryMode.ON_HEAP) === 500L)
    --- End diff --
    
    > The problem isn't that "storage memory is below than max memory". Even in the existing test "eviction evicts storage", storage memory (750) is below the max memory (1000).
    
    It's true that this was a little inprecise, but I think that "execution memory requests smaller than free memory should evict storage" is also slightly inprecise because this essentially sounds like a synonym for "execution evicts storage when there is not enough free memory." I guess a slightly more accurate description of SPARK-12165 would be something like "execution evicts storage if free memory is insufficient and free memory > 0", since not _all_ situations where there is insufficient free memory will trigger the first part of SPARK-12165. This precision doesn't really matter, though, so I'm happy to adopt your test name if you find it to be clearer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165][SPARK-12189] Fix bugs in evictio...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10170#issuecomment-163092608
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/47382/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165][SPARK-12189] Fix bugs in evictio...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10170#discussion_r47016581
  
    --- Diff: core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala ---
    @@ -85,33 +82,33 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite {
         val (mm, ms) = makeThings(Long.MaxValue, maxStorageMem)
         assert(mm.storageMemoryUsed === 0L)
         assert(mm.acquireStorageMemory(dummyBlock, 10L, evictedBlocks))
    -    // `ensureFreeSpace` should be called with the number of bytes requested
    -    assertEnsureFreeSpaceCalled(ms, 10L)
    +    assertEvictBlocksToFreeSpaceNotCalled(ms)
         assert(mm.storageMemoryUsed === 10L)
    +
         assert(mm.acquireStorageMemory(dummyBlock, 100L, evictedBlocks))
    -    assertEnsureFreeSpaceCalled(ms, 100L)
    +    assertEvictBlocksToFreeSpaceNotCalled(ms)
         assert(mm.storageMemoryUsed === 110L)
         // Acquire more than the max, not granted
         assert(!mm.acquireStorageMemory(dummyBlock, maxStorageMem + 1L, evictedBlocks))
    -    assertEnsureFreeSpaceCalled(ms, maxStorageMem + 1L)
    +    assertEvictBlocksToFreeSpaceNotCalled(ms)
         assert(mm.storageMemoryUsed === 110L)
         // Acquire up to the max, requests after this are still granted due to LRU eviction
         assert(mm.acquireStorageMemory(dummyBlock, maxStorageMem, evictedBlocks))
    -    assertEnsureFreeSpaceCalled(ms, 1000L)
    +    assertEvictBlocksToFreeSpaceCalled(ms, 110L)
         assert(mm.storageMemoryUsed === 1000L)
         assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
    -    assertEnsureFreeSpaceCalled(ms, 1L)
    +    assertEvictBlocksToFreeSpaceCalled(ms, 1L)
         assert(mm.storageMemoryUsed === 1000L)
    --- End diff --
    
    It took me a while to understand why this one is still `1000L`. The situation is as follows:
    
    1. Put a 1000 bytes block
    2. Put a 1 byte block, this should have evicted the first block
    3. Storage memory used is still 1000 bytes somehow (I expected 1 byte)
    
    Because of the way the test is set up, however, as of step (2) we actually treat the 1000-byte block inserted in step (1) as 1000 1-byte blocks. When we call `evictBlocksToFreeSpace` here we actually evicted a 1-byte block to put another 1-byte block, so the storage memory usage remains at `1000L`.
    
    Since this deviates from actual block manager behavior I think this line warrants a two-line comment, maybe something like:
    ```
    // Note: We evicted 1 byte to put another 1-byte block in, so the storage memory used remains
    // at 1000 bytes. This is different from real behavior, where the 1-byte block would have evicted
    // the 1000-byte block entirely. This is set up differently so we can write finer-grained tests.
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165][SPARK-12189] Fix bugs in evictio...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10170#issuecomment-163085038
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/47378/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165][SPARK-12189] Fix bugs in evictio...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10170#discussion_r47038991
  
    --- Diff: core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala ---
    @@ -154,32 +177,34 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
         assert(mm.acquireExecutionMemory(800L, taskAttemptId, MemoryMode.ON_HEAP) === 800L)
         assert(mm.executionMemoryUsed === 800L)
         assert(mm.storageMemoryUsed === 0L)
    -    assertEnsureFreeSpaceNotCalled(ms)
    +    assertEvictBlocksToFreeSpaceNotCalled(ms)
         // Storage should not be able to evict execution
         assert(mm.acquireStorageMemory(dummyBlock, 100L, evictedBlocks))
         assert(mm.executionMemoryUsed === 800L)
         assert(mm.storageMemoryUsed === 100L)
    -    assertEnsureFreeSpaceCalled(ms, 100L)
    +    assertEvictBlocksToFreeSpaceNotCalled(ms)
         assert(!mm.acquireStorageMemory(dummyBlock, 250L, evictedBlocks))
         assert(mm.executionMemoryUsed === 800L)
         assert(mm.storageMemoryUsed === 100L)
    -    assertEnsureFreeSpaceCalled(ms, 250L)
    +    assertEvictBlocksToFreeSpaceCalled(ms, 150L) // try to evict blocks ...
    +    assert(evictedBlocks.isEmpty) // ... but don't evict since evicting will not be sufficient
         mm.releaseExecutionMemory(maxMemory, taskAttemptId, MemoryMode.ON_HEAP)
         mm.releaseStorageMemory(maxMemory)
         // Acquire some execution memory again, but this time keep it within the execution region
         assert(mm.acquireExecutionMemory(200L, taskAttemptId, MemoryMode.ON_HEAP) === 200L)
         assert(mm.executionMemoryUsed === 200L)
         assert(mm.storageMemoryUsed === 0L)
    -    assertEnsureFreeSpaceNotCalled(ms)
    +    assertEvictBlocksToFreeSpaceNotCalled(ms)
         // Storage should still not be able to evict execution
         assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks))
         assert(mm.executionMemoryUsed === 200L)
         assert(mm.storageMemoryUsed === 750L)
    -    assertEnsureFreeSpaceCalled(ms, 750L)
    +    assertEvictBlocksToFreeSpaceNotCalled(ms) // since there was 800 bytes free
         assert(!mm.acquireStorageMemory(dummyBlock, 850L, evictedBlocks))
         assert(mm.executionMemoryUsed === 200L)
         assert(mm.storageMemoryUsed === 750L)
    -    assertEnsureFreeSpaceCalled(ms, 850L)
    +    assertEvictBlocksToFreeSpaceCalled(ms, 800L) // try to evict blocks...
    --- End diff --
    
    Yep, this was addressed by changing `maxMemory` to `maxStorageMemory` in the check at the start of `acquireStorageMemory`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165][SPARK-12189] Fix bugs in evictio...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10170#issuecomment-163367683
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165][SPARK-12189] Fix bugs in evictio...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10170#discussion_r46997970
  
    --- Diff: core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala ---
    @@ -100,7 +100,7 @@ private[spark] class UnifiedMemoryManager private[memory] (
           case MemoryMode.OFF_HEAP =>
             // For now, we only support on-heap caching of data, so we do not need to interact with
             // the storage pool when allocating off-heap memory. This will change in the future, though.
    -        super.acquireExecutionMemory(numBytes, taskAttemptId, memoryMode)
    +        offHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
    --- End diff --
    
    +1 to this change. It's much less brittle!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165][SPARK-12189] Fix bugs in evictio...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10170#discussion_r47037149
  
    --- Diff: core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala ---
    @@ -36,105 +37,98 @@ import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore, StorageLevel
     /**
      * Helper trait for sharing code among [[MemoryManager]] tests.
      */
    -private[memory] trait MemoryManagerSuite extends SparkFunSuite {
    +private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAfterEach {
     
    -  import MemoryManagerSuite.DEFAULT_ENSURE_FREE_SPACE_CALLED
    +  protected val evictedBlocks = new mutable.ArrayBuffer[(BlockId, BlockStatus)]
    +
    +  import MemoryManagerSuite.DEFAULT_EVICT_BLOCKS_TO_FREE_SPACE_CALLED
     
       // Note: Mockito's verify mechanism does not provide a way to reset method call counts
       // without also resetting stubbed methods. Since our test code relies on the latter,
    -  // we need to use our own variable to track invocations of `ensureFreeSpace`.
    +  // we need to use our own variable to track invocations of `evictBlocksToFreeSpace`.
     
       /**
    -   * The amount of free space requested in the last call to [[MemoryStore.ensureFreeSpace]]
    +   * The amount of space requested in the last call to [[MemoryStore.evictBlocksToFreeSpace]].
        *
    -   * This set whenever [[MemoryStore.ensureFreeSpace]] is called, and cleared when the test
    -   * code makes explicit assertions on this variable through [[assertEnsureFreeSpaceCalled]].
    +   * This set whenever [[MemoryStore.evictBlocksToFreeSpace]] is called, and cleared when the test
    +   * code makes explicit assertions on this variable through
    +   * [[assertEvictBlocksToFreeSpaceCalled]].
        */
    -  private val ensureFreeSpaceCalled = new AtomicLong(DEFAULT_ENSURE_FREE_SPACE_CALLED)
    +  private val evictBlocksToFreeSpaceCalled = new AtomicLong(0)
    +
    +  override def beforeEach(): Unit = {
    +    super.beforeEach()
    +    evictedBlocks.clear()
    +    evictBlocksToFreeSpaceCalled.set(DEFAULT_EVICT_BLOCKS_TO_FREE_SPACE_CALLED)
    +  }
     
       /**
    -   * Make a mocked [[MemoryStore]] whose [[MemoryStore.ensureFreeSpace]] method is stubbed.
    +   * Make a mocked [[MemoryStore]] whose [[MemoryStore.evictBlocksToFreeSpace]] method is stubbed.
        *
    -   * This allows our test code to release storage memory when [[MemoryStore.ensureFreeSpace]]
    -   * is called without relying on [[org.apache.spark.storage.BlockManager]] and all of its
    -   * dependencies.
    +   * This allows our test code to release storage memory when these methods are called
    +   * without relying on [[org.apache.spark.storage.BlockManager]] and all of its dependencies.
        */
       protected def makeMemoryStore(mm: MemoryManager): MemoryStore = {
    -    val ms = mock(classOf[MemoryStore])
    -    when(ms.ensureFreeSpace(anyLong(), any())).thenAnswer(ensureFreeSpaceAnswer(mm, 0))
    -    when(ms.ensureFreeSpace(any(), anyLong(), any())).thenAnswer(ensureFreeSpaceAnswer(mm, 1))
    +    val ms = mock(classOf[MemoryStore], RETURNS_SMART_NULLS)
    +    when(ms.evictBlocksToFreeSpace(any(), anyLong(), any()))
    +      .thenAnswer(evictBlocksToFreeSpaceAnswer(mm))
         mm.setMemoryStore(ms)
         ms
       }
     
       /**
    -   * Make an [[Answer]] that stubs [[MemoryStore.ensureFreeSpace]] with the right arguments.
    -   */
    -  private def ensureFreeSpaceAnswer(mm: MemoryManager, numBytesPos: Int): Answer[Boolean] = {
    +    * Simulate the part of [[MemoryStore.evictBlocksToFreeSpace]] that releases storage memory.
    +    *
    +    * This is a significant simplification of the real method, which actually drops existing
    +    * blocks based on the size of each block. Instead, here we simply release as many bytes
    +    * as needed to ensure the requested amount of free space. This allows us to set up the
    +    * test without relying on the [[org.apache.spark.storage.BlockManager]], which brings in
    +    * many other dependencies.
    +    *
    +    * Every call to this method will set a global variable, [[evictBlocksToFreeSpaceCalled]], that
    +    * records the number of bytes this is called with. This variable is expected to be cleared
    +    * by the test code later through [[assertEvictBlocksToFreeSpaceCalled]].
    +    */
    +  private def evictBlocksToFreeSpaceAnswer(mm: MemoryManager): Answer[Boolean] = {
         new Answer[Boolean] {
           override def answer(invocation: InvocationOnMock): Boolean = {
             val args = invocation.getArguments
    -        require(args.size > numBytesPos, s"bad test: expected >$numBytesPos arguments " +
    -          s"in ensureFreeSpace, found ${args.size}")
    -        require(args(numBytesPos).isInstanceOf[Long], s"bad test: expected ensureFreeSpace " +
    -          s"argument at index $numBytesPos to be a Long: ${args.mkString(", ")}")
    -        val numBytes = args(numBytesPos).asInstanceOf[Long]
    -        val success = mockEnsureFreeSpace(mm, numBytes)
    -        if (success) {
    +        val numBytesToFree = args(1).asInstanceOf[Long]
    +        assert(numBytesToFree > 0)
    +        require(evictBlocksToFreeSpaceCalled.get() === DEFAULT_EVICT_BLOCKS_TO_FREE_SPACE_CALLED,
    +          "bad test: evictBlocksToFreeSpace() variable was not reset")
    +        evictBlocksToFreeSpaceCalled.set(numBytesToFree)
    +        if (numBytesToFree <= mm.storageMemoryUsed) {
    +          // We can evict enough blocks to fulfill the request for space
    +          mm.releaseStorageMemory(numBytesToFree)
               args.last.asInstanceOf[mutable.Buffer[(BlockId, BlockStatus)]].append(
    -            (null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytes, 0L, 0L)))
    +            (null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytesToFree, 0L, 0L)))
    +          evictedBlocks.append(
    +            (null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytesToFree, 0L, 0L)))
    --- End diff --
    
    If you just add it to the one in the invocation args, then you won't be able to assert that execution memory requests resulted in the eviction of blocks, since the execution request doesn't pass in the `evictedBlocks` from the suite.
    
    A simple fix is to only add if `!(args.last eq evictedBlocks)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165][SPARK-12189] Fix bugs in evictio...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/10170#issuecomment-163086853
  
    Note to self RE: unroll fraction (just to be precise):
    
    At a high-level, the StaticMemoryManager a request for unroll memory will be able to obtain more memory if:
    
    - there is free storage memory, or
    - `currentUnrollMemory / maxStorageMemory` < `spark.memory.unrollFraction`.
    
    An unroll request can only evict blocks if  `finalUnrollMemory / maxStorageMemory` < `spark.memory.unrollFraction`, where `finalUnrollMemory` is the amount of unroll memory that would be used after the eviction was performed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165] Fix bug in eviction of storage m...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10170#issuecomment-162417990
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165][SPARK-12189] Fix bugs in evictio...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10170#issuecomment-163092604
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165][SPARK-12189] Fix bugs in evictio...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10170#discussion_r47021235
  
    --- Diff: core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala ---
    @@ -78,33 +75,37 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
         val (mm, ms) = makeThings(maxMemory)
         assert(mm.storageMemoryUsed === 0L)
         assert(mm.acquireStorageMemory(dummyBlock, 10L, evictedBlocks))
    -    // `ensureFreeSpace` should be called with the number of bytes requested
    -    assertEnsureFreeSpaceCalled(ms, 10L)
    +    assertEvictBlocksToFreeSpaceNotCalled(ms)
         assert(mm.storageMemoryUsed === 10L)
    +
         assert(mm.acquireStorageMemory(dummyBlock, 100L, evictedBlocks))
    -    assertEnsureFreeSpaceCalled(ms, 100L)
    +    assertEvictBlocksToFreeSpaceNotCalled(ms)
         assert(mm.storageMemoryUsed === 110L)
    +    assert(evictedBlocks.isEmpty)
         // Acquire more than the max, not granted
         assert(!mm.acquireStorageMemory(dummyBlock, maxMemory + 1L, evictedBlocks))
    -    assertEnsureFreeSpaceCalled(ms, maxMemory + 1L)
    +    assertEvictBlocksToFreeSpaceNotCalled(ms)
         assert(mm.storageMemoryUsed === 110L)
    +    assert(evictedBlocks.isEmpty)
         // Acquire up to the max, requests after this are still granted due to LRU eviction
         assert(mm.acquireStorageMemory(dummyBlock, maxMemory, evictedBlocks))
    -    assertEnsureFreeSpaceCalled(ms, 1000L)
    +    assertEvictBlocksToFreeSpaceCalled(ms, 110L)
         assert(mm.storageMemoryUsed === 1000L)
    +    assert(evictedBlocks.nonEmpty)
    --- End diff --
    
    OK to keep, but I don't think there's a lot to gain from asserting this


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165] Fix bug in eviction of storage m...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/10170#issuecomment-162441738
  
    I think that the test failures in `BlockManagerSuite` are related to the semantics of `maxNumBytesToFree` in `StaticMemoryManager.acquireUnrollMemory`.
    
    [Previously](https://github.com/apache/spark/pull/10170/files#diff-935c68a9803be144ed7bafdd2f756a0fL87), it looks like `maxNumBytesToFree` was the amount of memory that we'd try to free in the `memoryStore.ensureFreeSpace()` call but due to the bug fixed in this patch I believe that we'd end up in the case where `maxNumBytesToFree` is less than the amount of free storage memory, so `ensureFreeSpace()` wouldn't evict anything even though we actually need `numBytesToAcquire > maxNumBytesToFree` bytes of memory.
    
    After the fixes implemented here, we'll first claim as much free storage memory as possible, then subtract that from our memory goal and request the remaining memory via spilling. As a result, we are more prone to evict, which might be throwing off the original test case (it's a little tricky to say due to size estimation; I'll try to see if I can decouple that via mocking in order to make the test easier to reason about).
    
    One minor question of semantics: up until now (and still) it looks like `maxNumBytesToFree` has been "the maximum amount of memory to attempt to free up via spilling" and not "an upper bound on the amount of memory that will be spilled in response to a request". I believe that this might be the right interpretation, since the latter interpretation would mean that large cached blocks could never be evicted by small unroll requests.
    
    @andrewor14, is the original idea behind `spark.storage.unrollFraction` that it places an upper limit on the amount of total storage memory that can be used for unrolling? If that's the case, we can simplify `StorageMemoryPool.acquireMemory` to only accept a single `numBytesToAcquire` and cap the size of requests at `min(spark.storage.unrollFraction * maxStorageMemory - currentUnrollMemory, requestedUnrollMemory)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165][SPARK-12189] Fix bugs in evictio...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10170#issuecomment-163367435
  
    **[Test build #47433 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/47433/consoleFull)** for PR 10170 at commit [`7e9e191`](https://github.com/apache/spark/commit/7e9e191c76992813d9a0bdbb1de590776c4875a1).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165][SPARK-12189] Fix bugs in evictio...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10170#discussion_r47024437
  
    --- Diff: core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala ---
    @@ -143,7 +148,25 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
         assert(mm.acquireExecutionMemory(400L, taskAttemptId, MemoryMode.ON_HEAP) === 300L)
         assert(mm.executionMemoryUsed === 600L)
         assert(mm.storageMemoryUsed === 400L)
    -    assertEnsureFreeSpaceNotCalled(ms)
    +    assertEvictBlocksToFreeSpaceNotCalled(ms)
    +    assert(evictedBlocks.isEmpty)
    +  }
    +
    +  test("execution can evict storage blocks when storage memory is below max mem (SPARK-12165)") {
    +    val maxMemory = 1000L
    +    val taskAttemptId = 0L
    +    val (mm, ms) = makeThings(maxMemory)
    +    // Acquire enough storage memory to exceed the storage region size
    +    assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks))
    +    assertEvictBlocksToFreeSpaceNotCalled(ms)
    +    assert(mm.executionMemoryUsed === 0L)
    +    assert(mm.storageMemoryUsed === 750L)
    +    // Should now be able to require up to 500 bytes of memory
    +    assert(mm.acquireExecutionMemory(500L, taskAttemptId, MemoryMode.ON_HEAP) === 500L)
    --- End diff --
    
    Should we briefly explain SPARK-12165 since it's not at all obvious? Also I would change the numbers a little so it's easier to explain. Maybe something like:
    ```
    ...
    ...
    assert(mm.storageMemoryUsed === 700L)
    // SPARK-12165: previously, MemoryStore would not evict anything because it would
    // mistakenly think that the 300 bytes of free space is still available even after
    // using it to expand the execution pool. Consequently, no blocks were dropped
    // and the following call used to grant only 300 bytes to execution.
    assert(mm.acquireExecutionMemory(500L, taskAttemptId, MemoryMode.ON_HEAP) === 500L)
    assertEvictBlocksToFreeSpaceCalled(ms, 200L)
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165][SPARK-12189] Fix bugs in evictio...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10170#discussion_r47012495
  
    --- Diff: core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala ---
    @@ -36,105 +37,98 @@ import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore, StorageLevel
     /**
      * Helper trait for sharing code among [[MemoryManager]] tests.
      */
    -private[memory] trait MemoryManagerSuite extends SparkFunSuite {
    +private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAfterEach {
     
    -  import MemoryManagerSuite.DEFAULT_ENSURE_FREE_SPACE_CALLED
    +  protected val evictedBlocks = new mutable.ArrayBuffer[(BlockId, BlockStatus)]
    +
    +  import MemoryManagerSuite.DEFAULT_EVICT_BLOCKS_TO_FREE_SPACE_CALLED
     
       // Note: Mockito's verify mechanism does not provide a way to reset method call counts
       // without also resetting stubbed methods. Since our test code relies on the latter,
    -  // we need to use our own variable to track invocations of `ensureFreeSpace`.
    +  // we need to use our own variable to track invocations of `evictBlocksToFreeSpace`.
     
       /**
    -   * The amount of free space requested in the last call to [[MemoryStore.ensureFreeSpace]]
    +   * The amount of space requested in the last call to [[MemoryStore.evictBlocksToFreeSpace]].
        *
    -   * This set whenever [[MemoryStore.ensureFreeSpace]] is called, and cleared when the test
    -   * code makes explicit assertions on this variable through [[assertEnsureFreeSpaceCalled]].
    +   * This set whenever [[MemoryStore.evictBlocksToFreeSpace]] is called, and cleared when the test
    +   * code makes explicit assertions on this variable through
    +   * [[assertEvictBlocksToFreeSpaceCalled]].
        */
    -  private val ensureFreeSpaceCalled = new AtomicLong(DEFAULT_ENSURE_FREE_SPACE_CALLED)
    +  private val evictBlocksToFreeSpaceCalled = new AtomicLong(0)
    --- End diff --
    
    should this start at the `DEFAULT`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165][SPARK-12189] Fix bugs in evictio...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10170#issuecomment-163095804
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165][SPARK-12189] Fix bugs in evictio...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10170#discussion_r47021855
  
    --- Diff: core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala ---
    @@ -117,25 +118,29 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
         val (mm, ms) = makeThings(maxMemory)
         // Acquire enough storage memory to exceed the storage region
         assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks))
    -    assertEnsureFreeSpaceCalled(ms, 750L)
    +    assertEvictBlocksToFreeSpaceNotCalled(ms)
         assert(mm.executionMemoryUsed === 0L)
         assert(mm.storageMemoryUsed === 750L)
    +    assert(evictedBlocks.isEmpty)
         // Execution needs to request 250 bytes to evict storage memory
         assert(mm.acquireExecutionMemory(100L, taskAttemptId, MemoryMode.ON_HEAP) === 100L)
         assert(mm.executionMemoryUsed === 100L)
         assert(mm.storageMemoryUsed === 750L)
    -    assertEnsureFreeSpaceNotCalled(ms)
    +    assertEvictBlocksToFreeSpaceNotCalled(ms)
    +    assert(evictedBlocks.isEmpty)
         // Execution wants 200 bytes but only 150 are free, so storage is evicted
         assert(mm.acquireExecutionMemory(200L, taskAttemptId, MemoryMode.ON_HEAP) === 200L)
         assert(mm.executionMemoryUsed === 300L)
    -    assertEnsureFreeSpaceCalled(ms, 50L)
    +    assertEvictBlocksToFreeSpaceCalled(ms, 50L)
         assert(mm.executionMemoryUsed === 300L)
    --- End diff --
    
    duplicated assert? This should check `mm.storageMemoryUsed === 700L` instead


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165][SPARK-12189] Fix bugs in evictio...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10170#issuecomment-163095758
  
    **[Test build #47383 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/47383/consoleFull)** for PR 10170 at commit [`a43ed34`](https://github.com/apache/spark/commit/a43ed343f81dc2001718dcd1812ed85a1d849d72).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165][SPARK-12189] Fix bugs in evictio...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10170#discussion_r47000736
  
    --- Diff: core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala ---
    @@ -53,6 +53,20 @@ private[spark] class StaticMemoryManager(
         (maxStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong
       }
     
    +  override def acquireStorageMemory(
    +      blockId: BlockId,
    +      numBytes: Long,
    +      evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
    +    if (numBytes > storageMemoryPool.poolSize) {
    --- End diff --
    
    if you make the other one (in UMM) `numBytes > maxStorageMemory` then it would make sense to do the same here, so it's more consistent (small readability improvement)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165][SPARK-12189] Fix bugs in evictio...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10170#discussion_r47000196
  
    --- Diff: core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala ---
    @@ -110,6 +110,12 @@ private[spark] class UnifiedMemoryManager private[memory] (
           evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
         assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory)
         assert(numBytes >= 0)
    +    if (numBytes > maxMemory) {
    +      // Fail fast if the block simply won't fit
    +      logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
    +        s"memory limit ($maxMemory bytes)")
    --- End diff --
    
    this check is actually duplicated now that you've removed it from `ensureFreeSpace`, but I can see that it stops us from even resizing the memory pools early so it's OK to keep here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165][SPARK-12189] Fix bugs in evictio...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10170#discussion_r47017514
  
    --- Diff: core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala ---
    @@ -159,13 +156,13 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite {
         assert(mm.acquireUnrollMemory(dummyBlock, 500L, evictedBlocks))
         // `spark.storage.unrollFraction` is 0.4, so the max unroll space is 400 bytes.
         // Since we already occupy 60 bytes, we will try to ensure only 400 - 60 = 340 bytes.
    --- End diff --
    
    outdated comment


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165][SPARK-12189] Fix bugs in evictio...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10170#issuecomment-163367685
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/47433/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165][SPARK-12189] Fix bugs in evictio...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10170#discussion_r47021444
  
    --- Diff: core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala ---
    @@ -78,33 +75,37 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
         val (mm, ms) = makeThings(maxMemory)
         assert(mm.storageMemoryUsed === 0L)
         assert(mm.acquireStorageMemory(dummyBlock, 10L, evictedBlocks))
    -    // `ensureFreeSpace` should be called with the number of bytes requested
    -    assertEnsureFreeSpaceCalled(ms, 10L)
    +    assertEvictBlocksToFreeSpaceNotCalled(ms)
         assert(mm.storageMemoryUsed === 10L)
    +
         assert(mm.acquireStorageMemory(dummyBlock, 100L, evictedBlocks))
    -    assertEnsureFreeSpaceCalled(ms, 100L)
    +    assertEvictBlocksToFreeSpaceNotCalled(ms)
         assert(mm.storageMemoryUsed === 110L)
    +    assert(evictedBlocks.isEmpty)
         // Acquire more than the max, not granted
         assert(!mm.acquireStorageMemory(dummyBlock, maxMemory + 1L, evictedBlocks))
    -    assertEnsureFreeSpaceCalled(ms, maxMemory + 1L)
    +    assertEvictBlocksToFreeSpaceNotCalled(ms)
         assert(mm.storageMemoryUsed === 110L)
    +    assert(evictedBlocks.isEmpty)
         // Acquire up to the max, requests after this are still granted due to LRU eviction
         assert(mm.acquireStorageMemory(dummyBlock, maxMemory, evictedBlocks))
    -    assertEnsureFreeSpaceCalled(ms, 1000L)
    +    assertEvictBlocksToFreeSpaceCalled(ms, 110L)
         assert(mm.storageMemoryUsed === 1000L)
    +    assert(evictedBlocks.nonEmpty)
    +    evictedBlocks.clear()
         assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
    -    assertEnsureFreeSpaceCalled(ms, 1L)
    +    assertEvictBlocksToFreeSpaceCalled(ms, 1L)
         assert(mm.storageMemoryUsed === 1000L)
    --- End diff --
    
    same here as in the `StaticMemoryManagerSuite` case, we should add a comment to explain why this is still 1000


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165][SPARK-12189] Fix bugs in evictio...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10170#issuecomment-163068719
  
    **[Test build #47378 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/47378/consoleFull)** for PR 10170 at commit [`d182bdc`](https://github.com/apache/spark/commit/d182bdcf4649d2cf9d961177a8ea8764ba69e23e).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165][SPARK-12189] Fix bugs in evictio...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10170#discussion_r47000515
  
    --- Diff: core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala ---
    @@ -110,6 +110,12 @@ private[spark] class UnifiedMemoryManager private[memory] (
           evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
         assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory)
         assert(numBytes >= 0)
    +    if (numBytes > maxMemory) {
    +      // Fail fast if the block simply won't fit
    +      logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
    +        s"memory limit ($maxMemory bytes)")
    --- End diff --
    
    should this check be `numBytes > maxStorageMemory`? I don't think it makes much of a difference in practice but that would seem more correct


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165] Fix bug in eviction of storage m...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10170#issuecomment-162417956
  
    **[Test build #47251 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/47251/consoleFull)** for PR 10170 at commit [`0eac7da`](https://github.com/apache/spark/commit/0eac7da041326cfbec2c1db2f279cb655744f90e).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:\n  * `  public abstract static class PrefixComputer `\n


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165][SPARK-12189] Fix bugs in evictio...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10170#discussion_r47029505
  
    --- Diff: core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala ---
    @@ -154,32 +177,34 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
         assert(mm.acquireExecutionMemory(800L, taskAttemptId, MemoryMode.ON_HEAP) === 800L)
         assert(mm.executionMemoryUsed === 800L)
         assert(mm.storageMemoryUsed === 0L)
    -    assertEnsureFreeSpaceNotCalled(ms)
    +    assertEvictBlocksToFreeSpaceNotCalled(ms)
         // Storage should not be able to evict execution
         assert(mm.acquireStorageMemory(dummyBlock, 100L, evictedBlocks))
         assert(mm.executionMemoryUsed === 800L)
         assert(mm.storageMemoryUsed === 100L)
    -    assertEnsureFreeSpaceCalled(ms, 100L)
    +    assertEvictBlocksToFreeSpaceNotCalled(ms)
         assert(!mm.acquireStorageMemory(dummyBlock, 250L, evictedBlocks))
         assert(mm.executionMemoryUsed === 800L)
         assert(mm.storageMemoryUsed === 100L)
    -    assertEnsureFreeSpaceCalled(ms, 250L)
    +    assertEvictBlocksToFreeSpaceCalled(ms, 150L) // try to evict blocks ...
    +    assert(evictedBlocks.isEmpty) // ... but don't evict since evicting will not be sufficient
         mm.releaseExecutionMemory(maxMemory, taskAttemptId, MemoryMode.ON_HEAP)
         mm.releaseStorageMemory(maxMemory)
         // Acquire some execution memory again, but this time keep it within the execution region
         assert(mm.acquireExecutionMemory(200L, taskAttemptId, MemoryMode.ON_HEAP) === 200L)
         assert(mm.executionMemoryUsed === 200L)
         assert(mm.storageMemoryUsed === 0L)
    -    assertEnsureFreeSpaceNotCalled(ms)
    +    assertEvictBlocksToFreeSpaceNotCalled(ms)
         // Storage should still not be able to evict execution
         assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks))
         assert(mm.executionMemoryUsed === 200L)
         assert(mm.storageMemoryUsed === 750L)
    -    assertEnsureFreeSpaceCalled(ms, 750L)
    +    assertEvictBlocksToFreeSpaceNotCalled(ms) // since there was 800 bytes free
         assert(!mm.acquireStorageMemory(dummyBlock, 850L, evictedBlocks))
         assert(mm.executionMemoryUsed === 200L)
         assert(mm.storageMemoryUsed === 750L)
    -    assertEnsureFreeSpaceCalled(ms, 850L)
    +    assertEvictBlocksToFreeSpaceCalled(ms, 800L) // try to evict blocks...
    --- End diff --
    
    same here, if you take the suggestion then this will not happen


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165][SPARK-12189] Fix bugs in evictio...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10170#issuecomment-163072368
  
    **[Test build #47380 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/47380/consoleFull)** for PR 10170 at commit [`f6fb406`](https://github.com/apache/spark/commit/f6fb4066e52baa6bf6c3680db6cdc967330a110c).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165] Fix bug in eviction of storage m...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10170#issuecomment-162405970
  
    **[Test build #47251 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/47251/consoleFull)** for PR 10170 at commit [`0eac7da`](https://github.com/apache/spark/commit/0eac7da041326cfbec2c1db2f279cb655744f90e).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165][SPARK-12189] Fix bugs in evictio...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10170#discussion_r46997361
  
    --- Diff: core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala ---
    @@ -128,9 +132,11 @@ class StorageMemoryPool(lock: Object) extends MemoryPool(lock) with Logging {
         } else {
           // If reclaiming free memory did not adequately shrink the pool, begin evicting blocks:
           val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
    -      memoryStore.ensureFreeSpace(spaceToFree - spaceFreedByReleasingUnusedMemory, evictedBlocks)
    +      memoryStore.evictBlocksToFreeSpace(
    +        None, spaceToFree - spaceFreedByReleasingUnusedMemory, evictedBlocks)
    --- End diff --
    
    maybe a more readable way to write this is:
    ```
    val spaceFreedByReleasingUnusedMemory = math.min(...)
    decrementPoolSize(spaceFreedByReleasingUnusedMemory)
    val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory
    if (remainingSpaceToFree > 0) {
      // If reclaiming free memory did not ...
    } else {
      spaceFreedByReleasingUnusedMemory
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165][SPARK-12189] Fix bugs in evictio...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10170#discussion_r47028774
  
    --- Diff: core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala ---
    @@ -73,27 +73,31 @@ class StorageMemoryPool(lock: Object) extends MemoryPool(lock) with Logging {
        *
        * @param blockId the ID of the block we are acquiring storage memory for
        * @param numBytesToAcquire the size of this block
    -   * @param numBytesToFree the size of space to be freed through evicting blocks
    +   * @param maxNumBytesToFree the maximum amount of space to be freed through evicting blocks
        * @return whether all N bytes were successfully granted.
        */
       def acquireMemory(
           blockId: BlockId,
           numBytesToAcquire: Long,
    -      numBytesToFree: Long,
    +      maxNumBytesToFree: Long,
           evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = lock.synchronized {
         assert(numBytesToAcquire >= 0)
    -    assert(numBytesToFree >= 0)
    +    assert(maxNumBytesToFree >= 0)
         assert(memoryUsed <= poolSize)
    -    memoryStore.ensureFreeSpace(blockId, numBytesToFree, evictedBlocks)
    -    // Register evicted blocks, if any, with the active task metrics
    -    Option(TaskContext.get()).foreach { tc =>
    -      val metrics = tc.taskMetrics()
    -      val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
    -      metrics.updatedBlocks = Some(lastUpdatedBlocks ++ evictedBlocks.toSeq)
    +    if (numBytesToAcquire > memoryFree && maxNumBytesToFree > 0) {
    +      val additionalMemoryRequired = numBytesToAcquire - memoryFree
    +      memoryStore.evictBlocksToFreeSpace(
    --- End diff --
    
    (I should add that I think the existing code is correct, but rewriting it would make the semantics clearer and the tests easier to follow)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165][SPARK-12189] Fix bugs in evictio...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10170#issuecomment-163082947
  
    **[Test build #47383 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/47383/consoleFull)** for PR 10170 at commit [`a43ed34`](https://github.com/apache/spark/commit/a43ed343f81dc2001718dcd1812ed85a1d849d72).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165][SPARK-12189] Fix bugs in evictio...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/10170


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165][SPARK-12189] Fix bugs in evictio...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10170#discussion_r47033739
  
    --- Diff: core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala ---
    @@ -143,7 +148,25 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
         assert(mm.acquireExecutionMemory(400L, taskAttemptId, MemoryMode.ON_HEAP) === 300L)
         assert(mm.executionMemoryUsed === 600L)
         assert(mm.storageMemoryUsed === 400L)
    -    assertEnsureFreeSpaceNotCalled(ms)
    +    assertEvictBlocksToFreeSpaceNotCalled(ms)
    +    assert(evictedBlocks.isEmpty)
    +  }
    +
    +  test("execution can evict storage blocks when storage memory is below max mem (SPARK-12165)") {
    +    val maxMemory = 1000L
    +    val taskAttemptId = 0L
    +    val (mm, ms) = makeThings(maxMemory)
    +    // Acquire enough storage memory to exceed the storage region size
    +    assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks))
    +    assertEvictBlocksToFreeSpaceNotCalled(ms)
    +    assert(mm.executionMemoryUsed === 0L)
    +    assert(mm.storageMemoryUsed === 750L)
    +    // Should now be able to require up to 500 bytes of memory
    +    assert(mm.acquireExecutionMemory(500L, taskAttemptId, MemoryMode.ON_HEAP) === 500L)
    --- End diff --
    
    By the way I included these changes as part of my PR: https://github.com/JoshRosen/spark/pull/8


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165][SPARK-12189] Fix bugs in evictio...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10170#issuecomment-163080867
  
    **[Test build #47382 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/47382/consoleFull)** for PR 10170 at commit [`e2090d1`](https://github.com/apache/spark/commit/e2090d1e11109b8bfd232d3a33d9acbf775e0872).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165][SPARK-12189] Fix bugs in evictio...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/10170#issuecomment-163363703
  
    Updated PR description to remove the following text from the end (which will be incorporated into separate PRs, most likely):
    
    
    ## TODOs
    
    - [ ] Add stronger assertions or a dedicated regression test for the `_memoryUsed < 0` bug, which was uncovered while testing this patch and a related fix for SPARK-12155.
    - [ ] See whether we can now remove the confusing `MemoryStore.maxMemory` method.
    
    /cc @andrewor14 @zsxwing @yhuai @rxin 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165][SPARK-12189] Fix bugs in evictio...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10170#discussion_r47012533
  
    --- Diff: core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala ---
    @@ -36,105 +37,98 @@ import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore, StorageLevel
     /**
      * Helper trait for sharing code among [[MemoryManager]] tests.
      */
    -private[memory] trait MemoryManagerSuite extends SparkFunSuite {
    +private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAfterEach {
     
    -  import MemoryManagerSuite.DEFAULT_ENSURE_FREE_SPACE_CALLED
    +  protected val evictedBlocks = new mutable.ArrayBuffer[(BlockId, BlockStatus)]
    +
    +  import MemoryManagerSuite.DEFAULT_EVICT_BLOCKS_TO_FREE_SPACE_CALLED
     
       // Note: Mockito's verify mechanism does not provide a way to reset method call counts
       // without also resetting stubbed methods. Since our test code relies on the latter,
    -  // we need to use our own variable to track invocations of `ensureFreeSpace`.
    +  // we need to use our own variable to track invocations of `evictBlocksToFreeSpace`.
     
       /**
    -   * The amount of free space requested in the last call to [[MemoryStore.ensureFreeSpace]]
    +   * The amount of space requested in the last call to [[MemoryStore.evictBlocksToFreeSpace]].
        *
    -   * This set whenever [[MemoryStore.ensureFreeSpace]] is called, and cleared when the test
    -   * code makes explicit assertions on this variable through [[assertEnsureFreeSpaceCalled]].
    +   * This set whenever [[MemoryStore.evictBlocksToFreeSpace]] is called, and cleared when the test
    +   * code makes explicit assertions on this variable through
    +   * [[assertEvictBlocksToFreeSpaceCalled]].
        */
    -  private val ensureFreeSpaceCalled = new AtomicLong(DEFAULT_ENSURE_FREE_SPACE_CALLED)
    +  private val evictBlocksToFreeSpaceCalled = new AtomicLong(0)
    --- End diff --
    
    I see, you do this in `beforeEach` anyway


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165][SPARK-12189] Fix bugs in evictio...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10170#issuecomment-163086561
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/47380/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165][SPARK-12189] Fix bugs in evictio...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on the pull request:

    https://github.com/apache/spark/pull/10170#issuecomment-163368267
  
    Merging into master and 1.6!!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165][SPARK-12189] Fix bugs in evictio...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10170#issuecomment-163337431
  
    **[Test build #47433 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/47433/consoleFull)** for PR 10170 at commit [`7e9e191`](https://github.com/apache/spark/commit/7e9e191c76992813d9a0bdbb1de590776c4875a1).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165][SPARK-12189] Fix bugs in evictio...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10170#issuecomment-163086560
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165][SPARK-12189] Fix bugs in evictio...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10170#issuecomment-163085035
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165][SPARK-12189] Fix bugs in evictio...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10170#discussion_r46994074
  
    --- Diff: core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala ---
    @@ -73,27 +73,31 @@ class StorageMemoryPool(lock: Object) extends MemoryPool(lock) with Logging {
        *
        * @param blockId the ID of the block we are acquiring storage memory for
        * @param numBytesToAcquire the size of this block
    -   * @param numBytesToFree the size of space to be freed through evicting blocks
    +   * @param maxNumBytesToFree the maximum amount of space to be freed through evicting blocks
        * @return whether all N bytes were successfully granted.
        */
       def acquireMemory(
           blockId: BlockId,
           numBytesToAcquire: Long,
    -      numBytesToFree: Long,
    +      maxNumBytesToFree: Long,
           evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = lock.synchronized {
         assert(numBytesToAcquire >= 0)
    -    assert(numBytesToFree >= 0)
    +    assert(maxNumBytesToFree >= 0)
         assert(memoryUsed <= poolSize)
    -    memoryStore.ensureFreeSpace(blockId, numBytesToFree, evictedBlocks)
    -    // Register evicted blocks, if any, with the active task metrics
    -    Option(TaskContext.get()).foreach { tc =>
    -      val metrics = tc.taskMetrics()
    -      val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
    -      metrics.updatedBlocks = Some(lastUpdatedBlocks ++ evictedBlocks.toSeq)
    +    if (numBytesToAcquire > memoryFree && maxNumBytesToFree > 0) {
    +      val additionalMemoryRequired = numBytesToAcquire - memoryFree
    +      memoryStore.evictBlocksToFreeSpace(
    +        Some(blockId), Math.min(maxNumBytesToFree, additionalMemoryRequired), evictedBlocks)
    --- End diff --
    
    can we use `math.min` like we do in other places


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165][SPARK-12189] Fix bugs in evictio...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10170#issuecomment-163084981
  
    **[Test build #47378 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/47378/consoleFull)** for PR 10170 at commit [`d182bdc`](https://github.com/apache/spark/commit/d182bdcf4649d2cf9d961177a8ea8764ba69e23e).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165][SPARK-12189] Fix bugs in evictio...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on the pull request:

    https://github.com/apache/spark/pull/10170#issuecomment-163052886
  
    @JoshRosen despite the number of comments I left I think this patch looks good. I did a close review to verify its correctness and that the two bugs were real issues. On the side I will investigate the test failures and hopefully get this merged soon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165][SPARK-12189] Fix bugs in evictio...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on the pull request:

    https://github.com/apache/spark/pull/10170#issuecomment-163362662
  
    LGTM, I'll merge this once tests pass.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165][SPARK-12189] Fix bugs in evictio...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10170#discussion_r47028671
  
    --- Diff: core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala ---
    @@ -73,27 +73,31 @@ class StorageMemoryPool(lock: Object) extends MemoryPool(lock) with Logging {
        *
        * @param blockId the ID of the block we are acquiring storage memory for
        * @param numBytesToAcquire the size of this block
    -   * @param numBytesToFree the size of space to be freed through evicting blocks
    +   * @param maxNumBytesToFree the maximum amount of space to be freed through evicting blocks
        * @return whether all N bytes were successfully granted.
        */
       def acquireMemory(
           blockId: BlockId,
           numBytesToAcquire: Long,
    -      numBytesToFree: Long,
    +      maxNumBytesToFree: Long,
           evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = lock.synchronized {
         assert(numBytesToAcquire >= 0)
    -    assert(numBytesToFree >= 0)
    +    assert(maxNumBytesToFree >= 0)
         assert(memoryUsed <= poolSize)
    -    memoryStore.ensureFreeSpace(blockId, numBytesToFree, evictedBlocks)
    -    // Register evicted blocks, if any, with the active task metrics
    -    Option(TaskContext.get()).foreach { tc =>
    -      val metrics = tc.taskMetrics()
    -      val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
    -      metrics.updatedBlocks = Some(lastUpdatedBlocks ++ evictedBlocks.toSeq)
    +    if (numBytesToAcquire > memoryFree && maxNumBytesToFree > 0) {
    +      val additionalMemoryRequired = numBytesToAcquire - memoryFree
    +      memoryStore.evictBlocksToFreeSpace(
    --- End diff --
    
    after looking at the test case I think we should add a check here to do this only if there's a chance that evicting can free us enough memory, i.e.
    ```
    val additionalMemoryRequired = math.max(0, numBytesToAcquire - memoryFree)
    val numBytesToFree = math.min(maxNumBytesToFree, additionalMemoryRequired)
    // Only evict blocks if there is a chance that doing so will grant us what we want
    if (numBytesToFree > 0 && numBytesToFree <= memoryUsed) {
      memoryStore.evictBlocksToFreeSpace(...)
      ...
    }
    ```
    If we do this then we can't have the case where we try to free 150B from evicting blocks when there are only 100B worth of blocks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165][SPARK-12189] Fix bugs in evictio...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10170#discussion_r47021623
  
    --- Diff: core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala ---
    @@ -117,25 +118,29 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
         val (mm, ms) = makeThings(maxMemory)
         // Acquire enough storage memory to exceed the storage region
         assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks))
    -    assertEnsureFreeSpaceCalled(ms, 750L)
    +    assertEvictBlocksToFreeSpaceNotCalled(ms)
         assert(mm.executionMemoryUsed === 0L)
         assert(mm.storageMemoryUsed === 750L)
    +    assert(evictedBlocks.isEmpty)
         // Execution needs to request 250 bytes to evict storage memory
         assert(mm.acquireExecutionMemory(100L, taskAttemptId, MemoryMode.ON_HEAP) === 100L)
         assert(mm.executionMemoryUsed === 100L)
         assert(mm.storageMemoryUsed === 750L)
    -    assertEnsureFreeSpaceNotCalled(ms)
    +    assertEvictBlocksToFreeSpaceNotCalled(ms)
    +    assert(evictedBlocks.isEmpty)
    --- End diff --
    
    should this assert just go into the body of the previous one? i.e. `assertEvictBlocksToFreeSpaceNotCalled` internally also checks that `evictedBlocks` is empty. I notice we have these 2 lines in quite a few places.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165][SPARK-12189] Fix bugs in evictio...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10170#issuecomment-163086522
  
    **[Test build #47380 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/47380/consoleFull)** for PR 10170 at commit [`f6fb406`](https://github.com/apache/spark/commit/f6fb4066e52baa6bf6c3680db6cdc967330a110c).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165][SPARK-12189] Fix bugs in evictio...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10170#discussion_r47037467
  
    --- Diff: core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala ---
    @@ -110,6 +110,12 @@ private[spark] class UnifiedMemoryManager private[memory] (
           evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
         assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory)
         assert(numBytes >= 0)
    +    if (numBytes > maxMemory) {
    +      // Fail fast if the block simply won't fit
    +      logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
    +        s"memory limit ($maxMemory bytes)")
    --- End diff --
    
    Yep, good catch: here in UMM, `maxStorageMemory` is `maxMemory - onHeapExecutionMemoryPool.memoryUsed`, reflecting the fact that storage cannot evict execution.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165][SPARK-12189] Fix bugs in evictio...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10170#discussion_r47012724
  
    --- Diff: core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala ---
    @@ -36,105 +37,98 @@ import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore, StorageLevel
     /**
      * Helper trait for sharing code among [[MemoryManager]] tests.
      */
    -private[memory] trait MemoryManagerSuite extends SparkFunSuite {
    +private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAfterEach {
     
    -  import MemoryManagerSuite.DEFAULT_ENSURE_FREE_SPACE_CALLED
    +  protected val evictedBlocks = new mutable.ArrayBuffer[(BlockId, BlockStatus)]
    +
    +  import MemoryManagerSuite.DEFAULT_EVICT_BLOCKS_TO_FREE_SPACE_CALLED
     
       // Note: Mockito's verify mechanism does not provide a way to reset method call counts
       // without also resetting stubbed methods. Since our test code relies on the latter,
    -  // we need to use our own variable to track invocations of `ensureFreeSpace`.
    +  // we need to use our own variable to track invocations of `evictBlocksToFreeSpace`.
     
       /**
    -   * The amount of free space requested in the last call to [[MemoryStore.ensureFreeSpace]]
    +   * The amount of space requested in the last call to [[MemoryStore.evictBlocksToFreeSpace]].
        *
    -   * This set whenever [[MemoryStore.ensureFreeSpace]] is called, and cleared when the test
    -   * code makes explicit assertions on this variable through [[assertEnsureFreeSpaceCalled]].
    +   * This set whenever [[MemoryStore.evictBlocksToFreeSpace]] is called, and cleared when the test
    +   * code makes explicit assertions on this variable through
    +   * [[assertEvictBlocksToFreeSpaceCalled]].
        */
    -  private val ensureFreeSpaceCalled = new AtomicLong(DEFAULT_ENSURE_FREE_SPACE_CALLED)
    +  private val evictBlocksToFreeSpaceCalled = new AtomicLong(0)
    +
    +  override def beforeEach(): Unit = {
    +    super.beforeEach()
    +    evictedBlocks.clear()
    +    evictBlocksToFreeSpaceCalled.set(DEFAULT_EVICT_BLOCKS_TO_FREE_SPACE_CALLED)
    +  }
     
       /**
    -   * Make a mocked [[MemoryStore]] whose [[MemoryStore.ensureFreeSpace]] method is stubbed.
    +   * Make a mocked [[MemoryStore]] whose [[MemoryStore.evictBlocksToFreeSpace]] method is stubbed.
        *
    -   * This allows our test code to release storage memory when [[MemoryStore.ensureFreeSpace]]
    -   * is called without relying on [[org.apache.spark.storage.BlockManager]] and all of its
    -   * dependencies.
    +   * This allows our test code to release storage memory when these methods are called
    +   * without relying on [[org.apache.spark.storage.BlockManager]] and all of its dependencies.
        */
       protected def makeMemoryStore(mm: MemoryManager): MemoryStore = {
    -    val ms = mock(classOf[MemoryStore])
    -    when(ms.ensureFreeSpace(anyLong(), any())).thenAnswer(ensureFreeSpaceAnswer(mm, 0))
    -    when(ms.ensureFreeSpace(any(), anyLong(), any())).thenAnswer(ensureFreeSpaceAnswer(mm, 1))
    +    val ms = mock(classOf[MemoryStore], RETURNS_SMART_NULLS)
    +    when(ms.evictBlocksToFreeSpace(any(), anyLong(), any()))
    +      .thenAnswer(evictBlocksToFreeSpaceAnswer(mm))
         mm.setMemoryStore(ms)
         ms
       }
     
       /**
    -   * Make an [[Answer]] that stubs [[MemoryStore.ensureFreeSpace]] with the right arguments.
    -   */
    -  private def ensureFreeSpaceAnswer(mm: MemoryManager, numBytesPos: Int): Answer[Boolean] = {
    +    * Simulate the part of [[MemoryStore.evictBlocksToFreeSpace]] that releases storage memory.
    +    *
    +    * This is a significant simplification of the real method, which actually drops existing
    +    * blocks based on the size of each block. Instead, here we simply release as many bytes
    +    * as needed to ensure the requested amount of free space. This allows us to set up the
    +    * test without relying on the [[org.apache.spark.storage.BlockManager]], which brings in
    +    * many other dependencies.
    +    *
    +    * Every call to this method will set a global variable, [[evictBlocksToFreeSpaceCalled]], that
    +    * records the number of bytes this is called with. This variable is expected to be cleared
    +    * by the test code later through [[assertEvictBlocksToFreeSpaceCalled]].
    +    */
    +  private def evictBlocksToFreeSpaceAnswer(mm: MemoryManager): Answer[Boolean] = {
         new Answer[Boolean] {
           override def answer(invocation: InvocationOnMock): Boolean = {
             val args = invocation.getArguments
    -        require(args.size > numBytesPos, s"bad test: expected >$numBytesPos arguments " +
    -          s"in ensureFreeSpace, found ${args.size}")
    -        require(args(numBytesPos).isInstanceOf[Long], s"bad test: expected ensureFreeSpace " +
    -          s"argument at index $numBytesPos to be a Long: ${args.mkString(", ")}")
    -        val numBytes = args(numBytesPos).asInstanceOf[Long]
    -        val success = mockEnsureFreeSpace(mm, numBytes)
    -        if (success) {
    +        val numBytesToFree = args(1).asInstanceOf[Long]
    +        assert(numBytesToFree > 0)
    +        require(evictBlocksToFreeSpaceCalled.get() === DEFAULT_EVICT_BLOCKS_TO_FREE_SPACE_CALLED,
    +          "bad test: evictBlocksToFreeSpace() variable was not reset")
    +        evictBlocksToFreeSpaceCalled.set(numBytesToFree)
    +        if (numBytesToFree <= mm.storageMemoryUsed) {
    +          // We can evict enough blocks to fulfill the request for space
    +          mm.releaseStorageMemory(numBytesToFree)
               args.last.asInstanceOf[mutable.Buffer[(BlockId, BlockStatus)]].append(
    -            (null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytes, 0L, 0L)))
    +            (null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytesToFree, 0L, 0L)))
    +          evictedBlocks.append(
    +            (null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytesToFree, 0L, 0L)))
    --- End diff --
    
    as mentioned offline, this will likely add the same block to `evictedBlocks` twice


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165][SPARK-12189] Fix bugs in evictio...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10170#discussion_r47020540
  
    --- Diff: core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala ---
    @@ -159,13 +156,13 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite {
         assert(mm.acquireUnrollMemory(dummyBlock, 500L, evictedBlocks))
         // `spark.storage.unrollFraction` is 0.4, so the max unroll space is 400 bytes.
         // Since we already occupy 60 bytes, we will try to ensure only 400 - 60 = 340 bytes.
    --- End diff --
    
    actually, this whole test should be rewritten to trigger the case where unroll evicts part of existing storage. I have some changes here https://github.com/andrewor14/spark/commit/78cd2be5efd14dcad4fcd648f6815d741c09694d that are passing tests. I'll submit a PR against your branch shortly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165][SPARK-12189] Fix bugs in evictio...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/10170#issuecomment-163092566
  
    **[Test build #47382 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/47382/consoleFull)** for PR 10170 at commit [`e2090d1`](https://github.com/apache/spark/commit/e2090d1e11109b8bfd232d3a33d9acbf775e0872).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165][SPARK-12189] Fix bugs in evictio...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10170#discussion_r47032863
  
    --- Diff: core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala ---
    @@ -143,7 +148,25 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
         assert(mm.acquireExecutionMemory(400L, taskAttemptId, MemoryMode.ON_HEAP) === 300L)
         assert(mm.executionMemoryUsed === 600L)
         assert(mm.storageMemoryUsed === 400L)
    -    assertEnsureFreeSpaceNotCalled(ms)
    +    assertEvictBlocksToFreeSpaceNotCalled(ms)
    +    assert(evictedBlocks.isEmpty)
    +  }
    +
    +  test("execution can evict storage blocks when storage memory is below max mem (SPARK-12165)") {
    +    val maxMemory = 1000L
    +    val taskAttemptId = 0L
    +    val (mm, ms) = makeThings(maxMemory)
    +    // Acquire enough storage memory to exceed the storage region size
    +    assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks))
    +    assertEvictBlocksToFreeSpaceNotCalled(ms)
    +    assert(mm.executionMemoryUsed === 0L)
    +    assert(mm.storageMemoryUsed === 750L)
    +    // Should now be able to require up to 500 bytes of memory
    +    assert(mm.acquireExecutionMemory(500L, taskAttemptId, MemoryMode.ON_HEAP) === 500L)
    --- End diff --
    
    also, I think the test name should be something like
    ```
    requesting execution memory less than free memory should evict storage (SPARK-12165)
    ```
    The problem isn't that "storage memory is below than max memory". Even in the existing test "eviction evicts storage", storage memory (750) is below the max memory (1000).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165][SPARK-12189] Fix bugs in evictio...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10170#discussion_r47029205
  
    --- Diff: core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala ---
    @@ -154,32 +177,34 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
         assert(mm.acquireExecutionMemory(800L, taskAttemptId, MemoryMode.ON_HEAP) === 800L)
         assert(mm.executionMemoryUsed === 800L)
         assert(mm.storageMemoryUsed === 0L)
    -    assertEnsureFreeSpaceNotCalled(ms)
    +    assertEvictBlocksToFreeSpaceNotCalled(ms)
         // Storage should not be able to evict execution
         assert(mm.acquireStorageMemory(dummyBlock, 100L, evictedBlocks))
         assert(mm.executionMemoryUsed === 800L)
         assert(mm.storageMemoryUsed === 100L)
    -    assertEnsureFreeSpaceCalled(ms, 100L)
    +    assertEvictBlocksToFreeSpaceNotCalled(ms)
         assert(!mm.acquireStorageMemory(dummyBlock, 250L, evictedBlocks))
         assert(mm.executionMemoryUsed === 800L)
         assert(mm.storageMemoryUsed === 100L)
    -    assertEnsureFreeSpaceCalled(ms, 250L)
    +    assertEvictBlocksToFreeSpaceCalled(ms, 150L) // try to evict blocks ...
    +    assert(evictedBlocks.isEmpty) // ... but don't evict since evicting will not be sufficient
         mm.releaseExecutionMemory(maxMemory, taskAttemptId, MemoryMode.ON_HEAP)
         mm.releaseStorageMemory(maxMemory)
         // Acquire some execution memory again, but this time keep it within the execution region
         assert(mm.acquireExecutionMemory(200L, taskAttemptId, MemoryMode.ON_HEAP) === 200L)
         assert(mm.executionMemoryUsed === 200L)
         assert(mm.storageMemoryUsed === 0L)
    -    assertEnsureFreeSpaceNotCalled(ms)
    +    assertEvictBlocksToFreeSpaceNotCalled(ms)
         // Storage should still not be able to evict execution
         assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks))
         assert(mm.executionMemoryUsed === 200L)
         assert(mm.storageMemoryUsed === 750L)
    -    assertEnsureFreeSpaceCalled(ms, 750L)
    +    assertEvictBlocksToFreeSpaceNotCalled(ms) // since there was 800 bytes free
    --- End diff --
    
    was -> were


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165][SPARK-12189] Fix bugs in evictio...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10170#issuecomment-163095805
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/47383/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165][SPARK-12189] Fix bugs in evictio...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10170#discussion_r47038892
  
    --- Diff: core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala ---
    @@ -73,27 +73,31 @@ class StorageMemoryPool(lock: Object) extends MemoryPool(lock) with Logging {
        *
        * @param blockId the ID of the block we are acquiring storage memory for
        * @param numBytesToAcquire the size of this block
    -   * @param numBytesToFree the size of space to be freed through evicting blocks
    +   * @param maxNumBytesToFree the maximum amount of space to be freed through evicting blocks
        * @return whether all N bytes were successfully granted.
        */
       def acquireMemory(
           blockId: BlockId,
           numBytesToAcquire: Long,
    -      numBytesToFree: Long,
    +      maxNumBytesToFree: Long,
           evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = lock.synchronized {
         assert(numBytesToAcquire >= 0)
    -    assert(numBytesToFree >= 0)
    +    assert(maxNumBytesToFree >= 0)
         assert(memoryUsed <= poolSize)
    -    memoryStore.ensureFreeSpace(blockId, numBytesToFree, evictedBlocks)
    -    // Register evicted blocks, if any, with the active task metrics
    -    Option(TaskContext.get()).foreach { tc =>
    -      val metrics = tc.taskMetrics()
    -      val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
    -      metrics.updatedBlocks = Some(lastUpdatedBlocks ++ evictedBlocks.toSeq)
    +    if (numBytesToAcquire > memoryFree && maxNumBytesToFree > 0) {
    +      val additionalMemoryRequired = numBytesToAcquire - memoryFree
    +      memoryStore.evictBlocksToFreeSpace(
    --- End diff --
    
    `evictBlocksToFreeSpace` already has a form of this check internally. Even if we add the logic here, we can't drop the checks inside of `evictBlocksToFreeSpace` because when storage is evicting storage we need to handle the fact that an RDD cannot evict blocks belonging to the same RDD.
    
    It wouldn't hurt to add the check here, I suppose, since it would save us the hassle of having to iterate over all of the blocks in the memory store.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165][SPARK-12189] Fix bugs in evictio...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10170#discussion_r47028931
  
    --- Diff: core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala ---
    @@ -154,32 +177,34 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
         assert(mm.acquireExecutionMemory(800L, taskAttemptId, MemoryMode.ON_HEAP) === 800L)
         assert(mm.executionMemoryUsed === 800L)
         assert(mm.storageMemoryUsed === 0L)
    -    assertEnsureFreeSpaceNotCalled(ms)
    +    assertEvictBlocksToFreeSpaceNotCalled(ms)
         // Storage should not be able to evict execution
         assert(mm.acquireStorageMemory(dummyBlock, 100L, evictedBlocks))
         assert(mm.executionMemoryUsed === 800L)
         assert(mm.storageMemoryUsed === 100L)
    -    assertEnsureFreeSpaceCalled(ms, 100L)
    +    assertEvictBlocksToFreeSpaceNotCalled(ms)
         assert(!mm.acquireStorageMemory(dummyBlock, 250L, evictedBlocks))
         assert(mm.executionMemoryUsed === 800L)
         assert(mm.storageMemoryUsed === 100L)
    -    assertEnsureFreeSpaceCalled(ms, 250L)
    +    assertEvictBlocksToFreeSpaceCalled(ms, 150L) // try to evict blocks ...
    --- End diff --
    
    If you take my suggestion from above about adding the preemptive check, then this will not be called. (i.e. we know there's only 100B of blocks so don't bother trying to free 150B out of it)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165] Fix bug in eviction of storage m...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/10170#issuecomment-162417991
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/47251/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-12165][SPARK-12189] Fix bugs in evictio...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10170#discussion_r46994235
  
    --- Diff: core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala ---
    @@ -73,27 +73,31 @@ class StorageMemoryPool(lock: Object) extends MemoryPool(lock) with Logging {
        *
        * @param blockId the ID of the block we are acquiring storage memory for
        * @param numBytesToAcquire the size of this block
    -   * @param numBytesToFree the size of space to be freed through evicting blocks
    +   * @param maxNumBytesToFree the maximum amount of space to be freed through evicting blocks
        * @return whether all N bytes were successfully granted.
        */
       def acquireMemory(
           blockId: BlockId,
           numBytesToAcquire: Long,
    -      numBytesToFree: Long,
    +      maxNumBytesToFree: Long,
           evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = lock.synchronized {
         assert(numBytesToAcquire >= 0)
    -    assert(numBytesToFree >= 0)
    +    assert(maxNumBytesToFree >= 0)
         assert(memoryUsed <= poolSize)
    -    memoryStore.ensureFreeSpace(blockId, numBytesToFree, evictedBlocks)
    -    // Register evicted blocks, if any, with the active task metrics
    -    Option(TaskContext.get()).foreach { tc =>
    -      val metrics = tc.taskMetrics()
    -      val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
    -      metrics.updatedBlocks = Some(lastUpdatedBlocks ++ evictedBlocks.toSeq)
    +    if (numBytesToAcquire > memoryFree && maxNumBytesToFree > 0) {
    +      val additionalMemoryRequired = numBytesToAcquire - memoryFree
    +      memoryStore.evictBlocksToFreeSpace(
    +        Some(blockId), Math.min(maxNumBytesToFree, additionalMemoryRequired), evictedBlocks)
    --- End diff --
    
    actually, to improve readability a little:
    ```
    val val numBytesToFree = math.min(maxNumBytesToFree, additionalMemoryRequired)
    memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree, evictedBlocks)
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org