You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by mateiz <gi...@git.apache.org> on 2014/08/02 00:40:30 UTC

[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...

GitHub user mateiz opened a pull request:

    https://github.com/apache/spark/pull/1722

    SPARK-2792. Fix reading too much or too little data from each stream in ExternalMap / Sorter

    All these changes are from @mridulm's work in #1609, but extracted here to fix this specific issue and make it easier to merge not 1.1. This particular set of changes is to make sure that we read exactly the right range of bytes from each spill file in EAOM: some serializers can write bytes after the last object (e.g. the TC_RESET flag in Java serialization) and that would confuse the previous code into reading it as part of the next batch. There are also improvements to cleanup to make sure files are closed.
    
    In addition to bringing in the changes to ExternalAppendOnlyMap, I also copied them to the corresponding code in ExternalSorter and updated its test suite to test for the same issues.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mateiz/spark spark-2792

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/1722.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1722
    
----
commit 9a78e4b2fdf6ca20667aca478e39ad7fa5a34e11
Author: Matei Zaharia <ma...@databricks.com>
Date:   2014-08-01T22:02:13Z

    Add @mridulm's fixes to ExternalAppendOnlyMap for batch sizes
    
    All these changes are from @mridulm's work in #1609, but extracted here
    to fix this specific issue. This particular set of changes is to make
    sure that we read exactly the right range of bytes from each spill file
    in EAOM: some serializers can write bytes after the last object (e.g.
    the TC_RESET flag in Java serialization) and that would confuse the
    previous code into reading it as part of the next batch. There are also
    improvements to cleanup to make sure files are closed.

commit 0d6dad7dc0f38cc7accb89b75c848a2b31fe254c
Author: Matei Zaharia <ma...@databricks.com>
Date:   2014-08-01T22:12:28Z

    Added Mridul's test changes for ExternalAppendOnlyMap

commit bda37bb431d44c11c097497fd389d6ab2b97c69c
Author: Matei Zaharia <ma...@databricks.com>
Date:   2014-08-01T22:38:01Z

    Implement Mridul's ExternalAppendOnlyMap fixes in ExternalSorter too
    
    Modified ExternalSorterSuite to also set a low object stream reset and
    batch size, and verified that it failed before the changes and succeeded
    after.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on the pull request:

    https://github.com/apache/spark/pull/1722#issuecomment-51047651
  
    LGTM !


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1722#discussion_r15725858
  
    --- Diff: core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala ---
    @@ -30,8 +30,19 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
       private def mergeValue(buffer: ArrayBuffer[Int], i: Int) = buffer += i
       private def mergeCombiners(buf1: ArrayBuffer[Int], buf2: ArrayBuffer[Int]) = buf1 ++= buf2
     
    +  private def createSparkConf(loadDefaults: Boolean): SparkConf = {
    +    val conf = new SparkConf(loadDefaults)
    +    // Make the Java serializer write a reset instruction (TC_RESET) after each object to test
    +    // for a bug we had with bytes written past the last object in a batch (SPARK-2792)
    +    conf.set("spark.serializer.objectStreamReset", "0")
    +    conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
    +    // Ensure that we actually have multiple batches per spill file
    +    conf.set("spark.shuffle.spill.batchSize", "10")
    --- End diff --
    
    Oops, I misread.
    You are right, it looks fine (as long a batch size > 20, it should be exhibited)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1722#discussion_r15725724
  
    --- Diff: core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala ---
    @@ -30,8 +30,19 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
       private def mergeValue(buffer: ArrayBuffer[Int], i: Int) = buffer += i
       private def mergeCombiners(buf1: ArrayBuffer[Int], buf2: ArrayBuffer[Int]) = buf1 ++= buf2
     
    +  private def createSparkConf(loadDefaults: Boolean): SparkConf = {
    +    val conf = new SparkConf(loadDefaults)
    +    // Make the Java serializer write a reset instruction (TC_RESET) after each object to test
    +    // for a bug we had with bytes written past the last object in a batch (SPARK-2792)
    +    conf.set("spark.serializer.objectStreamReset", "0")
    +    conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
    +    // Ensure that we actually have multiple batches per spill file
    +    conf.set("spark.shuffle.spill.batchSize", "10")
    --- End diff --
    
    Actually I should have made it 0 - to ensure reset after each object (with the corresponding java serializer change to enable it) is because we want to ensure that there is a TC_RESET after object write and before close. Else the bug is not exposed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1722#discussion_r15725641
  
    --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala ---
    @@ -389,27 +404,51 @@ class ExternalAppendOnlyMap[K, V, C](
        * An iterator that returns (K, C) pairs in sorted order from an on-disk map
        */
       private class DiskMapIterator(file: File, blockId: BlockId, batchSizes: ArrayBuffer[Long])
    -    extends Iterator[(K, C)] {
    -    private val fileStream = new FileInputStream(file)
    -    private val bufferedStream = new BufferedInputStream(fileStream, fileBufferSize)
    +    extends Iterator[(K, C)]
    +  {
    --- End diff --
    
    Those asserts caught the bugs :-) Bug yeah, some of them might have been expensive.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1722#discussion_r15725700
  
    --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala ---
    @@ -455,7 +495,25 @@ class ExternalAppendOnlyMap[K, V, C](
     
         // TODO: Ensure this gets called even if the iterator isn't drained.
         private def cleanup() {
    -      deserializeStream.close()
    +      batchIndex = batchOffsets.length  // Prevent reading any other batch
    +      val ds = deserializeStream
    +      val fs = fileStream
    +      deserializeStream = null
    +      fileStream = null
    +
    +      if (ds != null) {
    +        try {
    +          ds.close()
    +        } catch {
    +          case e: IOException =>
    +            // Make sure we at least close the file handle
    +            if (fs != null) {
    +              try { fs.close() } catch { case e2: IOException => }
    --- End diff --
    
    This is just paranoid stuff, you can remove the catch IOException part actually
    The reason it exists is cos we have a Ulimit'ed FileInputStream - which enforces a ulimit of 8k on spark (which is the fd limit in our clusters).
    For large reducers, this prevents task from getting killed.
    So this is an attempt to ensure that the stream is truely closed.
    
    I did not realize this had leaked out.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1722#discussion_r15725849
  
    --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala ---
    @@ -455,7 +495,25 @@ class ExternalAppendOnlyMap[K, V, C](
     
         // TODO: Ensure this gets called even if the iterator isn't drained.
         private def cleanup() {
    -      deserializeStream.close()
    +      batchIndex = batchOffsets.length  // Prevent reading any other batch
    +      val ds = deserializeStream
    +      val fs = fileStream
    +      deserializeStream = null
    +      fileStream = null
    +
    +      if (ds != null) {
    +        try {
    +          ds.close()
    +        } catch {
    +          case e: IOException =>
    +            // Make sure we at least close the file handle
    +            if (fs != null) {
    +              try { fs.close() } catch { case e2: IOException => }
    --- End diff --
    
    Ah, I see. Will remove the catch part then.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:

    https://github.com/apache/spark/pull/1722#issuecomment-51008057
  
    Jenkins, test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1722#discussion_r15722888
  
    --- Diff: core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala ---
    @@ -30,8 +30,19 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
       private def mergeValue(buffer: ArrayBuffer[Int], i: Int) = buffer += i
       private def mergeCombiners(buf1: ArrayBuffer[Int], buf2: ArrayBuffer[Int]) = buf1 ++= buf2
     
    +  private def createSparkConf(loadDefaults: Boolean): SparkConf = {
    +    val conf = new SparkConf(loadDefaults)
    +    // Make the Java serializer write a reset instruction (TC_RESET) after each object to test
    +    // for a bug we had with bytes written past the last object in a batch (SPARK-2792)
    +    conf.set("spark.serializer.objectStreamReset", "0")
    +    conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
    +    // Ensure that we actually have multiple batches per spill file
    +    conf.set("spark.shuffle.spill.batchSize", "10")
    --- End diff --
    
    Another delta is that I made this 10 instead of 1, to check that things work with multiple objects per batch too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:

    https://github.com/apache/spark/pull/1722#issuecomment-51013476
  
    Jenkins, test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1722#issuecomment-50951036
  
    QA tests have started for PR 1722. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17734/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1722#discussion_r15725417
  
    --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala ---
    @@ -215,16 +218,28 @@ class ExternalAppendOnlyMap[K, V, C](
     
             if (objectsWritten == serializerBatchSize) {
               flush()
    -          writer.close()
               writer = blockManager.getDiskWriter(blockId, file, serializer, fileBufferSize)
             }
           }
           if (objectsWritten > 0) {
             flush()
    +      } else if (writer != null) {
    --- End diff --
    
    We don't appear to call writer.close() if objectsWritten == 0, is that the case?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1722#discussion_r15725705
  
    --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala ---
    @@ -215,16 +218,28 @@ class ExternalAppendOnlyMap[K, V, C](
     
             if (objectsWritten == serializerBatchSize) {
               flush()
    -          writer.close()
               writer = blockManager.getDiskWriter(blockId, file, serializer, fileBufferSize)
             }
           }
           if (objectsWritten > 0) {
             flush()
    +      } else if (writer != null) {
    --- End diff --
    
    Ah, right, so it would be.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:

    https://github.com/apache/spark/pull/1722#issuecomment-50983403
  
    @aarondav / @mridulm any other comments on this, or is it okay to merge?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on the pull request:

    https://github.com/apache/spark/pull/1722#issuecomment-50996430
  
    +0, I have not actually reviewed this, I only did a cursory pass-through. When it LGTM to @mridulm, we can merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:

    https://github.com/apache/spark/pull/1722#issuecomment-50998020
  
    Jenkins, test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/1722


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1722#issuecomment-51015624
  
    QA results for PR 1722:<br>- This patch PASSES unit tests.<br>- This patch merges cleanly<br>- This patch adds no public classes<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17837/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:

    https://github.com/apache/spark/pull/1722#issuecomment-50943073
  
    Jenkins, test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1722#issuecomment-51009557
  
    QA results for PR 1722:<br>- This patch FAILED unit tests.<br>- This patch merges cleanly<br>- This patch adds no public classes<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17835/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on the pull request:

    https://github.com/apache/spark/pull/1722#issuecomment-51003282
  
    LGTM !
    Though I would prefer if @aarondav also took a look at it - since this is based on my earlier work, I might be too close to it to see potential issues ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1722#discussion_r15728047
  
    --- Diff: core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala ---
    @@ -30,8 +30,19 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
       private def mergeValue(buffer: ArrayBuffer[Int], i: Int) = buffer += i
       private def mergeCombiners(buf1: ArrayBuffer[Int], buf2: ArrayBuffer[Int]) = buf1 ++= buf2
     
    +  private def createSparkConf(loadDefaults: Boolean): SparkConf = {
    +    val conf = new SparkConf(loadDefaults)
    +    // Make the Java serializer write a reset instruction (TC_RESET) after each object to test
    +    // for a bug we had with bytes written past the last object in a batch (SPARK-2792)
    +    conf.set("spark.serializer.objectStreamReset", "0")
    +    conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
    +    // Ensure that we actually have multiple batches per spill file
    +    conf.set("spark.shuffle.spill.batchSize", "10")
    --- End diff --
    
    BTW, was the java serializer fix also ported across ? I dont think @aarondav did that ...
    Else the object reset = 0 will cause serializer to ignore it (in master it used to check for value > 0)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1722#discussion_r15725631
  
    --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala ---
    @@ -215,16 +218,28 @@ class ExternalAppendOnlyMap[K, V, C](
     
             if (objectsWritten == serializerBatchSize) {
               flush()
    -          writer.close()
               writer = blockManager.getDiskWriter(blockId, file, serializer, fileBufferSize)
             }
           }
           if (objectsWritten > 0) {
             flush()
    +      } else if (writer != null) {
    --- End diff --
    
    when objectsWritten == 0, writer != null will hold.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on the pull request:

    https://github.com/apache/spark/pull/1722#issuecomment-50992153
  
    LGTM, thanks Matei !
    On 03-Aug-2014 12:13 pm, "Matei Zaharia" <no...@github.com> wrote:
    
    > @aarondav <https://github.com/aarondav> / @mridulm
    > <https://github.com/mridulm> any other comments on this, or is it okay to
    > merge?
    >
    > —
    > Reply to this email directly or view it on GitHub
    > <https://github.com/apache/spark/pull/1722#issuecomment-50983403>.
    >


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1722#issuecomment-51013672
  
    QA tests have started for PR 1722. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17837/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1722#discussion_r15722869
  
    --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala ---
    @@ -389,27 +404,51 @@ class ExternalAppendOnlyMap[K, V, C](
        * An iterator that returns (K, C) pairs in sorted order from an on-disk map
        */
       private class DiskMapIterator(file: File, blockId: BlockId, batchSizes: ArrayBuffer[Long])
    -    extends Iterator[(K, C)] {
    -    private val fileStream = new FileInputStream(file)
    -    private val bufferedStream = new BufferedInputStream(fileStream, fileBufferSize)
    +    extends Iterator[(K, C)]
    +  {
    +    private val batchOffsets = batchSizes.scanLeft(0L)(_ + _)  // Size will be batchSize.length + 1
    +    assert(file.length() == batchOffsets(batchOffsets.length - 1))
    +
    +    private var batchIndex = 0  // Which batch we're in
    +    private var fileStream: FileInputStream = null
     
         // An intermediate stream that reads from exactly one batch
         // This guards against pre-fetching and other arbitrary behavior of higher level streams
    -    private var batchStream = nextBatchStream()
    -    private var compressedStream = blockManager.wrapForCompression(blockId, batchStream)
    -    private var deserializeStream = ser.deserializeStream(compressedStream)
    +    private var deserializeStream = nextBatchStream()
         private var nextItem: (K, C) = null
         private var objectsRead = 0
     
         /**
          * Construct a stream that reads only from the next batch.
          */
    -    private def nextBatchStream(): InputStream = {
    -      if (batchSizes.length > 0) {
    -        ByteStreams.limit(bufferedStream, batchSizes.remove(0))
    +    private def nextBatchStream(): DeserializationStream = {
    +      // Note that batchOffsets.length = numBatches + 1 since we did a scan above; check whether
    +      // we're still in a valid batch.
    +      if (batchIndex < batchOffsets.length - 1) {
    +        if (deserializeStream != null) {
    +          deserializeStream.close()
    +          fileStream.close()
    +          deserializeStream = null
    +          fileStream = null
    +        }
    +
    +        val start = batchOffsets(batchIndex)
    +        fileStream = new FileInputStream(file)
    +        fileStream.getChannel.position(start)
    +        batchIndex += 1
    +
    +        val end = batchOffsets(batchIndex)
    +
    +        assert(end >= start, "start = " + start + ", end = " + end +
    +          ", batchOffsets = " + batchOffsets.mkString("[", ", ", "]"))
    +
    +        val bufferedStream = new BufferedInputStream(ByteStreams.limit(fileStream, end - start))
    +        val compressedStream = blockManager.wrapForCompression(blockId, bufferedStream)
    +        ser.deserializeStream(compressedStream)
    --- End diff --
    
    One delta w.r.t. your patch, @mridulm: you used to do `ser = serializer.newInstance` before this, but this should not be necessary; our serializers support reading even multiple streams concurrently (though confusingly not writing them as far as I see; they can share an output buffer there). I removed that because creating a new instance is actually kind of expensive for Kryo.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1722#issuecomment-51008114
  
    QA tests have started for PR 1722. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17835/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1722#discussion_r15725785
  
    --- Diff: core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala ---
    @@ -30,8 +30,19 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
       private def mergeValue(buffer: ArrayBuffer[Int], i: Int) = buffer += i
       private def mergeCombiners(buf1: ArrayBuffer[Int], buf2: ArrayBuffer[Int]) = buf1 ++= buf2
     
    +  private def createSparkConf(loadDefaults: Boolean): SparkConf = {
    +    val conf = new SparkConf(loadDefaults)
    +    // Make the Java serializer write a reset instruction (TC_RESET) after each object to test
    +    // for a bug we had with bytes written past the last object in a batch (SPARK-2792)
    +    conf.set("spark.serializer.objectStreamReset", "0")
    +    conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
    +    // Ensure that we actually have multiple batches per spill file
    +    conf.set("spark.shuffle.spill.batchSize", "10")
    --- End diff --
    
    Are you sure about that? The spark.serializer.objectStreamReset above is set to 0, and that's what causes a TC_RESET after each object written, but the batchSize is about how many objects you write before closing a batch. I definitely saw crashes when I set this to 10 and did not have the fixes in ExternalSorter. There were more when it was 1, so I can also do that. I don't think 0 will work with the code we have written.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:

    https://github.com/apache/spark/pull/1722#issuecomment-50997944
  
    Ah good point.. I've now pushed the JavaSerializer change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:

    https://github.com/apache/spark/pull/1722#issuecomment-51007189
  
    I just fixed objectStreamReset slightly so that 1 means "reset after every object" (that's what it was intended to be originally)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1722#discussion_r15722998
  
    --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala ---
    @@ -389,27 +404,51 @@ class ExternalAppendOnlyMap[K, V, C](
        * An iterator that returns (K, C) pairs in sorted order from an on-disk map
        */
       private class DiskMapIterator(file: File, blockId: BlockId, batchSizes: ArrayBuffer[Long])
    -    extends Iterator[(K, C)] {
    -    private val fileStream = new FileInputStream(file)
    -    private val bufferedStream = new BufferedInputStream(fileStream, fileBufferSize)
    +    extends Iterator[(K, C)]
    +  {
    --- End diff --
    
    Here I also removed some of the more paranoid asserts about batchSizes


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1722#issuecomment-50998145
  
    QA tests have started for PR 1722. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17817/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1722#issuecomment-50952340
  
    QA results for PR 1722:<br>- This patch PASSES unit tests.<br>- This patch merges cleanly<br>- This patch adds no public classes<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17734/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1722#discussion_r15725667
  
    --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala ---
    @@ -389,27 +404,51 @@ class ExternalAppendOnlyMap[K, V, C](
        * An iterator that returns (K, C) pairs in sorted order from an on-disk map
        */
       private class DiskMapIterator(file: File, blockId: BlockId, batchSizes: ArrayBuffer[Long])
    -    extends Iterator[(K, C)] {
    -    private val fileStream = new FileInputStream(file)
    -    private val bufferedStream = new BufferedInputStream(fileStream, fileBufferSize)
    +    extends Iterator[(K, C)]
    +  {
    +    private val batchOffsets = batchSizes.scanLeft(0L)(_ + _)  // Size will be batchSize.length + 1
    +    assert(file.length() == batchOffsets(batchOffsets.length - 1))
    +
    +    private var batchIndex = 0  // Which batch we're in
    +    private var fileStream: FileInputStream = null
     
         // An intermediate stream that reads from exactly one batch
         // This guards against pre-fetching and other arbitrary behavior of higher level streams
    -    private var batchStream = nextBatchStream()
    -    private var compressedStream = blockManager.wrapForCompression(blockId, batchStream)
    -    private var deserializeStream = ser.deserializeStream(compressedStream)
    +    private var deserializeStream = nextBatchStream()
         private var nextItem: (K, C) = null
         private var objectsRead = 0
     
         /**
          * Construct a stream that reads only from the next batch.
          */
    -    private def nextBatchStream(): InputStream = {
    -      if (batchSizes.length > 0) {
    -        ByteStreams.limit(bufferedStream, batchSizes.remove(0))
    +    private def nextBatchStream(): DeserializationStream = {
    +      // Note that batchOffsets.length = numBatches + 1 since we did a scan above; check whether
    +      // we're still in a valid batch.
    +      if (batchIndex < batchOffsets.length - 1) {
    +        if (deserializeStream != null) {
    +          deserializeStream.close()
    +          fileStream.close()
    +          deserializeStream = null
    +          fileStream = null
    +        }
    +
    +        val start = batchOffsets(batchIndex)
    +        fileStream = new FileInputStream(file)
    +        fileStream.getChannel.position(start)
    +        batchIndex += 1
    +
    +        val end = batchOffsets(batchIndex)
    +
    +        assert(end >= start, "start = " + start + ", end = " + end +
    +          ", batchOffsets = " + batchOffsets.mkString("[", ", ", "]"))
    +
    +        val bufferedStream = new BufferedInputStream(ByteStreams.limit(fileStream, end - start))
    +        val compressedStream = blockManager.wrapForCompression(blockId, bufferedStream)
    +        ser.deserializeStream(compressedStream)
    --- End diff --
    
    So that is something I was not sure of : particularly with kryo (not java).
    We were seeing the input buffer getting stepped on from various threads - this was specifically in context of 2G fixes though, where we had to modify the way the buffer was created anyway. I dont know if the initialization changes something else.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1722#discussion_r15750212
  
    --- Diff: core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala ---
    @@ -35,16 +35,15 @@ private[spark] class JavaSerializationStream(out: OutputStream, counterReset: In
       /**
        * Calling reset to avoid memory leak:
        * http://stackoverflow.com/questions/1281549/memory-leak-traps-in-the-java-standard-api
    -   * But only call it every 10,000th time to avoid bloated serialization streams (when
    +   * But only call it every 100th time to avoid bloated serialization streams (when
        * the stream 'resets' object class descriptions have to be re-written)
        */
       def writeObject[T: ClassTag](t: T): SerializationStream = {
         objOut.writeObject(t)
    +    counter += 1
         if (counterReset > 0 && counter >= counterReset) {
    --- End diff --
    
    This is the right behavior, but is a slight change ... I dont think anyone is expecting the earlier  behavior though !


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on the pull request:

    https://github.com/apache/spark/pull/1722#issuecomment-50992283
  
    Oh wait, is the java serialier change also ported ?
    Else the tests won't do what we want it to do.
    On 03-Aug-2014 8:11 pm, "Mridul Muralidharan" <mr...@gmail.com> wrote:
    
    > LGTM, thanks Matei !
    > On 03-Aug-2014 12:13 pm, "Matei Zaharia" <no...@github.com> wrote:
    >
    >> @aarondav <https://github.com/aarondav> / @mridulm
    >> <https://github.com/mridulm> any other comments on this, or is it okay
    >> to merge?
    >>
    >> —
    >> Reply to this email directly or view it on GitHub
    >> <https://github.com/apache/spark/pull/1722#issuecomment-50983403>.
    >>
    >


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1722#issuecomment-51000046
  
    QA results for PR 1722:<br>- This patch PASSES unit tests.<br>- This patch merges cleanly<br>- This patch adds no public classes<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17817/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:

    https://github.com/apache/spark/pull/1722#issuecomment-51109462
  
    Alright, I've merged this in. Thanks for looking over it!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2792. Fix reading too much or too little...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1722#issuecomment-50942970
  
    QA tests have started for PR 1722. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17708/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---