You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by davies <gi...@git.apache.org> on 2015/11/11 01:10:37 UTC

[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

GitHub user davies opened a pull request:

    https://github.com/apache/spark/pull/9610

    [SPARK-8029] Robust shuffle writer

    Currently, all the shuffle writer will write to target path directly, the file could be corrupted by other attempt of the same partition on the same executor. They should write to temporary file then rename to target path, as what we do in output committer. In order to make the rename atomic, the temporary file should be created in the same local directory (FileSystem).

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/davies/spark safe_shuffle

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/9610.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #9610
    
----
commit fbfdda993b7b4d7bb43ebc3baeb472bde1f5b0be
Author: Davies Liu <da...@databricks.com>
Date:   2015-11-10T23:50:06Z

    use temporary file and rename to avoid concurrent write

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9610#discussion_r44748721
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala ---
    @@ -65,11 +66,11 @@ private[spark] class SortShuffleWriter[K, V, C](
         // Don't bother including the time to open the merged output file in the shuffle write time,
         // because it just opens a single file, so is typically too fast to measure accurately
         // (see SPARK-3570).
    -    val outputFile = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
    +    val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
    +    val tmp = Utils.tempFileWith(output)
    --- End diff --
    
    This is only one file `output` here, I think it's obvious


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-155609402
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9610#discussion_r44744021
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala ---
    @@ -65,11 +66,11 @@ private[spark] class SortShuffleWriter[K, V, C](
         // Don't bother including the time to open the merged output file in the shuffle write time,
         // because it just opens a single file, so is typically too fast to measure accurately
         // (see SPARK-3570).
    -    val outputFile = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
    +    val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
    +    val tmp = Utils.tempFileWith(output)
    --- End diff --
    
    can you call these `outputTmp` or something so it's slightly easier to follow? (here and other places)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-155678143
  
    **[Test build #45600 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45600/consoleFull)** for PR 9610 at commit [`55485a9`](https://github.com/apache/spark/commit/55485a9a9b125a75fa45cf39938918a21beac291).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-155615247
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9610#discussion_r44744327
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
    @@ -93,6 +154,31 @@ private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleB
         } {
           out.close()
         }
    +
    +    val dataFile = getDataFile(shuffleId, mapId)
    +    // Note: there is only one IndexShuffleBlockResolver per executor
    +    synchronized {
    --- End diff --
    
    is this synchronized for atomic renames? If so, can you mention in a comment?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9610#discussion_r44497573
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala ---
    @@ -106,6 +108,19 @@ private[spark] class HashShuffleWriter[K, V](
           writer.commitAndClose()
           writer.fileSegment().length
         }
    +    // rename all shuffle files to final paths
    +    shuffle.writers.zip(sizes).foreach { case (writer: DiskBlockObjectWriter, size: Long) =>
    +      if (size > 0) {
    +        val output = blockManager.diskBlockManager.getFile(writer.blockId)
    +        if (output.exists()) {
    +          writer.file.delete()
    +        } else {
    +          if (!writer.file.renameTo(output)) {
    +            throw new IOException(s"fail to rename ${writer.file} to $output")
    --- End diff --
    
    same problem here on partially existing shuffle output.  Also, the `if (size > 0)` check will lead to inconsistencies if you have non-deterministic shuffle output -- it might be true for different partitions in different attempts.  I think it can be OK in any case, as long as the MapStatus is consistent.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-156336014
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-156284952
  
    **[Test build #2053 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2053/consoleFull)** for PR 9610 at commit [`d0b937f`](https://github.com/apache/spark/commit/d0b937f24b6f7e4d19867419fdf0ac6a6ce38265).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-156288669
  
    @squito Thanks for reviewing this, I had included your regression test here, also added tests for resolver.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-156289053
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9610#discussion_r44715168
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
    @@ -93,6 +93,29 @@ private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleB
         } {
           out.close()
         }
    +
    +    val dataFile = getDataFile(shuffleId, mapId)
    +    synchronized {
    +      if (dataFile.exists() && indexFile.exists()) {
    +        if (dataTmp != null && dataTmp.exists()) {
    +          dataTmp.delete()
    +        }
    +        indexTmp.delete()
    +      } else {
    +        if (indexFile.exists()) {
    +          indexFile.delete()
    +        }
    +        if (!indexTmp.renameTo(indexFile)) {
    +          throw new IOException("fail to rename data file " + indexTmp + " to " + indexFile)
    +        }
    +        if (dataFile.exists()) {
    +          dataFile.delete()
    +        }
    +        if (dataTmp != null && dataTmp.exists() && !dataTmp.renameTo(dataFile)) {
    +          throw new IOException("fail to rename data file " + dataTmp + " to " + dataFile)
    --- End diff --
    
    I don't think there is a particular flaw here, but its a bit hard to follow since its a mix of first-attempt-wins and last-attempt wins.  First attempt if there is a data file & index file; last attempt if its only an index file.  the problem w/ last-attempt is that this delete will fail on windows if the file is open for reading, I believe.  Though we can't get around that because SPARK-4085 always requires us to delete some files that might be open, in which case we hope that we don't run into this race again on the next retry.  It would be nice to minimize that case, though.  We'd be closer to first-attempt-wins if we always wrote a dataFile, even if its empty when dataTmp == null.
    
    There is also an issue w/ mapStatus & non-deterministic data.  It might not matter which output you get, but the mapstatus should be consistent with the data that is read.  If attempt 1 writes non-empty outputs a,b,c, and attempt 2 writes non-empty outputs d,e,f (which are not committed), the reduce tasks might get the mapstatus for attempt 2, look for outputs d,e,f, and get nothing but empty blocks.  Matei had suggested writing the mapstatus to a file, so that subsequent attempts always return the map status corresponding to the first successful attempt.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-156318315
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9610#discussion_r44744413
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala ---
    @@ -106,6 +108,28 @@ private[spark] class HashShuffleWriter[K, V](
           writer.commitAndClose()
           writer.fileSegment().length
         }
    +    // rename all shuffle files to final paths
    +    // Note: there is only one ShuffleBlockResolver in executor
    +    shuffleBlockResolver.synchronized {
    +      shuffle.writers.zipWithIndex.foreach { case (writer, i) =>
    +        val output = blockManager.diskBlockManager.getFile(writer.blockId)
    +        if (sizes(i) > 0) {
    +          if (output.exists()) {
    +            // update the size of output for MapStatus
    --- End diff --
    
    or
    ```
    // Use length of existing file and delete our own temporary one
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9610#discussion_r44496929
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
    @@ -93,6 +95,10 @@ private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleB
         } {
           out.close()
         }
    +    indexFile.deleteOnExit()
    +    if (!tmp.renameTo(indexFile)) {
    +      throw new IOException(s"fail to rename index file $tmp to $indexFile")
    --- End diff --
    
    this will just kill the task, right?  both tasks are actually just fine, and in fact the overall job should continue if one of them succeeds.  But instead this will lead to the task getting retried, and potentially continuing to fail up to 4 times, though its actually finished successfully from another taskset?  You could handle this in scheduler, but that would add some complexity.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-155850864
  
    **[Test build #2040 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2040/consoleFull)** for PR 9610 at commit [`6deccff`](https://github.com/apache/spark/commit/6deccff9d322b92538a470329581c8abeb8f7e6a).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9610#discussion_r44499655
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala ---
    @@ -106,6 +108,19 @@ private[spark] class HashShuffleWriter[K, V](
           writer.commitAndClose()
           writer.fileSegment().length
         }
    +    // rename all shuffle files to final paths
    +    shuffle.writers.zip(sizes).foreach { case (writer: DiskBlockObjectWriter, size: Long) =>
    +      if (size > 0) {
    +        val output = blockManager.diskBlockManager.getFile(writer.blockId)
    +        if (output.exists()) {
    +          writer.file.delete()
    +        } else {
    +          if (!writer.file.renameTo(output)) {
    +            throw new IOException(s"fail to rename ${writer.file} to $output")
    --- End diff --
    
    The `size` here is used to make sure that a new file is generated (we could try to delete existed file if size is 0). 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-156275688
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/45787/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-156273250
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9610#discussion_r44715331
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
    @@ -93,6 +93,29 @@ private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleB
         } {
           out.close()
         }
    +
    +    val dataFile = getDataFile(shuffleId, mapId)
    +    synchronized {
    --- End diff --
    
    Can you add a big comment explaining what is going on here?  Also worth noting that there is only one `IndexShuffleBlockResolver` per executor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-156373251
  
    **[Test build #2054 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2054/consoleFull)** for PR 9610 at commit [`71b12bf`](https://github.com/apache/spark/commit/71b12bfa81120d229b333b7bf4541e7ee23ec733).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9610#discussion_r44742596
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
    @@ -40,10 +39,17 @@ import IndexShuffleBlockResolver.NOOP_REDUCE_ID
      */
     // Note: Changes to the format in this file should be kept in sync with
     // org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getSortBasedShuffleBlockData().
    -private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleBlockResolver
    +private[spark] class IndexShuffleBlockResolver(
    +    conf: SparkConf,
    +    _blockManager: BlockManager = null)
    +  extends ShuffleBlockResolver
       with Logging {
     
    -  private lazy val blockManager = SparkEnv.get.blockManager
    +  private lazy val blockManager = if (_blockManager == null) {
    +    SparkEnv.get.blockManager
    +  } else {
    +    _blockManager
    +  }
    --- End diff --
    
    or
    ```
    Option(_blockManager).getOrElse(SparkEnv.getBlockManager)
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9610#discussion_r44743433
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
    @@ -93,6 +154,31 @@ private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleB
         } {
           out.close()
         }
    +
    +    val dataFile = getDataFile(shuffleId, mapId)
    +    // Note: there is only one IndexShuffleBlockResolver per executor
    +    synchronized {
    +      val existedLengths = checkIndexAndDataFile(indexFile, dataFile, lengths.length)
    +      if (existedLengths != null) {
    +        // Use the lengths of existed output for MapStatus
    +        System.arraycopy(existedLengths, 0, lengths, 0, lengths.length)
    +        dataTmp.delete()
    +        indexTmp.delete()
    +      } else {
    +        if (indexFile.exists()) {
    +          indexFile.delete()
    +        }
    +        if (dataFile.exists()) {
    +          dataFile.delete()
    +        }
    +        if (!indexTmp.renameTo(indexFile)) {
    +          throw new IOException("fail to rename data file " + indexTmp + " to " + indexFile)
    +        }
    +        if (dataTmp != null && dataTmp.exists() && !dataTmp.renameTo(dataFile)) {
    +          throw new IOException("fail to rename data file " + dataTmp + " to " + dataFile)
    +        }
    --- End diff --
    
    should we check
    ```
    checkIndexAndDataFile(indexTmp, dataTmp, lengths.length) != null
    ```
    what happens if this fails? Should we throw an exception?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-155615235
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-156275686
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-156341479
  
    The failed test is not related, I'm merging this into master, will create another PR for other branches.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9610#discussion_r44695273
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
    @@ -93,6 +95,10 @@ private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleB
         } {
           out.close()
         }
    +    indexFile.deleteOnExit()
    +    if (!tmp.renameTo(indexFile)) {
    +      throw new IOException(s"fail to rename index file $tmp to $indexFile")
    --- End diff --
    
    On Thu, Nov 12, 2015 at 8:30 AM, Matei Zaharia <no...@github.com>
    wrote:
    
    > In
    > core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
    > <https://github.com/apache/spark/pull/9610#discussion_r44678796>:
    >
    > > @@ -93,6 +95,10 @@ private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleB
    > >      } {
    > >        out.close()
    > >      }
    > > +    indexFile.deleteOnExit()
    > > +    if (!tmp.renameTo(indexFile)) {
    > > +      throw new IOException(s"fail to rename index file $tmp to $indexFile")
    >
    > Can you test for this? I think the worry was about different TaskSets
    > attempting the same map stage. Imagine that attempt 1 of the stage
    > successfully completes a task, and sends back a map output status, but that
    > status gets ignored because that stage attempt got cancelled. Attempt 2
    > might then fail to send a new status for it.
    >
    > There seem to be two ways to fix it if this problem can actually occur --
    > either add MapOutputStatuses even from failed task sets or mark this new
    > task as successful if a file exists.
    >
    After this PR, the second attempt of same task will return SUCCESS, with
    new MapOutputStatus, which could be different than the previous attempt
    (having different sizes of partitions), since we does not use the exact
    number of size (could be lossy compressed), I think it's fine.
    
    > —
    > Reply to this email directly or view it on GitHub
    > <https://github.com/apache/spark/pull/9610/files#r44678796>.
    >
    
    
    
    -- 
     - Davies



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9610#discussion_r44743357
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
    @@ -93,6 +154,31 @@ private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleB
         } {
           out.close()
         }
    +
    +    val dataFile = getDataFile(shuffleId, mapId)
    +    // Note: there is only one IndexShuffleBlockResolver per executor
    +    synchronized {
    +      val existedLengths = checkIndexAndDataFile(indexFile, dataFile, lengths.length)
    +      if (existedLengths != null) {
    +        // Use the lengths of existed output for MapStatus
    +        System.arraycopy(existedLengths, 0, lengths, 0, lengths.length)
    +        dataTmp.delete()
    +        indexTmp.delete()
    +      } else {
    +        if (indexFile.exists()) {
    --- End diff --
    
    ```
    // This is the first successful attempt in writing the map outputs for this task,
    // so override any existing index and data files with the ones we wrote.
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9610#discussion_r44748592
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
    @@ -93,6 +154,31 @@ private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleB
         } {
           out.close()
         }
    +
    +    val dataFile = getDataFile(shuffleId, mapId)
    +    // Note: there is only one IndexShuffleBlockResolver per executor
    +    synchronized {
    +      val existedLengths = checkIndexAndDataFile(indexFile, dataFile, lengths.length)
    +      if (existedLengths != null) {
    +        // Use the lengths of existed output for MapStatus
    +        System.arraycopy(existedLengths, 0, lengths, 0, lengths.length)
    +        dataTmp.delete()
    +        indexTmp.delete()
    +      } else {
    +        if (indexFile.exists()) {
    +          indexFile.delete()
    +        }
    +        if (dataFile.exists()) {
    +          dataFile.delete()
    +        }
    +        if (!indexTmp.renameTo(indexFile)) {
    +          throw new IOException("fail to rename data file " + indexTmp + " to " + indexFile)
    +        }
    +        if (dataTmp != null && dataTmp.exists() && !dataTmp.renameTo(dataFile)) {
    +          throw new IOException("fail to rename data file " + dataTmp + " to " + dataFile)
    +        }
    --- End diff --
    
    This will slowdown the normal path, I think it's not needed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-155614480
  
    **[Test build #2031 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2031/consoleFull)** for PR 9610 at commit [`7cccfcc`](https://github.com/apache/spark/commit/7cccfcc671064cb68c7a9fc6862bc67af77ef07c).
     * This patch **fails to build**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by Prasidhdh <gi...@git.apache.org>.
Github user Prasidhdh commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-220781153
  
    @davies :  If we want to merge two datafiles (one from first action of rdd and another from second action of same rdd), How can i do that? And do i need to do anything with indexfile?
    Please give help me to understand this problem!
    Thank you


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9610#discussion_r44742346
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
    @@ -75,13 +81,68 @@ private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleB
       }
     
       /**
    +   * Check whether there are index file and data file also they are matched with each other, returns
    +   * the lengths of each block in data file, if there are matched, or return null.
    +   */
    +  private def checkIndexAndDataFile(index: File, data: File, blocks: Int): Array[Long] = {
    +    val lengths = new Array[Long](blocks)
    +    if (index.length() == (blocks + 1) * 8) {
    --- End diff --
    
    instead of the nested `if-else`'s, it might be clearer to write this as:
    ```
    // can you add a comment here to explain this check
    if (index.length() != (blocks + 1) * 8) {
      return null
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-155927235
  
    **[Test build #2043 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2043/consoleFull)** for PR 9610 at commit [`6deccff`](https://github.com/apache/spark/commit/6deccff9d322b92538a470329581c8abeb8f7e6a).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-155616320
  
    **[Test build #2032 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2032/consoleFull)** for PR 9610 at commit [`9f0d2f9`](https://github.com/apache/spark/commit/9f0d2f97d36d432af7dbde93367885423daaa711).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-156274865
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9610#discussion_r44743137
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
    @@ -93,6 +154,31 @@ private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleB
         } {
           out.close()
         }
    +
    +    val dataFile = getDataFile(shuffleId, mapId)
    +    // Note: there is only one IndexShuffleBlockResolver per executor
    +    synchronized {
    +      val existedLengths = checkIndexAndDataFile(indexFile, dataFile, lengths.length)
    +      if (existedLengths != null) {
    +        // Use the lengths of existed output for MapStatus
    --- End diff --
    
    ```
    // A previous attempt has already written the map outputs for this task,
    // so just use them and delete the ones we've written
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-155754853
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-156273236
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-155615326
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-156274866
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/45786/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-155611691
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/45568/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-156312301
  
    **[Test build #45800 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45800/consoleFull)** for PR 9610 at commit [`35bd469`](https://github.com/apache/spark/commit/35bd4691c9f1c4f1d9bb271515c8a28793c2f10d).
     * This patch **fails PySpark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-155615330
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/45571/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9610#discussion_r44744968
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala ---
    @@ -106,6 +108,28 @@ private[spark] class HashShuffleWriter[K, V](
           writer.commitAndClose()
           writer.fileSegment().length
         }
    +    // rename all shuffle files to final paths
    +    // Note: there is only one ShuffleBlockResolver in executor
    +    shuffleBlockResolver.synchronized {
    +      shuffle.writers.zipWithIndex.foreach { case (writer, i) =>
    +        val output = blockManager.diskBlockManager.getFile(writer.blockId)
    +        if (sizes(i) > 0) {
    +          if (output.exists()) {
    +            // update the size of output for MapStatus
    +            sizes(i) = output.length()
    +            writer.file.delete()
    +          } else {
    +            if (!writer.file.renameTo(output)) {
    --- End diff --
    
    `// Commit by renaming our temporary file to something the shuffle reader expects`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-155648765
  
    **[Test build #2032 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2032/consoleFull)** for PR 9610 at commit [`9f0d2f9`](https://github.com/apache/spark/commit/9f0d2f97d36d432af7dbde93367885423daaa711).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9610#discussion_r44742822
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
    @@ -75,13 +81,68 @@ private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleB
       }
     
       /**
    +   * Check whether there are index file and data file also they are matched with each other, returns
    +   * the lengths of each block in data file, if there are matched, or return null.
    --- End diff --
    
    ```
    /**
     * Check whether the given index and data files match each other.
     * If so, return the length of each block in the data file, otherwise return null.
     */
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by squito <gi...@git.apache.org>.
Github user squito commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-156244415
  
    you can take this test case if you like: https://github.com/squito/spark/blob/SPARK-8029_first_wins/core/src/test/scala/org/apache/spark/ShuffleSuite.scala#L351
    
    I'd also add more test cases to cover the various paths through the output commit code.  And I think that `ShuffleWriter.write` should at least include a comment explaining the need for shuffle writers to write to a tmp file and atomically move them to the final place, since its not obvious why that would be necessary, and I guess there isn't any other good place to stick the explanation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-155932017
  
    cc @mateiz @rxin 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-155611690
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-156335873
  
    **[Test build #45824 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45824/consoleFull)** for PR 9610 at commit [`71b12bf`](https://github.com/apache/spark/commit/71b12bfa81120d229b333b7bf4541e7ee23ec733).
     * This patch **fails PySpark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-155699719
  
    **[Test build #45600 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45600/consoleFull)** for PR 9610 at commit [`55485a9`](https://github.com/apache/spark/commit/55485a9a9b125a75fa45cf39938918a21beac291).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-155609388
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-156341753
  
    **[Test build #2054 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2054/consoleFull)** for PR 9610 at commit [`71b12bf`](https://github.com/apache/spark/commit/71b12bfa81120d229b333b7bf4541e7ee23ec733).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9610#discussion_r44741973
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
    @@ -93,6 +154,31 @@ private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleB
         } {
           out.close()
         }
    +
    +    val dataFile = getDataFile(shuffleId, mapId)
    +    // Note: there is only one IndexShuffleBlockResolver per executor
    +    synchronized {
    +      val existedLengths = checkIndexAndDataFile(indexFile, dataFile, lengths.length)
    --- End diff --
    
    `existingLengths`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9610#discussion_r44742645
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
    @@ -75,13 +81,68 @@ private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleB
       }
     
       /**
    +   * Check whether there are index file and data file also they are matched with each other, returns
    +   * the lengths of each block in data file, if there are matched, or return null.
    +   */
    +  private def checkIndexAndDataFile(index: File, data: File, blocks: Int): Array[Long] = {
    +    val lengths = new Array[Long](blocks)
    +    if (index.length() == (blocks + 1) * 8) {
    +      // Read the lengths of blocks
    +      val f = try {
    +        new FileInputStream(index)
    +      } catch {
    +        case e: IOException =>
    +          return null
    +      }
    +      val in = new DataInputStream(new BufferedInputStream(f))
    +      try {
    +        // Convert the offsets into lengths of each block
    +        var offset = in.readLong()
    +        if (offset != 0L) {
    +          return null
    --- End diff --
    
    ```
    // first offset should be 0
    return null
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-155619605
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/45573/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9610#discussion_r44742689
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
    @@ -75,13 +81,68 @@ private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleB
       }
     
       /**
    +   * Check whether there are index file and data file also they are matched with each other, returns
    +   * the lengths of each block in data file, if there are matched, or return null.
    +   */
    +  private def checkIndexAndDataFile(index: File, data: File, blocks: Int): Array[Long] = {
    +    val lengths = new Array[Long](blocks)
    +    if (index.length() == (blocks + 1) * 8) {
    +      // Read the lengths of blocks
    +      val f = try {
    +        new FileInputStream(index)
    +      } catch {
    +        case e: IOException =>
    +          return null
    +      }
    +      val in = new DataInputStream(new BufferedInputStream(f))
    +      try {
    +        // Convert the offsets into lengths of each block
    +        var offset = in.readLong()
    +        if (offset != 0L) {
    +          return null
    +        }
    +        var i = 0
    +        while (i < blocks) {
    +          val off = in.readLong()
    +          lengths(i) = off - offset
    +          offset = off
    +          i += 1
    +        }
    +      } catch {
    +        case e: IOException =>
    +          return null
    +      } finally {
    +        in.close()
    +      }
    +
    +      val size = lengths.reduce(_ + _)
    +      // `length` returns 0 if it not exists.
    +      if (data.length() == size) {
    --- End diff --
    
    or just `if (data.length() == lengths.sum)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-156290332
  
    **[Test build #45800 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45800/consoleFull)** for PR 9610 at commit [`35bd469`](https://github.com/apache/spark/commit/35bd4691c9f1c4f1d9bb271515c8a28793c2f10d).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9610#discussion_r44717955
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala ---
    @@ -106,6 +108,19 @@ private[spark] class HashShuffleWriter[K, V](
           writer.commitAndClose()
           writer.fileSegment().length
         }
    +    // rename all shuffle files to final paths
    +    shuffle.writers.zip(sizes).foreach { case (writer: DiskBlockObjectWriter, size: Long) =>
    +      if (size > 0) {
    +        val output = blockManager.diskBlockManager.getFile(writer.blockId)
    +        if (output.exists()) {
    +          writer.file.delete()
    +        } else {
    +          if (!writer.file.renameTo(output)) {
    +            throw new IOException(s"fail to rename ${writer.file} to $output")
    --- End diff --
    
    yeah I suppose it all depends on what the model is for non-deterministic data.  The reduce tasks can read data from a mix attempts, but I guess that is OK (we can't completely prevent it in any case).  There is also the problem of returning the right mapstatus here, but it doesn't matter as much in this case -- you will at least return some set of non-empty blocks that is consistent with the shuffle data on disk, even if the sizes can be arbitrarily wrong.
    
    Also I know its super-rare, but there _is_ a race between `output.exists` and `renameTo(output)`, might as well protect against that.
    
    I also find it a weird that this is neither first or last attempt wins -- the first attempt to get to each output file wins, but it can be a mix of attempts.  again I'd include a comment explaining the logic


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-156308437
  
    @davies I took a pass and I find the approach correct and simple. I did a close review and confirmed that all four `ShuffleWriter`s write temporary files and rename them correctly on commit. Most of my comments are minor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-156336017
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/45824/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-156272489
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9610#discussion_r44499442
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
    @@ -93,6 +95,10 @@ private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleB
         } {
           out.close()
         }
    +    indexFile.deleteOnExit()
    +    if (!tmp.renameTo(indexFile)) {
    +      throw new IOException(s"fail to rename index file $tmp to $indexFile")
    --- End diff --
    
    There is very little chance that the two concurrent task will call `renameTo` in the same time, even with that, one of them will succeed, the scheduler will mark the partition as success, and the failure will be ignored (not retried). 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-155699800
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9610#discussion_r44499346
  
    --- Diff: core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java ---
    @@ -155,9 +156,20 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
           writer.commitAndClose();
         }
     
    -    partitionLengths =
    -      writePartitionedFile(shuffleBlockResolver.getDataFile(shuffleId, mapId));
    -    shuffleBlockResolver.writeIndexFile(shuffleId, mapId, partitionLengths);
    +    File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
    +    final File tmp = new File(output.getAbsolutePath() + "." + UUID.randomUUID());
    +    partitionLengths = writePartitionedFile(tmp);
    +    if (!output.exists()) {
    --- End diff --
    
    Good point, we should check both.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-155702971
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-155612416
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-156272468
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-155702992
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-155619603
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-156318210
  
    @andrewor14 Thanks, this looks much better now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-156312360
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/45800/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-155612397
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9610#discussion_r44743772
  
    --- Diff: core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java ---
    @@ -155,9 +155,10 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
           writer.commitAndClose();
         }
     
    -    partitionLengths =
    -      writePartitionedFile(shuffleBlockResolver.getDataFile(shuffleId, mapId));
    -    shuffleBlockResolver.writeIndexFile(shuffleId, mapId, partitionLengths);
    +    File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
    --- End diff --
    
    `// Write to a temporary location to avoid potential corruption issues (SPARK-8029)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/9610


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-155612644
  
    **[Test build #2031 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2031/consoleFull)** for PR 9610 at commit [`7cccfcc`](https://github.com/apache/spark/commit/7cccfcc671064cb68c7a9fc6862bc67af77ef07c).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9610#discussion_r44742558
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
    @@ -75,13 +81,68 @@ private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleB
       }
     
       /**
    +   * Check whether there are index file and data file also they are matched with each other, returns
    +   * the lengths of each block in data file, if there are matched, or return null.
    +   */
    +  private def checkIndexAndDataFile(index: File, data: File, blocks: Int): Array[Long] = {
    +    val lengths = new Array[Long](blocks)
    +    if (index.length() == (blocks + 1) * 8) {
    +      // Read the lengths of blocks
    +      val f = try {
    +        new FileInputStream(index)
    +      } catch {
    +        case e: IOException =>
    +          return null
    --- End diff --
    
    should we at least `logWarning` when this happens? Same in L113


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9610#discussion_r44742918
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
    @@ -93,6 +154,31 @@ private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleB
         } {
           out.close()
         }
    +
    +    val dataFile = getDataFile(shuffleId, mapId)
    +    // Note: there is only one IndexShuffleBlockResolver per executor
    +    synchronized {
    +      val existedLengths = checkIndexAndDataFile(indexFile, dataFile, lengths.length)
    +      if (existedLengths != null) {
    +        // Use the lengths of existed output for MapStatus
    +        System.arraycopy(existedLengths, 0, lengths, 0, lengths.length)
    +        dataTmp.delete()
    +        indexTmp.delete()
    +      } else {
    +        if (indexFile.exists()) {
    +          indexFile.delete()
    +        }
    +        if (dataFile.exists()) {
    +          dataFile.delete()
    +        }
    +        if (!indexTmp.renameTo(indexFile)) {
    +          throw new IOException("fail to rename data file " + indexTmp + " to " + indexFile)
    --- End diff --
    
    data -> index


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-155674056
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-155674110
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9610#discussion_r44743032
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
    @@ -93,6 +154,31 @@ private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleB
         } {
           out.close()
         }
    +
    +    val dataFile = getDataFile(shuffleId, mapId)
    +    // Note: there is only one IndexShuffleBlockResolver per executor
    +    synchronized {
    +      val existedLengths = checkIndexAndDataFile(indexFile, dataFile, lengths.length)
    +      if (existedLengths != null) {
    +        // Use the lengths of existed output for MapStatus
    +        System.arraycopy(existedLengths, 0, lengths, 0, lengths.length)
    +        dataTmp.delete()
    --- End diff --
    
    will this throw NPE?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-155889821
  
    **[Test build #2043 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2043/consoleFull)** for PR 9610 at commit [`6deccff`](https://github.com/apache/spark/commit/6deccff9d322b92538a470329581c8abeb8f7e6a).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9610#discussion_r44499545
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala ---
    @@ -106,6 +108,19 @@ private[spark] class HashShuffleWriter[K, V](
           writer.commitAndClose()
           writer.fileSegment().length
         }
    +    // rename all shuffle files to final paths
    +    shuffle.writers.zip(sizes).foreach { case (writer: DiskBlockObjectWriter, size: Long) =>
    +      if (size > 0) {
    +        val output = blockManager.diskBlockManager.getFile(writer.blockId)
    +        if (output.exists()) {
    +          writer.file.delete()
    +        } else {
    +          if (!writer.file.renameTo(output)) {
    +            throw new IOException(s"fail to rename ${writer.file} to $output")
    --- End diff --
    
    I think these partitions are independent, they should be OK whenever it's generated in different attempt, or that's the basic idea of how RDD works (could be re-run and got the same result). 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9610#discussion_r44497624
  
    --- Diff: core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java ---
    @@ -155,9 +156,20 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
           writer.commitAndClose();
         }
     
    -    partitionLengths =
    -      writePartitionedFile(shuffleBlockResolver.getDataFile(shuffleId, mapId));
    -    shuffleBlockResolver.writeIndexFile(shuffleId, mapId, partitionLengths);
    +    File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
    +    final File tmp = new File(output.getAbsolutePath() + "." + UUID.randomUUID());
    --- End diff --
    
    good point about creating the tmp files in the same dir as the dest to make sure we can do the rename ... I had taken that for granted.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-155888201
  
    **[Test build #2040 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2040/consoleFull)** for PR 9610 at commit [`6deccff`](https://github.com/apache/spark/commit/6deccff9d322b92538a470329581c8abeb8f7e6a).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9610#discussion_r44743641
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala ---
    @@ -84,17 +86,8 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf)
             Array.tabulate[DiskBlockObjectWriter](numReducers) { bucketId =>
               val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
               val blockFile = blockManager.diskBlockManager.getFile(blockId)
    -          // Because of previous failures, the shuffle file may already exist on this machine.
    -          // If so, remove it.
    -          if (blockFile.exists) {
    -            if (blockFile.delete()) {
    -              logInfo(s"Removed existing shuffle file $blockFile")
    -            } else {
    -              logWarning(s"Failed to remove existing shuffle file $blockFile")
    -            }
    -          }
    -          blockManager.getDiskWriter(blockId, blockFile, serializerInstance, bufferSize,
    -            writeMetrics)
    +          val tmp = new File(blockFile.getAbsolutePath + "." + UUID.randomUUID())
    --- End diff --
    
    why not use your new method `Utils.withTempFile`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-155754860
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/45618/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-155754469
  
    **[Test build #45618 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45618/consoleFull)** for PR 9610 at commit [`6deccff`](https://github.com/apache/spark/commit/6deccff9d322b92538a470329581c8abeb8f7e6a).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9610#discussion_r44678796
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
    @@ -93,6 +95,10 @@ private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleB
         } {
           out.close()
         }
    +    indexFile.deleteOnExit()
    +    if (!tmp.renameTo(indexFile)) {
    +      throw new IOException(s"fail to rename index file $tmp to $indexFile")
    --- End diff --
    
    Can you test for this? I think the worry was about different TaskSets attempting the same map stage. Imagine that attempt 1 of the stage successfully completes a task, and sends back a map output status, but that status gets ignored because that stage attempt got cancelled. Attempt 2 might then fail to send a new status for it.
    
    There seem to be two ways to fix it if this problem can actually occur -- either add MapOutputStatuses even from failed task sets or mark this new task as successful if a file exists.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9610#discussion_r44497139
  
    --- Diff: core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java ---
    @@ -155,9 +156,20 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
           writer.commitAndClose();
         }
     
    -    partitionLengths =
    -      writePartitionedFile(shuffleBlockResolver.getDataFile(shuffleId, mapId));
    -    shuffleBlockResolver.writeIndexFile(shuffleId, mapId, partitionLengths);
    +    File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
    +    final File tmp = new File(output.getAbsolutePath() + "." + UUID.randomUUID());
    +    partitionLengths = writePartitionedFile(tmp);
    +    if (!output.exists()) {
    --- End diff --
    
    I dont' think you can do this and still support SPARK-4085 -- regenerating the output if one of the shuffle files goes completely missing.  Because if the index file goes missing, and the data file is still there, with this logic you'll always never regenerate the shuffle output.  But maybe SPARK-4085 is not worth it ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9610#discussion_r44745040
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
    @@ -75,13 +81,68 @@ private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleB
       }
     
       /**
    +   * Check whether there are index file and data file also they are matched with each other, returns
    +   * the lengths of each block in data file, if there are matched, or return null.
    +   */
    +  private def checkIndexAndDataFile(index: File, data: File, blocks: Int): Array[Long] = {
    +    val lengths = new Array[Long](blocks)
    +    if (index.length() == (blocks + 1) * 8) {
    +      // Read the lengths of blocks
    +      val f = try {
    +        new FileInputStream(index)
    +      } catch {
    +        case e: IOException =>
    +          return null
    +      }
    +      val in = new DataInputStream(new BufferedInputStream(f))
    +      try {
    +        // Convert the offsets into lengths of each block
    +        var offset = in.readLong()
    +        if (offset != 0L) {
    +          return null
    +        }
    +        var i = 0
    +        while (i < blocks) {
    +          val off = in.readLong()
    +          lengths(i) = off - offset
    +          offset = off
    +          i += 1
    +        }
    +      } catch {
    +        case e: IOException =>
    +          return null
    +      } finally {
    +        in.close()
    +      }
    +
    +      val size = lengths.reduce(_ + _)
    +      // `length` returns 0 if it not exists.
    +      if (data.length() == size) {
    +        lengths
    +      } else {
    +        null
    +      }
    +    } else {
    +      null
    +    }
    +  }
    +
    +  /**
        * Write an index file with the offsets of each block, plus a final offset at the end for the
        * end of the output file. This will be used by getBlockData to figure out where each block
        * begins and ends.
    +   *
    +   * It will commit the data and index file as an atomic operation, use the existed ones (lengths of
    +   * blocks will be refreshed), or replace them with new ones.
    --- End diff --
    
    we should add that this modifies the contents of `lengths` if existing data and index files exist and are matching.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-156318305
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9610#discussion_r44741545
  
    --- Diff: core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java ---
    @@ -248,8 +250,7 @@ void forceSorterToSpill() throws IOException {
        *
        * @return the partition lengths in the merged file.
        */
    -  private long[] mergeSpills(SpillInfo[] spills) throws IOException {
    -    final File outputFile = shuffleBlockResolver.getDataFile(shuffleId, mapId);
    +  private long[] mergeSpills(SpillInfo[] spills, File outputFile) throws IOException {
    --- End diff --
    
    This is only used in 1 place so we don't need to change the method signature here. We can just do the `Utils.tempFileWith` in this method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-156289039
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-156318569
  
    **[Test build #45824 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45824/consoleFull)** for PR 9610 at commit [`71b12bf`](https://github.com/apache/spark/commit/71b12bfa81120d229b333b7bf4541e7ee23ec733).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-155706511
  
    **[Test build #45618 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45618/consoleFull)** for PR 9610 at commit [`6deccff`](https://github.com/apache/spark/commit/6deccff9d322b92538a470329581c8abeb8f7e6a).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-156312359
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-8029] Robust shuffle writer

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9610#issuecomment-155699803
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/45600/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org