You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by mridulm <gi...@git.apache.org> on 2014/07/27 17:31:21 UTC

[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

GitHub user mridulm opened a pull request:

    https://github.com/apache/spark/pull/1609

    [SPARK-2532] WIP Consolidated shuffle fixes

    Status of the PR
    - [X] Cherry pick and merge changes from internal branch to spark master
    - [X] Remove WIP comments and 2G branch references.
    - [X] Tests for BlockObjectWriter
    - [ ] Tests for ExternalAppendOnlyMap
    - [ ] Tests for MapOutputTracker
    - [ ] Tests for ShuffleBlockManager
    
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mridulm/spark consolidated_shuffle_fixes

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/1609.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1609
    
----
commit f1182f8a3d3328248d471038d6ab0db6e6a1396d
Author: Mridul Muralidharan <mr...@apache.org>
Date:   2014-07-27T15:23:05Z

    Consolidated shuffle fixes

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

Posted by witgo <gi...@git.apache.org>.
Github user witgo commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1609#discussion_r15446910
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala ---
    @@ -107,68 +109,296 @@ private[spark] class DiskBlockObjectWriter(
       private var fos: FileOutputStream = null
       private var ts: TimeTrackingOutputStream = null
       private var objOut: SerializationStream = null
    +
    +  // Did we create this file or was it already present : used in revert to decide
    +  // if we should delete this file or not. Also used to detect if file was deleted
    +  // between creation of BOW and its actual init
    +  private val initiallyExists = file.exists() && file.isFile
       private val initialPosition = file.length()
       private var lastValidPosition = initialPosition
    +
       private var initialized = false
    +  // closed explicitly ?
    +  private var closed = false
    +  // Attempt to cleanly close ? (could also be closed via revert)
    +  // Note, a cleanly closed file could be subsequently reverted
    +  private var cleanCloseAttempted = false
    +  // Was the file actually opened atleast once.
    +  // Note: initialized/streams change state with close/revert.
    +  private var wasOpenedOnce = false
       private var _timeWriting = 0L
     
    -  override def open(): BlockObjectWriter = {
    -    fos = new FileOutputStream(file, true)
    -    ts = new TimeTrackingOutputStream(fos)
    -    channel = fos.getChannel()
    +  // Due to some directory creation race issues in spark, it has been observed that
    +  // sometimes file creation happens 'before' the actual directory has been created
    +  // So we attempt to retry atleast once with a mkdirs in case directory was missing.
    +  private def init() {
    +    init(canRetry = true)
    +  }
    +
    +  private def init(canRetry: Boolean) {
    +
    +    if (closed) throw new IOException("Already closed")
    +
    +    assert (! initialized)
    +    assert (! wasOpenedOnce)
    +    var exists = false
    +    try {
    +      exists = file.exists()
    +      if (! exists && initiallyExists && 0 != initialPosition && ! Utils.inShutdown) {
    +        // Was deleted by cleanup thread ?
    +        throw new IOException("file " + file + " cleaned up ? exists = " + exists +
    +          ", initiallyExists = " + initiallyExists + ", initialPosition = " + initialPosition)
    +      }
    +      fos = new FileOutputStream(file, true)
    +    } catch {
    +      case fEx: FileNotFoundException =>
    +        // There seems to be some race in directory creation.
    +        // Attempts to fix it dont seem to have worked : working around the problem for now.
    +        logDebug("Unable to open " + file + ", canRetry = " + canRetry + ", exists = " + exists +
    +          ", initialPosition = " + initialPosition + ", in shutdown = " + Utils.inShutdown(), fEx)
    +        if (canRetry && ! Utils.inShutdown()) {
    +          // try creating the parent directory if that is the issue.
    +          // Since there can be race with others, dont bother checking for
    +          // success/failure - the call to init() will resolve if fos can be created.
    +          file.getParentFile.mkdirs()
    +          // Note, if directory did not exist, then file does not either - and so
    +          // initialPosition would be zero in either case.
    +          init(canRetry = false)
    +          return
    +        } else throw fEx
    +    }
    +
    +    try {
    +      // This is to workaround case where creation of object and actual init
    +      // (which can happen much later) happens after a delay and the cleanup thread
    +      // cleaned up the file.
    +      channel = fos.getChannel
    +      val fosPos = channel.position()
    +      if (initialPosition != fosPos) {
    +        throw new IOException("file cleaned up ? " + file.exists() + 
    +          ", initialpos = " + initialPosition +
    +          "current len = " + fosPos + ", in shutdown ? " + Utils.inShutdown)
    +      }
    +
    +      ts = new TimeTrackingOutputStream(fos)
    +      val bos = new BufferedOutputStream(ts, bufferSize)
    +      bs = compressStream(bos)
    +      objOut = serializer.newInstance().serializeStream(bs)
    +      initialized = true
    +      wasOpenedOnce = true;
    +    } finally {
    +      if (! initialized) {
    --- End diff --
    
    One more Spaces


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

Posted by witgo <gi...@git.apache.org>.
Github user witgo commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1609#discussion_r15447535
  
    --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala ---
    @@ -353,26 +368,53 @@ class ExternalAppendOnlyMap[K, V, C](
        */
       private class DiskMapIterator(file: File, blockId: BlockId, batchSizes: ArrayBuffer[Long])
         extends Iterator[(K, C)] {
    -    private val fileStream = new FileInputStream(file)
    -    private val bufferedStream = new BufferedInputStream(fileStream, fileBufferSize)
    +
    +    assert (! batchSizes.isEmpty)
    --- End diff --
    
    Same as above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1609#discussion_r15540934
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala ---
    @@ -107,68 +109,296 @@ private[spark] class DiskBlockObjectWriter(
       private var fos: FileOutputStream = null
       private var ts: TimeTrackingOutputStream = null
       private var objOut: SerializationStream = null
    +
    +  // Did we create this file or was it already present : used in revert to decide
    +  // if we should delete this file or not. Also used to detect if file was deleted
    +  // between creation of BOW and its actual init
    +  private val initiallyExists = file.exists() && file.isFile
       private val initialPosition = file.length()
       private var lastValidPosition = initialPosition
    +
       private var initialized = false
    +  // closed explicitly ?
    +  private var closed = false
    +  // Attempt to cleanly close ? (could also be closed via revert)
    +  // Note, a cleanly closed file could be subsequently reverted
    +  private var cleanCloseAttempted = false
    +  // Was the file actually opened atleast once.
    +  // Note: initialized/streams change state with close/revert.
    +  private var wasOpenedOnce = false
       private var _timeWriting = 0L
     
    -  override def open(): BlockObjectWriter = {
    -    fos = new FileOutputStream(file, true)
    -    ts = new TimeTrackingOutputStream(fos)
    -    channel = fos.getChannel()
    +  // Due to some directory creation race issues in spark, it has been observed that
    +  // sometimes file creation happens 'before' the actual directory has been created
    +  // So we attempt to retry atleast once with a mkdirs in case directory was missing.
    +  private def init() {
    +    init(canRetry = true)
    +  }
    +
    +  private def init(canRetry: Boolean) {
    +
    +    if (closed) throw new IOException("Already closed")
    +
    +    assert (! initialized)
    +    assert (! wasOpenedOnce)
    +    var exists = false
    +    try {
    +      exists = file.exists()
    +      if (! exists && initiallyExists && 0 != initialPosition && ! Utils.inShutdown) {
    +        // Was deleted by cleanup thread ?
    +        throw new IOException("file " + file + " cleaned up ? exists = " + exists +
    +          ", initiallyExists = " + initiallyExists + ", initialPosition = " + initialPosition)
    +      }
    +      fos = new FileOutputStream(file, true)
    +    } catch {
    +      case fEx: FileNotFoundException =>
    +        // There seems to be some race in directory creation.
    +        // Attempts to fix it dont seem to have worked : working around the problem for now.
    +        logDebug("Unable to open " + file + ", canRetry = " + canRetry + ", exists = " + exists +
    +          ", initialPosition = " + initialPosition + ", in shutdown = " + Utils.inShutdown(), fEx)
    +        if (canRetry && ! Utils.inShutdown()) {
    +          // try creating the parent directory if that is the issue.
    +          // Since there can be race with others, dont bother checking for
    +          // success/failure - the call to init() will resolve if fos can be created.
    +          file.getParentFile.mkdirs()
    +          // Note, if directory did not exist, then file does not either - and so
    +          // initialPosition would be zero in either case.
    +          init(canRetry = false)
    --- End diff --
    
    As mentioned in the comments, this tries to retry once in case file could not be created due to lack of presence of directory (which is what the FileNotFoundException is usually for) : except when we are already in shutdown.
    This is a case which happens due to some race in spark between creation of directory and allowing files to be created under that directory.
    
    We have fixed a double checked locking bug below (in DiskBlockManager) but looks like it is not sufficient - since this was observed even after that.
    (In our branch, the logDebug is actually logError to flush out these cases).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1609#discussion_r15541435
  
    --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala ---
    @@ -353,26 +368,53 @@ class ExternalAppendOnlyMap[K, V, C](
        */
       private class DiskMapIterator(file: File, blockId: BlockId, batchSizes: ArrayBuffer[Long])
         extends Iterator[(K, C)] {
    -    private val fileStream = new FileInputStream(file)
    -    private val bufferedStream = new BufferedInputStream(fileStream, fileBufferSize)
    +
    +    assert (! batchSizes.isEmpty)
    +    assert (! batchSizes.exists(_ <= 0))
    +    private val batchOffsets = batchSizes.scanLeft(0L)(_ + _)
    --- End diff --
    
    This is to give us the starting offset for each batch; and not rely on where the last batch's read ended.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

Posted by witgo <gi...@git.apache.org>.
Github user witgo commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1609#discussion_r15447402
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala ---
    @@ -107,68 +109,296 @@ private[spark] class DiskBlockObjectWriter(
       private var fos: FileOutputStream = null
       private var ts: TimeTrackingOutputStream = null
       private var objOut: SerializationStream = null
    +
    +  // Did we create this file or was it already present : used in revert to decide
    +  // if we should delete this file or not. Also used to detect if file was deleted
    +  // between creation of BOW and its actual init
    +  private val initiallyExists = file.exists() && file.isFile
       private val initialPosition = file.length()
       private var lastValidPosition = initialPosition
    +
       private var initialized = false
    +  // closed explicitly ?
    +  private var closed = false
    +  // Attempt to cleanly close ? (could also be closed via revert)
    +  // Note, a cleanly closed file could be subsequently reverted
    +  private var cleanCloseAttempted = false
    +  // Was the file actually opened atleast once.
    +  // Note: initialized/streams change state with close/revert.
    +  private var wasOpenedOnce = false
       private var _timeWriting = 0L
     
    -  override def open(): BlockObjectWriter = {
    -    fos = new FileOutputStream(file, true)
    -    ts = new TimeTrackingOutputStream(fos)
    -    channel = fos.getChannel()
    +  // Due to some directory creation race issues in spark, it has been observed that
    +  // sometimes file creation happens 'before' the actual directory has been created
    +  // So we attempt to retry atleast once with a mkdirs in case directory was missing.
    +  private def init() {
    +    init(canRetry = true)
    +  }
    +
    +  private def init(canRetry: Boolean) {
    +
    +    if (closed) throw new IOException("Already closed")
    +
    +    assert (! initialized)
    +    assert (! wasOpenedOnce)
    +    var exists = false
    +    try {
    +      exists = file.exists()
    +      if (! exists && initiallyExists && 0 != initialPosition && ! Utils.inShutdown) {
    +        // Was deleted by cleanup thread ?
    +        throw new IOException("file " + file + " cleaned up ? exists = " + exists +
    +          ", initiallyExists = " + initiallyExists + ", initialPosition = " + initialPosition)
    +      }
    +      fos = new FileOutputStream(file, true)
    +    } catch {
    +      case fEx: FileNotFoundException =>
    +        // There seems to be some race in directory creation.
    +        // Attempts to fix it dont seem to have worked : working around the problem for now.
    +        logDebug("Unable to open " + file + ", canRetry = " + canRetry + ", exists = " + exists +
    +          ", initialPosition = " + initialPosition + ", in shutdown = " + Utils.inShutdown(), fEx)
    +        if (canRetry && ! Utils.inShutdown()) {
    --- End diff --
    
    Same as above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1609#discussion_r15541257
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala ---
    @@ -236,31 +241,61 @@ object ShuffleBlockManager {
           new PrimitiveVector[Long]()
         }
     
    -    def numBlocks = mapIdToIndex.size
    +    /*
    +     * This is required for shuffle consolidation to work. In particular when updates to file are
    +     * happening while parallel requests to fetch block happens.
    +     */
    +    private val blockLengthsByReducer = Array.fill[PrimitiveVector[Long]](files.length) {
    +      new PrimitiveVector[Long]()
    +    }
    +
    +    private var numBlocks = 0
    --- End diff --
    
    The reason for change to var from def is perhaps subtle.
    Consider the case of :
    
    add for mapIdToIndex with mapId 0
    add for mapIdToIndex with mapId 1
    add for mapIdToIndex with mapId 0 (on re-execution)
    add for mapIdToIndex with mapId 1 (on re-execution)
    
    Now both 0 and 1 will end up with the same index assigned (since it was based on mapIdToIndex.size).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on the pull request:

    https://github.com/apache/spark/pull/1609#issuecomment-50306633
  
    @witgo I did not understand the space issue : stylecheck seems to run fine.
    
    Regarding the actual issues : the JIRA lists some of them - unfortunately it is not exhaustive.
    Spark code assumes a few things :
    1) A flush followed by close should not cause additional data to be written to the stream - which is not valid in general case (close can still write more data).
    2) reading an object from stream will consume all data written as  part of the object - which is not valid in general case, additional info could have been written after the object was written (like reset markers in java serde). So stream wrapping has to account for that.
    3) 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1609#discussion_r15553264
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala ---
    @@ -107,68 +109,296 @@ private[spark] class DiskBlockObjectWriter(
       private var fos: FileOutputStream = null
       private var ts: TimeTrackingOutputStream = null
       private var objOut: SerializationStream = null
    +
    +  // Did we create this file or was it already present : used in revert to decide
    +  // if we should delete this file or not. Also used to detect if file was deleted
    +  // between creation of BOW and its actual init
    +  private val initiallyExists = file.exists() && file.isFile
       private val initialPosition = file.length()
       private var lastValidPosition = initialPosition
    +
       private var initialized = false
    +  // closed explicitly ?
    +  private var closed = false
    +  // Attempt to cleanly close ? (could also be closed via revert)
    +  // Note, a cleanly closed file could be subsequently reverted
    +  private var cleanCloseAttempted = false
    +  // Was the file actually opened atleast once.
    +  // Note: initialized/streams change state with close/revert.
    +  private var wasOpenedOnce = false
       private var _timeWriting = 0L
     
    -  override def open(): BlockObjectWriter = {
    -    fos = new FileOutputStream(file, true)
    -    ts = new TimeTrackingOutputStream(fos)
    -    channel = fos.getChannel()
    +  // Due to some directory creation race issues in spark, it has been observed that
    +  // sometimes file creation happens 'before' the actual directory has been created
    +  // So we attempt to retry atleast once with a mkdirs in case directory was missing.
    --- End diff --
    
    Any idea on how this could happen? I don't fully understand it, did someone do `new File(fullPath)` and correctly create it before the directory was made?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1609#issuecomment-50267540
  
    QA tests have started for PR 1609. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17242/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:

    https://github.com/apache/spark/pull/1609#issuecomment-50289340
  
    @adav @andrewor14 would be good if you two take a look at this when it's merging correctly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1609#issuecomment-50277674
  
    QA tests have started for PR 1609. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17244/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1609#discussion_r15442779
  
    --- Diff: core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala ---
    @@ -0,0 +1,296 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.storage
    +
    +import org.scalatest.FunSuite
    +import java.io.{IOException, FileOutputStream, OutputStream, File}
    +import org.apache.spark.serializer.JavaSerializer
    +import org.apache.spark.SparkConf
    +import org.apache.spark.util.Utils
    +
    +/**
    + * Test various code paths in DiskBlockObjectWriter
    + */
    +class DiskBlockObjectWriterSuite extends FunSuite {
    +
    +  private val conf = new SparkConf
    +  private val BUFFER_SIZE = 32 * 1024
    +
    +  private def tempFile(): File = {
    +    val file = File.createTempFile("temp_", "block")
    +    // We dont want file to exist ! Just need a temp file name
    +    file.delete()
    +    file
    +  }
    +
    +  private def createWriter(file: File = tempFile()) :
    +      (File, DiskBlockObjectWriter) = {
    +    file.deleteOnExit()
    +
    +    (file, new DiskBlockObjectWriter(BlockId("test_1"), file,
    +      new JavaSerializer(conf), BUFFER_SIZE, (out: OutputStream) => out, true))
    +  }
    +
    +
    +  test("write after close should throw IOException") {
    +    val (file, bow) = createWriter()
    +    bow.write("test")
    +    bow.write("test1")
    +    assert (file.exists() && file.isFile)
    +
    +    bow.commitAndClose()
    +
    +    intercept[IOException] {
    +      bow.write("test2")
    +    }
    +
    +    file.delete()
    +  }
    +
    +  test("write after revert should throw IOException") {
    +    val (file, bow) = createWriter()
    +    bow.write("test")
    +    bow.write("test1")
    +    assert (file.exists() && file.isFile)
    +
    +    bow.revertPartialWritesAndClose()
    +
    +    intercept[IOException] {
    +      bow.write("test2")
    +    }
    +
    +    file.delete()
    +  }
    +
    +  test("create even if directory does not exist") {
    +    val dir = File.createTempFile("temp_", "dir")
    +    dir.delete()
    +
    +    val file = new File(dir, "temp.file")
    +    file.deleteOnExit()
    +
    +    val bow = new DiskBlockObjectWriter(BlockId("test_1"), file, new JavaSerializer(conf),
    +      BUFFER_SIZE, (out: OutputStream) => out, true)
    +
    +    bow.write("test")
    +    assert (file.exists() && file.isFile)
    +    bow.commitAndClose()
    +    Utils.deleteRecursively(dir)
    +  }
    +
    +  test("revert of new file should delete it") {
    +    val (file, bow) = createWriter()
    +    bow.write("test")
    +    bow.write("test1")
    +    assert (file.exists() && file.isFile)
    +
    +    bow.revertPartialWritesAndClose()
    +    assert (! file.exists())
    +    // file.delete()
    +  }
    +
    +  test("revert of existing file should revert it to previous state") {
    +    val (file, bow1) = createWriter()
    +
    +    bow1.write("test")
    +    bow1.write("test1")
    +    assert (file.exists() && file.isFile)
    +
    +    bow1.commitAndClose()
    +    val length = file.length()
    +
    +    // reopen same file.
    +    val bow2 = createWriter(file)._2
    +
    +    bow2.write("test3")
    +    bow2.write("test4")
    +
    +    assert (file.exists() && file.isFile)
    +
    +    bow2.revertPartialWritesAndClose()
    +    assert (file.exists())
    +    assert (length == file.length())
    +    file.delete()
    +  }
    +
    +  test("revert of writer after close should delete if it did not exist earlier") {
    +    val (file, bow) = createWriter(tempFile())
    +
    +    bow.write("test")
    +    bow.write("test1")
    +    assert (file.exists() && file.isFile)
    +
    +    bow.commitAndClose()
    +    val length = file.length()
    +
    +    assert (file.exists() && file.isFile)
    +    assert (length > 0)
    +
    +    // Now revert the file, after it has been closed : should delete the file
    +    // since it did not exist earlier.
    +    bow.revertPartialWritesAndClose()
    +    assert (! file.exists())
    +    file.delete()
    +  }
    +
    +  test("revert of writer after close should revert it to previous state") {
    +    val (file, bow1) = createWriter()
    +
    +    bow1.write("test")
    +    bow1.write("test1")
    +    assert (file.exists() && file.isFile)
    +
    +    bow1.commitAndClose()
    +    val length = file.length()
    +
    +    // reopen same file.
    +    val bow2 = createWriter(file)._2
    +
    +    bow2.write("test3")
    +    bow2.write("test4")
    +
    +    bow2.commitAndClose()
    +
    +    assert (file.exists() && file.isFile)
    +    assert (file.length() > length)
    +
    +    // Now revert it : should get reverted back to previous state - after bow1
    +    bow2.revertPartialWritesAndClose()
    +    assert (file.exists())
    +    assert (length == file.length())
    +    file.delete()
    +  }
    +
    +  // val confCopy = conf.clone
    +  // // Ensure we always write data after object ser
    +  // confCopy.set("spark.serializer.objectStreamReset", "1")
    --- End diff --
    
    This is already removed - waiting for tests to pass locally :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

Posted by witgo <gi...@git.apache.org>.
Github user witgo commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1609#discussion_r15447541
  
    --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala ---
    @@ -353,26 +368,53 @@ class ExternalAppendOnlyMap[K, V, C](
        */
       private class DiskMapIterator(file: File, blockId: BlockId, batchSizes: ArrayBuffer[Long])
         extends Iterator[(K, C)] {
    -    private val fileStream = new FileInputStream(file)
    -    private val bufferedStream = new BufferedInputStream(fileStream, fileBufferSize)
    +
    +    assert (! batchSizes.isEmpty)
    +    assert (! batchSizes.exists(_ <= 0))
    --- End diff --
    
    Same as above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on the pull request:

    https://github.com/apache/spark/pull/1609#issuecomment-50363390
  
    @mridulm Thanks for the clarifications! Those make sense and are some tricky edge cases. I will begin reviewing the code as soon as possible.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1609#discussion_r15553299
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala ---
    @@ -107,68 +109,296 @@ private[spark] class DiskBlockObjectWriter(
       private var fos: FileOutputStream = null
       private var ts: TimeTrackingOutputStream = null
       private var objOut: SerializationStream = null
    +
    +  // Did we create this file or was it already present : used in revert to decide
    +  // if we should delete this file or not. Also used to detect if file was deleted
    +  // between creation of BOW and its actual init
    +  private val initiallyExists = file.exists() && file.isFile
       private val initialPosition = file.length()
       private var lastValidPosition = initialPosition
    +
       private var initialized = false
    +  // closed explicitly ?
    +  private var closed = false
    +  // Attempt to cleanly close ? (could also be closed via revert)
    +  // Note, a cleanly closed file could be subsequently reverted
    +  private var cleanCloseAttempted = false
    +  // Was the file actually opened atleast once.
    +  // Note: initialized/streams change state with close/revert.
    +  private var wasOpenedOnce = false
       private var _timeWriting = 0L
     
    -  override def open(): BlockObjectWriter = {
    -    fos = new FileOutputStream(file, true)
    -    ts = new TimeTrackingOutputStream(fos)
    -    channel = fos.getChannel()
    +  // Due to some directory creation race issues in spark, it has been observed that
    +  // sometimes file creation happens 'before' the actual directory has been created
    +  // So we attempt to retry atleast once with a mkdirs in case directory was missing.
    --- End diff --
    
    BTW we could also add a lock around what creates the directories.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1609#issuecomment-50307411
  
    QA results for PR 1609:<br>- This patch FAILED unit tests.<br>- This patch merges cleanly<br>- This patch adds no public classes<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17277/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1609#issuecomment-50600899
  
    QA tests have started for PR 1609. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17443/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1609#discussion_r15553214
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala ---
    @@ -96,9 +98,9 @@ class HashShuffleWriter[K, V](
         var totalBytes = 0L
         var totalTime = 0L
         val compressedSizes = shuffle.writers.map { writer: BlockObjectWriter =>
    -      writer.commit()
    -      writer.close()
    +      writer.commitAndClose()
           val size = writer.fileSegment().length
    +      assert (size >= 0)
    --- End diff --
    
    Nit: no space after assert (it's not a keyword)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on the pull request:

    https://github.com/apache/spark/pull/1609#issuecomment-50455483
  
    All pending fixes work be done.
    I dont think there are any pieces missing in the merge from internal branch to master.
    Open for review, thanks !


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1609#discussion_r15541003
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala ---
    @@ -107,68 +109,296 @@ private[spark] class DiskBlockObjectWriter(
       private var fos: FileOutputStream = null
       private var ts: TimeTrackingOutputStream = null
       private var objOut: SerializationStream = null
    +
    +  // Did we create this file or was it already present : used in revert to decide
    +  // if we should delete this file or not. Also used to detect if file was deleted
    +  // between creation of BOW and its actual init
    +  private val initiallyExists = file.exists() && file.isFile
       private val initialPosition = file.length()
       private var lastValidPosition = initialPosition
    +
       private var initialized = false
    +  // closed explicitly ?
    +  private var closed = false
    +  // Attempt to cleanly close ? (could also be closed via revert)
    +  // Note, a cleanly closed file could be subsequently reverted
    +  private var cleanCloseAttempted = false
    +  // Was the file actually opened atleast once.
    +  // Note: initialized/streams change state with close/revert.
    +  private var wasOpenedOnce = false
       private var _timeWriting = 0L
     
    -  override def open(): BlockObjectWriter = {
    -    fos = new FileOutputStream(file, true)
    -    ts = new TimeTrackingOutputStream(fos)
    -    channel = fos.getChannel()
    +  // Due to some directory creation race issues in spark, it has been observed that
    +  // sometimes file creation happens 'before' the actual directory has been created
    +  // So we attempt to retry atleast once with a mkdirs in case directory was missing.
    +  private def init() {
    +    init(canRetry = true)
    +  }
    +
    +  private def init(canRetry: Boolean) {
    +
    +    if (closed) throw new IOException("Already closed")
    +
    +    assert (! initialized)
    +    assert (! wasOpenedOnce)
    +    var exists = false
    +    try {
    +      exists = file.exists()
    +      if (! exists && initiallyExists && 0 != initialPosition && ! Utils.inShutdown) {
    +        // Was deleted by cleanup thread ?
    +        throw new IOException("file " + file + " cleaned up ? exists = " + exists +
    +          ", initiallyExists = " + initiallyExists + ", initialPosition = " + initialPosition)
    +      }
    +      fos = new FileOutputStream(file, true)
    +    } catch {
    +      case fEx: FileNotFoundException =>
    +        // There seems to be some race in directory creation.
    +        // Attempts to fix it dont seem to have worked : working around the problem for now.
    +        logDebug("Unable to open " + file + ", canRetry = " + canRetry + ", exists = " + exists +
    +          ", initialPosition = " + initialPosition + ", in shutdown = " + Utils.inShutdown(), fEx)
    +        if (canRetry && ! Utils.inShutdown()) {
    +          // try creating the parent directory if that is the issue.
    +          // Since there can be race with others, dont bother checking for
    +          // success/failure - the call to init() will resolve if fos can be created.
    +          file.getParentFile.mkdirs()
    +          // Note, if directory did not exist, then file does not either - and so
    +          // initialPosition would be zero in either case.
    +          init(canRetry = false)
    +          return
    +        } else throw fEx
    +    }
    +
    +    try {
    +      // This is to workaround case where creation of object and actual init
    +      // (which can happen much later) happens after a delay and the cleanup thread
    +      // cleaned up the file.
    +      channel = fos.getChannel
    +      val fosPos = channel.position()
    +      if (initialPosition != fosPos) {
    +        throw new IOException("file cleaned up ? " + file.exists() + 
    +          ", initialpos = " + initialPosition +
    +          "current len = " + fosPos + ", in shutdown ? " + Utils.inShutdown)
    +      }
    +
    +      ts = new TimeTrackingOutputStream(fos)
    +      val bos = new BufferedOutputStream(ts, bufferSize)
    +      bs = compressStream(bos)
    +      objOut = serializer.newInstance().serializeStream(bs)
    +      initialized = true
    +      wasOpenedOnce = true;
    +    } finally {
    +      if (! initialized) {
    +        // failed, cleanup state.
    +        val tfos = fos
    +        updateCloseState()
    +        tfos.close()
    +      }
    +    }
    +  }
    +
    +  private def open(): BlockObjectWriter = {
    +    init()
         lastValidPosition = initialPosition
    -    bs = compressStream(new BufferedOutputStream(ts, bufferSize))
    -    objOut = serializer.newInstance().serializeStream(bs)
    -    initialized = true
         this
       }
     
    -  override def close() {
    -    if (initialized) {
    -      if (syncWrites) {
    -        // Force outstanding writes to disk and track how long it takes
    -        objOut.flush()
    +  private def updateCloseState() {
    +
    +    if (ts ne null) _timeWriting += ts.timeWriting
    +
    +    bs = null
    +    channel = null
    +    fos = null
    +    ts = null
    +    objOut = null
    +    initialized = false
    +  }
    +
    +  private def flushAll() {
    +    if (closed) throw new IOException("Already closed")
    +
    +    // NOTE: Because Kryo doesn't flush the underlying stream we explicitly flush both the
    +    //       serializer stream and the lower level stream.
    +    if (objOut ne null) {
    +      objOut.flush()
    +      bs.flush()
    +    }
    +  }
    +
    +  private def closeAll(needFlush: Boolean, needRevert: Boolean) {
    +
    +    if (null != objOut) {
    +      val truncatePos = if (needRevert) initialPosition else -1L
    +      assert (! this.closed)
    +
    +      // In case syncWrites is true or we need to truncate
    +      var cleanlyClosed = false
    +      try {
    +        // Flushing if we need to truncate also. Currently, we reopen to truncate
    +        // so this is not strictly required (since close could write further to streams).
    +        // Keeping it around in case that gets relaxed.
    +        if (needFlush || needRevert) flushAll()
    +
             val start = System.nanoTime()
    -        fos.getFD.sync()
    +        try {
    +          if (syncWrites) {
    +            // Force outstanding writes to disk and track how long it takes
    +            fos.getFD.sync()
    +          }
    +        } catch {
    +          case sfe: SyncFailedException => // ignore
    +        }
    +        // must cause cascading close. Note, repeated close on closed streams should not cause
    +        // issues : except some libraries do not honour it - hence not explicitly closing bs/fos
    +        objOut.close()
    +        // bs.close()
    +        // fos.close()
             _timeWriting += System.nanoTime() - start
    -      }
    -      objOut.close()
     
    -      _timeWriting += ts.timeWriting
    +        // fos MUST have been closed.
    +        assert((channel eq null) || !channel.isOpen)
    +        cleanlyClosed = true
    +
    +      } finally {
    --- End diff --
    
    Do revert handling in finally so that irrespective of how flush/close went through, the file is returned in a sane state if we are reverting.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1609#discussion_r15542308
  
    --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala ---
    @@ -418,7 +459,25 @@ class ExternalAppendOnlyMap[K, V, C](
     
         // TODO: Ensure this gets called even if the iterator isn't drained.
         private def cleanup() {
    -      deserializeStream.close()
    +      batchIndex = batchOffsets.length
    +      val dstrm = deserializeStream
    +      val fstrm = fileStream
    +      deserializeStream = null
    +      fileStream = null
    +
    +      if (dstrm ne null) {
    +        try {
    +          dstrm.close()
    +        } catch {
    +          case ioEx: IOException => {
    +            // best case attempt - atleast free the handles
    +            if (fstrm ne null) {
    +              try { fstrm.close() } catch {case ioEx: IOException => }
    +            }
    +            throw ioEx
    +          }
    +        }
    +      }
    --- End diff --
    
    This is just more defensive cleanup compared to earlier and setting batchIndex to unusable value.
    To ensure that any close related exceptions do not result in unclosed files (which impacts ulimit of spark process)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1609#issuecomment-50291239
  
    QA results for PR 1609:<br>- This patch FAILED unit tests.<br>- This patch merges cleanly<br>- This patch adds no public classes<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17256/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1609#discussion_r15540782
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala ---
    @@ -71,7 +72,8 @@ class HashShuffleWriter[K, V](
             try {
               return Some(commitWritesAndBuildStatus())
             } catch {
    -          case e: Exception =>
    +          case e: Throwable =>
    +            success = false // for finally block
                 revertWrites()
    --- End diff --
    
    If success != false, then release writers will attempt to register.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:

    https://github.com/apache/spark/pull/1609#issuecomment-50929329
  
    So I looked through this and I also think it would be good to split it into smaller patches for 1.1. As far as I can see there are several orthogonal improvements here:
    - Shuffle file consolidation fixes that Aaron copied in https://github.com/apache/spark/pull/1678
    - ExternalAppendOnlyMap fixes to deal with writes past end of stream; we also need these in ExternalSorter
    - Fixes to directory creation in DiskBlockManager (I'm still not sure when this would be a problem actually if all accesses to these directories are through getFile; needs some investigation)
    - Fixes to isSymlink (though as is this seems like it would only compile on Java 7)
    - Improvements to the API of DiskBlockObjectWriter
    
    Of these, the first two are most critical. So I'd like to get those into 1.1, and then we can do API refactoring and the other fixes on the master branch. For the directory creation fix I'd still like to understand when that can be a problem (I'm probably just missing something), but it's also one we can add in 1.1 during the QA window.
    
    I'm going to update the JIRA to create sub-tasks for these things so we can track where each one is fixed. Thanks again for putting this together Mridul, this is very helpful.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1609#issuecomment-50267547
  
    QA results for PR 1609:<br>- This patch FAILED unit tests.<br>- This patch merges cleanly<br>- This patch adds no public classes<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17242/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:

    https://github.com/apache/spark/pull/1609#issuecomment-50804445
  
    So FYI I'm going to make a more detailed pass through this soon to see if we can get all of it into 1.1. It would be nice to get all these changes in so we can QA them along with the other QA we do for 1.1, but if that doesn't work out, we can split some of them into smaller patches as Aaron did.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

Posted by witgo <gi...@git.apache.org>.
Github user witgo commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1609#discussion_r15447397
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala ---
    @@ -107,68 +109,296 @@ private[spark] class DiskBlockObjectWriter(
       private var fos: FileOutputStream = null
       private var ts: TimeTrackingOutputStream = null
       private var objOut: SerializationStream = null
    +
    +  // Did we create this file or was it already present : used in revert to decide
    +  // if we should delete this file or not. Also used to detect if file was deleted
    +  // between creation of BOW and its actual init
    +  private val initiallyExists = file.exists() && file.isFile
       private val initialPosition = file.length()
       private var lastValidPosition = initialPosition
    +
       private var initialized = false
    +  // closed explicitly ?
    +  private var closed = false
    +  // Attempt to cleanly close ? (could also be closed via revert)
    +  // Note, a cleanly closed file could be subsequently reverted
    +  private var cleanCloseAttempted = false
    +  // Was the file actually opened atleast once.
    +  // Note: initialized/streams change state with close/revert.
    +  private var wasOpenedOnce = false
       private var _timeWriting = 0L
     
    -  override def open(): BlockObjectWriter = {
    -    fos = new FileOutputStream(file, true)
    -    ts = new TimeTrackingOutputStream(fos)
    -    channel = fos.getChannel()
    +  // Due to some directory creation race issues in spark, it has been observed that
    +  // sometimes file creation happens 'before' the actual directory has been created
    +  // So we attempt to retry atleast once with a mkdirs in case directory was missing.
    +  private def init() {
    +    init(canRetry = true)
    +  }
    +
    +  private def init(canRetry: Boolean) {
    +
    +    if (closed) throw new IOException("Already closed")
    +
    +    assert (! initialized)
    +    assert (! wasOpenedOnce)
    +    var exists = false
    +    try {
    +      exists = file.exists()
    +      if (! exists && initiallyExists && 0 != initialPosition && ! Utils.inShutdown) {
    +        // Was deleted by cleanup thread ?
    +        throw new IOException("file " + file + " cleaned up ? exists = " + exists +
    +          ", initiallyExists = " + initiallyExists + ", initialPosition = " + initialPosition)
    +      }
    +      fos = new FileOutputStream(file, true)
    +    } catch {
    +      case fEx: FileNotFoundException =>
    +        // There seems to be some race in directory creation.
    +        // Attempts to fix it dont seem to have worked : working around the problem for now.
    +        logDebug("Unable to open " + file + ", canRetry = " + canRetry + ", exists = " + exists +
    +          ", initialPosition = " + initialPosition + ", in shutdown = " + Utils.inShutdown(), fEx)
    +        if (canRetry && ! Utils.inShutdown()) {
    +          // try creating the parent directory if that is the issue.
    +          // Since there can be race with others, dont bother checking for
    +          // success/failure - the call to init() will resolve if fos can be created.
    +          file.getParentFile.mkdirs()
    +          // Note, if directory did not exist, then file does not either - and so
    +          // initialPosition would be zero in either case.
    +          init(canRetry = false)
    +          return
    +        } else throw fEx
    +    }
    +
    +    try {
    +      // This is to workaround case where creation of object and actual init
    +      // (which can happen much later) happens after a delay and the cleanup thread
    +      // cleaned up the file.
    +      channel = fos.getChannel
    +      val fosPos = channel.position()
    +      if (initialPosition != fosPos) {
    +        throw new IOException("file cleaned up ? " + file.exists() + 
    +          ", initialpos = " + initialPosition +
    +          "current len = " + fosPos + ", in shutdown ? " + Utils.inShutdown)
    +      }
    +
    +      ts = new TimeTrackingOutputStream(fos)
    +      val bos = new BufferedOutputStream(ts, bufferSize)
    +      bs = compressStream(bos)
    +      objOut = serializer.newInstance().serializeStream(bs)
    +      initialized = true
    +      wasOpenedOnce = true;
    +    } finally {
    +      if (! initialized) {
    +        // failed, cleanup state.
    +        val tfos = fos
    +        updateCloseState()
    +        tfos.close()
    +      }
    +    }
    +  }
    +
    +  private def open(): BlockObjectWriter = {
    +    init()
         lastValidPosition = initialPosition
    -    bs = compressStream(new BufferedOutputStream(ts, bufferSize))
    -    objOut = serializer.newInstance().serializeStream(bs)
    -    initialized = true
         this
       }
     
    -  override def close() {
    -    if (initialized) {
    -      if (syncWrites) {
    -        // Force outstanding writes to disk and track how long it takes
    -        objOut.flush()
    +  private def updateCloseState() {
    +
    +    if (ts ne null) _timeWriting += ts.timeWriting
    +
    +    bs = null
    +    channel = null
    +    fos = null
    +    ts = null
    +    objOut = null
    +    initialized = false
    +  }
    +
    +  private def flushAll() {
    +    if (closed) throw new IOException("Already closed")
    +
    +    // NOTE: Because Kryo doesn't flush the underlying stream we explicitly flush both the
    +    //       serializer stream and the lower level stream.
    +    if (objOut ne null) {
    +      objOut.flush()
    +      bs.flush()
    +    }
    +  }
    +
    +  private def closeAll(needFlush: Boolean, needRevert: Boolean) {
    +
    +    if (null != objOut) {
    +      val truncatePos = if (needRevert) initialPosition else -1L
    +      assert (! this.closed)
    --- End diff --
    
    Same as above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1609#issuecomment-50307247
  
    QA tests have started for PR 1609. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17277/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1609#discussion_r15541065
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala ---
    @@ -188,6 +425,39 @@ private[spark] class DiskBlockObjectWriter(
     
       // Only valid if called after commit()
       override def bytesWritten: Long = {
    -    lastValidPosition - initialPosition
    +    val retval = lastValidPosition - initialPosition
    +
    +    assert(retval >= 0 || Utils.inShutdown(),
    +      "exists = " + file.exists() + ", bytesWritten = " + retval +
    +      ", lastValidPosition = " + lastValidPosition + ", initialPosition = " + initialPosition +
    +      ", in shutdown = " + Utils.inShutdown())
    +
    +    // TODO: Comment this out when we are done validating : can be expensive due to file.length()
    +    assert (file.length() >= lastValidPosition || Utils.inShutdown(),
    +      "exists = " + file.exists() + ", file len = " + file.length() +
    +          ", bytesWritten = " + retval + ", lastValidPosition = " + lastValidPosition +
    +          ", initialPosition = " + initialPosition + ", in shutdown = " + Utils.inShutdown())
    +
    +    if (retval >= 0) retval else 0
       }
     }
    +
    +object DiskBlockObjectWriter{
    +
    +  // Unfortunately, cant do it atomically ...
    +  private def truncateIfExists(file: File, truncatePos: Long) {
    +    var fos: FileOutputStream = null
    +    try {
    +      // There is no way to do this atomically iirc.
    +      if (file.exists() && file.length() != truncatePos) {
    +        fos = new FileOutputStream(file, true)
    +        fos.getChannel.truncate(truncatePos)
    +      }
    +    } finally {
    +      if (fos ne null) {
    +        fos.close()
    +      }
    +    }
    +  }
    +}
    --- End diff --
    
    Hopefully, rest of the code changes in this class is documented (a bit too heavily probably ? lighten it before final commit ?) 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1609#discussion_r15565486
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala ---
    @@ -107,68 +109,296 @@ private[spark] class DiskBlockObjectWriter(
       private var fos: FileOutputStream = null
       private var ts: TimeTrackingOutputStream = null
       private var objOut: SerializationStream = null
    +
    +  // Did we create this file or was it already present : used in revert to decide
    +  // if we should delete this file or not. Also used to detect if file was deleted
    +  // between creation of BOW and its actual init
    +  private val initiallyExists = file.exists() && file.isFile
       private val initialPosition = file.length()
       private var lastValidPosition = initialPosition
    +
       private var initialized = false
    +  // closed explicitly ?
    +  private var closed = false
    +  // Attempt to cleanly close ? (could also be closed via revert)
    +  // Note, a cleanly closed file could be subsequently reverted
    +  private var cleanCloseAttempted = false
    +  // Was the file actually opened atleast once.
    +  // Note: initialized/streams change state with close/revert.
    +  private var wasOpenedOnce = false
       private var _timeWriting = 0L
     
    -  override def open(): BlockObjectWriter = {
    -    fos = new FileOutputStream(file, true)
    -    ts = new TimeTrackingOutputStream(fos)
    -    channel = fos.getChannel()
    +  // Due to some directory creation race issues in spark, it has been observed that
    +  // sometimes file creation happens 'before' the actual directory has been created
    +  // So we attempt to retry atleast once with a mkdirs in case directory was missing.
    +  private def init() {
    +    init(canRetry = true)
    +  }
    +
    +  private def init(canRetry: Boolean) {
    +
    +    if (closed) throw new IOException("Already closed")
    +
    +    assert (! initialized)
    +    assert (! wasOpenedOnce)
    +    var exists = false
    +    try {
    +      exists = file.exists()
    +      if (! exists && initiallyExists && 0 != initialPosition && ! Utils.inShutdown) {
    +        // Was deleted by cleanup thread ?
    +        throw new IOException("file " + file + " cleaned up ? exists = " + exists +
    +          ", initiallyExists = " + initiallyExists + ", initialPosition = " + initialPosition)
    +      }
    +      fos = new FileOutputStream(file, true)
    +    } catch {
    +      case fEx: FileNotFoundException =>
    +        // There seems to be some race in directory creation.
    +        // Attempts to fix it dont seem to have worked : working around the problem for now.
    +        logDebug("Unable to open " + file + ", canRetry = " + canRetry + ", exists = " + exists +
    +          ", initialPosition = " + initialPosition + ", in shutdown = " + Utils.inShutdown(), fEx)
    +        if (canRetry && ! Utils.inShutdown()) {
    +          // try creating the parent directory if that is the issue.
    +          // Since there can be race with others, dont bother checking for
    +          // success/failure - the call to init() will resolve if fos can be created.
    +          file.getParentFile.mkdirs()
    +          // Note, if directory did not exist, then file does not either - and so
    +          // initialPosition would be zero in either case.
    +          init(canRetry = false)
    +          return
    +        } else throw fEx
    +    }
    +
    +    try {
    +      // This is to workaround case where creation of object and actual init
    +      // (which can happen much later) happens after a delay and the cleanup thread
    +      // cleaned up the file.
    +      channel = fos.getChannel
    +      val fosPos = channel.position()
    +      if (initialPosition != fosPos) {
    +        throw new IOException("file cleaned up ? " + file.exists() + 
    +          ", initialpos = " + initialPosition +
    +          "current len = " + fosPos + ", in shutdown ? " + Utils.inShutdown)
    +      }
    +
    +      ts = new TimeTrackingOutputStream(fos)
    +      val bos = new BufferedOutputStream(ts, bufferSize)
    +      bs = compressStream(bos)
    +      objOut = serializer.newInstance().serializeStream(bs)
    +      initialized = true
    +      wasOpenedOnce = true;
    +    } finally {
    +      if (! initialized) {
    +        // failed, cleanup state.
    +        val tfos = fos
    +        updateCloseState()
    +        tfos.close()
    +      }
    +    }
    +  }
    +
    +  private def open(): BlockObjectWriter = {
    +    init()
         lastValidPosition = initialPosition
    -    bs = compressStream(new BufferedOutputStream(ts, bufferSize))
    -    objOut = serializer.newInstance().serializeStream(bs)
    -    initialized = true
         this
       }
     
    -  override def close() {
    -    if (initialized) {
    -      if (syncWrites) {
    -        // Force outstanding writes to disk and track how long it takes
    -        objOut.flush()
    +  private def updateCloseState() {
    +
    +    if (ts ne null) _timeWriting += ts.timeWriting
    --- End diff --
    
    The only reason to use eq/ne was to avoid the call to equals/canEquals/etc.
    Since spark ships with assertions turned on, this ends up being expensive since some of the codepaths are very frequent : and all changes in the 2G fix branch uses eq/ne.
    Not sure how relevant it is to this patch though ... I can revert to ==/!= if it is a problem !


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1609#discussion_r15599910
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala ---
    @@ -107,68 +109,296 @@ private[spark] class DiskBlockObjectWriter(
       private var fos: FileOutputStream = null
       private var ts: TimeTrackingOutputStream = null
       private var objOut: SerializationStream = null
    +
    +  // Did we create this file or was it already present : used in revert to decide
    +  // if we should delete this file or not. Also used to detect if file was deleted
    +  // between creation of BOW and its actual init
    +  private val initiallyExists = file.exists() && file.isFile
       private val initialPosition = file.length()
       private var lastValidPosition = initialPosition
    +
       private var initialized = false
    +  // closed explicitly ?
    +  private var closed = false
    +  // Attempt to cleanly close ? (could also be closed via revert)
    +  // Note, a cleanly closed file could be subsequently reverted
    +  private var cleanCloseAttempted = false
    +  // Was the file actually opened atleast once.
    +  // Note: initialized/streams change state with close/revert.
    +  private var wasOpenedOnce = false
       private var _timeWriting = 0L
     
    -  override def open(): BlockObjectWriter = {
    -    fos = new FileOutputStream(file, true)
    -    ts = new TimeTrackingOutputStream(fos)
    -    channel = fos.getChannel()
    +  // Due to some directory creation race issues in spark, it has been observed that
    +  // sometimes file creation happens 'before' the actual directory has been created
    +  // So we attempt to retry atleast once with a mkdirs in case directory was missing.
    --- End diff --
    
    Alright, as long as you can confirm that it still happens without this. I just don't see how it could happen from the other code in there. I guess we can try disabling it after this fix goes in and see if things break.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1609#issuecomment-50278406
  
    QA results for PR 1609:<br>- This patch FAILED unit tests.<br>- This patch merges cleanly<br>- This patch adds no public classes<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17244/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on the pull request:

    https://github.com/apache/spark/pull/1609#issuecomment-50307155
  
    Jenkins, test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on the pull request:

    https://github.com/apache/spark/pull/1609#issuecomment-50305416
  
    @mridulm Thanks for submitting this! I would like to dig a little deeper into understanding the specific issues you found, in order to understand the solutions you have provided (since the specific solutions seem interleaved with a lot of new asserts and code paths). 
    
    You mention that there was an issue if shuffle writes co-occur with shuffle fetches, which is true, but should not typically occur due to the barrier before the reduce stage of a shuffle. In what situations does this happen (outside of failure conditions)?
    
    Did you observe a prior pattern of close/revert/close on the same block writer?
    
    How did task failures induce inconsistent state on the map side? Was it due to the same close/revert/close pattern?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

Posted by witgo <gi...@git.apache.org>.
Github user witgo commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1609#discussion_r15447430
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala ---
    @@ -107,68 +109,296 @@ private[spark] class DiskBlockObjectWriter(
       private var fos: FileOutputStream = null
       private var ts: TimeTrackingOutputStream = null
       private var objOut: SerializationStream = null
    +
    +  // Did we create this file or was it already present : used in revert to decide
    +  // if we should delete this file or not. Also used to detect if file was deleted
    +  // between creation of BOW and its actual init
    +  private val initiallyExists = file.exists() && file.isFile
       private val initialPosition = file.length()
       private var lastValidPosition = initialPosition
    +
       private var initialized = false
    +  // closed explicitly ?
    +  private var closed = false
    +  // Attempt to cleanly close ? (could also be closed via revert)
    +  // Note, a cleanly closed file could be subsequently reverted
    +  private var cleanCloseAttempted = false
    +  // Was the file actually opened atleast once.
    +  // Note: initialized/streams change state with close/revert.
    +  private var wasOpenedOnce = false
       private var _timeWriting = 0L
     
    -  override def open(): BlockObjectWriter = {
    -    fos = new FileOutputStream(file, true)
    -    ts = new TimeTrackingOutputStream(fos)
    -    channel = fos.getChannel()
    +  // Due to some directory creation race issues in spark, it has been observed that
    +  // sometimes file creation happens 'before' the actual directory has been created
    +  // So we attempt to retry atleast once with a mkdirs in case directory was missing.
    +  private def init() {
    +    init(canRetry = true)
    +  }
    +
    +  private def init(canRetry: Boolean) {
    +
    +    if (closed) throw new IOException("Already closed")
    +
    +    assert (! initialized)
    +    assert (! wasOpenedOnce)
    +    var exists = false
    +    try {
    +      exists = file.exists()
    +      if (! exists && initiallyExists && 0 != initialPosition && ! Utils.inShutdown) {
    +        // Was deleted by cleanup thread ?
    +        throw new IOException("file " + file + " cleaned up ? exists = " + exists +
    +          ", initiallyExists = " + initiallyExists + ", initialPosition = " + initialPosition)
    +      }
    +      fos = new FileOutputStream(file, true)
    +    } catch {
    +      case fEx: FileNotFoundException =>
    +        // There seems to be some race in directory creation.
    +        // Attempts to fix it dont seem to have worked : working around the problem for now.
    +        logDebug("Unable to open " + file + ", canRetry = " + canRetry + ", exists = " + exists +
    +          ", initialPosition = " + initialPosition + ", in shutdown = " + Utils.inShutdown(), fEx)
    +        if (canRetry && ! Utils.inShutdown()) {
    +          // try creating the parent directory if that is the issue.
    +          // Since there can be race with others, dont bother checking for
    +          // success/failure - the call to init() will resolve if fos can be created.
    +          file.getParentFile.mkdirs()
    +          // Note, if directory did not exist, then file does not either - and so
    +          // initialPosition would be zero in either case.
    +          init(canRetry = false)
    +          return
    +        } else throw fEx
    +    }
    +
    +    try {
    +      // This is to workaround case where creation of object and actual init
    +      // (which can happen much later) happens after a delay and the cleanup thread
    +      // cleaned up the file.
    +      channel = fos.getChannel
    +      val fosPos = channel.position()
    +      if (initialPosition != fosPos) {
    +        throw new IOException("file cleaned up ? " + file.exists() + 
    +          ", initialpos = " + initialPosition +
    +          "current len = " + fosPos + ", in shutdown ? " + Utils.inShutdown)
    +      }
    +
    +      ts = new TimeTrackingOutputStream(fos)
    +      val bos = new BufferedOutputStream(ts, bufferSize)
    +      bs = compressStream(bos)
    +      objOut = serializer.newInstance().serializeStream(bs)
    +      initialized = true
    +      wasOpenedOnce = true;
    +    } finally {
    +      if (! initialized) {
    +        // failed, cleanup state.
    +        val tfos = fos
    +        updateCloseState()
    +        tfos.close()
    +      }
    +    }
    +  }
    +
    +  private def open(): BlockObjectWriter = {
    +    init()
         lastValidPosition = initialPosition
    -    bs = compressStream(new BufferedOutputStream(ts, bufferSize))
    -    objOut = serializer.newInstance().serializeStream(bs)
    -    initialized = true
         this
       }
     
    -  override def close() {
    -    if (initialized) {
    -      if (syncWrites) {
    -        // Force outstanding writes to disk and track how long it takes
    -        objOut.flush()
    +  private def updateCloseState() {
    +
    +    if (ts ne null) _timeWriting += ts.timeWriting
    +
    +    bs = null
    +    channel = null
    +    fos = null
    +    ts = null
    +    objOut = null
    +    initialized = false
    +  }
    +
    +  private def flushAll() {
    +    if (closed) throw new IOException("Already closed")
    +
    +    // NOTE: Because Kryo doesn't flush the underlying stream we explicitly flush both the
    +    //       serializer stream and the lower level stream.
    +    if (objOut ne null) {
    +      objOut.flush()
    +      bs.flush()
    +    }
    +  }
    +
    +  private def closeAll(needFlush: Boolean, needRevert: Boolean) {
    +
    +    if (null != objOut) {
    +      val truncatePos = if (needRevert) initialPosition else -1L
    +      assert (! this.closed)
    +
    +      // In case syncWrites is true or we need to truncate
    +      var cleanlyClosed = false
    +      try {
    +        // Flushing if we need to truncate also. Currently, we reopen to truncate
    +        // so this is not strictly required (since close could write further to streams).
    +        // Keeping it around in case that gets relaxed.
    +        if (needFlush || needRevert) flushAll()
    +
             val start = System.nanoTime()
    -        fos.getFD.sync()
    +        try {
    +          if (syncWrites) {
    +            // Force outstanding writes to disk and track how long it takes
    +            fos.getFD.sync()
    +          }
    +        } catch {
    +          case sfe: SyncFailedException => // ignore
    +        }
    +        // must cause cascading close. Note, repeated close on closed streams should not cause
    +        // issues : except some libraries do not honour it - hence not explicitly closing bs/fos
    +        objOut.close()
    +        // bs.close()
    +        // fos.close()
             _timeWriting += System.nanoTime() - start
    -      }
    -      objOut.close()
     
    -      _timeWriting += ts.timeWriting
    +        // fos MUST have been closed.
    +        assert((channel eq null) || !channel.isOpen)
    +        cleanlyClosed = true
    +
    +      } finally {
    +
    +        this.closed = true
    +        if (! cleanlyClosed) {
    --- End diff --
    
    Same as above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

Posted by witgo <gi...@git.apache.org>.
Github user witgo commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1609#discussion_r15447384
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala ---
    @@ -107,68 +109,296 @@ private[spark] class DiskBlockObjectWriter(
       private var fos: FileOutputStream = null
       private var ts: TimeTrackingOutputStream = null
       private var objOut: SerializationStream = null
    +
    +  // Did we create this file or was it already present : used in revert to decide
    +  // if we should delete this file or not. Also used to detect if file was deleted
    +  // between creation of BOW and its actual init
    +  private val initiallyExists = file.exists() && file.isFile
       private val initialPosition = file.length()
       private var lastValidPosition = initialPosition
    +
       private var initialized = false
    +  // closed explicitly ?
    +  private var closed = false
    +  // Attempt to cleanly close ? (could also be closed via revert)
    +  // Note, a cleanly closed file could be subsequently reverted
    +  private var cleanCloseAttempted = false
    +  // Was the file actually opened atleast once.
    +  // Note: initialized/streams change state with close/revert.
    +  private var wasOpenedOnce = false
       private var _timeWriting = 0L
     
    -  override def open(): BlockObjectWriter = {
    -    fos = new FileOutputStream(file, true)
    -    ts = new TimeTrackingOutputStream(fos)
    -    channel = fos.getChannel()
    +  // Due to some directory creation race issues in spark, it has been observed that
    +  // sometimes file creation happens 'before' the actual directory has been created
    +  // So we attempt to retry atleast once with a mkdirs in case directory was missing.
    +  private def init() {
    +    init(canRetry = true)
    +  }
    +
    +  private def init(canRetry: Boolean) {
    +
    +    if (closed) throw new IOException("Already closed")
    +
    +    assert (! initialized)
    +    assert (! wasOpenedOnce)
    --- End diff --
    
    One more spaces


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1609#issuecomment-50274618
  
    QA tests have started for PR 1609. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17243/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm closed the pull request at:

    https://github.com/apache/spark/pull/1609


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on the pull request:

    https://github.com/apache/spark/pull/1609#issuecomment-54383344
  
    @mridulm Are the issues in this PR taken care of by #1722 and and #1678? Do we still need this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1609#issuecomment-50311354
  
    QA results for PR 1609:<br>- This patch PASSES unit tests.<br>- This patch merges cleanly<br>- This patch adds no public classes<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17279/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on the pull request:

    https://github.com/apache/spark/pull/1609#issuecomment-50306648
  
    Accidental close, apologies !


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1609#discussion_r15553165
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala ---
    @@ -107,68 +109,296 @@ private[spark] class DiskBlockObjectWriter(
       private var fos: FileOutputStream = null
       private var ts: TimeTrackingOutputStream = null
       private var objOut: SerializationStream = null
    +
    +  // Did we create this file or was it already present : used in revert to decide
    +  // if we should delete this file or not. Also used to detect if file was deleted
    +  // between creation of BOW and its actual init
    +  private val initiallyExists = file.exists() && file.isFile
       private val initialPosition = file.length()
       private var lastValidPosition = initialPosition
    +
       private var initialized = false
    +  // closed explicitly ?
    +  private var closed = false
    +  // Attempt to cleanly close ? (could also be closed via revert)
    +  // Note, a cleanly closed file could be subsequently reverted
    +  private var cleanCloseAttempted = false
    +  // Was the file actually opened atleast once.
    +  // Note: initialized/streams change state with close/revert.
    +  private var wasOpenedOnce = false
       private var _timeWriting = 0L
     
    -  override def open(): BlockObjectWriter = {
    -    fos = new FileOutputStream(file, true)
    -    ts = new TimeTrackingOutputStream(fos)
    -    channel = fos.getChannel()
    +  // Due to some directory creation race issues in spark, it has been observed that
    +  // sometimes file creation happens 'before' the actual directory has been created
    +  // So we attempt to retry atleast once with a mkdirs in case directory was missing.
    +  private def init() {
    +    init(canRetry = true)
    +  }
    +
    +  private def init(canRetry: Boolean) {
    +
    +    if (closed) throw new IOException("Already closed")
    +
    +    assert (! initialized)
    +    assert (! wasOpenedOnce)
    +    var exists = false
    +    try {
    +      exists = file.exists()
    +      if (! exists && initiallyExists && 0 != initialPosition && ! Utils.inShutdown) {
    +        // Was deleted by cleanup thread ?
    +        throw new IOException("file " + file + " cleaned up ? exists = " + exists +
    +          ", initiallyExists = " + initiallyExists + ", initialPosition = " + initialPosition)
    +      }
    +      fos = new FileOutputStream(file, true)
    +    } catch {
    +      case fEx: FileNotFoundException =>
    +        // There seems to be some race in directory creation.
    +        // Attempts to fix it dont seem to have worked : working around the problem for now.
    +        logDebug("Unable to open " + file + ", canRetry = " + canRetry + ", exists = " + exists +
    +          ", initialPosition = " + initialPosition + ", in shutdown = " + Utils.inShutdown(), fEx)
    +        if (canRetry && ! Utils.inShutdown()) {
    +          // try creating the parent directory if that is the issue.
    +          // Since there can be race with others, dont bother checking for
    +          // success/failure - the call to init() will resolve if fos can be created.
    +          file.getParentFile.mkdirs()
    +          // Note, if directory did not exist, then file does not either - and so
    +          // initialPosition would be zero in either case.
    +          init(canRetry = false)
    +          return
    +        } else throw fEx
    +    }
    +
    +    try {
    +      // This is to workaround case where creation of object and actual init
    +      // (which can happen much later) happens after a delay and the cleanup thread
    +      // cleaned up the file.
    +      channel = fos.getChannel
    +      val fosPos = channel.position()
    +      if (initialPosition != fosPos) {
    +        throw new IOException("file cleaned up ? " + file.exists() + 
    +          ", initialpos = " + initialPosition +
    +          "current len = " + fosPos + ", in shutdown ? " + Utils.inShutdown)
    +      }
    +
    +      ts = new TimeTrackingOutputStream(fos)
    +      val bos = new BufferedOutputStream(ts, bufferSize)
    +      bs = compressStream(bos)
    +      objOut = serializer.newInstance().serializeStream(bs)
    +      initialized = true
    +      wasOpenedOnce = true;
    +    } finally {
    +      if (! initialized) {
    +        // failed, cleanup state.
    +        val tfos = fos
    +        updateCloseState()
    +        tfos.close()
    +      }
    +    }
    +  }
    +
    +  private def open(): BlockObjectWriter = {
    +    init()
         lastValidPosition = initialPosition
    -    bs = compressStream(new BufferedOutputStream(ts, bufferSize))
    -    objOut = serializer.newInstance().serializeStream(bs)
    -    initialized = true
         this
       }
     
    -  override def close() {
    -    if (initialized) {
    -      if (syncWrites) {
    -        // Force outstanding writes to disk and track how long it takes
    -        objOut.flush()
    +  private def updateCloseState() {
    +
    +    if (ts ne null) _timeWriting += ts.timeWriting
    --- End diff --
    
    Just a small note on code style, you can use `!= null` safely. Please do that so that it matches the rest of the codebase.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

Posted by witgo <gi...@git.apache.org>.
Github user witgo commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1609#discussion_r15446921
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala ---
    @@ -176,10 +406,17 @@ private[spark] class DiskBlockObjectWriter(
         if (!initialized) {
           open()
         }
    +    // Not checking if closed on purpose ... introduce it ? No usecase for it right now.
         objOut.writeObject(value)
       }
     
       override def fileSegment(): FileSegment = {
    +    assert (! initialized)
    --- End diff --
    
    Same as above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1609#issuecomment-50604735
  
    QA results for PR 1609:<br>- This patch PASSES unit tests.<br>- This patch merges cleanly<br>- This patch adds no public classes<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17443/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on the pull request:

    https://github.com/apache/spark/pull/1609#issuecomment-50704781
  
    I created #1678, which only includes the changes directly related to fixing the issues with shuffle file consolidation (essentially forking off a piece of this PR), intended as a simple candidate for review to make the 1.1 release. The smaller PR is not intended as a replacement for this more complete one, however -- it is merely an option to fix some of the more severe bugs in time for the next major release. If we can get this one in for 1.1 instead, then we should.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1609#issuecomment-50290037
  
    QA tests have started for PR 1609. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17256/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

Posted by witgo <gi...@git.apache.org>.
Github user witgo commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1609#discussion_r15447449
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala ---
    @@ -107,68 +109,296 @@ private[spark] class DiskBlockObjectWriter(
       private var fos: FileOutputStream = null
       private var ts: TimeTrackingOutputStream = null
       private var objOut: SerializationStream = null
    +
    +  // Did we create this file or was it already present : used in revert to decide
    +  // if we should delete this file or not. Also used to detect if file was deleted
    +  // between creation of BOW and its actual init
    +  private val initiallyExists = file.exists() && file.isFile
       private val initialPosition = file.length()
       private var lastValidPosition = initialPosition
    +
       private var initialized = false
    +  // closed explicitly ?
    +  private var closed = false
    +  // Attempt to cleanly close ? (could also be closed via revert)
    +  // Note, a cleanly closed file could be subsequently reverted
    +  private var cleanCloseAttempted = false
    +  // Was the file actually opened atleast once.
    +  // Note: initialized/streams change state with close/revert.
    +  private var wasOpenedOnce = false
       private var _timeWriting = 0L
     
    -  override def open(): BlockObjectWriter = {
    -    fos = new FileOutputStream(file, true)
    -    ts = new TimeTrackingOutputStream(fos)
    -    channel = fos.getChannel()
    +  // Due to some directory creation race issues in spark, it has been observed that
    +  // sometimes file creation happens 'before' the actual directory has been created
    +  // So we attempt to retry atleast once with a mkdirs in case directory was missing.
    +  private def init() {
    +    init(canRetry = true)
    +  }
    +
    +  private def init(canRetry: Boolean) {
    +
    +    if (closed) throw new IOException("Already closed")
    +
    +    assert (! initialized)
    +    assert (! wasOpenedOnce)
    +    var exists = false
    +    try {
    +      exists = file.exists()
    +      if (! exists && initiallyExists && 0 != initialPosition && ! Utils.inShutdown) {
    +        // Was deleted by cleanup thread ?
    +        throw new IOException("file " + file + " cleaned up ? exists = " + exists +
    +          ", initiallyExists = " + initiallyExists + ", initialPosition = " + initialPosition)
    +      }
    +      fos = new FileOutputStream(file, true)
    +    } catch {
    +      case fEx: FileNotFoundException =>
    +        // There seems to be some race in directory creation.
    +        // Attempts to fix it dont seem to have worked : working around the problem for now.
    +        logDebug("Unable to open " + file + ", canRetry = " + canRetry + ", exists = " + exists +
    +          ", initialPosition = " + initialPosition + ", in shutdown = " + Utils.inShutdown(), fEx)
    +        if (canRetry && ! Utils.inShutdown()) {
    +          // try creating the parent directory if that is the issue.
    +          // Since there can be race with others, dont bother checking for
    +          // success/failure - the call to init() will resolve if fos can be created.
    +          file.getParentFile.mkdirs()
    +          // Note, if directory did not exist, then file does not either - and so
    +          // initialPosition would be zero in either case.
    +          init(canRetry = false)
    +          return
    +        } else throw fEx
    +    }
    +
    +    try {
    +      // This is to workaround case where creation of object and actual init
    +      // (which can happen much later) happens after a delay and the cleanup thread
    +      // cleaned up the file.
    +      channel = fos.getChannel
    +      val fosPos = channel.position()
    +      if (initialPosition != fosPos) {
    +        throw new IOException("file cleaned up ? " + file.exists() + 
    +          ", initialpos = " + initialPosition +
    +          "current len = " + fosPos + ", in shutdown ? " + Utils.inShutdown)
    +      }
    +
    +      ts = new TimeTrackingOutputStream(fos)
    +      val bos = new BufferedOutputStream(ts, bufferSize)
    +      bs = compressStream(bos)
    +      objOut = serializer.newInstance().serializeStream(bs)
    +      initialized = true
    +      wasOpenedOnce = true;
    +    } finally {
    +      if (! initialized) {
    +        // failed, cleanup state.
    +        val tfos = fos
    +        updateCloseState()
    +        tfos.close()
    +      }
    +    }
    +  }
    +
    +  private def open(): BlockObjectWriter = {
    +    init()
         lastValidPosition = initialPosition
    -    bs = compressStream(new BufferedOutputStream(ts, bufferSize))
    -    objOut = serializer.newInstance().serializeStream(bs)
    -    initialized = true
         this
       }
     
    -  override def close() {
    -    if (initialized) {
    -      if (syncWrites) {
    -        // Force outstanding writes to disk and track how long it takes
    -        objOut.flush()
    +  private def updateCloseState() {
    +
    +    if (ts ne null) _timeWriting += ts.timeWriting
    +
    +    bs = null
    +    channel = null
    +    fos = null
    +    ts = null
    +    objOut = null
    +    initialized = false
    +  }
    +
    +  private def flushAll() {
    +    if (closed) throw new IOException("Already closed")
    +
    +    // NOTE: Because Kryo doesn't flush the underlying stream we explicitly flush both the
    +    //       serializer stream and the lower level stream.
    +    if (objOut ne null) {
    +      objOut.flush()
    +      bs.flush()
    +    }
    +  }
    +
    +  private def closeAll(needFlush: Boolean, needRevert: Boolean) {
    +
    +    if (null != objOut) {
    +      val truncatePos = if (needRevert) initialPosition else -1L
    +      assert (! this.closed)
    +
    +      // In case syncWrites is true or we need to truncate
    +      var cleanlyClosed = false
    +      try {
    +        // Flushing if we need to truncate also. Currently, we reopen to truncate
    +        // so this is not strictly required (since close could write further to streams).
    +        // Keeping it around in case that gets relaxed.
    +        if (needFlush || needRevert) flushAll()
    +
             val start = System.nanoTime()
    -        fos.getFD.sync()
    +        try {
    +          if (syncWrites) {
    +            // Force outstanding writes to disk and track how long it takes
    +            fos.getFD.sync()
    +          }
    +        } catch {
    +          case sfe: SyncFailedException => // ignore
    +        }
    +        // must cause cascading close. Note, repeated close on closed streams should not cause
    +        // issues : except some libraries do not honour it - hence not explicitly closing bs/fos
    +        objOut.close()
    +        // bs.close()
    +        // fos.close()
             _timeWriting += System.nanoTime() - start
    -      }
    -      objOut.close()
     
    -      _timeWriting += ts.timeWriting
    +        // fos MUST have been closed.
    +        assert((channel eq null) || !channel.isOpen)
    +        cleanlyClosed = true
    +
    +      } finally {
    +
    +        this.closed = true
    +        if (! cleanlyClosed) {
    +          // could not cleanly close. We have two cases here -
    +          // a) normal close,
    +          // b) revert
    +          // If (a) then then streams/data is in inconsistent so we cant really recover
    +          // simply release fd and allow exception to bubble up.
    +          // If (b) and file length >= initialPosition, then truncate file and ignore exception
    +          // else,cause exception to bubble up since we cant recover
    +          assert (fos ne null)
    +          try { fos.close() } catch { case ioEx: IOException => /* best case attempt, ignore */ }
    +        }
    +
    +        updateCloseState()
    +
    +        // Since close can end up writing data in general case (inspite of flush),
    +        // we reopen to truncate file.
    +        if (needRevert) {
    +          // remove if not earlier existed : best case effort so we dont care about return value
    +          // of delete (it can fail if file was already deleted by cleaner threads for example)
    +          if (! initiallyExists) {
    +            file.delete()
    +            // Explicitly ignore exceptions (when cleanlyClosed = false) and return
    +            // from here. Usually not good idea in finally, but it is ok here.
    +            return
    +          } else {
    +            val fileLen = file.length()
    +            if (fileLen >= truncatePos) {
    +              if (fileLen > truncatePos) DiskBlockObjectWriter.truncateIfExists(file, truncatePos)
     
    -      channel = null
    -      bs = null
    -      fos = null
    -      ts = null
    -      objOut = null
    -      initialized = false
    +              // Validate length.
    +              assert(truncatePos == file.length() || Utils.inShutdown(),
    +                "truncatePos = " + truncatePos + ", len = " + file.length() +
    +                    ", in shutdown = " + Utils.inShutdown())
    +
    +              // Explicitly ignore exceptions (when cleanlyClosed = false) and return
    +              // from here. Usually not good idea in finally, but it is ok here.
    +              return
    +            } // else cause the exception to bubble up if thrown
    +          }
    +        }
    +      }
    +    } else {
    +      // it is possible for open to have never been called - no data written to this
    +      // partition for example. so objOut == null
    +      this.closed = true
         }
    +    initialized = false
       }
     
    -  override def isOpen: Boolean = objOut != null
    +  private def validateBytesWritten() {
    +    // This should happen due to file deletion, during cleanup. Ensure bytesWritten is in sane 
    +    // state. Note, parallel threads continue to run while shutdown threads are running : so 
    +    // this prevents unwanted assertion failures and exception elsewhere.
    +    if (lastValidPosition < initialPosition) {
    +      // This is invoked so that assertions within bytes written are validated.
    +      assert (bytesWritten >= 0)
    +      lastValidPosition = initialPosition
    +    }
    +  }
     
    -  override def commit(): Long = {
    +  override def commitAndClose() {
         if (initialized) {
    -      // NOTE: Because Kryo doesn't flush the underlying stream we explicitly flush both the
    -      //       serializer stream and the lower level stream.
    -      objOut.flush()
    -      bs.flush()
    +      // opened, file still open
    +      assert (wasOpenedOnce)
    +      // Note, set cleanCloseAttempted even before we finish the close : so that a revert on this
    +      // in case close fails can truncate to previous state !
    +      cleanCloseAttempted = true
    +      closeAll(needFlush = true, needRevert = false)
    +
           val prevPos = lastValidPosition
    -      lastValidPosition = channel.position()
    -      lastValidPosition - prevPos
    +      assert (prevPos == initialPosition)
    +      assert (null == fos)
    +
    +      lastValidPosition = file.length()
    +      validateBytesWritten()
    +      // review: remove ?
    +      assert (bytesWritten >= 0, "bytesWritten = " + bytesWritten +
    +        ", initial pos = " + initialPosition + ", last valid pos = " + lastValidPosition)
    +
    +    } else if (cleanCloseAttempted) {
    +      // opened and closed cleanly
    +      assert (closed)
    +      assert (wasOpenedOnce)
    +      // size should be lastValidPosition, or file deleted due to shutdown.
    +      assert (lastValidPosition == file.length() || Utils.inShutdown,
    +        "lastValidPosition = " + lastValidPosition  +
    +          ", file len = " + file.length() + ", exists = " + file.exists())
    +
         } else {
    -      // lastValidPosition is zero if stream is uninitialized
    -      lastValidPosition
    +      // reverted or never opened.
    +      this.closed = true
    +      assert (initialPosition == file.length() || (0 == initialPosition && ! initiallyExists) ||
    +          Utils.inShutdown, "initialPosition = " + initialPosition +
    +          ", file len = " + file.length() + ", exists = " + file.exists())
    +      assert(lastValidPosition == initialPosition)
         }
       }
     
    -  override def revertPartialWrites() {
    +  override def revertPartialWritesAndClose() {
         if (initialized) {
    -      // Discard current writes. We do this by flushing the outstanding writes and
    -      // truncate the file to the last valid position.
    -      objOut.flush()
    -      bs.flush()
    -      channel.truncate(lastValidPosition)
    +      // opened, file still open
    +      // Discard current writes. We do this by truncating the file to the last valid position.
    +      closeAll(needFlush = true, needRevert = true)
    +      validateBytesWritten()
    +      assert (bytesWritten == 0, "bytesWritten = " + bytesWritten +
    +        ", initial pos = " + initialPosition + ", last valid pos = " + lastValidPosition)
    +      assert (initialPosition == file.length() || Utils.inShutdown,
    +        "initialPosition = " + initialPosition +
    +          ", file len = " + file.length() + ", exists = " + file.exists())
    +    } else if (cleanCloseAttempted) {
    +      // Already opened and closed : truncate to last location (or delete
    +      // if created in this instance)
    +      assert (closed)
    +      cleanCloseAttempted = false
    +
    +      // truncate to initialPosition
    +      // remove if not earlier existed
    +      if (! initiallyExists) {
    --- End diff --
    
    Same as above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1609#discussion_r15565447
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala ---
    @@ -107,68 +109,296 @@ private[spark] class DiskBlockObjectWriter(
       private var fos: FileOutputStream = null
       private var ts: TimeTrackingOutputStream = null
       private var objOut: SerializationStream = null
    +
    +  // Did we create this file or was it already present : used in revert to decide
    +  // if we should delete this file or not. Also used to detect if file was deleted
    +  // between creation of BOW and its actual init
    +  private val initiallyExists = file.exists() && file.isFile
       private val initialPosition = file.length()
       private var lastValidPosition = initialPosition
    +
       private var initialized = false
    +  // closed explicitly ?
    +  private var closed = false
    +  // Attempt to cleanly close ? (could also be closed via revert)
    +  // Note, a cleanly closed file could be subsequently reverted
    +  private var cleanCloseAttempted = false
    +  // Was the file actually opened atleast once.
    +  // Note: initialized/streams change state with close/revert.
    +  private var wasOpenedOnce = false
       private var _timeWriting = 0L
     
    -  override def open(): BlockObjectWriter = {
    -    fos = new FileOutputStream(file, true)
    -    ts = new TimeTrackingOutputStream(fos)
    -    channel = fos.getChannel()
    +  // Due to some directory creation race issues in spark, it has been observed that
    +  // sometimes file creation happens 'before' the actual directory has been created
    +  // So we attempt to retry atleast once with a mkdirs in case directory was missing.
    --- End diff --
    
    As I mentioned above, we did fix a dcl bug, bug that did not seem sufficient.
    I agree this is a rare condition, and the 'fix' is a hack to workaround the problem : but pending identifying root cause, this is the best we have unfortunately.
    Any thoughts ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm closed the pull request at:

    https://github.com/apache/spark/pull/1609


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1609#issuecomment-50460011
  
    QA results for PR 1609:<br>- This patch PASSES unit tests.<br>- This patch merges cleanly<br>- This patch adds no public classes<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17352/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1609#discussion_r15442565
  
    --- Diff: core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala ---
    @@ -0,0 +1,296 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.storage
    +
    +import org.scalatest.FunSuite
    +import java.io.{IOException, FileOutputStream, OutputStream, File}
    +import org.apache.spark.serializer.JavaSerializer
    +import org.apache.spark.SparkConf
    +import org.apache.spark.util.Utils
    +
    +/**
    + * Test various code paths in DiskBlockObjectWriter
    + */
    +class DiskBlockObjectWriterSuite extends FunSuite {
    +
    +  private val conf = new SparkConf
    +  private val BUFFER_SIZE = 32 * 1024
    +
    +  private def tempFile(): File = {
    +    val file = File.createTempFile("temp_", "block")
    +    // We dont want file to exist ! Just need a temp file name
    +    file.delete()
    +    file
    +  }
    +
    +  private def createWriter(file: File = tempFile()) :
    +      (File, DiskBlockObjectWriter) = {
    +    file.deleteOnExit()
    +
    +    (file, new DiskBlockObjectWriter(BlockId("test_1"), file,
    +      new JavaSerializer(conf), BUFFER_SIZE, (out: OutputStream) => out, true))
    +  }
    +
    +
    +  test("write after close should throw IOException") {
    +    val (file, bow) = createWriter()
    +    bow.write("test")
    +    bow.write("test1")
    +    assert (file.exists() && file.isFile)
    +
    +    bow.commitAndClose()
    +
    +    intercept[IOException] {
    +      bow.write("test2")
    +    }
    +
    +    file.delete()
    +  }
    +
    +  test("write after revert should throw IOException") {
    +    val (file, bow) = createWriter()
    +    bow.write("test")
    +    bow.write("test1")
    +    assert (file.exists() && file.isFile)
    +
    +    bow.revertPartialWritesAndClose()
    +
    +    intercept[IOException] {
    +      bow.write("test2")
    +    }
    +
    +    file.delete()
    +  }
    +
    +  test("create even if directory does not exist") {
    +    val dir = File.createTempFile("temp_", "dir")
    +    dir.delete()
    +
    +    val file = new File(dir, "temp.file")
    +    file.deleteOnExit()
    +
    +    val bow = new DiskBlockObjectWriter(BlockId("test_1"), file, new JavaSerializer(conf),
    +      BUFFER_SIZE, (out: OutputStream) => out, true)
    +
    +    bow.write("test")
    +    assert (file.exists() && file.isFile)
    +    bow.commitAndClose()
    +    Utils.deleteRecursively(dir)
    +  }
    +
    +  test("revert of new file should delete it") {
    +    val (file, bow) = createWriter()
    +    bow.write("test")
    +    bow.write("test1")
    +    assert (file.exists() && file.isFile)
    +
    +    bow.revertPartialWritesAndClose()
    +    assert (! file.exists())
    +    // file.delete()
    +  }
    +
    +  test("revert of existing file should revert it to previous state") {
    +    val (file, bow1) = createWriter()
    +
    +    bow1.write("test")
    +    bow1.write("test1")
    +    assert (file.exists() && file.isFile)
    +
    +    bow1.commitAndClose()
    +    val length = file.length()
    +
    +    // reopen same file.
    +    val bow2 = createWriter(file)._2
    +
    +    bow2.write("test3")
    +    bow2.write("test4")
    +
    +    assert (file.exists() && file.isFile)
    +
    +    bow2.revertPartialWritesAndClose()
    +    assert (file.exists())
    +    assert (length == file.length())
    +    file.delete()
    +  }
    +
    +  test("revert of writer after close should delete if it did not exist earlier") {
    +    val (file, bow) = createWriter(tempFile())
    +
    +    bow.write("test")
    +    bow.write("test1")
    +    assert (file.exists() && file.isFile)
    +
    +    bow.commitAndClose()
    +    val length = file.length()
    +
    +    assert (file.exists() && file.isFile)
    +    assert (length > 0)
    +
    +    // Now revert the file, after it has been closed : should delete the file
    +    // since it did not exist earlier.
    +    bow.revertPartialWritesAndClose()
    +    assert (! file.exists())
    +    file.delete()
    +  }
    +
    +  test("revert of writer after close should revert it to previous state") {
    +    val (file, bow1) = createWriter()
    +
    +    bow1.write("test")
    +    bow1.write("test1")
    +    assert (file.exists() && file.isFile)
    +
    +    bow1.commitAndClose()
    +    val length = file.length()
    +
    +    // reopen same file.
    +    val bow2 = createWriter(file)._2
    +
    +    bow2.write("test3")
    +    bow2.write("test4")
    +
    +    bow2.commitAndClose()
    +
    +    assert (file.exists() && file.isFile)
    +    assert (file.length() > length)
    +
    +    // Now revert it : should get reverted back to previous state - after bow1
    +    bow2.revertPartialWritesAndClose()
    +    assert (file.exists())
    +    assert (length == file.length())
    +    file.delete()
    +  }
    +
    +  // val confCopy = conf.clone
    +  // // Ensure we always write data after object ser
    +  // confCopy.set("spark.serializer.objectStreamReset", "1")
    --- End diff --
    
    Make sure to remove commented out debug code like this


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

Posted by witgo <gi...@git.apache.org>.
Github user witgo commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1609#discussion_r15447510
  
    --- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
    @@ -935,15 +941,22 @@ private[spark] object Utils extends Logging {
        * Currently, this detects whether the JVM is shutting down by Runtime#addShutdownHook throwing
        * an IllegalStateException.
        */
    +  @volatile private var shutdownStarted = false
    +  private[spark] def setShutdownStarted() {
    +    shutdownStarted = true
    +  }
       def inShutdown(): Boolean = {
         try {
    +      if (shutdownStarted) return true
    --- End diff --
    
    This is a very nice improvement


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on the pull request:

    https://github.com/apache/spark/pull/1609#issuecomment-50290580
  
    Did you mean @aarondav?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1609#discussion_r15542180
  
    --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala ---
    @@ -353,26 +368,53 @@ class ExternalAppendOnlyMap[K, V, C](
        */
       private class DiskMapIterator(file: File, blockId: BlockId, batchSizes: ArrayBuffer[Long])
         extends Iterator[(K, C)] {
    -    private val fileStream = new FileInputStream(file)
    -    private val bufferedStream = new BufferedInputStream(fileStream, fileBufferSize)
    +
    +    assert (! batchSizes.isEmpty)
    +    assert (! batchSizes.exists(_ <= 0))
    +    private val batchOffsets = batchSizes.scanLeft(0L)(_ + _)
    +    assert (file.length() == batchOffsets(batchOffsets.length - 1))
    +
    +    private var batchIndex = 0
    +    private var fileStream: FileInputStream = null
     
         // An intermediate stream that reads from exactly one batch
         // This guards against pre-fetching and other arbitrary behavior of higher level streams
    -    private var batchStream = nextBatchStream()
    -    private var compressedStream = blockManager.wrapForCompression(blockId, batchStream)
    -    private var deserializeStream = ser.deserializeStream(compressedStream)
    +    private var deserializeStream = nextBatchStream()
         private var nextItem: (K, C) = null
         private var objectsRead = 0
     
         /**
          * Construct a stream that reads only from the next batch.
          */
    -    private def nextBatchStream(): InputStream = {
    -      if (batchSizes.length > 0) {
    -        ByteStreams.limit(bufferedStream, batchSizes.remove(0))
    +    private def nextBatchStream(): DeserializationStream = {
    +      if (batchIndex + 1 < batchOffsets.length) {
    +        assert (file.length() == batchOffsets(batchOffsets.length - 1))
    +        if (null != deserializeStream) {
    +          deserializeStream.close()
    +          fileStream.close()
    +          deserializeStream = null
    +          fileStream = null
    +        }
    +        val start = batchOffsets(batchIndex)
    +        fileStream = new FileInputStream(file)
    +        fileStream.getChannel.position(start)
    +        assert (start == fileStream.getChannel.position())
    +        batchIndex += 1
    +
    +        val end = batchOffsets(batchIndex)
    +
    +        assert (end >= start, "start = " + start + ", end = " + end +
    +          ", batchOffsets = " + batchOffsets.mkString("[", ", ", "]"))
    +
    +        val strm = new BufferedInputStream(ByteStreams.limit(fileStream, end - start))
    +
    +        val compressedStream = blockManager.wrapForCompression(blockId, strm)
    +        val ser = serializer.newInstance()
    +        ser.deserializeStream(compressedStream)
           } else {
             // No more batches left
    -        bufferedStream
    +        cleanup()
    +        null
    --- End diff --
    
    This attempts to handle both cases of -
    1) too much read via ByteStreams.limit (already handled earlier).
    2) too little read - via batchOffsets.
    
    The issue is as follows - it is possible for suffix data to be written into the stream after writing an object.
    This can be part of writing the object itself, as part of flush, or close.
    
    Earlier code assumes that this will not happen - and we need to handle only (a) above.
    An example of when this happens is when we use reset in java serializer after every 10k objects are written.
    
    To illustrate with a degenerate example:
    Consider that java serializer stream reset is done after each object is written.
    So after writing each object, java seriailzer will write a TC_RESET byte into the stream.
    
    The file content now will be :
    <obj1><TC_RESET><obj2><TC_RESET> ... <obj10k><TC_RESET> closed; <obj10k1><TC_RESET> ...
    
    In earlier impl, we will create an input stream with limit set to until closed; (the right behavior) and start reading until 10k objects are read.
    Once 10k objects are read, we reopen the next batch stream - but note that the TC_RESET after 10k'th object has not yet been consumed by java serializer !
    
    So the next batch stream will start with :
    <TC_RESET> <obj10k1><TC_RESET> ...
    that is, it is off by 1 byte already.
    
    This is illustrated by the change to ExternalAppendOnlyMapSuite (it should be possible to run that against master imo)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1609#discussion_r15540734
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleWriter.scala ---
    @@ -116,8 +118,13 @@ class HashShuffleWriter[K, V](
       private def revertWrites(): Unit = {
         if (shuffle != null && shuffle.writers != null) {
           for (writer <- shuffle.writers) {
    -        writer.revertPartialWrites()
    -        writer.close()
    +        try {
    +          writer.revertPartialWritesAndClose()
    +        } catch {
    +          // Ensure that all revert's get done - log exception and continue
    +          case ex: Exception =>
    +            logError("Exception reverting/closing writers", ex)
    +        }
           }
    --- End diff --
    
    revert/close can throw exception - causing other writers to be left hanging.
    Hence, log and continue


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1609#issuecomment-50455818
  
    QA tests have started for PR 1609. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17352/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1609#discussion_r15448803
  
    --- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
    @@ -935,15 +941,22 @@ private[spark] object Utils extends Logging {
        * Currently, this detects whether the JVM is shutting down by Runtime#addShutdownHook throwing
        * an IllegalStateException.
        */
    +  @volatile private var shutdownStarted = false
    +  private[spark] def setShutdownStarted() {
    +    shutdownStarted = true
    +  }
       def inShutdown(): Boolean = {
         try {
    +      if (shutdownStarted) return true
    --- End diff --
    
    Unfortunately, had to be reverted :-(


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1609#discussion_r15553689
  
    --- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
    @@ -947,6 +958,34 @@ private[spark] object Utils extends Logging {
         }
         false
       }
    +  /*
    +  @volatile private var shutdownStarted = false
    +  private[spark] def setShutdownStarted() {
    +    shutdownStarted = true
    +  }
    +
    +  def inShutdown(): Boolean = {
    +    if (shutdownStarted) return true
    +    doShutdownCheck()
    +    shutdownStarted
    +  }
    +
    +  private[spark] def doShutdownCheck() {
    +    var shutdown = false
    +    try {
    +      val hook = new Thread {
    +        override def run() {}
    +      }
    +      Runtime.getRuntime.addShutdownHook(hook)
    +      Runtime.getRuntime.removeShutdownHook(hook)
    +    } catch {
    +      case ise: IllegalStateException =>
    +        shutdown = true
    +    } finally {
    +      shutdownStarted = shutdown
    +    }
    +  }
    +  */
    --- End diff --
    
    IMO we should leave this out of the patch if it's dead code. You can open a JIRA with it instead and attach the code there.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1609#issuecomment-50308229
  
    QA tests have started for PR 1609. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17279/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1609#issuecomment-50306070
  
    QA tests have started for PR 1609. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17276/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on the pull request:

    https://github.com/apache/spark/pull/1609#issuecomment-54393908
  
    I think it got split into four issues, two of which got committed, not sure
    of the other other two .... And the first one was regressed upon in
    1.1.already.
    But this or probably is defunct now .... Will close
    On 04-Sep-2014 5:03 am, "andrewor14" <no...@github.com> wrote:
    
    > @mridulm <https://github.com/mridulm> Are the issues in this PR taken
    > care of by #1722 <https://github.com/apache/spark/pull/1722> and and #1678
    > <https://github.com/apache/spark/pull/1678>? Do we still need this PR?
    >
    > —
    > Reply to this email directly or view it on GitHub
    > <https://github.com/apache/spark/pull/1609#issuecomment-54383344>.
    >


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

Posted by mridulm <gi...@git.apache.org>.
GitHub user mridulm reopened a pull request:

    https://github.com/apache/spark/pull/1609

    [SPARK-2532] WIP Consolidated shuffle fixes

    Status of the PR
    - [X] Cherry pick and merge changes from internal branch to spark master
    - [X] Remove WIP comments and 2G branch references.
    - [X] Tests for BlockObjectWriter
    - [ ] Tests for ExternalAppendOnlyMap
    - [x] Tests for ShuffleBlockManager
    
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mridulm/spark consolidated_shuffle_fixes

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/1609.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1609
    
----
commit f1182f8a3d3328248d471038d6ab0db6e6a1396d
Author: Mridul Muralidharan <mr...@apache.org>
Date:   2014-07-27T15:23:05Z

    Consolidated shuffle fixes

commit 66d6ec3f99882ad6062c5bff36f2edb82b0c24c0
Author: Mridul Muralidharan <mr...@apache.org>
Date:   2014-07-27T15:40:32Z

    Add missing setShutdownStarted hooks

commit 027c7f18c44c57960a2a94eee961f0aa811e7a34
Author: Mridul Muralidharan <mr...@apache.org>
Date:   2014-07-27T16:05:31Z

    stylecheck fixes

commit 195c529c1ae5ffa7e8f9cf6af4df8b9536a39d6a
Author: Mridul Muralidharan <mr...@apache.org>
Date:   2014-07-27T23:46:53Z

    Fix build, add testcases for DiskBlockManagerSuite

commit 6095545bf55a87ef0b28bc11adc63dcc5b661b6c
Author: Mridul Muralidharan <mr...@apache.org>
Date:   2014-07-27T23:50:45Z

    Consolidated fixes

commit 1c1faea69d9709c7e65afc9bdd13a8e0d5488c82
Author: Mridul Muralidharan <mr...@apache.org>
Date:   2014-07-27T23:50:50Z

    Merge branch 'consolidated_shuffle_fixes' of github.com:mridulm/spark into consolidated_shuffle_fixes

commit fbf20f792baf7ab8d6705c7a9525c2db92bb7ae3
Author: Mridul Muralidharan <mr...@apache.org>
Date:   2014-07-28T06:59:45Z

    Disable code to detect programming shutdown via stop's. Make actor/store shutdown in DiskBlockManagerSuite more robust

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

Posted by witgo <gi...@git.apache.org>.
Github user witgo commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1609#discussion_r15447436
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala ---
    @@ -107,68 +109,296 @@ private[spark] class DiskBlockObjectWriter(
       private var fos: FileOutputStream = null
       private var ts: TimeTrackingOutputStream = null
       private var objOut: SerializationStream = null
    +
    +  // Did we create this file or was it already present : used in revert to decide
    +  // if we should delete this file or not. Also used to detect if file was deleted
    +  // between creation of BOW and its actual init
    +  private val initiallyExists = file.exists() && file.isFile
       private val initialPosition = file.length()
       private var lastValidPosition = initialPosition
    +
       private var initialized = false
    +  // closed explicitly ?
    +  private var closed = false
    +  // Attempt to cleanly close ? (could also be closed via revert)
    +  // Note, a cleanly closed file could be subsequently reverted
    +  private var cleanCloseAttempted = false
    +  // Was the file actually opened atleast once.
    +  // Note: initialized/streams change state with close/revert.
    +  private var wasOpenedOnce = false
       private var _timeWriting = 0L
     
    -  override def open(): BlockObjectWriter = {
    -    fos = new FileOutputStream(file, true)
    -    ts = new TimeTrackingOutputStream(fos)
    -    channel = fos.getChannel()
    +  // Due to some directory creation race issues in spark, it has been observed that
    +  // sometimes file creation happens 'before' the actual directory has been created
    +  // So we attempt to retry atleast once with a mkdirs in case directory was missing.
    +  private def init() {
    +    init(canRetry = true)
    +  }
    +
    +  private def init(canRetry: Boolean) {
    +
    +    if (closed) throw new IOException("Already closed")
    +
    +    assert (! initialized)
    +    assert (! wasOpenedOnce)
    +    var exists = false
    +    try {
    +      exists = file.exists()
    +      if (! exists && initiallyExists && 0 != initialPosition && ! Utils.inShutdown) {
    +        // Was deleted by cleanup thread ?
    +        throw new IOException("file " + file + " cleaned up ? exists = " + exists +
    +          ", initiallyExists = " + initiallyExists + ", initialPosition = " + initialPosition)
    +      }
    +      fos = new FileOutputStream(file, true)
    +    } catch {
    +      case fEx: FileNotFoundException =>
    +        // There seems to be some race in directory creation.
    +        // Attempts to fix it dont seem to have worked : working around the problem for now.
    +        logDebug("Unable to open " + file + ", canRetry = " + canRetry + ", exists = " + exists +
    +          ", initialPosition = " + initialPosition + ", in shutdown = " + Utils.inShutdown(), fEx)
    +        if (canRetry && ! Utils.inShutdown()) {
    +          // try creating the parent directory if that is the issue.
    +          // Since there can be race with others, dont bother checking for
    +          // success/failure - the call to init() will resolve if fos can be created.
    +          file.getParentFile.mkdirs()
    +          // Note, if directory did not exist, then file does not either - and so
    +          // initialPosition would be zero in either case.
    +          init(canRetry = false)
    +          return
    +        } else throw fEx
    +    }
    +
    +    try {
    +      // This is to workaround case where creation of object and actual init
    +      // (which can happen much later) happens after a delay and the cleanup thread
    +      // cleaned up the file.
    +      channel = fos.getChannel
    +      val fosPos = channel.position()
    +      if (initialPosition != fosPos) {
    +        throw new IOException("file cleaned up ? " + file.exists() + 
    +          ", initialpos = " + initialPosition +
    +          "current len = " + fosPos + ", in shutdown ? " + Utils.inShutdown)
    +      }
    +
    +      ts = new TimeTrackingOutputStream(fos)
    +      val bos = new BufferedOutputStream(ts, bufferSize)
    +      bs = compressStream(bos)
    +      objOut = serializer.newInstance().serializeStream(bs)
    +      initialized = true
    +      wasOpenedOnce = true;
    +    } finally {
    +      if (! initialized) {
    +        // failed, cleanup state.
    +        val tfos = fos
    +        updateCloseState()
    +        tfos.close()
    +      }
    +    }
    +  }
    +
    +  private def open(): BlockObjectWriter = {
    +    init()
         lastValidPosition = initialPosition
    -    bs = compressStream(new BufferedOutputStream(ts, bufferSize))
    -    objOut = serializer.newInstance().serializeStream(bs)
    -    initialized = true
         this
       }
     
    -  override def close() {
    -    if (initialized) {
    -      if (syncWrites) {
    -        // Force outstanding writes to disk and track how long it takes
    -        objOut.flush()
    +  private def updateCloseState() {
    +
    +    if (ts ne null) _timeWriting += ts.timeWriting
    +
    +    bs = null
    +    channel = null
    +    fos = null
    +    ts = null
    +    objOut = null
    +    initialized = false
    +  }
    +
    +  private def flushAll() {
    +    if (closed) throw new IOException("Already closed")
    +
    +    // NOTE: Because Kryo doesn't flush the underlying stream we explicitly flush both the
    +    //       serializer stream and the lower level stream.
    +    if (objOut ne null) {
    +      objOut.flush()
    +      bs.flush()
    +    }
    +  }
    +
    +  private def closeAll(needFlush: Boolean, needRevert: Boolean) {
    +
    +    if (null != objOut) {
    +      val truncatePos = if (needRevert) initialPosition else -1L
    +      assert (! this.closed)
    +
    +      // In case syncWrites is true or we need to truncate
    +      var cleanlyClosed = false
    +      try {
    +        // Flushing if we need to truncate also. Currently, we reopen to truncate
    +        // so this is not strictly required (since close could write further to streams).
    +        // Keeping it around in case that gets relaxed.
    +        if (needFlush || needRevert) flushAll()
    +
             val start = System.nanoTime()
    -        fos.getFD.sync()
    +        try {
    +          if (syncWrites) {
    +            // Force outstanding writes to disk and track how long it takes
    +            fos.getFD.sync()
    +          }
    +        } catch {
    +          case sfe: SyncFailedException => // ignore
    +        }
    +        // must cause cascading close. Note, repeated close on closed streams should not cause
    +        // issues : except some libraries do not honour it - hence not explicitly closing bs/fos
    +        objOut.close()
    +        // bs.close()
    +        // fos.close()
             _timeWriting += System.nanoTime() - start
    -      }
    -      objOut.close()
     
    -      _timeWriting += ts.timeWriting
    +        // fos MUST have been closed.
    +        assert((channel eq null) || !channel.isOpen)
    +        cleanlyClosed = true
    +
    +      } finally {
    +
    +        this.closed = true
    +        if (! cleanlyClosed) {
    +          // could not cleanly close. We have two cases here -
    +          // a) normal close,
    +          // b) revert
    +          // If (a) then then streams/data is in inconsistent so we cant really recover
    +          // simply release fd and allow exception to bubble up.
    +          // If (b) and file length >= initialPosition, then truncate file and ignore exception
    +          // else,cause exception to bubble up since we cant recover
    +          assert (fos ne null)
    +          try { fos.close() } catch { case ioEx: IOException => /* best case attempt, ignore */ }
    +        }
    +
    +        updateCloseState()
    +
    +        // Since close can end up writing data in general case (inspite of flush),
    +        // we reopen to truncate file.
    +        if (needRevert) {
    +          // remove if not earlier existed : best case effort so we dont care about return value
    +          // of delete (it can fail if file was already deleted by cleaner threads for example)
    +          if (! initiallyExists) {
    --- End diff --
    
    Same as above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1609#discussion_r15449433
  
    --- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
    @@ -947,6 +958,34 @@ private[spark] object Utils extends Logging {
         }
         false
       }
    +  /*
    +  @volatile private var shutdownStarted = false
    +  private[spark] def setShutdownStarted() {
    +    shutdownStarted = true
    +  }
    +
    +  def inShutdown(): Boolean = {
    +    if (shutdownStarted) return true
    +    doShutdownCheck()
    +    shutdownStarted
    +  }
    +
    +  private[spark] def doShutdownCheck() {
    +    var shutdown = false
    +    try {
    +      val hook = new Thread {
    +        override def run() {}
    +      }
    +      Runtime.getRuntime.addShutdownHook(hook)
    +      Runtime.getRuntime.removeShutdownHook(hook)
    +    } catch {
    +      case ise: IllegalStateException =>
    +        shutdown = true
    +    } finally {
    +      shutdownStarted = shutdown
    +    }
    +  }
    +  */
    --- End diff --
    
    This (and related commented inShutdown references) has been left around in case someone else can suggest an improvement !
    The issue is, this works fine in 'normal' use : but while running spark tests in local mode, it fails.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1609#discussion_r15537366
  
    --- Diff: core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala ---
    @@ -40,7 +40,7 @@ private[spark] class JavaSerializationStream(out: OutputStream, counterReset: In
        */
       def writeObject[T: ClassTag](t: T): SerializationStream = {
         objOut.writeObject(t)
    -    if (counterReset > 0 && counter >= counterReset) {
    +    if (counterReset >= 0 && counter >= counterReset) {
    --- End diff --
    
    This was done only to support adding marker after each object has been written.
    Only practical reason to do this is to test that part.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

Posted by witgo <gi...@git.apache.org>.
Github user witgo commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1609#discussion_r15447401
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala ---
    @@ -107,68 +109,296 @@ private[spark] class DiskBlockObjectWriter(
       private var fos: FileOutputStream = null
       private var ts: TimeTrackingOutputStream = null
       private var objOut: SerializationStream = null
    +
    +  // Did we create this file or was it already present : used in revert to decide
    +  // if we should delete this file or not. Also used to detect if file was deleted
    +  // between creation of BOW and its actual init
    +  private val initiallyExists = file.exists() && file.isFile
       private val initialPosition = file.length()
       private var lastValidPosition = initialPosition
    +
       private var initialized = false
    +  // closed explicitly ?
    +  private var closed = false
    +  // Attempt to cleanly close ? (could also be closed via revert)
    +  // Note, a cleanly closed file could be subsequently reverted
    +  private var cleanCloseAttempted = false
    +  // Was the file actually opened atleast once.
    +  // Note: initialized/streams change state with close/revert.
    +  private var wasOpenedOnce = false
       private var _timeWriting = 0L
     
    -  override def open(): BlockObjectWriter = {
    -    fos = new FileOutputStream(file, true)
    -    ts = new TimeTrackingOutputStream(fos)
    -    channel = fos.getChannel()
    +  // Due to some directory creation race issues in spark, it has been observed that
    +  // sometimes file creation happens 'before' the actual directory has been created
    +  // So we attempt to retry atleast once with a mkdirs in case directory was missing.
    +  private def init() {
    +    init(canRetry = true)
    +  }
    +
    +  private def init(canRetry: Boolean) {
    +
    +    if (closed) throw new IOException("Already closed")
    +
    +    assert (! initialized)
    +    assert (! wasOpenedOnce)
    +    var exists = false
    +    try {
    +      exists = file.exists()
    +      if (! exists && initiallyExists && 0 != initialPosition && ! Utils.inShutdown) {
    --- End diff --
    
    Same as above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1609#issuecomment-50274622
  
    QA results for PR 1609:<br>- This patch FAILED unit tests.<br>- This patch merges cleanly<br>- This patch adds no public classes<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17243/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1609#discussion_r15565552
  
    --- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
    @@ -947,6 +958,34 @@ private[spark] object Utils extends Logging {
         }
         false
       }
    +  /*
    +  @volatile private var shutdownStarted = false
    +  private[spark] def setShutdownStarted() {
    +    shutdownStarted = true
    +  }
    +
    +  def inShutdown(): Boolean = {
    +    if (shutdownStarted) return true
    +    doShutdownCheck()
    +    shutdownStarted
    +  }
    +
    +  private[spark] def doShutdownCheck() {
    +    var shutdown = false
    +    try {
    +      val hook = new Thread {
    +        override def run() {}
    +      }
    +      Runtime.getRuntime.addShutdownHook(hook)
    +      Runtime.getRuntime.removeShutdownHook(hook)
    +    } catch {
    +      case ise: IllegalStateException =>
    +        shutdown = true
    +    } finally {
    +      shutdownStarted = shutdown
    +    }
    +  }
    +  */
    --- End diff --
    
    Sure, will move it out when I push next.
    It becomes directly relevant to this patch since there are assertions which check for either file/directory being in expected state or VM is in shutdown (and so cleanup happened/is happening - which caused file deletions).
    For VM shutdown, this is just an optimization - but for shutdown ordered by driver, inShutdown will return false, but same codepath as shutdown is invoked by spark (stop on various subsystems) : resulting in exceptions/assertion failures in threads which are still running.
    
    Unfortunately, this diff interacts badly with local mode - particularly tests, since it keeps reusing the same VM.
    Any ideas on how to 'fix' or resolve this ? Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1609#discussion_r15662654
  
    --- Diff: core/src/main/scala/org/apache/spark/util/Java7Util.scala ---
    @@ -0,0 +1,27 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.util
    +
    +import java.io.File
    +
    +/**
    + * Java 7 (or higher) specific util methods.
    + */
    +object Java7Util {
    +  def isSymlink(file: File) = java.nio.file.Files.isSymbolicLink(file.toPath)
    +}
    --- End diff --
    
    Will this still compile on Java 6? I don't see any changes to pom.xml or anything to exclude it. Or maybe did you mean to call this by reflection?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:

    https://github.com/apache/spark/pull/1609#issuecomment-50943106
  
    Opened https://github.com/apache/spark/pull/1722 to do the second fix (map batch writing) for 1.1, including applying the same fix to ExternalSorter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] WIP Consolidated shuffle fixes

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1609#issuecomment-50306230
  
    QA results for PR 1609:<br>- This patch FAILED unit tests.<br>- This patch merges cleanly<br>- This patch adds no public classes<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17276/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2532] Consolidated shuffle fixes

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on the pull request:

    https://github.com/apache/spark/pull/1609#issuecomment-50517754
  
    I have added some comments to the PR in the hopes that it will aid in the review.
    
    I am sure it is still involved process inspite of this, so please do feel free to raise as many queries as required : sometimes they might trigger unearthing some other issues as part of the discussion.
    I want to ensure that we do not miss on any subtle issue here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---