You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by dafrista <gi...@git.apache.org> on 2016/05/28 18:34:50 UTC

[GitHub] spark pull request: [SPARK-5581][Core] When writing sorted map out...

GitHub user dafrista opened a pull request:

    https://github.com/apache/spark/pull/13382

    [SPARK-5581][Core] When writing sorted map output file, avoid open / …

    …close between each partition
    
    ## What changes were proposed in this pull request?
    
    Replace commitAndClose with separate commit and close to avoid opening and closing
    the file between partitions.
    
    ## How was this patch tested?
    
    Run existing unit tests, add a few unit tests regarding reverts.
    
    Observed a ~20% reduction in total time in tasks on stages with shuffle
    writes to many partitions.
    
    @JoshRosen 

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/dafrista/spark separatecommit-master

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/13382.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #13382
    
----
commit 360cc54047428b9ffe76b2e9005ea5a3bb35e610
Author: Brian Cho <bc...@fb.com>
Date:   2016-05-17T04:38:43Z

    [SPARK-5581][Core] When writing sorted map output file, avoid open / close between each partition
    
    ## What changes were proposed in this pull request?
    
    Replace commitAndClose with separate commit and close to avoid opening and closing
    the file between partitions.
    
    ## How was this patch tested?
    
    Run existing unit tests, add a few unit tests regarding reverts.
    
    Observed a ~20% reduction in total time in tasks on stages with shuffle
    writes to many partitions.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13382: [SPARK-5581][Core] When writing sorted map output file, ...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the issue:

    https://github.com/apache/spark/pull/13382
  
    LGTM as well, so I'm going to merge this into master for inclusion in Spark 2.1.0. Thanks @dafrista for writing this and to @ericl for helping with review.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13382: [SPARK-5581][Core] When writing sorted map output file, ...

Posted by dafrista <gi...@git.apache.org>.
Github user dafrista commented on the issue:

    https://github.com/apache/spark/pull/13382
  
    Thanks @ericl I've added that information to the class comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5581][Core] When writing sorted map output file, ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/13382
  
    **[Test build #59704 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/59704/consoleFull)** for PR 13382 at commit [`6fc0fd0`](https://github.com/apache/spark/commit/6fc0fd002239a58f4462928bd511395c3145e9f2).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13382: [SPARK-5581][Core] When writing sorted map output file, ...

Posted by dafrista <gi...@git.apache.org>.
Github user dafrista commented on the issue:

    https://github.com/apache/spark/pull/13382
  
    @JoshRosen please let me know your thoughts on the changes


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13382: [SPARK-5581][Core] When writing sorted map output file, ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/13382
  
    **[Test build #62510 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/62510/consoleFull)** for PR 13382 at commit [`e19ec3d`](https://github.com/apache/spark/commit/e19ec3d2b145879e7ea73fa847761cfdeb7d5c95).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13382: [SPARK-5581][Core] When writing sorted map output...

Posted by ericl <gi...@git.apache.org>.
Github user ericl commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13382#discussion_r71262947
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala ---
    @@ -46,102 +46,145 @@ private[spark] class DiskBlockObjectWriter(
       extends OutputStream
       with Logging {
     
    +  /**
    +   * Guards against close calls, e.g. from a wrapping stream.
    +   * Call manualClose to close the stream that was extended by this trait.
    +   */
    +  private trait ManualCloseOutputStream extends OutputStream {
    +    abstract override def close(): Unit = {
    +      flush()
    +    }
    +
    +    def manualClose(): Unit = {
    +      super.close()
    +    }
    +  }
    +
       /** The file channel, used for repositioning / truncating the file. */
       private var channel: FileChannel = null
    +  private var mcs: ManualCloseOutputStream = null
       private var bs: OutputStream = null
       private var fos: FileOutputStream = null
       private var ts: TimeTrackingOutputStream = null
       private var objOut: SerializationStream = null
       private var initialized = false
    +  private var streamOpen = false
       private var hasBeenClosed = false
    -  private var commitAndCloseHasBeenCalled = false
     
       /**
        * Cursors used to represent positions in the file.
        *
    -   * xxxxxxxx|--------|---       |
    -   *         ^        ^          ^
    -   *         |        |        finalPosition
    -   *         |      reportedPosition
    -   *       initialPosition
    +   * xxxxxxxx|--------|---|
    +   *           ^          ^
    +   *           |        committedPosition
    +   *         reportedPosition
        *
    -   * initialPosition: Offset in the file where we start writing. Immutable.
        * reportedPosition: Position at the time of the last update to the write metrics.
    -   * finalPosition: Offset where we stopped writing. Set on closeAndCommit() then never changed.
    +   * committedPosition: Offset after last committed write.
        * -----: Current writes to the underlying file.
        * xxxxx: Existing contents of the file.
        */
    -  private val initialPosition = file.length()
    -  private var finalPosition: Long = -1
    -  private var reportedPosition = initialPosition
    +  private var committedPosition = file.length()
    +  private var reportedPosition = committedPosition
     
       /**
        * Keep track of number of records written and also use this to periodically
        * output bytes written since the latter is expensive to do for each record.
        */
       private var numRecordsWritten = 0
     
    +  private def initialize(): Unit = {
    +    fos = new FileOutputStream(file, true)
    +    channel = fos.getChannel()
    +    ts = new TimeTrackingOutputStream(writeMetrics, fos)
    +    class ManualCloseBufferedOutputStream
    +      extends BufferedOutputStream(ts, bufferSize) with ManualCloseOutputStream
    +    mcs = new ManualCloseBufferedOutputStream
    +  }
    +
       def open(): DiskBlockObjectWriter = {
         if (hasBeenClosed) {
           throw new IllegalStateException("Writer already closed. Cannot be reopened.")
         }
    -    fos = new FileOutputStream(file, true)
    -    ts = new TimeTrackingOutputStream(writeMetrics, fos)
    -    channel = fos.getChannel()
    -    bs = compressStream(new BufferedOutputStream(ts, bufferSize))
    +    if (!initialized) {
    +      initialize()
    +      initialized = true
    +    }
    +    bs = compressStream(mcs)
         objOut = serializerInstance.serializeStream(bs)
    -    initialized = true
    +    streamOpen = true
         this
       }
     
    -  override def close() {
    +  /**
    +   * Close and cleanup all resources.
    +   * Should call after committing or reverting partial writes.
    +   */
    +  private def closeResources(): Unit = {
         if (initialized) {
    -      Utils.tryWithSafeFinally {
    -        if (syncWrites) {
    -          // Force outstanding writes to disk and track how long it takes
    -          objOut.flush()
    -          val start = System.nanoTime()
    -          fos.getFD.sync()
    -          writeMetrics.incWriteTime(System.nanoTime() - start)
    -        }
    -      } {
    -        objOut.close()
    -      }
    -
    +      mcs.manualClose()
           channel = null
    +      mcs = null
           bs = null
           fos = null
           ts = null
           objOut = null
           initialized = false
    +      streamOpen = false
           hasBeenClosed = true
         }
       }
     
    -  def isOpen: Boolean = objOut != null
    +  /**
    +   * Commits any remaining partial writes and closes resources.
    +   */
    +  override def close() {
    +    if (initialized) {
    +      Utils.tryWithSafeFinally {
    +        commit()
    +      } {
    +        closeResources()
    +      }
    +    }
    +  }
     
       /**
        * Flush the partial writes and commit them as a single atomic block.
    +   * A commit may write additional bytes to frame the atomic block.
    +   *
    +   * @return file segment with previous offset and length committed on this call.
        */
    -  def commitAndClose(): Unit = {
    -    if (initialized) {
    +  def commit(): FileSegment = {
    --- End diff --
    
    nit: commitAndGet()?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5581][Core] When writing sorted map output file, ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/13382
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5581][Core] When writing sorted map output file, ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/13382
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/59680/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13382: [SPARK-5581][Core] When writing sorted map output file, ...

Posted by dafrista <gi...@git.apache.org>.
Github user dafrista commented on the issue:

    https://github.com/apache/spark/pull/13382
  
    This change gives good performance gains for shuffles with many partitions. It would be great to get it merged. (Ping @JoshRosen)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13382: [SPARK-5581][Core] When writing sorted map output file, ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/13382
  
    **[Test build #62503 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/62503/consoleFull)** for PR 13382 at commit [`0fe4bc8`](https://github.com/apache/spark/commit/0fe4bc8a0232f9e6a4dcb6df76fc3f256b784803).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13382: [SPARK-5581][Core] When writing sorted map output file, ...

Posted by dafrista <gi...@git.apache.org>.
Github user dafrista commented on the issue:

    https://github.com/apache/spark/pull/13382
  
    Thanks @ericl. I pushed a commit addressing your comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5581][Core] When writing sorted map output file, ...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13382#discussion_r65266332
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala ---
    @@ -46,66 +46,82 @@ private[spark] class DiskBlockObjectWriter(
       extends OutputStream
       with Logging {
     
    +  /**
    +   * Guards against close calls, e.g. from a wrapping stream.
    +   * Call manualClose to close the stream that was extended by this trait.
    +   */
    +  private trait ManualCloseOutputStream extends OutputStream {
    +    abstract override def close(): Unit = {
    +      flush()
    +    }
    +
    +    def manualClose(): Unit = {
    +      super.close()
    +    }
    +  }
    +
       /** The file channel, used for repositioning / truncating the file. */
       private var channel: FileChannel = null
    +  private var mcs: ManualCloseOutputStream = null
       private var bs: OutputStream = null
       private var fos: FileOutputStream = null
       private var ts: TimeTrackingOutputStream = null
       private var objOut: SerializationStream = null
       private var initialized = false
    +  private var streamOpen = false
       private var hasBeenClosed = false
    -  private var commitAndCloseHasBeenCalled = false
     
       /**
        * Cursors used to represent positions in the file.
        *
    -   * xxxxxxxx|--------|---       |
    -   *         ^        ^          ^
    -   *         |        |        finalPosition
    -   *         |      reportedPosition
    -   *       initialPosition
    +   * xxxxxxxx|--------|---|
    +   *           ^          ^
    +   *           |        committedPosition
    +   *         reportedPosition
        *
    -   * initialPosition: Offset in the file where we start writing. Immutable.
        * reportedPosition: Position at the time of the last update to the write metrics.
    -   * finalPosition: Offset where we stopped writing. Set on closeAndCommit() then never changed.
    +   * committedPosition: Offset after last committed write.
        * -----: Current writes to the underlying file.
        * xxxxx: Existing contents of the file.
        */
    -  private val initialPosition = file.length()
    -  private var finalPosition: Long = -1
    -  private var reportedPosition = initialPosition
    +  private var committedPosition = file.length()
    +  private var reportedPosition = committedPosition
     
       /**
        * Keep track of number of records written and also use this to periodically
        * output bytes written since the latter is expensive to do for each record.
        */
       private var numRecordsWritten = 0
     
    +  private def initialize(): Unit = {
    +    fos = new FileOutputStream(file, true)
    +    channel = fos.getChannel()
    +    ts = new TimeTrackingOutputStream(writeMetrics, fos)
    +    class ManualCloseBufferedOutputStream
    +      extends BufferedOutputStream(ts, bufferSize) with ManualCloseOutputStream
    +    mcs = new ManualCloseBufferedOutputStream
    +  }
    +
       def open(): DiskBlockObjectWriter = {
         if (hasBeenClosed) {
           throw new IllegalStateException("Writer already closed. Cannot be reopened.")
         }
    -    fos = new FileOutputStream(file, true)
    -    ts = new TimeTrackingOutputStream(writeMetrics, fos)
    -    channel = fos.getChannel()
    -    bs = compressStream(new BufferedOutputStream(ts, bufferSize))
    +    if (!initialized) {
    +      initialize()
    +      initialized = true
    +    }
    +    bs = compressStream(mcs)
         objOut = serializerInstance.serializeStream(bs)
    -    initialized = true
    +    streamOpen = true
         this
       }
     
       override def close() {
         if (initialized) {
           Utils.tryWithSafeFinally {
    -        if (syncWrites) {
    -          // Force outstanding writes to disk and track how long it takes
    -          objOut.flush()
    -          val start = System.nanoTime()
    -          fos.getFD.sync()
    -          writeMetrics.incWriteTime(System.nanoTime() - start)
    -        }
    +        commit()
    --- End diff --
    
    Quick question regarding semantics: in the old code, just calling `close()` would have the effect of writing any buffered data out but would not update `finalPosition` (`committedPosition` in the new terminology adopted in this patch). As of this change, though, this call to `commit()` inside of `close()` will end up updating that variable. This is slightly confusing to reason about.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5581][Core] When writing sorted map output file, ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/13382
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/59704/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5581][Core] When writing sorted map output file, ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/13382
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13382: [SPARK-5581][Core] When writing sorted map output file, ...

Posted by dafrista <gi...@git.apache.org>.
Github user dafrista commented on the issue:

    https://github.com/apache/spark/pull/13382
  
    @JoshRosen I went ahead and implemented the change. My goal was to make `close()` behavior clearer while still behaving like a "normal" `OutputStream` implementation. Changes are ready for another review.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13382: [SPARK-5581][Core] When writing sorted map output file, ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/13382
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5581][Core] When writing sorted map output file, ...

Posted by dafrista <gi...@git.apache.org>.
Github user dafrista commented on the pull request:

    https://github.com/apache/spark/pull/13382
  
    Thanks @JoshRosen I've made changes based on your comments. I'll wait to hear your thoughts on `close()` before proceeding on that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13382: [SPARK-5581][Core] When writing sorted map output file, ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/13382
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/62510/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13382: [SPARK-5581][Core] When writing sorted map output file, ...

Posted by dafrista <gi...@git.apache.org>.
Github user dafrista commented on the issue:

    https://github.com/apache/spark/pull/13382
  
    Great. Thanks @JoshRosen my JIRA username is chobrian.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13382: [SPARK-5581][Core] When writing sorted map output...

Posted by ericl <gi...@git.apache.org>.
Github user ericl commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13382#discussion_r71262784
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala ---
    @@ -46,102 +46,145 @@ private[spark] class DiskBlockObjectWriter(
       extends OutputStream
       with Logging {
     
    +  /**
    +   * Guards against close calls, e.g. from a wrapping stream.
    +   * Call manualClose to close the stream that was extended by this trait.
    +   */
    +  private trait ManualCloseOutputStream extends OutputStream {
    +    abstract override def close(): Unit = {
    +      flush()
    +    }
    +
    +    def manualClose(): Unit = {
    +      super.close()
    +    }
    +  }
    +
       /** The file channel, used for repositioning / truncating the file. */
       private var channel: FileChannel = null
    +  private var mcs: ManualCloseOutputStream = null
       private var bs: OutputStream = null
       private var fos: FileOutputStream = null
       private var ts: TimeTrackingOutputStream = null
       private var objOut: SerializationStream = null
       private var initialized = false
    +  private var streamOpen = false
       private var hasBeenClosed = false
    -  private var commitAndCloseHasBeenCalled = false
     
       /**
        * Cursors used to represent positions in the file.
        *
    -   * xxxxxxxx|--------|---       |
    -   *         ^        ^          ^
    -   *         |        |        finalPosition
    -   *         |      reportedPosition
    -   *       initialPosition
    +   * xxxxxxxx|--------|---|
    --- End diff --
    
    Could you update the diagram? I think this is misleading since reportedPosition will always be ahead of committedPosition except during some internal processing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13382: [SPARK-5581][Core] When writing sorted map output...

Posted by ericl <gi...@git.apache.org>.
Github user ericl commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13382#discussion_r71262847
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala ---
    @@ -46,102 +46,145 @@ private[spark] class DiskBlockObjectWriter(
       extends OutputStream
       with Logging {
     
    +  /**
    +   * Guards against close calls, e.g. from a wrapping stream.
    +   * Call manualClose to close the stream that was extended by this trait.
    --- End diff --
    
    Also comment that this is needed to support resume writing after a commit().


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5581][Core] When writing sorted map output file, ...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13382#discussion_r65263926
  
    --- Diff: core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java ---
    @@ -48,7 +48,7 @@
       private final File file;
       private final BlockId blockId;
       private final int numRecordsToWrite;
    -  private DiskBlockObjectWriter writer;
    +  private final DiskBlockObjectWriter writer;
    --- End diff --
    
    I think that this was purposely non-final so that we can null it out in order to allow the writer to be garbage collected as soon as `close()` is called.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13382: [SPARK-5581][Core] When writing sorted map output file, ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/13382
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/62503/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13382: [SPARK-5581][Core] When writing sorted map output...

Posted by ericl <gi...@git.apache.org>.
Github user ericl commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13382#discussion_r71266677
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala ---
    @@ -27,8 +27,8 @@ import org.apache.spark.util.Utils
     
     /**
      * A class for writing JVM objects directly to a file on disk. This class allows data to be appended
    - * to an existing block and can guarantee atomicity in the case of faults as it allows the caller to
    - * revert partial writes.
    + * to an existing block. Callers can write to the same file and commit these writes.
    + * In case of faults, callers should atomically revert the uncommitted partial writes.
    --- End diff --
    
    Perhaps elaborate a bit more, e.g. "For efficiency, this class retains the underlying file channel across multiple commits to a file. The channel is kept open until close() is called on DiskBlockObjectWriter."


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13382: [SPARK-5581][Core] When writing sorted map output file, ...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the issue:

    https://github.com/apache/spark/pull/13382
  
    @dafrista, what's your Apache JIRA username / id? I'd like to assign the https://issues.apache.org/jira/browse/SPARK-5581 JIRA ticket to you so that you're properly credited in the release notes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13382: [SPARK-5581][Core] When writing sorted map output file, ...

Posted by ericl <gi...@git.apache.org>.
Github user ericl commented on the issue:

    https://github.com/apache/spark/pull/13382
  
    Cool, @JoshRosen I'll leave this for you to merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13382: [SPARK-5581][Core] When writing sorted map output...

Posted by ericl <gi...@git.apache.org>.
Github user ericl commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13382#discussion_r71262912
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala ---
    @@ -46,102 +46,145 @@ private[spark] class DiskBlockObjectWriter(
       extends OutputStream
       with Logging {
     
    +  /**
    +   * Guards against close calls, e.g. from a wrapping stream.
    +   * Call manualClose to close the stream that was extended by this trait.
    --- End diff --
    
    Could you also update the class-level comment to note the commit-and-resume behavior?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5581][Core] When writing sorted map output file, ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/13382
  
    **[Test build #59680 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/59680/consoleFull)** for PR 13382 at commit [`360cc54`](https://github.com/apache/spark/commit/360cc54047428b9ffe76b2e9005ea5a3bb35e610).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13382: [SPARK-5581][Core] When writing sorted map output file, ...

Posted by ericl <gi...@git.apache.org>.
Github user ericl commented on the issue:

    https://github.com/apache/spark/pull/13382
  
    This LGTM with some minor comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13382: [SPARK-5581][Core] When writing sorted map output file, ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/13382
  
    **[Test build #62510 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/62510/consoleFull)** for PR 13382 at commit [`e19ec3d`](https://github.com/apache/spark/commit/e19ec3d2b145879e7ea73fa847761cfdeb7d5c95).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5581][Core] When writing sorted map out...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/13382#issuecomment-222323144
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5581][Core] When writing sorted map output file, ...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13382#discussion_r65264229
  
    --- Diff: core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java ---
    @@ -22,6 +22,7 @@
     import java.io.IOException;
     import java.util.LinkedList;
     
    +import org.apache.spark.storage.FileSegment;
    --- End diff --
    
    This import needs to be placed a few lines down with the other Spark imports on line 41; as-is, Scalastyle is going to complain about this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13382: [SPARK-5581][Core] When writing sorted map output...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/13382


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13382: [SPARK-5581][Core] When writing sorted map output file, ...

Posted by dafrista <gi...@git.apache.org>.
Github user dafrista commented on the issue:

    https://github.com/apache/spark/pull/13382
  
    Jenkins, test this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13382: [SPARK-5581][Core] When writing sorted map output file, ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/13382
  
    **[Test build #59878 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/59878/consoleFull)** for PR 13382 at commit [`57f6c8b`](https://github.com/apache/spark/commit/57f6c8bdca5203be52deb3e0b50ef543d8904a1e).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13382: [SPARK-5581][Core] When writing sorted map output file, ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/13382
  
    **[Test build #59878 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/59878/consoleFull)** for PR 13382 at commit [`57f6c8b`](https://github.com/apache/spark/commit/57f6c8bdca5203be52deb3e0b50ef543d8904a1e).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5581][Core] When writing sorted map output file, ...

Posted by dafrista <gi...@git.apache.org>.
Github user dafrista commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13382#discussion_r65287494
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala ---
    @@ -46,66 +46,82 @@ private[spark] class DiskBlockObjectWriter(
       extends OutputStream
       with Logging {
     
    +  /**
    +   * Guards against close calls, e.g. from a wrapping stream.
    +   * Call manualClose to close the stream that was extended by this trait.
    +   */
    +  private trait ManualCloseOutputStream extends OutputStream {
    +    abstract override def close(): Unit = {
    +      flush()
    +    }
    +
    +    def manualClose(): Unit = {
    +      super.close()
    +    }
    +  }
    +
       /** The file channel, used for repositioning / truncating the file. */
       private var channel: FileChannel = null
    +  private var mcs: ManualCloseOutputStream = null
       private var bs: OutputStream = null
       private var fos: FileOutputStream = null
       private var ts: TimeTrackingOutputStream = null
       private var objOut: SerializationStream = null
       private var initialized = false
    +  private var streamOpen = false
       private var hasBeenClosed = false
    -  private var commitAndCloseHasBeenCalled = false
     
       /**
        * Cursors used to represent positions in the file.
        *
    -   * xxxxxxxx|--------|---       |
    -   *         ^        ^          ^
    -   *         |        |        finalPosition
    -   *         |      reportedPosition
    -   *       initialPosition
    +   * xxxxxxxx|--------|---|
    +   *           ^          ^
    +   *           |        committedPosition
    +   *         reportedPosition
        *
    -   * initialPosition: Offset in the file where we start writing. Immutable.
        * reportedPosition: Position at the time of the last update to the write metrics.
    -   * finalPosition: Offset where we stopped writing. Set on closeAndCommit() then never changed.
    +   * committedPosition: Offset after last committed write.
        * -----: Current writes to the underlying file.
        * xxxxx: Existing contents of the file.
        */
    -  private val initialPosition = file.length()
    -  private var finalPosition: Long = -1
    -  private var reportedPosition = initialPosition
    +  private var committedPosition = file.length()
    +  private var reportedPosition = committedPosition
     
       /**
        * Keep track of number of records written and also use this to periodically
        * output bytes written since the latter is expensive to do for each record.
        */
       private var numRecordsWritten = 0
     
    +  private def initialize(): Unit = {
    +    fos = new FileOutputStream(file, true)
    +    channel = fos.getChannel()
    +    ts = new TimeTrackingOutputStream(writeMetrics, fos)
    +    class ManualCloseBufferedOutputStream
    +      extends BufferedOutputStream(ts, bufferSize) with ManualCloseOutputStream
    +    mcs = new ManualCloseBufferedOutputStream
    +  }
    +
       def open(): DiskBlockObjectWriter = {
         if (hasBeenClosed) {
           throw new IllegalStateException("Writer already closed. Cannot be reopened.")
         }
    -    fos = new FileOutputStream(file, true)
    -    ts = new TimeTrackingOutputStream(writeMetrics, fos)
    -    channel = fos.getChannel()
    -    bs = compressStream(new BufferedOutputStream(ts, bufferSize))
    +    if (!initialized) {
    +      initialize()
    +      initialized = true
    +    }
    +    bs = compressStream(mcs)
         objOut = serializerInstance.serializeStream(bs)
    -    initialized = true
    +    streamOpen = true
         this
       }
     
       override def close() {
         if (initialized) {
           Utils.tryWithSafeFinally {
    -        if (syncWrites) {
    -          // Force outstanding writes to disk and track how long it takes
    -          objOut.flush()
    -          val start = System.nanoTime()
    -          fos.getFD.sync()
    -          writeMetrics.incWriteTime(System.nanoTime() - start)
    -        }
    +        commit()
    --- End diff --
    
    This is interesting. Making `close()` a no-op breaks my understanding of `OutputStream`, where I expect (a) any buffered data to be written out and (b) cleaning up resources. But I don't see anywhere that definitely prescribes this either.
    
    Apart from that, right now having `revertPartialWritesAndClose()` call `close()` may be causing more confusion -- on this code path, the `commit()` called on `close()` ends up having no effect because `streamOpen` is set to false, but it's not so clear. How about introducing a private `closeResources()` that cleans up only the resources, and have `close()` call `commit()`, then `closeResources()`; and have `revertPartialWritesAndClose()` call `closeResources()` instead of `close()`?
    
    If the previous paragraph doesn't make much sense, I can push a commit with this change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5581][Core] When writing sorted map output file, ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/13382
  
    **[Test build #59680 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/59680/consoleFull)** for PR 13382 at commit [`360cc54`](https://github.com/apache/spark/commit/360cc54047428b9ffe76b2e9005ea5a3bb35e610).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13382: [SPARK-5581][Core] When writing sorted map output file, ...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the issue:

    https://github.com/apache/spark/pull/13382
  
    Assigned. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13382: [SPARK-5581][Core] When writing sorted map output file, ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/13382
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5581][Core] When writing sorted map output file, ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/13382
  
    **[Test build #59704 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/59704/consoleFull)** for PR 13382 at commit [`6fc0fd0`](https://github.com/apache/spark/commit/6fc0fd002239a58f4462928bd511395c3145e9f2).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13382: [SPARK-5581][Core] When writing sorted map output file, ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/13382
  
    **[Test build #62503 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/62503/consoleFull)** for PR 13382 at commit [`0fe4bc8`](https://github.com/apache/spark/commit/0fe4bc8a0232f9e6a4dcb6df76fc3f256b784803).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5581][Core] When writing sorted map output file, ...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/13382
  
    Jenkins, this is ok to test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-5581][Core] When writing sorted map output file, ...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13382#discussion_r65267465
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala ---
    @@ -46,66 +46,82 @@ private[spark] class DiskBlockObjectWriter(
       extends OutputStream
       with Logging {
     
    +  /**
    +   * Guards against close calls, e.g. from a wrapping stream.
    +   * Call manualClose to close the stream that was extended by this trait.
    +   */
    +  private trait ManualCloseOutputStream extends OutputStream {
    +    abstract override def close(): Unit = {
    +      flush()
    +    }
    +
    +    def manualClose(): Unit = {
    +      super.close()
    +    }
    +  }
    +
       /** The file channel, used for repositioning / truncating the file. */
       private var channel: FileChannel = null
    +  private var mcs: ManualCloseOutputStream = null
       private var bs: OutputStream = null
       private var fos: FileOutputStream = null
       private var ts: TimeTrackingOutputStream = null
       private var objOut: SerializationStream = null
       private var initialized = false
    +  private var streamOpen = false
       private var hasBeenClosed = false
    -  private var commitAndCloseHasBeenCalled = false
     
       /**
        * Cursors used to represent positions in the file.
        *
    -   * xxxxxxxx|--------|---       |
    -   *         ^        ^          ^
    -   *         |        |        finalPosition
    -   *         |      reportedPosition
    -   *       initialPosition
    +   * xxxxxxxx|--------|---|
    +   *           ^          ^
    +   *           |        committedPosition
    +   *         reportedPosition
        *
    -   * initialPosition: Offset in the file where we start writing. Immutable.
        * reportedPosition: Position at the time of the last update to the write metrics.
    -   * finalPosition: Offset where we stopped writing. Set on closeAndCommit() then never changed.
    +   * committedPosition: Offset after last committed write.
        * -----: Current writes to the underlying file.
        * xxxxx: Existing contents of the file.
        */
    -  private val initialPosition = file.length()
    -  private var finalPosition: Long = -1
    -  private var reportedPosition = initialPosition
    +  private var committedPosition = file.length()
    +  private var reportedPosition = committedPosition
     
       /**
        * Keep track of number of records written and also use this to periodically
        * output bytes written since the latter is expensive to do for each record.
        */
       private var numRecordsWritten = 0
     
    +  private def initialize(): Unit = {
    +    fos = new FileOutputStream(file, true)
    +    channel = fos.getChannel()
    +    ts = new TimeTrackingOutputStream(writeMetrics, fos)
    +    class ManualCloseBufferedOutputStream
    +      extends BufferedOutputStream(ts, bufferSize) with ManualCloseOutputStream
    +    mcs = new ManualCloseBufferedOutputStream
    +  }
    +
       def open(): DiskBlockObjectWriter = {
         if (hasBeenClosed) {
           throw new IllegalStateException("Writer already closed. Cannot be reopened.")
         }
    -    fos = new FileOutputStream(file, true)
    -    ts = new TimeTrackingOutputStream(writeMetrics, fos)
    -    channel = fos.getChannel()
    -    bs = compressStream(new BufferedOutputStream(ts, bufferSize))
    +    if (!initialized) {
    +      initialize()
    +      initialized = true
    +    }
    +    bs = compressStream(mcs)
         objOut = serializerInstance.serializeStream(bs)
    -    initialized = true
    +    streamOpen = true
         this
       }
     
       override def close() {
         if (initialized) {
           Utils.tryWithSafeFinally {
    -        if (syncWrites) {
    -          // Force outstanding writes to disk and track how long it takes
    -          objOut.flush()
    -          val start = System.nanoTime()
    -          fos.getFD.sync()
    -          writeMetrics.incWriteTime(System.nanoTime() - start)
    -        }
    +        commit()
    --- End diff --
    
    I guess the problem is that this class has a contract where we assume that the caller will call either `commitAndClose()` or `revertPartialWritesAndClose()` after they're done writing, so having a single `close()` method which makes sense for both cases is a bit confusing.
    
    Would it aid understandability to make `close()` itself into a no-op and always rely on the user to call one of our more specific close methods instead?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13382: [SPARK-5581][Core] When writing sorted map output file, ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/13382
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/59878/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13382: [SPARK-5581][Core] When writing sorted map output file, ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/13382
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org