You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by sarutak <gi...@git.apache.org> on 2014/09/16 14:24:12 UTC

[GitHub] spark pull request: [SPARK-3546] InputStream of ManagedBuffer does...

GitHub user sarutak opened a pull request:

    https://github.com/apache/spark/pull/2408

    [SPARK-3546] InputStream of ManagedBuffer does not close and causes running out of file descriptor

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/sarutak/spark resolve-resource-leak-issue

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/2408.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2408
    
----
commit bf29d4a1d8e332941caba337286f26f2238095d4
Author: Kousuke Saruta <sa...@oss.nttdata.co.jp>
Date:   2014-09-16T12:22:12Z

    Modified FileSegment to close channel
    
    Modifid ShuffleBlockFetcherIterator to close block data file

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3546] InputStream of ManagedBuffer is n...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2408#discussion_r17598920
  
    --- Diff: core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala ---
    @@ -66,8 +67,13 @@ final class FileSegmentManagedBuffer(val file: File, val offset: Long, val lengt
       override def size: Long = length
     
       override def nioByteBuffer(): ByteBuffer = {
    -    val channel = new RandomAccessFile(file, "r").getChannel
    -    channel.map(MapMode.READ_ONLY, offset, length)
    +    var channel: FileChannel = null
    +    try {
    +      channel = new RandomAccessFile(file, "r").getChannel
    +      channel.map(MapMode.READ_ONLY, offset, length)
    +    } finally {
    +      channel.close()
    --- End diff --
    
    This would throw an NPE if an error occurred in `new RandomAccessFile` or `getChannel`. I suppose you move `new RandomAccessFile(file, "r").getChannel` before the `try` block. Before that method returns, there is no `FileChannel` that successfully opened and therefore needs closing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3546] InputStream of ManagedBuffer is n...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/2408#issuecomment-55801829
  
    Thanks. Merging in master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3546] InputStream of ManagedBuffer is n...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2408#discussion_r17598955
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala ---
    @@ -111,13 +112,21 @@ final class ShuffleBlockFetcherIterator(
         blockTransferService.fetchBlocks(req.address.host, req.address.port, blockIds,
           new BlockFetchingListener {
             override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = {
    -          results.put(new FetchResult(BlockId(blockId), sizeMap(blockId),
    -            () => serializer.newInstance().deserializeStream(
    -              blockManager.wrapForCompression(BlockId(blockId), data.inputStream())).asIterator
    -          ))
    -          shuffleMetrics.remoteBytesRead += data.size
    -          shuffleMetrics.remoteBlocksFetched += 1
    -          logDebug("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime))
    +          var is: InputStream = null
    +          try {
    +            is = data.inputStream()
    +            results.put(new FetchResult(BlockId(blockId), sizeMap(blockId),
    +              () => serializer.newInstance().deserializeStream(
    +                blockManager.wrapForCompression(BlockId(blockId), is)).asIterator
    +            ))
    +            shuffleMetrics.remoteBytesRead += data.size
    --- End diff --
    
    Can these three lines follow the `finally` block? the stream can be closed at this point I think.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3546] InputStream of ManagedBuffer is n...

Posted by sarutak <gi...@git.apache.org>.
Github user sarutak commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2408#discussion_r17600021
  
    --- Diff: core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala ---
    @@ -66,8 +67,13 @@ final class FileSegmentManagedBuffer(val file: File, val offset: Long, val lengt
       override def size: Long = length
     
       override def nioByteBuffer(): ByteBuffer = {
    -    val channel = new RandomAccessFile(file, "r").getChannel
    -    channel.map(MapMode.READ_ONLY, offset, length)
    +    var channel: FileChannel = null
    +    try {
    +      channel = new RandomAccessFile(file, "r").getChannel
    +      channel.map(MapMode.READ_ONLY, offset, length)
    +    } finally {
    +      channel.close()
    --- End diff --
    
    Originally I was going to check channel is null or not, but I forgot at previous PR.
    Now I've modified. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3546] InputStream of ManagedBuffer is n...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/2408#issuecomment-55788663
  
    Jenkins, test this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3546] InputStream of ManagedBuffer is n...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2408#issuecomment-55801517
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20409/consoleFull) for   PR 2408 at commit [`074781d`](https://github.com/apache/spark/commit/074781d220f37fa3edaa22ecce7312d0ca22596a).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3546] InputStream of ManagedBuffer is n...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2408#issuecomment-55742800
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20383/consoleFull) for   PR 2408 at commit [`bf29d4a`](https://github.com/apache/spark/commit/bf29d4a1d8e332941caba337286f26f2238095d4).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3546] InputStream of ManagedBuffer is n...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2408#discussion_r17617628
  
    --- Diff: core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala ---
    @@ -66,8 +67,15 @@ final class FileSegmentManagedBuffer(val file: File, val offset: Long, val lengt
       override def size: Long = length
     
       override def nioByteBuffer(): ByteBuffer = {
    -    val channel = new RandomAccessFile(file, "r").getChannel
    -    channel.map(MapMode.READ_ONLY, offset, length)
    +    var channel: FileChannel = null
    +    try {
    --- End diff --
    
    this part looks good to me


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3546] InputStream of ManagedBuffer is n...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2408#issuecomment-55769853
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20391/consoleFull) for   PR 2408 at commit [`5f63f67`](https://github.com/apache/spark/commit/5f63f67fb8e1dc85788436581f49adc2cb8b32bc).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3546] InputStream of ManagedBuffer does...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2408#issuecomment-55734763
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20383/consoleFull) for   PR 2408 at commit [`bf29d4a`](https://github.com/apache/spark/commit/bf29d4a1d8e332941caba337286f26f2238095d4).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3546] InputStream of ManagedBuffer is n...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2408#issuecomment-55751983
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20384/consoleFull) for   PR 2408 at commit [`b37231a`](https://github.com/apache/spark/commit/b37231a7285836d540a21b263f812953e7c6d800).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `  class ArrayConstructor extends net.razorvine.pickle.objects.ArrayConstructor `
      * `class SCCallSiteSync(object):`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3546] InputStream of ManagedBuffer is n...

Posted by sarutak <gi...@git.apache.org>.
Github user sarutak commented on the pull request:

    https://github.com/apache/spark/pull/2408#issuecomment-55790211
  
    test this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3546] InputStream of ManagedBuffer is n...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/2408


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3546] InputStream of ManagedBuffer is n...

Posted by sarutak <gi...@git.apache.org>.
Github user sarutak commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2408#discussion_r17601336
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala ---
    @@ -111,13 +112,21 @@ final class ShuffleBlockFetcherIterator(
         blockTransferService.fetchBlocks(req.address.host, req.address.port, blockIds,
           new BlockFetchingListener {
             override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = {
    -          results.put(new FetchResult(BlockId(blockId), sizeMap(blockId),
    -            () => serializer.newInstance().deserializeStream(
    -              blockManager.wrapForCompression(BlockId(blockId), data.inputStream())).asIterator
    -          ))
    -          shuffleMetrics.remoteBytesRead += data.size
    -          shuffleMetrics.remoteBlocksFetched += 1
    -          logDebug("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime))
    +          var is: InputStream = null
    +          try {
    +            is = data.inputStream()
    +            results.put(new FetchResult(BlockId(blockId), sizeMap(blockId),
    +              () => serializer.newInstance().deserializeStream(
    +                blockManager.wrapForCompression(BlockId(blockId), is)).asIterator
    +            ))
    +            shuffleMetrics.remoteBytesRead += data.size
    --- End diff --
    
    Ah, do you mean line 122 - 124 should be out of try block? I agree with that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3546] InputStream of ManagedBuffer is n...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2408#discussion_r17600258
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala ---
    @@ -111,13 +112,21 @@ final class ShuffleBlockFetcherIterator(
         blockTransferService.fetchBlocks(req.address.host, req.address.port, blockIds,
           new BlockFetchingListener {
             override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = {
    -          results.put(new FetchResult(BlockId(blockId), sizeMap(blockId),
    -            () => serializer.newInstance().deserializeStream(
    -              blockManager.wrapForCompression(BlockId(blockId), data.inputStream())).asIterator
    -          ))
    -          shuffleMetrics.remoteBytesRead += data.size
    -          shuffleMetrics.remoteBlocksFetched += 1
    -          logDebug("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime))
    +          var is: InputStream = null
    +          try {
    +            is = data.inputStream()
    +            results.put(new FetchResult(BlockId(blockId), sizeMap(blockId),
    +              () => serializer.newInstance().deserializeStream(
    +                blockManager.wrapForCompression(BlockId(blockId), is)).asIterator
    +            ))
    +            shuffleMetrics.remoteBytesRead += data.size
    --- End diff --
    
    These three lines do not need to be within the `try` block, right? I figure it's best to complete the `try` block and have `is` be closed before moving on to further operations.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3546] InputStream of ManagedBuffer is n...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2408#issuecomment-55758516
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20391/consoleFull) for   PR 2408 at commit [`5f63f67`](https://github.com/apache/spark/commit/5f63f67fb8e1dc85788436581f49adc2cb8b32bc).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3546] InputStream of ManagedBuffer is n...

Posted by sarutak <gi...@git.apache.org>.
Github user sarutak commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2408#discussion_r17619322
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala ---
    @@ -111,10 +112,18 @@ final class ShuffleBlockFetcherIterator(
         blockTransferService.fetchBlocks(req.address.host, req.address.port, blockIds,
           new BlockFetchingListener {
             override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = {
    -          results.put(new FetchResult(BlockId(blockId), sizeMap(blockId),
    -            () => serializer.newInstance().deserializeStream(
    -              blockManager.wrapForCompression(BlockId(blockId), data.inputStream())).asIterator
    -          ))
    +          var is: InputStream = null
    +          try {
    +            is = data.inputStream()
    +            results.put(new FetchResult(BlockId(blockId), sizeMap(blockId),
    +              () => serializer.newInstance().deserializeStream(
    +                blockManager.wrapForCompression(BlockId(blockId), is)).asIterator
    +            ))
    +          } finally {
    +            if (is != null) {
    +              is.close()
    --- End diff --
    
    @rxin Exactly, it's not make sense and I noticed the InputStream is closed via NextIterator#close.
    So, I revert this part of change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3546] InputStream of ManagedBuffer is n...

Posted by sarutak <gi...@git.apache.org>.
Github user sarutak commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2408#discussion_r17600041
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala ---
    @@ -111,13 +112,21 @@ final class ShuffleBlockFetcherIterator(
         blockTransferService.fetchBlocks(req.address.host, req.address.port, blockIds,
           new BlockFetchingListener {
             override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = {
    -          results.put(new FetchResult(BlockId(blockId), sizeMap(blockId),
    -            () => serializer.newInstance().deserializeStream(
    -              blockManager.wrapForCompression(BlockId(blockId), data.inputStream())).asIterator
    -          ))
    -          shuffleMetrics.remoteBytesRead += data.size
    -          shuffleMetrics.remoteBlocksFetched += 1
    -          logDebug("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime))
    +          var is: InputStream = null
    +          try {
    +            is = data.inputStream()
    +            results.put(new FetchResult(BlockId(blockId), sizeMap(blockId),
    +              () => serializer.newInstance().deserializeStream(
    +                blockManager.wrapForCompression(BlockId(blockId), is)).asIterator
    +            ))
    +            shuffleMetrics.remoteBytesRead += data.size
    --- End diff --
    
    Sorry, what do you mean? I didn't get.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3546] InputStream of ManagedBuffer is n...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2408#issuecomment-55741767
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20384/consoleFull) for   PR 2408 at commit [`b37231a`](https://github.com/apache/spark/commit/b37231a7285836d540a21b263f812953e7c6d800).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3546] InputStream of ManagedBuffer is n...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2408#issuecomment-55791275
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20409/consoleFull) for   PR 2408 at commit [`074781d`](https://github.com/apache/spark/commit/074781d220f37fa3edaa22ecce7312d0ca22596a).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3546] InputStream of ManagedBuffer is n...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2408#discussion_r17617004
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala ---
    @@ -111,10 +112,18 @@ final class ShuffleBlockFetcherIterator(
         blockTransferService.fetchBlocks(req.address.host, req.address.port, blockIds,
           new BlockFetchingListener {
             override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = {
    -          results.put(new FetchResult(BlockId(blockId), sizeMap(blockId),
    -            () => serializer.newInstance().deserializeStream(
    -              blockManager.wrapForCompression(BlockId(blockId), data.inputStream())).asIterator
    -          ))
    +          var is: InputStream = null
    +          try {
    +            is = data.inputStream()
    +            results.put(new FetchResult(BlockId(blockId), sizeMap(blockId),
    +              () => serializer.newInstance().deserializeStream(
    +                blockManager.wrapForCompression(BlockId(blockId), is)).asIterator
    +            ))
    +          } finally {
    +            if (is != null) {
    +              is.close()
    --- End diff --
    
    doesn't this close the inputstream prematurely?  Note that the 3ard argument to results is passed in as a closure so it is lazy.
    
    BTW in my new refactoring of this, there is a place where we should explicitly close the streams:
    
    https://github.com/apache/spark/pull/2330/files#diff-27109eb30a77542d377c936e0d134420R295
    
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org