You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by mallman <gi...@git.apache.org> on 2017/01/07 21:38:13 UTC

[GitHub] spark pull request #16499: [SPARK-17204][CORE] Fix replicated off heap stora...

GitHub user mallman opened a pull request:

    https://github.com/apache/spark/pull/16499

    [SPARK-17204][CORE] Fix replicated off heap storage

    (Jira: https://issues.apache.org/jira/browse/SPARK-17204)
    
    ## What changes were proposed in this pull request?
    
    There are a couple of bugs in the `BlockManager` with respect to support for replicated off-heap storage. First, the locally-stored off-heap byte buffer is disposed of when it is replicated. It should not be. Second, the replica byte buffers are stored as heap byte buffers instead of direct byte buffers even when the storage level memory mode is off-heap. This PR addresses both of these problems.
    
    ## How was this patch tested?
    
    `BlockManagerReplicationSuite` was enhanced to fill in the coverage gaps. It now fails if either of the bugs in this PR exist.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/VideoAmp/spark-public spark-17204-replicated_off_heap_storage

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/16499.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #16499
    
----
commit e49aeca23ff463fbd9a9cc4db99078c466bfbd56
Author: Michael Allman <mi...@videoamp.com>
Date:   2017-01-02T01:28:12Z

    Fix a couple of bugs in replicated off-heap storage

commit 40b6b97ca9013544702433dc5dc388c054daf41a
Author: Michael Allman <mi...@videoamp.com>
Date:   2017-01-07T21:17:33Z

    Shore-up BlockManagerReplicationSuite to identify a couple of bugs with
    off-heap storage replication

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16499: [SPARK-17204][CORE] Fix replicated off heap storage

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16499
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/73226/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16499: [SPARK-17204][CORE] Fix replicated off heap storage

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16499
  
    **[Test build #74127 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74127/testReport)** for PR 16499 at commit [`45eb006`](https://github.com/apache/spark/commit/45eb0066dc0eb4003e371735855825bcd1c91524).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16499: [SPARK-17204][CORE] Fix replicated off heap storage

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16499
  
    **[Test build #74127 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74127/testReport)** for PR 16499 at commit [`45eb006`](https://github.com/apache/spark/commit/45eb0066dc0eb4003e371735855825bcd1c91524).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16499: [SPARK-17204][CORE] Fix replicated off heap stora...

Posted by mallman <gi...@git.apache.org>.
Github user mallman commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16499#discussion_r101809099
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -813,7 +813,14 @@ private[spark] class BlockManager(
                   false
               }
             } else {
    -          memoryStore.putBytes(blockId, size, level.memoryMode, () => bytes)
    +          val memoryMode = level.memoryMode
    +          memoryStore.putBytes(blockId, size, memoryMode, () => {
    +            if (memoryMode == MemoryMode.OFF_HEAP) {
    --- End diff --
    
    I'm not sure what you're suggesting I do here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16499: [SPARK-17204][CORE] Fix replicated off heap stora...

Posted by mallman <gi...@git.apache.org>.
Github user mallman commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16499#discussion_r106552361
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -1048,7 +1065,7 @@ private[spark] class BlockManager(
               try {
                 replicate(blockId, bytesToReplicate, level, remoteClassTag)
               } finally {
    -            bytesToReplicate.dispose()
    +            bytesToReplicate.unmap()
    --- End diff --
    
    @cloud-fan I explored the approach of making the `MemoryStore` return a `ChunkedByteBuffer` that cannot be disposed, however I don't think there's a clean way to safely support that behavior. In essence, if the memory manager marks a buffer as indisposable when it returns it to the block manager, then that buffer cannot be evicted later. Adding additional code to handle this other behavior correctly was looking rather messy, and I abandoned the effort.
    
    At this point, I think that explicitly separating `unmap` and `dispose` methods is still the best way to resolve this issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16499: [SPARK-17204][CORE] Fix replicated off heap stora...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16499#discussion_r102377114
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -813,7 +813,14 @@ private[spark] class BlockManager(
                   false
               }
             } else {
    -          memoryStore.putBytes(blockId, size, level.memoryMode, () => bytes)
    +          val memoryMode = level.memoryMode
    +          memoryStore.putBytes(blockId, size, memoryMode, () => {
    +            if (memoryMode == MemoryMode.OFF_HEAP) {
    --- End diff --
    
    Oh. nvm. That `duplicate` doesn't actually copy buffer content...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16499: [SPARK-17204][CORE] Fix replicated off heap stora...

Posted by mallman <gi...@git.apache.org>.
Github user mallman commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16499#discussion_r101602604
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -813,7 +813,14 @@ private[spark] class BlockManager(
                   false
               }
             } else {
    -          memoryStore.putBytes(blockId, size, level.memoryMode, () => bytes)
    +          val memoryMode = level.memoryMode
    +          memoryStore.putBytes(blockId, size, memoryMode, () => {
    +            if (memoryMode == MemoryMode.OFF_HEAP) {
    --- End diff --
    
    NM about checking `memoryMode.useOffHeap`. I got that confused with `StorageLevel`. There's actually only two values of `MemoryMode`: `MemoryMode.OFF_HEAP` and `MemoryMode.ON_HEAP`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16499: [SPARK-17204][CORE] Fix replicated off heap stora...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16499#discussion_r101883526
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -1018,7 +1025,9 @@ private[spark] class BlockManager(
               try {
                 replicate(blockId, bytesToReplicate, level, remoteClassTag)
               } finally {
    -            bytesToReplicate.dispose()
    +            if (!level.useOffHeap) {
    --- End diff --
    
    > Allocating a direct byte buffer creates a java.nio.DirectByteBuffer, which is in turn a subclass of java.nio.MappedByteBuffer. So calling dispose() will dispose direct buffers, too.
    
    yeah, right.
    
    If `bytesToReplicate` comes from disk store, it could be a memory-mapped byte buffer, doesn't this change may miss the change to dispose it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16499: [SPARK-17204][CORE] Fix replicated off heap stora...

Posted by sameeragarwal <gi...@git.apache.org>.
Github user sameeragarwal commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16499#discussion_r101674560
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -1018,7 +1025,9 @@ private[spark] class BlockManager(
               try {
                 replicate(blockId, bytesToReplicate, level, remoteClassTag)
               } finally {
    -            bytesToReplicate.dispose()
    +            if (!level.useOffHeap) {
    --- End diff --
    
    Similarly, aren't we deciding whether we should dispose `bytesToReplicate` based on the 'target' `StorageLevel`? Shouldn't we make that decision based on whether `bytesToReplicate` were stored off-heap or not?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16499: [SPARK-17204][CORE] Fix replicated off heap stora...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16499#discussion_r102376897
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -813,7 +813,14 @@ private[spark] class BlockManager(
                   false
               }
             } else {
    -          memoryStore.putBytes(blockId, size, level.memoryMode, () => bytes)
    +          val memoryMode = level.memoryMode
    +          memoryStore.putBytes(blockId, size, memoryMode, () => {
    +            if (memoryMode == MemoryMode.OFF_HEAP) {
    --- End diff --
    
    No. I meant do we actually need to defensive copy here? All usage of `putBytes` across Spark have duplicated the byte buffer before passing it in. Is there any missing case we should do this defensive copy?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16499: [SPARK-17204][CORE] Fix replicated off heap storage

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16499
  
    **[Test build #73223 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73223/testReport)** for PR 16499 at commit [`320b548`](https://github.com/apache/spark/commit/320b54884fc086806f28bda15c10e091ed4edf72).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16499: [SPARK-17204][CORE] Fix replicated off heap stora...

Posted by mallman <gi...@git.apache.org>.
Github user mallman commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16499#discussion_r101592331
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -1018,7 +1025,9 @@ private[spark] class BlockManager(
               try {
                 replicate(blockId, bytesToReplicate, level, remoteClassTag)
               } finally {
    -            bytesToReplicate.dispose()
    +            if (!level.useOffHeap) {
    --- End diff --
    
    I think the name of the `ChunkedByteBuffer.dispose()` method is confusing. It actually only attempts to dispose a so-called "memory mapped" buffer. On-heap buffers are not memory mapped, therefore this is a no-op for them.
    
    On the other hand, when the storage level uses off-heap memory in this context, `bytesToReplicate` is a reference to the actual off-heap memory buffer. Disposing of this buffer will erase it from the local memory store. Obviously, this is not the desired behavior. So we add the guard for off-heap memory buffers here.
    
    As far as I can tell, there is no storage level for which `bytesToReplicate.dispose()` would actually do anything. However, technically if `bytesToReplicate` where memory-mapped but not direct, this would dispose of that memory. Would we even want that behavior? Overall, this `finally` clause is attempting to destroy the data we get from `doGetLocalBytes()`. This does not seem to be safe or correct, because we do not want to destroy the data stored locally. Therefore, we should consider getting rid of this finally clause entirely. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16499: [SPARK-17204][CORE] Fix replicated off heap storage

Posted by mallman <gi...@git.apache.org>.
Github user mallman commented on the issue:

    https://github.com/apache/spark/pull/16499
  
    @rxin, can you recommend someone I reach out to for help reviewing this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16499: [SPARK-17204][CORE] Fix replicated off heap storage

Posted by mallman <gi...@git.apache.org>.
Github user mallman commented on the issue:

    https://github.com/apache/spark/pull/16499
  
    Josh, can you take a look at this when you have a chance?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16499: [SPARK-17204][CORE] Fix replicated off heap stora...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16499#discussion_r102378915
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -813,7 +813,14 @@ private[spark] class BlockManager(
                   false
               }
             } else {
    -          memoryStore.putBytes(blockId, size, level.memoryMode, () => bytes)
    +          val memoryMode = level.memoryMode
    +          memoryStore.putBytes(blockId, size, memoryMode, () => {
    +            if (memoryMode == MemoryMode.OFF_HEAP) {
    --- End diff --
    
    But I am still wondering if we need to do copy like this here. Right, it is defensive, but as `BlockManager` is private to spark internal, and if all callers to it do not modify/release the byte buffer passed in, doesn't this defensive copy only cause performance regression?
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16499: [SPARK-17204][CORE] Fix replicated off heap stora...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16499#discussion_r101883328
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -813,7 +813,14 @@ private[spark] class BlockManager(
                   false
               }
             } else {
    -          memoryStore.putBytes(blockId, size, level.memoryMode, () => bytes)
    +          val memoryMode = level.memoryMode
    +          memoryStore.putBytes(blockId, size, memoryMode, () => {
    +            if (memoryMode == MemoryMode.OFF_HEAP) {
    --- End diff --
    
    I mean if all `bytes` are copy before passing in, it is no need to do another copy.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16499: [SPARK-17204][CORE] Fix replicated off heap storage

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16499
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16499: [SPARK-17204][CORE] Fix replicated off heap stora...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16499#discussion_r103530816
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -1018,7 +1025,9 @@ private[spark] class BlockManager(
               try {
                 replicate(blockId, bytesToReplicate, level, remoteClassTag)
               } finally {
    -            bytesToReplicate.dispose()
    +            if (!level.useOffHeap) {
    --- End diff --
    
    `StorageUtils.dispose` is a good place to put the fix.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16499: [SPARK-17204][CORE] Fix replicated off heap stora...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16499#discussion_r101682138
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -1018,7 +1025,9 @@ private[spark] class BlockManager(
               try {
                 replicate(blockId, bytesToReplicate, level, remoteClassTag)
               } finally {
    -            bytesToReplicate.dispose()
    +            if (!level.useOffHeap) {
    --- End diff --
    
    As `StorageUtils.dispose` only cleans up a memory-mapped `ByteBuffer`, I don't think calling `bytesToReplicate.dispose()` here would be a problem.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16499: [SPARK-17204][CORE] Fix replicated off heap stora...

Posted by mallman <gi...@git.apache.org>.
Github user mallman commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16499#discussion_r101602014
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -813,7 +813,14 @@ private[spark] class BlockManager(
                   false
               }
             } else {
    -          memoryStore.putBytes(blockId, size, level.memoryMode, () => bytes)
    +          val memoryMode = level.memoryMode
    +          memoryStore.putBytes(blockId, size, memoryMode, () => {
    +            if (memoryMode == MemoryMode.OFF_HEAP) {
    --- End diff --
    
    (Actually, I think we need to check `memoryMode.useOffHeap` here.)
    
    Assume `memoryMode.useOffHeap` is true. We have two cases to consider:
    
    1. `bytes` is on-heap. In this case, we need to copy it into a new direct buffer, and that's what we're doing here.
    2. `bytes` is off-heap. In this case, we assume that the caller upstream is managing the memory underlying `bytes`, and `bytes.copy(Platform.allocateDirectBuffer)` becomes a defensive copy. If the caller is not managing this memory, I would call that a bug in the caller's behavior.
    
    In either case, I believe we should be calling `bytes.copy(Platform.allocateDirectBuffer)` when `memoryMode.useOffHeap` is true.
    
    BTW, in my experience tracing this code in the debugger, `bytes` has always been an on-heap buffer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16499: [SPARK-17204][CORE] Fix replicated off heap stora...

Posted by mallman <gi...@git.apache.org>.
Github user mallman commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16499#discussion_r102272981
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -813,7 +813,14 @@ private[spark] class BlockManager(
                   false
               }
             } else {
    -          memoryStore.putBytes(blockId, size, level.memoryMode, () => bytes)
    +          val memoryMode = level.memoryMode
    +          memoryStore.putBytes(blockId, size, memoryMode, () => {
    +            if (memoryMode == MemoryMode.OFF_HEAP) {
    --- End diff --
    
    So do a copy if and only if `memoryMode == MemoryMode.OFF_HEAP` and `bytes` is not direct and `bytes` is not a memory mapped file?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16499: [SPARK-17204][CORE] Fix replicated off heap stora...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16499#discussion_r103599100
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -1018,7 +1025,9 @@ private[spark] class BlockManager(
               try {
                 replicate(blockId, bytesToReplicate, level, remoteClassTag)
               } finally {
    -            bytesToReplicate.dispose()
    +            if (!level.useOffHeap) {
    --- End diff --
    
    To fix it in StorageUtils.dispose() sounds good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16499: [SPARK-17204][CORE] Fix replicated off heap storage

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16499
  
    **[Test build #73226 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73226/testReport)** for PR 16499 at commit [`25923f3`](https://github.com/apache/spark/commit/25923f36ce7a2e2b94ab6ae5a915ba8b1c05cd25).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16499: [SPARK-17204][CORE] Fix replicated off heap storage

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16499
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/71022/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16499: [SPARK-17204][CORE] Fix replicated off heap storage

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16499
  
    **[Test build #71022 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/71022/testReport)** for PR 16499 at commit [`e49aeca`](https://github.com/apache/spark/commit/e49aeca23ff463fbd9a9cc4db99078c466bfbd56).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16499: [SPARK-17204][CORE] Fix replicated off heap storage

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16499
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16499: [SPARK-17204][CORE] Fix replicated off heap stora...

Posted by mallman <gi...@git.apache.org>.
Github user mallman commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16499#discussion_r103602156
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -1018,7 +1025,9 @@ private[spark] class BlockManager(
               try {
                 replicate(blockId, bytesToReplicate, level, remoteClassTag)
               } finally {
    -            bytesToReplicate.dispose()
    +            if (!level.useOffHeap) {
    --- End diff --
    
    Ok. I'll see what else is calling that method to validate that a fix there won't break something else, and I'll add a unit test to validate that calling `StorageUtils.dispose` on a direct byte buffer that isn't memory mapped doesn't actually dispose it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16499: [SPARK-17204][CORE] Fix replicated off heap stora...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/16499


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16499: [SPARK-17204][CORE] Fix replicated off heap stora...

Posted by mallman <gi...@git.apache.org>.
Github user mallman commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16499#discussion_r101809872
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -1018,7 +1025,9 @@ private[spark] class BlockManager(
               try {
                 replicate(blockId, bytesToReplicate, level, remoteClassTag)
               } finally {
    -            bytesToReplicate.dispose()
    +            if (!level.useOffHeap) {
    --- End diff --
    
    Allocating a direct byte buffer creates a `java.nio.DirectByteBuffer`, which is in turn a subclass of `java.nio.MappedByteBuffer`. So calling `dispose()` will dispose direct buffers, too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16499: [SPARK-17204][CORE] Fix replicated off heap storage

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16499
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16499: [SPARK-17204][CORE] Fix replicated off heap storage

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the issue:

    https://github.com/apache/spark/pull/16499
  
    You do not need to open the new JIRA. You can still use the same JIRA number


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16499: [SPARK-17204][CORE] Fix replicated off heap storage

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16499
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/73223/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16499: [SPARK-17204][CORE] Fix replicated off heap stora...

Posted by mallman <gi...@git.apache.org>.
Github user mallman commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16499#discussion_r95066452
  
    --- Diff: core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala ---
    @@ -375,7 +375,8 @@ class BlockManagerReplicationSuite extends SparkFunSuite
           // Put the block into one of the stores
           val blockId = new TestBlockId(
             "block-with-" + storageLevel.description.replace(" ", "-").toLowerCase)
    -      stores(0).putSingle(blockId, new Array[Byte](blockSize), storageLevel)
    +      val testValue = Array.fill[Byte](blockSize)(1)
    --- End diff --
    
    Using an array of 1s instead of an array of 0s is my silly, paranoid, OCD way of adding a little extra entropy to the test. I think the chance that this change in test value will actually affect the outcome of this test is about 0%. I will revert to the original test value on request.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16499: [SPARK-17204][CORE] Fix replicated off heap storage

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on the issue:

    https://github.com/apache/spark/pull/16499
  
    Ideally I think @joshrosen is the person to take a look


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16499: [SPARK-17204][CORE] Fix replicated off heap stora...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16499#discussion_r101448019
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -813,7 +813,14 @@ private[spark] class BlockManager(
                   false
               }
             } else {
    -          memoryStore.putBytes(blockId, size, level.memoryMode, () => bytes)
    +          val memoryMode = level.memoryMode
    +          memoryStore.putBytes(blockId, size, memoryMode, () => {
    +            if (memoryMode == MemoryMode.OFF_HEAP) {
    --- End diff --
    
    Do we need to check if `bytes` is already a direct buffer?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16499: [SPARK-17204][CORE] Fix replicated off heap storage

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16499
  
    **[Test build #71022 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/71022/testReport)** for PR 16499 at commit [`e49aeca`](https://github.com/apache/spark/commit/e49aeca23ff463fbd9a9cc4db99078c466bfbd56).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16499: [SPARK-17204][CORE] Fix replicated off heap storage

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the issue:

    https://github.com/apache/spark/pull/16499
  
    also cc @sameeragarwal 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16499: [SPARK-17204][CORE] Fix replicated off heap stora...

Posted by mallman <gi...@git.apache.org>.
Github user mallman commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16499#discussion_r101675576
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -813,7 +813,14 @@ private[spark] class BlockManager(
                   false
               }
             } else {
    -          memoryStore.putBytes(blockId, size, level.memoryMode, () => bytes)
    +          val memoryMode = level.memoryMode
    +          memoryStore.putBytes(blockId, size, memoryMode, () => {
    +            if (memoryMode == MemoryMode.OFF_HEAP) {
    --- End diff --
    
    Is it safe to store a ref to `bytes` if the memory is stored off-heap? If the caller changes the values in that memory or frees it, the buffer we put in the memory store will be affected. We don't want that kind of side-effect.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16499: [SPARK-17204][CORE] Fix replicated off heap stora...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16499#discussion_r101447383
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -1018,7 +1025,9 @@ private[spark] class BlockManager(
               try {
                 replicate(blockId, bytesToReplicate, level, remoteClassTag)
               } finally {
    -            bytesToReplicate.dispose()
    +            if (!level.useOffHeap) {
    --- End diff --
    
    Do we need to call dispose on on-head byte buffer? I think only off-heap byte buffer needs to be disposed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16499: [SPARK-17204][CORE] Fix replicated off heap stora...

Posted by mallman <gi...@git.apache.org>.
Github user mallman commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16499#discussion_r102293219
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -843,7 +852,15 @@ private[spark] class BlockManager(
                   false
               }
             } else {
    -          memoryStore.putBytes(blockId, size, level.memoryMode, () => bytes)
    +          val memoryMode = level.memoryMode
    +          memoryStore.putBytes(blockId, size, memoryMode, () => {
    +            if (memoryMode == MemoryMode.OFF_HEAP &&
    +                bytes.chunks.exists(buffer => !buffer.isDirect)) {
    --- End diff --
    
    I've refined this check for copying `bytes` to skip copying when the underlying buffers are already direct.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16499: [SPARK-17204][CORE] Fix replicated off heap stora...

Posted by mallman <gi...@git.apache.org>.
Github user mallman commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16499#discussion_r104770778
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -1048,7 +1065,7 @@ private[spark] class BlockManager(
               try {
                 replicate(blockId, bytesToReplicate, level, remoteClassTag)
               } finally {
    -            bytesToReplicate.dispose()
    +            bytesToReplicate.unmap()
    --- End diff --
    
    I would to explore this further to ensure this would really work well, but I like your idea with one caveat. I think we should avoid using the `disposed` var for this purpose. Using it this way would introduce ambiguity in its meaning when `disposed` is true. In addition to its current meaning, it could also mean "this buffer is not disposed but we don't want to dispose it".
    
    Instead, I suggest keeping the usage of `disposed` as-is and adding an additional var, e.g. `indisposable`, which makes the `dispose` method itself a no-op.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16499: [SPARK-17204][CORE] Fix replicated off heap storage

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16499
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/74127/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16499: [SPARK-17204][CORE] Fix replicated off heap stora...

Posted by mallman <gi...@git.apache.org>.
Github user mallman commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16499#discussion_r102271763
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -1018,7 +1025,9 @@ private[spark] class BlockManager(
               try {
                 replicate(blockId, bytesToReplicate, level, remoteClassTag)
               } finally {
    -            bytesToReplicate.dispose()
    +            if (!level.useOffHeap) {
    --- End diff --
    
    There does not appear to be a robust way to check if `bytesToReplicate` is a mmapped file or not. Perhaps `doGetLocalBytes` should return a tuple `(ChunkedByteBuffer, boolean)` where the second element of the tuple is `true` if and only if the buffer is a mmapped file. Thoughts?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16499: [SPARK-17204][CORE] Fix replicated off heap stora...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16499#discussion_r101680844
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -813,7 +813,14 @@ private[spark] class BlockManager(
                   false
               }
             } else {
    -          memoryStore.putBytes(blockId, size, level.memoryMode, () => bytes)
    +          val memoryMode = level.memoryMode
    +          memoryStore.putBytes(blockId, size, memoryMode, () => {
    +            if (memoryMode == MemoryMode.OFF_HEAP) {
    --- End diff --
    
    I did a search. Looks like the buffers passed to `putBytes` (the only caller of private `doPutBytes`) across Spark are all duplicated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16499: [SPARK-17204][CORE] Fix replicated off heap storage

Posted by mallman <gi...@git.apache.org>.
Github user mallman commented on the issue:

    https://github.com/apache/spark/pull/16499
  
    Backport PR is #17390 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16499: [SPARK-17204][CORE] Fix replicated off heap storage

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16499
  
    **[Test build #73226 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73226/testReport)** for PR 16499 at commit [`25923f3`](https://github.com/apache/spark/commit/25923f36ce7a2e2b94ab6ae5a915ba8b1c05cd25).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16499: [SPARK-17204][CORE] Fix replicated off heap stora...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16499#discussion_r102851869
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -1018,7 +1025,9 @@ private[spark] class BlockManager(
               try {
                 replicate(blockId, bytesToReplicate, level, remoteClassTag)
               } finally {
    -            bytesToReplicate.dispose()
    +            if (!level.useOffHeap) {
    --- End diff --
    
    +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16499: [SPARK-17204][CORE] Fix replicated off heap stora...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16499#discussion_r106573530
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -1048,7 +1065,7 @@ private[spark] class BlockManager(
               try {
                 replicate(blockId, bytesToReplicate, level, remoteClassTag)
               } finally {
    -            bytesToReplicate.dispose()
    +            bytesToReplicate.unmap()
    --- End diff --
    
    yea we should separate `unmap` and `dispose`. but instead of using a hack in https://github.com/apache/spark/pull/16499/files#diff-21027f5c826cd378daaae5f7c3eea2b5R240, shall we use a just a flag `needUnmap` in `ChunkedByteBuffer `?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16499: [SPARK-17204][CORE] Fix replicated off heap stora...

Posted by sameeragarwal <gi...@git.apache.org>.
Github user sameeragarwal commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16499#discussion_r101664624
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -813,7 +813,14 @@ private[spark] class BlockManager(
                   false
               }
             } else {
    -          memoryStore.putBytes(blockId, size, level.memoryMode, () => bytes)
    +          val memoryMode = level.memoryMode
    +          memoryStore.putBytes(blockId, size, memoryMode, () => {
    +            if (memoryMode == MemoryMode.OFF_HEAP) {
    --- End diff --
    
    This condition can just check for `level.useOffHeap` right? But more generally, I had the same question that @viirya has. Is there a way to check if `bytes` is already off-heap and avoid the defensive copy? Or will that never be the case?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16499: [SPARK-17204][CORE] Fix replicated off heap storage

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/16499
  
    thanks, merging to master/2.1!
    
    @mallman can you send a new PR for 2.0? thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16499: [SPARK-17204][CORE] Fix replicated off heap storage

Posted by mallman <gi...@git.apache.org>.
Github user mallman commented on the issue:

    https://github.com/apache/spark/pull/16499
  
    I looked into simply cleaning up the `StorageUtils.dispose` method to only dispose memory-mapped buffers. However, I did find legitimate uses of that method to dispose of direct/non-memory-mapped buffers. So I kept the behavior of that method as-is. Instead, I added a new method\u2014unmap\u2014which will dispose of memory-mapped buffers *only*, and added calls to that method where appropriate. At the end of the day, I only found one case where we specifically wanted an "unmap" behavior instead of the other broader disposal behavior. (That case being the one what was causing corruption of replicated blocks in the first place.)
    
    I also found a new memory management bug in `BlockManager` introduced by the encryption support. In the original codebase, it disposes of a buffer unsafely. I think part of the problem is the documentation of the `ChunkedByteBuffer.toByteBuffer` method uses the word "copy" in describing what that method does. I expanded and made that method's documentation more precise to clarify that sometimes that method *does not* return a copy of the data. In those cases, it is not safe to dispose the returned buffer.
    
    I found that there were no uses of `ByteBufferInputStream` where automatic buffer disposal was called for. Therefore, I dropped that support from that class to guard against unsafe usage. If someone _really_ wants to actually use automatic buffer disposal in `ByteBufferInputStream` they can\u2014carefully\u2014re-add that support. I think that that's generally unsafe. And, like I said, nothing in the codebase was using it anyway except where it was used incorrectly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16499: [SPARK-17204][CORE] Fix replicated off heap stora...

Posted by mallman <gi...@git.apache.org>.
Github user mallman commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16499#discussion_r107028767
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -1048,7 +1065,7 @@ private[spark] class BlockManager(
               try {
                 replicate(blockId, bytesToReplicate, level, remoteClassTag)
               } finally {
    -            bytesToReplicate.dispose()
    +            bytesToReplicate.unmap()
    --- End diff --
    
    The best I think we can expect from such a flag is a hint. The constructor of a `ChunkedByteBuffer` will not always know if the underlying byte buffers are memory mapped or not. For example, see https://github.com/apache/spark/blob/bec6b16c1900fe93def89cc5eb51cbef498196cb/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L326.
    
    In this case, `data.nioByteBuffer()` might or might not be memory-mapped.
    
    I still think the current patch set is the best overall amongst the other options we've considered. I can add a unit test for `StorageUtils.unmap` to ensure it works as expected (only disposing memory-mapped buffers). I can also add an `if` clause around the call to `bytesToReplicate.unmap()` to ensure this is only called when the replication storage level is off-heap. This will ensure the reflective call on the `fd` field only occurs for off-heap replication. Given that off-heap replication is currently broken, I doubt anyone will notice a performance degradation... Besides that, I suspect that network and disk IO performance will dominate the reflective method call performance.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16499: [SPARK-17204][CORE] Fix replicated off heap stora...

Posted by mallman <gi...@git.apache.org>.
Github user mallman commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16499#discussion_r102293681
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -317,6 +317,9 @@ private[spark] class BlockManager(
     
       /**
        * Put the block locally, using the given storage level.
    +   *
    +   * '''Important!''' Callers must not mutate or release the data buffer underlying `bytes`. Doing
    --- End diff --
    
    I've explicitly documented the fact that callers must not mutate the data buffers underlying `bytes`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16499: [SPARK-17204][CORE] Fix replicated off heap storage

Posted by mallman <gi...@git.apache.org>.
Github user mallman commented on the issue:

    https://github.com/apache/spark/pull/16499
  
    > @mallman can you send a new PR for 2.0? thanks!
    
    Will do. Do I need to open a new JIRA ticket for that?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16499: [SPARK-17204][CORE] Fix replicated off heap stora...

Posted by mallman <gi...@git.apache.org>.
Github user mallman commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16499#discussion_r103509038
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -1018,7 +1025,9 @@ private[spark] class BlockManager(
               try {
                 replicate(blockId, bytesToReplicate, level, remoteClassTag)
               } finally {
    -            bytesToReplicate.dispose()
    +            if (!level.useOffHeap) {
    --- End diff --
    
    Hi guys. Sorry for the delay on the update. I started down the path that I proposed and it resulted in too many awkward changes in method signatures downstream. I don't think this is a viable step forward.
    
    As another option, we could dispose the buffer if and only if it has a non-null `fd` field. Since that field is private, we would have to call it by reflection. I'd also include a unit test to validate that the field exists as expected to guard against internal changes in future versions of Java.
    
    On a broader level, I wonder if callers of `ChunkedByteBuffer.dispose` method understand that it will dispose of non-memory-mapped direct buffers? The documentation of that method suggests it's only supposed to dispose of memory-mapped files in the strict sense (those actually memory mapped against a file descriptor by the OS). If other methods are accidentally calling this method on non-memory-mapped direct buffers, that suggests to me we need to push the fix to that method (or actually the StorageUtils.dispose() method). What do you think of that?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16499: [SPARK-17204][CORE] Fix replicated off heap storage

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16499
  
    **[Test build #73223 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73223/testReport)** for PR 16499 at commit [`320b548`](https://github.com/apache/spark/commit/320b54884fc086806f28bda15c10e091ed4edf72).
     * This patch passes all tests.
     * This patch **does not merge cleanly**.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16499: [SPARK-17204][CORE] Fix replicated off heap storage

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16499
  
    Build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16499: [SPARK-17204][CORE] Fix replicated off heap stora...

Posted by sameeragarwal <gi...@git.apache.org>.
Github user sameeragarwal commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16499#discussion_r102289626
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -1018,7 +1025,9 @@ private[spark] class BlockManager(
               try {
                 replicate(blockId, bytesToReplicate, level, remoteClassTag)
               } finally {
    -            bytesToReplicate.dispose()
    +            if (!level.useOffHeap) {
    --- End diff --
    
    I agree, we can have `doGetLocalBytes` return an additional `boolean` that indicates whether the returned buffer is memory-mapped or not.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16499: [SPARK-17204][CORE] Fix replicated off heap stora...

Posted by mallman <gi...@git.apache.org>.
Github user mallman commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16499#discussion_r101675669
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -1018,7 +1025,9 @@ private[spark] class BlockManager(
               try {
                 replicate(blockId, bytesToReplicate, level, remoteClassTag)
               } finally {
    -            bytesToReplicate.dispose()
    +            if (!level.useOffHeap) {
    --- End diff --
    
    So maybe use `putBlockStatus.storageLevel` instead?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16499: [SPARK-17204][CORE] Fix replicated off heap stora...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16499#discussion_r102376170
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -1018,7 +1025,9 @@ private[spark] class BlockManager(
               try {
                 replicate(blockId, bytesToReplicate, level, remoteClassTag)
               } finally {
    -            bytesToReplicate.dispose()
    +            if (!level.useOffHeap) {
    --- End diff --
    
    Sounds good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16499: [SPARK-17204][CORE] Fix replicated off heap stora...

Posted by mallman <gi...@git.apache.org>.
Github user mallman commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16499#discussion_r95066296
  
    --- Diff: core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala ---
    @@ -387,12 +388,23 @@ class BlockManagerReplicationSuite extends SparkFunSuite
             testStore => blockLocations.contains(testStore.blockManagerId.executorId)
           }.foreach { testStore =>
             val testStoreName = testStore.blockManagerId.executorId
    -        assert(
    -          testStore.getLocalValues(blockId).isDefined, s"$blockId was not found in $testStoreName")
    -        testStore.releaseLock(blockId)
    --- End diff --
    
    N.B. We no longer need the `releaseLock` call because we exhaust the iterator returned by `getLocalValues`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16499: [SPARK-17204][CORE] Fix replicated off heap stora...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16499#discussion_r104763900
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -1048,7 +1065,7 @@ private[spark] class BlockManager(
               try {
                 replicate(blockId, bytesToReplicate, level, remoteClassTag)
               } finally {
    -            bytesToReplicate.dispose()
    +            bytesToReplicate.unmap()
    --- End diff --
    
    one new thought: ideally we only wanna dispose bytes returned by `DiskStore`. `ChunkedByteBuffer` has a boolean flag, `disposed`. If this flag is true, `ChunkedByteBuffer.dispose` will become no-op.
    
    When `MemoryStore` returns a `ChunkedByteBuffer`, how about we setting the `disposed` flag to true? Then we won't dispose it later.`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org