You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by JeetKunDoug <gi...@git.apache.org> on 2018/05/14 16:32:09 UTC

[GitHub] spark pull request #21322: [SPARK-24225] Support closing AutoClosable object...

GitHub user JeetKunDoug opened a pull request:

    https://github.com/apache/spark/pull/21322

    [SPARK-24225] Support closing AutoClosable objects in MemoryStore

    This allows Broadcast Variables can be released properly
    
    ## What changes were proposed in this pull request?
    
    Broadcast variables, while usually used to broadcast data to executors, can also be used to control the scope and lifecycle of shared resources (e.g. connection pools). When creating and destroying those resources within a task is expensive, using a broadcast variable to keep them deserialized in memory for multiple tasks to share can make a huge difference in the efficiency of a Spark job.
    
    In `MemoryStore`, check if any entries in a `DeserializedMemoryEntry` implement `AutoClosable` and, if so, call `close` on those resources. This occurs in two places:
    - `remove` of an individual item
    - `clear` of the MemoryStore
    
    ## How was this patch tested?
    
    Added additional tests to `MemoryStoreSuite` in order to check that we properly close resources, and handle exceptions properly.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/JeetKunDoug/spark handle-autoclosable-objects

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/21322.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #21322
    
----
commit f254f94fdc5e2648d7c1104bf5ec2355de7c6055
Author: Doug Rohrer <dr...@...>
Date:   2018-05-14T16:24:00Z

    [SPARK-24225] Support closing AutoClosable objects in MemoryStore so Broadcast Variables can be released properly

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21322: [SPARK-24225][CORE] Support closing AutoClosable objects...

Posted by JeetKunDoug <gi...@git.apache.org>.
Github user JeetKunDoug commented on the issue:

    https://github.com/apache/spark/pull/21322
  
    @cloud-fan yes, essentially... the only thing I'd add is that it's bound to "whatever scope the driver sees fit to bind it to", if that makes sense. In my particular case, it happens to be a stage, but it's possible that others may find there are longer-lived cases, all of which should be supportable... As this is my side-project for work, it may take me a bit to get something more together, but I'll see what I can sketch out. Would you prefer some sort of design doc, or is an update to this PR (or a new one) appropriate?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21322: [SPARK-24225][CORE] Support closing AutoClosable objects...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21322
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90796/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21322: [SPARK-24225][CORE] Support closing AutoClosable objects...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21322
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90652/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21322#discussion_r188128177
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
    @@ -384,15 +385,36 @@ private[spark] class MemoryStore(
         }
       }
     
    +  private def maybeReleaseResources(entry: MemoryEntry[_]): Unit = {
    +    entry match {
    +      case SerializedMemoryEntry(buffer, _, _) => buffer.dispose()
    +      case DeserializedMemoryEntry(objs: Array[Any], _, _) => maybeCloseValues(objs)
    --- End diff --
    
    As I know, broadcasted variables can be serialized on disk too (`BlockManager.doPutIterator`). In the case, seems `AutoCloseable` broadcasted variables won't hit this release logic.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21322: [SPARK-24225][CORE] Support closing AutoClosable objects...

Posted by kiszk <gi...@git.apache.org>.
Github user kiszk commented on the issue:

    https://github.com/apache/spark/pull/21322
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21322#discussion_r224875111
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
    @@ -384,15 +385,30 @@ private[spark] class MemoryStore(
         }
       }
     
    +  private def maybeReleaseResources(resource: (BlockId, MemoryEntry[_])): Unit = {
    +    maybeReleaseResources(resource._1, resource._2)
    +  }
    +
    +  private def maybeReleaseResources(blockId: BlockId, entry: MemoryEntry[_]): Unit = {
    +    entry match {
    +      case SerializedMemoryEntry(buffer, _, _) => buffer.dispose()
    --- End diff --
    
    Why not just make these case classes `Closeable` and then you can close them consistently


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...

Posted by JeetKunDoug <gi...@git.apache.org>.
Github user JeetKunDoug commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21322#discussion_r188325608
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
    @@ -384,15 +385,36 @@ private[spark] class MemoryStore(
         }
       }
     
    +  private def maybeReleaseResources(entry: MemoryEntry[_]): Unit = {
    +    entry match {
    +      case SerializedMemoryEntry(buffer, _, _) => buffer.dispose()
    +      case DeserializedMemoryEntry(objs: Array[Any], _, _) => maybeCloseValues(objs)
    --- End diff --
    
    Actually, digging further, there's other places where we may deserialize an object from the disk store and never put it into the memory store - it seems like punting on a guarantee that your AutoClosable object is closed and making this a best-effort thing when calling `BlockManager.removeBroadcast` (which is how we used it in the case that caused us to put together this PR) may make the most sense. It'll still be better than depending on GC and a finalizer to get the resources cleaned up when the driver can call `Broadcast#destroy` but we can document it as a best practice to have one just in case the close() call doesn't happen due to edge cases.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21322: [SPARK-24225] Support closing AutoClosable objects in Me...

Posted by JeetKunDoug <gi...@git.apache.org>.
Github user JeetKunDoug commented on the issue:

    https://github.com/apache/spark/pull/21322
  
    @dbtsai Here's the PR we discussed earlier.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21322#discussion_r188128515
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
    @@ -384,15 +385,36 @@ private[spark] class MemoryStore(
         }
       }
     
    +  private def maybeReleaseResources(entry: MemoryEntry[_]): Unit = {
    +    entry match {
    +      case SerializedMemoryEntry(buffer, _, _) => buffer.dispose()
    +      case DeserializedMemoryEntry(objs: Array[Any], _, _) => maybeCloseValues(objs)
    +      case _ =>
    +    }
    +  }
    +
    +  private def maybeCloseValues(objs: Array[Any]): Unit = {
    +    objs.foreach {
    +        case closable: AutoCloseable =>
    --- End diff --
    
    indent style: two spaces.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21322: [SPARK-24225][CORE] Support closing AutoClosable objects...

Posted by JeetKunDoug <gi...@git.apache.org>.
Github user JeetKunDoug commented on the issue:

    https://github.com/apache/spark/pull/21322
  
    @cloud-fan I added the above-mentioned check for `isBroadcast` and only release resources if it's a broadcast ID. This will affect the MapOutputTracker as well, but I think it's a pretty limited change and should work well. If you're not crazy about that, I can start at `Broadcast.destroy` and pass down a flag to control the behavior, but it's a much more invasive (but doable) change.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21322: [SPARK-24225][CORE] Support closing AutoClosable objects...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21322
  
    **[Test build #90651 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90651/testReport)** for PR 21322 at commit [`62d46d3`](https://github.com/apache/spark/commit/62d46d3bf49ef0393a916d3cafaae4947f374f36).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21322: [SPARK-24225][CORE] Support closing AutoClosable objects...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21322
  
    **[Test build #90651 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90651/testReport)** for PR 21322 at commit [`62d46d3`](https://github.com/apache/spark/commit/62d46d3bf49ef0393a916d3cafaae4947f374f36).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21322: [SPARK-24225] Support closing AutoClosable objects in Me...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21322
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21322: [SPARK-24225][CORE] Support closing AutoClosable objects...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21322
  
    **[Test build #90599 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90599/testReport)** for PR 21322 at commit [`f254f94`](https://github.com/apache/spark/commit/f254f94fdc5e2648d7c1104bf5ec2355de7c6055).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...

Posted by dbtsai <gi...@git.apache.org>.
Github user dbtsai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21322#discussion_r188035443
  
    --- Diff: core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala ---
    @@ -526,4 +526,84 @@ class MemoryStoreSuite
           }
         }
       }
    +
    +  test("[SPARK-24225]: remove should close AutoCloseable object") {
    +
    +    val (store, _) = makeMemoryStore(12000)
    +
    +    val id = BroadcastBlockId(0)
    +    val tracker = new CloseTracker()
    +    store.putIteratorAsValues(id, Iterator(tracker), ClassTag.Any)
    +    assert(store.contains(id))
    +    store.remove(id)
    +    assert(tracker.getClosed())
    +  }
    +
    +  test("[SPARK-24225]: remove should close AutoCloseable objects even if they throw exceptions") {
    +
    +    val (store, _) = makeMemoryStore(12000)
    +
    +    val id = BroadcastBlockId(0)
    +    val tracker = new CloseTracker(true)
    +    store.putIteratorAsValues(id, Iterator(tracker), ClassTag.Any)
    +    assert(store.contains(id))
    +    store.remove(id)
    +    assert(tracker.getClosed())
    +  }
    +
    +  test("[SPARK-24225]: clear should close AutoCloseable objects") {
    +
    +    val (store, _) = makeMemoryStore(12000)
    +
    +    val id = BroadcastBlockId(0)
    +    val tracker = new CloseTracker
    +    store.putIteratorAsValues(id, Iterator(tracker), ClassTag.Any)
    +    assert(store.contains(id))
    +    store.clear()
    +    assert(tracker.getClosed())
    +  }
    +
    +  test("[SPARK-24225]: clear should close all AutoCloseable objects put together in an iterator") {
    --- End diff --
    
    Can you check if you have multiple autocloseable objects in an iterator, and only one of them is removed, the rests of the objects will not be closed? 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21322: [SPARK-24225] Support closing AutoClosable objects in Me...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21322
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21322#discussion_r189210100
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
    @@ -384,14 +385,37 @@ private[spark] class MemoryStore(
         }
       }
     
    +  private def maybeReleaseResources(entry: MemoryEntry[_]): Unit = {
    +    entry match {
    +      case SerializedMemoryEntry(buffer, _, _) => buffer.dispose()
    +      case DeserializedMemoryEntry(values: Array[Any], _, _) => maybeCloseValues(values)
    +      case _ =>
    +    }
    +  }
    +
    +  private def maybeCloseValues(values: Array[Any]): Unit = {
    +    values.foreach {
    +      case closable: AutoCloseable =>
    +        safelyCloseValue(closable)
    +      case _ =>
    +    }
    +  }
    +
    +  private def safelyCloseValue(closable: AutoCloseable): Unit = {
    +    try {
    +      closable.close()
    +    } catch {
    +      case ex: Exception => logWarning(s"Failed to close AutoClosable $closable", ex)
    +    }
    +  }
    +
       def remove(blockId: BlockId): Boolean = memoryManager.synchronized {
    --- End diff --
    
    To do it in `remove`, I don't think we can avoid the issue I mentioned before. If you have a deserilized value in broadcast cache, it's possible to be cleaned by GC if this broadcasted value isn't stored as deserialized entry in `MemoryStore`.
    
    If the object already claims some resources we want to release by using `AutoCloseable` interface, we don't properly release it when it's cleaned by GC. That is happened before `remove` is called.
    



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21322: [SPARK-24225][CORE] Support closing AutoClosable objects...

Posted by JeetKunDoug <gi...@git.apache.org>.
Github user JeetKunDoug commented on the issue:

    https://github.com/apache/spark/pull/21322
  
    @cloud-fan So your suggestion makes sense - it seems like the best path forward is to check the `isBroadcast` flag on the BlockId passed in to `MemoryStore.remove` and release resources only if it's a broadcast ID - does that make sense to you?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21322#discussion_r224874828
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
    @@ -384,15 +385,30 @@ private[spark] class MemoryStore(
         }
       }
     
    +  private def maybeReleaseResources(resource: (BlockId, MemoryEntry[_])): Unit = {
    +    maybeReleaseResources(resource._1, resource._2)
    +  }
    +
    +  private def maybeReleaseResources(blockId: BlockId, entry: MemoryEntry[_]): Unit = {
    +    entry match {
    +      case SerializedMemoryEntry(buffer, _, _) => buffer.dispose()
    +      case DeserializedMemoryEntry(values: Array[Any], _, _) => maybeCloseValues(values, blockId)
    +      case _ =>
    +    }
    +  }
    +
    +  private def maybeCloseValues(values: Array[Any], blockId: BlockId): Unit = {
    +    if (blockId.isBroadcast) {
    +      values.foreach(value => Utils.tryClose(value))
    --- End diff --
    
    Just a style thing, but could be `values.foreach(Utils.tryClose)`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21322: [SPARK-24225][CORE] Support closing AutoClosable objects...

Posted by JeetKunDoug <gi...@git.apache.org>.
Github user JeetKunDoug commented on the issue:

    https://github.com/apache/spark/pull/21322
  
    @viirya it seems from my admittedly cursory look at where we use the `cachedValues` reference map that we should be OK in this case - if there's a deserialized version of the variable (the only case we really care about), it'll have a hard reference in an entry in MemoryStore, and, when removed (which occurs if `BlockManager` drops it from memory) we can do the cleanup there explicitly. Perhaps I missed a way to have a broadcast variable in deserialized form in the cache but _not_ in `MemoryStore.entries`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21322: [SPARK-24225][CORE] Support closing AutoClosable objects...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21322
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90599/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21322: [SPARK-24225][CORE] Support closing AutoClosable objects...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21322
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21322: [SPARK-24225][CORE] Support closing AutoClosable objects...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21322
  
    **[Test build #90824 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90824/testReport)** for PR 21322 at commit [`790c906`](https://github.com/apache/spark/commit/790c906062fc55f553a5cecb61147f801875d4b2).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...

Posted by dbtsai <gi...@git.apache.org>.
Github user dbtsai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21322#discussion_r189187759
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
    @@ -404,6 +428,7 @@ private[spark] class MemoryStore(
     
       def clear(): Unit = memoryManager.synchronized {
         entries.synchronized {
    +      entries.values().asScala.foreach(maybeReleaseResources)
    --- End diff --
    
    Should we check if the keys are `blockId.isBroadcast == true`? 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...

Posted by JeetKunDoug <gi...@git.apache.org>.
Github user JeetKunDoug commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21322#discussion_r188314886
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
    @@ -384,15 +385,36 @@ private[spark] class MemoryStore(
         }
       }
     
    +  private def maybeReleaseResources(entry: MemoryEntry[_]): Unit = {
    +    entry match {
    +      case SerializedMemoryEntry(buffer, _, _) => buffer.dispose()
    +      case DeserializedMemoryEntry(objs: Array[Any], _, _) => maybeCloseValues(objs)
    --- End diff --
    
    Ah- ok, I see where the issue is. So in this case you may have a deserialized instance but the memory store is full, so it fails to be put. Now we've got a live, deserialized object not in MemoryStore. Thanks for catching this. It looks like this case could be handled in `MemoryStore.putIteratorAsValues` when the `putIterator` call fails, which would handle several cases in `BlockManager` where we try (and fail) to put deserialized values, but it means a check for potential `AutoClosable` values any time we fail to put into `MemoryStore`, and I'm not sure of the performance impact of this.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21322: [SPARK-24225][CORE] Support closing AutoClosable objects...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21322
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21322: [SPARK-24225] Support closing AutoClosable objects in Me...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21322
  
    **[Test build #90599 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90599/testReport)** for PR 21322 at commit [`f254f94`](https://github.com/apache/spark/commit/f254f94fdc5e2648d7c1104bf5ec2355de7c6055).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...

Posted by dbtsai <gi...@git.apache.org>.
Github user dbtsai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21322#discussion_r188034118
  
    --- Diff: core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala ---
    @@ -526,4 +526,84 @@ class MemoryStoreSuite
           }
         }
       }
    +
    +  test("[SPARK-24225]: remove should close AutoCloseable object") {
    +
    +    val (store, _) = makeMemoryStore(12000)
    +
    +    val id = BroadcastBlockId(0)
    +    val tracker = new CloseTracker()
    +    store.putIteratorAsValues(id, Iterator(tracker), ClassTag.Any)
    +    assert(store.contains(id))
    +    store.remove(id)
    +    assert(tracker.getClosed())
    +  }
    +
    +  test("[SPARK-24225]: remove should close AutoCloseable objects even if they throw exceptions") {
    +
    +    val (store, _) = makeMemoryStore(12000)
    +
    +    val id = BroadcastBlockId(0)
    +    val tracker = new CloseTracker(true)
    +    store.putIteratorAsValues(id, Iterator(tracker), ClassTag.Any)
    +    assert(store.contains(id))
    +    store.remove(id)
    +    assert(tracker.getClosed())
    +  }
    +
    +  test("[SPARK-24225]: clear should close AutoCloseable objects") {
    +
    +    val (store, _) = makeMemoryStore(12000)
    +
    +    val id = BroadcastBlockId(0)
    +    val tracker = new CloseTracker
    +    store.putIteratorAsValues(id, Iterator(tracker), ClassTag.Any)
    +    assert(store.contains(id))
    +    store.clear()
    +    assert(tracker.getClosed())
    +  }
    +
    +  test("[SPARK-24225]: clear should close all AutoCloseable objects put together in an iterator") {
    +
    +    val (store, _) = makeMemoryStore(12000)
    +
    +    val id1 = BroadcastBlockId(1)
    +    val tracker2 = new CloseTracker
    +    val tracker1 = new CloseTracker
    +    store.putIteratorAsValues(id1, Iterator(tracker1, tracker2), ClassTag.Any)
    +    assert(store.contains(id1))
    +    store.clear()
    +    assert(tracker1.getClosed())
    +    assert(tracker2.getClosed())
    +  }
    +
    +  test("[SPARK-24225]: clear should close AutoCloseable objects even if they throw exceptions") {
    +
    +    val (store, _) = makeMemoryStore(12000)
    +
    +    val id1 = BroadcastBlockId(1)
    +    val id2 = BroadcastBlockId(2)
    +    val tracker2 = new CloseTracker(true)
    +    val tracker1 = new CloseTracker(true)
    +    store.putIteratorAsValues(id1, Iterator(tracker1), ClassTag.Any)
    +    store.putIteratorAsValues(id2, Iterator(tracker2), ClassTag.Any)
    +    assert(store.contains(id1))
    +    assert(store.contains(id2))
    +    store.clear()
    +    assert(tracker1.getClosed())
    +    assert(tracker2.getClosed())
    +  }
    +}
    +
    +private class CloseTracker (val throwsOnClosed: Boolean = false) extends AutoCloseable {
    +  var closed = false
    +  override def close(): Unit = {
    +    closed = true
    +    if (throwsOnClosed) {
    +      throw new RuntimeException("Throwing")
    +    }
    +  }
    +  def getClosed(): Boolean = {
    +    closed
    --- End diff --
    
    since `closed` is public, you might use it directly. Or you can make `closed` private. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21322: [SPARK-24225][CORE] Support closing AutoClosable objects...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/21322
  
    Shall we leave this PR closed and start it from a design doc? Let me suggest to close this for now while I am looking through old PRs.
    
    @JeetKunDoug, please feel free to create a clone of this PR if there's any reason to keep this open that I missed. No objection.
    
    Thanks.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21322: [SPARK-24225][CORE] Support closing AutoClosable objects...

Posted by JeetKunDoug <gi...@git.apache.org>.
Github user JeetKunDoug commented on the issue:

    https://github.com/apache/spark/pull/21322
  
     @cloud-fan as to eviction, it ends up calling `BlockManager.dropFromMemory`, which will in fact call `MemoryStore.remove` and do the cleanup appropriately, so that should already be handled appropriately.
    
    I think you're right in that the `TorrentBroadcast.readBroadcastBlock` will only succeed if it can, in fact, push the unblockified object to the MemoryStore, but I think @viirya is right that there's still a possibility that it's been spilled to disk (and appropriately closed), but then we deserialize it again in `BlockManager.getLocalValues` but fail to write it back to the MemoryStore _there_, and then it's possible we can return the value to the client but not add it to the MemoryStore. In that case, a finalize method can handle the cleanup. However, I wonder if we should always store it back to the `blockManager` in the Some() case when we read from `getLocalValues` just to ensure it's there? Since we refuse to allow it to be used if we can't save it in the case we read it from another node, why would we allow it here?
    
    Regardless, I'm pushing some fix-ups to the other issues @dbtsai raised, which I think will handle all of the cases we'd need to consider this a "best-effort" cleanup while still requiring a finalizer. Please let me know what your thoughts on forcing the save to the memory store in `readBroadcastBlock`'s Some block where it manages to read from local values ( before TorrentBroadcast.scala:218 in my branch).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21322: [SPARK-24225][CORE] Support closing AutoClosable objects...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21322
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21322: [SPARK-24225][CORE] Support closing AutoClosable objects...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/21322
  
    Introducing a new concept which is similar to broadcast seems like an overkill. We can just update broadcast, to allow it to be memory-only.
    
    However, there might be simpler solutions to fit your case, without broadcast. e.g.
    ```
    val myObj = ...
    rdd.mapPartitions { it =>
      try {
        // process data
      } finally {
        myObj.close()
      }
    }
    ```
    
    I think we need to clear define the use case and think about whether we need a new API or not.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21322: [SPARK-24225][CORE] Support closing AutoClosable objects...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21322
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21322: [SPARK-24225][CORE] Support closing AutoClosable objects...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/21322
  
    Ah i see. It's kind of a broadcast but has a much small scope and its lifecycle is bound to a stage. I'm looking forward to a full design of it, thanks!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/21322


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...

Posted by dbtsai <gi...@git.apache.org>.
Github user dbtsai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21322#discussion_r188033806
  
    --- Diff: core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala ---
    @@ -526,4 +526,84 @@ class MemoryStoreSuite
           }
         }
       }
    +
    +  test("[SPARK-24225]: remove should close AutoCloseable object") {
    +
    +    val (store, _) = makeMemoryStore(12000)
    +
    +    val id = BroadcastBlockId(0)
    +    val tracker = new CloseTracker()
    +    store.putIteratorAsValues(id, Iterator(tracker), ClassTag.Any)
    +    assert(store.contains(id))
    +    store.remove(id)
    +    assert(tracker.getClosed())
    +  }
    +
    +  test("[SPARK-24225]: remove should close AutoCloseable objects even if they throw exceptions") {
    +
    +    val (store, _) = makeMemoryStore(12000)
    +
    +    val id = BroadcastBlockId(0)
    +    val tracker = new CloseTracker(true)
    +    store.putIteratorAsValues(id, Iterator(tracker), ClassTag.Any)
    +    assert(store.contains(id))
    +    store.remove(id)
    +    assert(tracker.getClosed())
    +  }
    +
    +  test("[SPARK-24225]: clear should close AutoCloseable objects") {
    +
    +    val (store, _) = makeMemoryStore(12000)
    +
    +    val id = BroadcastBlockId(0)
    +    val tracker = new CloseTracker
    +    store.putIteratorAsValues(id, Iterator(tracker), ClassTag.Any)
    +    assert(store.contains(id))
    +    store.clear()
    +    assert(tracker.getClosed())
    +  }
    +
    +  test("[SPARK-24225]: clear should close all AutoCloseable objects put together in an iterator") {
    +
    +    val (store, _) = makeMemoryStore(12000)
    +
    +    val id1 = BroadcastBlockId(1)
    +    val tracker2 = new CloseTracker
    +    val tracker1 = new CloseTracker
    +    store.putIteratorAsValues(id1, Iterator(tracker1, tracker2), ClassTag.Any)
    +    assert(store.contains(id1))
    +    store.clear()
    +    assert(tracker1.getClosed())
    +    assert(tracker2.getClosed())
    +  }
    +
    +  test("[SPARK-24225]: clear should close AutoCloseable objects even if they throw exceptions") {
    +
    +    val (store, _) = makeMemoryStore(12000)
    +
    +    val id1 = BroadcastBlockId(1)
    +    val id2 = BroadcastBlockId(2)
    +    val tracker2 = new CloseTracker(true)
    +    val tracker1 = new CloseTracker(true)
    +    store.putIteratorAsValues(id1, Iterator(tracker1), ClassTag.Any)
    +    store.putIteratorAsValues(id2, Iterator(tracker2), ClassTag.Any)
    +    assert(store.contains(id1))
    +    assert(store.contains(id2))
    +    store.clear()
    +    assert(tracker1.getClosed())
    +    assert(tracker2.getClosed())
    +  }
    +}
    +
    +private class CloseTracker (val throwsOnClosed: Boolean = false) extends AutoCloseable {
    +  var closed = false
    +  override def close(): Unit = {
    +    closed = true
    +    if (throwsOnClosed) {
    +      throw new RuntimeException("Throwing")
    --- End diff --
    
    Could you add `var isExcpetionThrown = false`, and check it in the test whether the exception is thrown?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21322: [SPARK-24225][CORE] Support closing AutoClosable objects...

Posted by JeetKunDoug <gi...@git.apache.org>.
Github user JeetKunDoug commented on the issue:

    https://github.com/apache/spark/pull/21322
  
    @cloud-fan The use-case we have in mind (and are currently using Broadcast + finalizers for) is the case where you have, for example, a connection pool that is, for one reason or another, scoped to a particular stage in the job. In this case, the pool itself is expensive to create, and can be shared across tasks, which makes closing the object in a try/finally for a single task, or even a single partition, as you'd end up potentially closing the resource early, and having to rebuild it several times. The fundamental trick is to figure out a way to allow the driver to define the scope of the shared resource (like a broadcast variable) and ensure it's really memory-only, so if there's a better way to use the existing broadcast variable infrastructure to do this, and prevent this kind of broadcast variable from being purged from the MemoryStore, then I'm all for it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21322: [SPARK-24225][CORE] Support closing AutoClosable objects...

Posted by JeetKunDoug <gi...@git.apache.org>.
Github user JeetKunDoug commented on the issue:

    https://github.com/apache/spark/pull/21322
  
    @cloud-fan So I see what's been messing with me - it's the whole Broadcast variable cache/week reference thing. I originally wrote this internally for Spark 1.6.3 (still supporting some older Spark installations), which didn't have this concept. I'll take another look at how we may be able to better handle the executor-wide cache in a more coherent way (pun intended) and get back to you. You may be right that we will have to depend on a combination of a finalizer + autocloseable object handled more directly in `Broadcast#destroy` for the happy-path case where we're not running into memory pressure on the MemoryStore that is causing the variable to churn.... I just hate having to depend on a finalizer to do this if there's a way around it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21322: [SPARK-24225][CORE] Support closing AutoClosable objects...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21322
  
    **[Test build #90652 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90652/testReport)** for PR 21322 at commit [`6a08c43`](https://github.com/apache/spark/commit/6a08c434cf967b939b8065bb23d64d0715e38a2c).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...

Posted by dbtsai <gi...@git.apache.org>.
Github user dbtsai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21322#discussion_r188032854
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
    @@ -384,15 +385,36 @@ private[spark] class MemoryStore(
         }
       }
     
    +  private def maybeReleaseResources(entry: MemoryEntry[_]): Unit = {
    +    entry match {
    +      case SerializedMemoryEntry(buffer, _, _) => buffer.dispose()
    +      case DeserializedMemoryEntry(objs: Array[Any], _, _) => maybeCloseValues(objs)
    +      case _ =>
    +    }
    +  }
    +
    +  private def maybeCloseValues(objs: Array[Any]): Unit = {
    --- End diff --
    
    `values: Array[Any]`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...

Posted by JeetKunDoug <gi...@git.apache.org>.
Github user JeetKunDoug commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21322#discussion_r189276054
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
    @@ -384,14 +385,37 @@ private[spark] class MemoryStore(
         }
       }
     
    +  private def maybeReleaseResources(entry: MemoryEntry[_]): Unit = {
    +    entry match {
    +      case SerializedMemoryEntry(buffer, _, _) => buffer.dispose()
    +      case DeserializedMemoryEntry(values: Array[Any], _, _) => maybeCloseValues(values)
    +      case _ =>
    +    }
    +  }
    +
    +  private def maybeCloseValues(values: Array[Any]): Unit = {
    +    values.foreach {
    +      case closable: AutoCloseable =>
    +        safelyCloseValue(closable)
    +      case _ =>
    +    }
    +  }
    +
    +  private def safelyCloseValue(closable: AutoCloseable): Unit = {
    +    try {
    +      closable.close()
    +    } catch {
    +      case ex: Exception => logWarning(s"Failed to close AutoClosable $closable", ex)
    +    }
    +  }
    +
       def remove(blockId: BlockId): Boolean = memoryManager.synchronized {
         val entry = entries.synchronized {
           entries.remove(blockId)
         }
         if (entry != null) {
    -      entry match {
    -        case SerializedMemoryEntry(buffer, _, _) => buffer.dispose()
    -        case _ =>
    +      if (blockId.isBroadcast) {
    +        maybeReleaseResources(entry)
    --- End diff --
    
    @dbtsai thanks - That whole "my day job vs. OSS" rush to fix. Will take care of it correctly and push an update.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21322: [SPARK-24225][CORE] Support closing AutoClosable objects...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21322
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21322: [SPARK-24225][CORE] Support closing AutoClosable objects...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/21322
  
    IIUC it's impossible to clean up broadcasted object in `MemoryStore`. The life cycle of the broadcasted object is:
    1. The first task that tries to get the broadcasted object will read bytes from block manager and deserialize it to object. The object will be put into the executor-wise cache.
    2. Other tasks in this executor try to get the broadcast object, read from cache if it's there.
    3. Other tasks in this executor try to get the broadcast object, redo step 1 if it has been evicted from cache.
    4. Job finishes, remove the value in block manager and cache.
    
    If we do cleanup in `MemoryStore`, we just rebuild the object from bytes and call its `close`. This doesn't help as we need to do cleanup for all the objects that have been created during the job.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21322: [SPARK-24225][CORE] Support closing AutoClosable objects...

Posted by JeetKunDoug <gi...@git.apache.org>.
Github user JeetKunDoug commented on the issue:

    https://github.com/apache/spark/pull/21322
  
    @cloud-fan (Also left on the JIRA ticket) - Sorry this has dropped off my radar for so long - work + life took me away from it for a while. So looking at the PR review comments and better understanding Broadcast Variable behavior (and some of the changes that took place in the 2.X series), it seems like simply trying to close Broadcast variables won't work as intended. However, I believe the underlying concept (driver-scoped shared variables, where the variable lives until the job is done or the driver removes it) is still worth pursuing. Being able to scope shared resources (like DB connection pools, which may need to change per phase of a job, or be able to be disposed of early in a process, which makes static variables not useful). Given that, I'd like to propose we add a new concept, similar to Broadcast Variables, called, perhaps, Scoped Variables. The intent would be for these to be scoped by the driver, be relatively small from a memory-consumption perspective (unlike broa
 dcast variables, which can be much larger), and to be held in memory until explicitly removed by the driver. Most of the infrastructure work for broadcast variables supports this use-case, but we'd need to have either a "non-purgable" type in the MemoryStore, or some other store specific to these new scoped variables, in order to prevent them from being evicted like cached items are.
     
    Thoughts on this? I'll start working on updating the PR to support something like this sometime today, but it might still take a while to get something workable put together, so I'd appreciate any feedback when someone has the time.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21322: [SPARK-24225][CORE] Support closing AutoClosable objects...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21322
  
    **[Test build #90652 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90652/testReport)** for PR 21322 at commit [`6a08c43`](https://github.com/apache/spark/commit/6a08c434cf967b939b8065bb23d64d0715e38a2c).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21322: [SPARK-24225][CORE] Support closing AutoClosable objects...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21322
  
    **[Test build #90796 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90796/testReport)** for PR 21322 at commit [`790c906`](https://github.com/apache/spark/commit/790c906062fc55f553a5cecb61147f801875d4b2).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21322: [SPARK-24225][CORE] Support closing AutoClosable objects...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/21322
  
    according to `TorrentBroadcast.writeBlocks` and `TorrentBroadcast.readBroadcastBlock`, broadcasted data are always written into the block manager with `MEMORY_AND_DISK_SER` mode. When a task wants to get the broadcasted value, it needs to get the serialized bytes from block manager(maybe memory, maybe disk) and deserialize it into an object, put the object into a cache, and return the object to the user function.
    
    That said, at the executor side we may create the broadcasted object multiple times: e.g. if it's evicted from cache and GCed, we read and deserialize it from block manager again. A simple way to do executor side clean up is implementing `finalize` method in the class you want to broadcast.
    
    Alternatively, we can improve the cache and watch the eviction event. If eviction happens, do the cleanup.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21322: [SPARK-24225][CORE] Support closing AutoClosable objects...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/21322
  
    I think a design doc is better, to make sure we are on the same page before the actual coding, which saves time.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...

Posted by dbtsai <gi...@git.apache.org>.
Github user dbtsai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21322#discussion_r189187178
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
    @@ -384,14 +385,37 @@ private[spark] class MemoryStore(
         }
       }
     
    +  private def maybeReleaseResources(entry: MemoryEntry[_]): Unit = {
    +    entry match {
    +      case SerializedMemoryEntry(buffer, _, _) => buffer.dispose()
    +      case DeserializedMemoryEntry(values: Array[Any], _, _) => maybeCloseValues(values)
    +      case _ =>
    +    }
    +  }
    +
    +  private def maybeCloseValues(values: Array[Any]): Unit = {
    +    values.foreach {
    +      case closable: AutoCloseable =>
    +        safelyCloseValue(closable)
    +      case _ =>
    +    }
    +  }
    +
    +  private def safelyCloseValue(closable: AutoCloseable): Unit = {
    +    try {
    +      closable.close()
    +    } catch {
    +      case ex: Exception => logWarning(s"Failed to close AutoClosable $closable", ex)
    +    }
    +  }
    +
       def remove(blockId: BlockId): Boolean = memoryManager.synchronized {
         val entry = entries.synchronized {
           entries.remove(blockId)
         }
         if (entry != null) {
    -      entry match {
    -        case SerializedMemoryEntry(buffer, _, _) => buffer.dispose()
    -        case _ =>
    +      if (blockId.isBroadcast) {
    +        maybeReleaseResources(entry)
    --- End diff --
    
    In this case, what happen when the blockId is not broadcast? The existing cleaning-up will not be called.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21322: [SPARK-24225] Support closing AutoClosable objects in Me...

Posted by dbtsai <gi...@git.apache.org>.
Github user dbtsai commented on the issue:

    https://github.com/apache/spark/pull/21322
  
    Could you change the title to `[SPARK-24225][CORE] Support closing AutoClosable objects...`? Thanks.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...

Posted by dbtsai <gi...@git.apache.org>.
Github user dbtsai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21322#discussion_r188032698
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
    @@ -384,15 +385,36 @@ private[spark] class MemoryStore(
         }
       }
     
    +  private def maybeReleaseResources(entry: MemoryEntry[_]): Unit = {
    +    entry match {
    +      case SerializedMemoryEntry(buffer, _, _) => buffer.dispose()
    +      case DeserializedMemoryEntry(objs: Array[Any], _, _) => maybeCloseValues(objs)
    --- End diff --
    
    `DeserializedMemoryEntry(values, _, _)` to match the rest of code style. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21322: [SPARK-24225][CORE] Support closing AutoClosable objects...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/21322
  
    I'd like to recommend http://apache-spark-developers-list.1001551.n3.nabble.com/SPIP-SPARK-25728-Structured-Intermediate-Representation-Tungsten-IR-for-generating-Java-code-td25370.html 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21322: [SPARK-24225][CORE] Support closing AutoClosable objects...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/21322
  
    One concern I have is, now we keep broadcasted variables in `BroadcastManager.cachedValues` by using weak reference. So if a broadcasted variable with `AutoCloseable` is released before we call `Broadcast#destroy`, we still can't properly release the resources.
    



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...

Posted by JeetKunDoug <gi...@git.apache.org>.
Github user JeetKunDoug commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21322#discussion_r188295537
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
    @@ -384,15 +385,36 @@ private[spark] class MemoryStore(
         }
       }
     
    +  private def maybeReleaseResources(entry: MemoryEntry[_]): Unit = {
    +    entry match {
    +      case SerializedMemoryEntry(buffer, _, _) => buffer.dispose()
    +      case DeserializedMemoryEntry(objs: Array[Any], _, _) => maybeCloseValues(objs)
    --- End diff --
    
    I wouldn't expect a never-deserialized Memory Entry to be closed, as it was never really instantiated to begin with - so if it _only_ lands on disk, I think that's reasonable (as the variable in question would never have had a chance to allocate anything either).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21322: [SPARK-24225][CORE] Support closing AutoClosable objects...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/21322
  
    I think users are responsible to call `Broadcast#destroy`, which unpersist broadcast blocks from block manager and run user-defined driver side cleanup.
    
    It is a valid use case to allow users to define some executor side cleanup via `AutoCloseable`. However, I don't think we should always detect `AutoCloseable` when removing a block, as it may break existing program and cause perf regression. We should only do it for broadcast blocks.
    
    A good place to do it seems to be `BlockManager.removeBroadcast`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21322: [SPARK-24225][CORE] Support closing AutoClosable objects...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21322
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21322: [SPARK-24225][CORE] Support closing AutoClosable objects...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21322
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90651/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21322#discussion_r188306362
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
    @@ -384,15 +385,36 @@ private[spark] class MemoryStore(
         }
       }
     
    +  private def maybeReleaseResources(entry: MemoryEntry[_]): Unit = {
    +    entry match {
    +      case SerializedMemoryEntry(buffer, _, _) => buffer.dispose()
    +      case DeserializedMemoryEntry(objs: Array[Any], _, _) => maybeCloseValues(objs)
    --- End diff --
    
    In theory, you can have working broadcasted object and at the same time it is not in `MemoryStore`.
    
    During storing the merged object into `BlockManager` by calling `putSingle`, it can be stored into disk store.
    
    Once the object is going to be used, if we can't find it in cache, we call `BlockManager.getLocalValues` to retrieve it back from disk store. Although it will try to store it to `MemoryStore`, it may not success.
    
    I think the point is here the change assumes that if there is a deserialized broadcasted object, it is definitely in `MemoryStore`. But if I read the code correctly, it is not the case. You can have serialized bytes of the object in disk store and use a deserialized object at the same time.
    
    



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21322: [SPARK-24225][CORE] Support closing AutoClosable objects...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21322
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90824/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21322: [SPARK-24225] Support closing AutoClosable objects in Me...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the issue:

    https://github.com/apache/spark/pull/21322
  
    cc @jiangxb1987 @cloud-fan 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21322#discussion_r224875899
  
    --- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
    @@ -1930,6 +1930,18 @@ private[spark] object Utils extends Logging {
         }
       }
     
    +  def tryClose(value: Any): Unit = {
    --- End diff --
    
    This should accept at best `AnyRef`. It doesn't really seem like we need a new global utility method for this. It's a little unusual to try closing things that aren't `Closeable` and we can try to rationalize that in the callers above if possible.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21322: [SPARK-24225][CORE] Support closing AutoClosable ...

Posted by JeetKunDoug <gi...@git.apache.org>.
Github user JeetKunDoug commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21322#discussion_r188074445
  
    --- Diff: core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala ---
    @@ -526,4 +526,84 @@ class MemoryStoreSuite
           }
         }
       }
    +
    +  test("[SPARK-24225]: remove should close AutoCloseable object") {
    +
    +    val (store, _) = makeMemoryStore(12000)
    +
    +    val id = BroadcastBlockId(0)
    +    val tracker = new CloseTracker()
    +    store.putIteratorAsValues(id, Iterator(tracker), ClassTag.Any)
    +    assert(store.contains(id))
    +    store.remove(id)
    +    assert(tracker.getClosed())
    +  }
    +
    +  test("[SPARK-24225]: remove should close AutoCloseable objects even if they throw exceptions") {
    +
    +    val (store, _) = makeMemoryStore(12000)
    +
    +    val id = BroadcastBlockId(0)
    +    val tracker = new CloseTracker(true)
    +    store.putIteratorAsValues(id, Iterator(tracker), ClassTag.Any)
    +    assert(store.contains(id))
    +    store.remove(id)
    +    assert(tracker.getClosed())
    +  }
    +
    +  test("[SPARK-24225]: clear should close AutoCloseable objects") {
    +
    +    val (store, _) = makeMemoryStore(12000)
    +
    +    val id = BroadcastBlockId(0)
    +    val tracker = new CloseTracker
    +    store.putIteratorAsValues(id, Iterator(tracker), ClassTag.Any)
    +    assert(store.contains(id))
    +    store.clear()
    +    assert(tracker.getClosed())
    +  }
    +
    +  test("[SPARK-24225]: clear should close all AutoCloseable objects put together in an iterator") {
    --- End diff --
    
    So if I understand the API correctly, there is no way to remove a single item that was put as part of a call to `putIterator` - because operations are conducted by `blockId` you would only be able to remove the whole group of entries, not a single part of an iterator.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21322: [SPARK-24225][CORE] Support closing AutoClosable objects...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21322
  
    **[Test build #90796 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90796/testReport)** for PR 21322 at commit [`790c906`](https://github.com/apache/spark/commit/790c906062fc55f553a5cecb61147f801875d4b2).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21322: [SPARK-24225] Support closing AutoClosable objects in Me...

Posted by dbtsai <gi...@git.apache.org>.
Github user dbtsai commented on the issue:

    https://github.com/apache/spark/pull/21322
  
    test this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21322: [SPARK-24225][CORE] Support closing AutoClosable objects...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/21322
  
    @JeetKunDoug The same issue we discuss above. I think if there's a deserialized version of the variable, it can be not in `MemoryStore` but only serialized bytes in disk store.
    
    The reason is, we use `getLocalValues` to retrieve the deserialized object. If it is stored in disk store by `putSingle`, we read it back and call `maybeCacheDiskValuesInMemory` trying to cache it in `MemoryStore`. But it is not guaranteed to be succeed. When it fails, you still only have serialized value in disk store.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21322: [SPARK-24225][CORE] Support closing AutoClosable objects...

Posted by JeetKunDoug <gi...@git.apache.org>.
Github user JeetKunDoug commented on the issue:

    https://github.com/apache/spark/pull/21322
  
    @HyukjinKwon makes sense - I'll try to get a design doc put together as soon as I can, but my "day job" is preventing me from working on it right now.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21322: [SPARK-24225][CORE] Support closing AutoClosable objects...

Posted by JeetKunDoug <gi...@git.apache.org>.
Github user JeetKunDoug commented on the issue:

    https://github.com/apache/spark/pull/21322
  
    Sounds good - it may take me a while to get the time to work this up, but I'll get something put together and attached to the underlying issue as soon as I can. If possible, can you point me at what you would consider a "good" design document, so I have a starting point to work with?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21322: [SPARK-24225][CORE] Support closing AutoClosable objects...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21322
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21322: [SPARK-24225][CORE] Support closing AutoClosable objects...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21322
  
    **[Test build #90824 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90824/testReport)** for PR 21322 at commit [`790c906`](https://github.com/apache/spark/commit/790c906062fc55f553a5cecb61147f801875d4b2).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21322: [SPARK-24225] Support closing AutoClosable objects in Me...

Posted by dbtsai <gi...@git.apache.org>.
Github user dbtsai commented on the issue:

    https://github.com/apache/spark/pull/21322
  
    Jenkins, add to whitelist.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org