You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by manbuyun <gi...@git.apache.org> on 2018/04/27 03:20:19 UTC

[GitHub] spark pull request #21175: [SPARK-24107] ChunkedByteBuffer.writeFully method...

GitHub user manbuyun opened a pull request:

    https://github.com/apache/spark/pull/21175

    [SPARK-24107] ChunkedByteBuffer.writeFully method has not reset the limit value

    JIRA Issue: https://issues.apache.org/jira/browse/SPARK-24107?jql=text%20~%20%22ChunkedByteBuffer%22
    
    ChunkedByteBuffer.writeFully method has not reset the limit value. When 
    chunks larger than bufferWriteChunkSize, such as 80*1024*1024 larger than
    config.BUFFER_WRITE_CHUNK_SIZE(64 * 1024 * 1024),only while once, will lost 16*1024*1024 byte

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/manbuyun/spark bugfix-ChunkedByteBuffer

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/21175.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #21175
    
----
commit fae181433ca1eda6be0ad450223d73c6eb5f3f35
Author: WangJinhai02 <ji...@...>
Date:   2018-04-26T14:43:44Z

    restore bytes limit value

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21175: [SPARK-24107][CORE] ChunkedByteBuffer.writeFully ...

Posted by Ngone51 <gi...@git.apache.org>.
Github user Ngone51 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21175#discussion_r184590989
  
    --- Diff: core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala ---
    @@ -56,6 +56,12 @@ class ChunkedByteBufferSuite extends SparkFunSuite {
         assert(chunkedByteBuffer.getChunks().head.position() === 0)
       }
     
    +  test("writeFully() does not affect original buffer's position") {
    --- End diff --
    
    Hi @manbuyun .You should add a new unit test to support your own change. For example, "writeFully() can write buffer which is larger than `bufferWriteChunkSize` correctly. " And update the test code.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21175: [SPARK-24107][CORE] ChunkedByteBuffer.writeFully ...

Posted by manbuyun <gi...@git.apache.org>.
Github user manbuyun commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21175#discussion_r185415529
  
    --- Diff: core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala ---
    @@ -20,12 +20,12 @@ package org.apache.spark.io
     import java.nio.ByteBuffer
     
     import com.google.common.io.ByteStreams
    -
    -import org.apache.spark.SparkFunSuite
    +import org.apache.spark.{SparkFunSuite, SharedSparkContext}
    --- End diff --
    
    I have fixed and commit. Thanks


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21175: [SPARK-24107][CORE] ChunkedByteBuffer.writeFully ...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21175#discussion_r184675304
  
    --- Diff: core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala ---
    @@ -56,6 +56,15 @@ class ChunkedByteBufferSuite extends SparkFunSuite {
         assert(chunkedByteBuffer.getChunks().head.position() === 0)
       }
     
    +  test("SPARK-24107: writeFully() write buffer which is larger than bufferWriteChunkSize") {
    +    val bufferWriteChunkSize = Option(SparkEnv.get).map(_.conf.get(config.BUFFER_WRITE_CHUNK_SIZE))
    +            .getOrElse(config.BUFFER_WRITE_CHUNK_SIZE.defaultValue.get).toInt
    --- End diff --
    
    How about setting this value via `spark.buffer.write.chunkSize`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21175: [SPARK-24107] ChunkedByteBuffer.writeFully method has no...

Posted by kiszk <gi...@git.apache.org>.
Github user kiszk commented on the issue:

    https://github.com/apache/spark/pull/21175
  
    Would it be possible to add a unit test?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21175: [SPARK-24107][CORE] ChunkedByteBuffer.writeFully ...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21175#discussion_r184644678
  
    --- Diff: core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala ---
    @@ -56,6 +56,13 @@ class ChunkedByteBufferSuite extends SparkFunSuite {
         assert(chunkedByteBuffer.getChunks().head.position() === 0)
       }
     
    +  test("SPARK-24107: writeFully() write buffer which is larger than bufferWriteChunkSize") {
    +    val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(80 * 1024 * 1024)))
    --- End diff --
    
    Can you configure `bufferWriteChunkSize` explicitly?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21175: [SPARK-24107][CORE] ChunkedByteBuffer.writeFully ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21175#discussion_r188160813
  
    --- Diff: core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala ---
    @@ -63,10 +63,15 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
        */
       def writeFully(channel: WritableByteChannel): Unit = {
         for (bytes <- getChunks()) {
    -      while (bytes.remaining() > 0) {
    -        val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize)
    -        bytes.limit(bytes.position() + ioSize)
    -        channel.write(bytes)
    +      val curChunkLimit = bytes.limit()
    +      while (bytes.hasRemaining) {
    +        try {
    +          val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize)
    +          bytes.limit(bytes.position() + ioSize)
    +          channel.write(bytes)
    +        } finally {
    --- End diff --
    
    I think the problem is, `bytes.limit(bytes.position() + ioSize)` will change the result of `bytes.hasRemaining`, so we have to restore the limit in each loop.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21175: [SPARK-24107][CORE] ChunkedByteBuffer.writeFully ...

Posted by manbuyun <gi...@git.apache.org>.
Github user manbuyun commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21175#discussion_r184706100
  
    --- Diff: core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala ---
    @@ -56,6 +56,15 @@ class ChunkedByteBufferSuite extends SparkFunSuite {
         assert(chunkedByteBuffer.getChunks().head.position() === 0)
       }
     
    +  test("SPARK-24107: writeFully() write buffer which is larger than bufferWriteChunkSize") {
    +    val bufferWriteChunkSize = Option(SparkEnv.get).map(_.conf.get(config.BUFFER_WRITE_CHUNK_SIZE))
    +            .getOrElse(config.BUFFER_WRITE_CHUNK_SIZE.defaultValue.get).toInt
    --- End diff --
    
    Ok. I have added. Please check


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21175: [SPARK-24107][CORE] ChunkedByteBuffer.writeFully method ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21175
  
    **[Test build #89928 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89928/testReport)** for PR 21175 at commit [`fb527c8`](https://github.com/apache/spark/commit/fb527c87a1f4ddb05eb601038736aeb4ec3f7223).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21175: [SPARK-24107][CORE] ChunkedByteBuffer.writeFully ...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21175#discussion_r184644454
  
    --- Diff: core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala ---
    @@ -63,10 +63,12 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
        */
       def writeFully(channel: WritableByteChannel): Unit = {
         for (bytes <- getChunks()) {
    +      val limit = bytes.limit()
           while (bytes.remaining() > 0) {
    --- End diff --
    
    This is not related to this pr though, `while (bytes.hasRemaining) {`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21175: [SPARK-24107][CORE] ChunkedByteBuffer.writeFully ...

Posted by kiszk <gi...@git.apache.org>.
Github user kiszk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21175#discussion_r184602233
  
    --- Diff: core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala ---
    @@ -56,6 +56,12 @@ class ChunkedByteBufferSuite extends SparkFunSuite {
         assert(chunkedByteBuffer.getChunks().head.position() === 0)
       }
     
    +  test("writeFully() can write buffer which is larger than bufferWriteChunkSize correctly") {
    --- End diff --
    
    nit: Would it be possible to add `SPARK-24107: ` into the start of the string? It would help us connect a UT with JIRA entry.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21175: [SPARK-24107][CORE] ChunkedByteBuffer.writeFully ...

Posted by Ngone51 <gi...@git.apache.org>.
Github user Ngone51 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21175#discussion_r184607965
  
    --- Diff: core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala ---
    @@ -56,6 +56,12 @@ class ChunkedByteBufferSuite extends SparkFunSuite {
         assert(chunkedByteBuffer.getChunks().head.position() === 0)
       }
     
    +  test("SPARK-24107: writeFully() write buffer which is larger than bufferWriteChunkSize") {
    +    val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(80 * 1024 * 1024)))
    +    chunkedByteBuffer.writeFully(new ByteArrayWritableChannel(chunkedByteBuffer.size.toInt))
    +    assert(chunkedByteBuffer.size === (80L * 1024L * 1024L))
    --- End diff --
    
    `ByteArrayWritableChannel `'s size, not `chunkedByteBuffer`'s size.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21175: [SPARK-24107][CORE] ChunkedByteBuffer.writeFully method ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21175
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21175: [SPARK-24107][CORE] ChunkedByteBuffer.writeFully ...

Posted by manbuyun <gi...@git.apache.org>.
Github user manbuyun commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21175#discussion_r184603606
  
    --- Diff: core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala ---
    @@ -56,6 +56,12 @@ class ChunkedByteBufferSuite extends SparkFunSuite {
         assert(chunkedByteBuffer.getChunks().head.position() === 0)
       }
     
    +  test("writeFully() can write buffer which is larger than bufferWriteChunkSize correctly") {
    +    val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(80*1024*1024)))
    --- End diff --
    
    Done. Thanks


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21175: [SPARK-24107][CORE] ChunkedByteBuffer.writeFully ...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21175#discussion_r188154716
  
    --- Diff: core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala ---
    @@ -63,10 +63,15 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
        */
       def writeFully(channel: WritableByteChannel): Unit = {
         for (bytes <- getChunks()) {
    -      while (bytes.remaining() > 0) {
    -        val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize)
    -        bytes.limit(bytes.position() + ioSize)
    -        channel.write(bytes)
    +      val curChunkLimit = bytes.limit()
    +      while (bytes.hasRemaining) {
    +        try {
    +          val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize)
    +          bytes.limit(bytes.position() + ioSize)
    +          channel.write(bytes)
    +        } finally {
    --- End diff --
    
    I don't think we need the `try` and `finally` here because `getChunks()` returns duplicated ByteBuffers which have their own position and limit.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21175: [SPARK-24107][CORE] ChunkedByteBuffer.writeFully method ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/21175
  
    ok to test


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21175: [SPARK-24107][CORE] ChunkedByteBuffer.writeFully ...

Posted by manbuyun <gi...@git.apache.org>.
Github user manbuyun commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21175#discussion_r184594822
  
    --- Diff: core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala ---
    @@ -56,6 +56,12 @@ class ChunkedByteBufferSuite extends SparkFunSuite {
         assert(chunkedByteBuffer.getChunks().head.position() === 0)
       }
     
    +  test("writeFully() does not affect original buffer's position") {
    --- End diff --
    
    Done. Thanks


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21175: [SPARK-24107][CORE] ChunkedByteBuffer.writeFully ...

Posted by manbuyun <gi...@git.apache.org>.
Github user manbuyun commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21175#discussion_r184727560
  
    --- Diff: core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala ---
    @@ -63,10 +63,12 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
        */
       def writeFully(channel: WritableByteChannel): Unit = {
         for (bytes <- getChunks()) {
    -      while (bytes.remaining() > 0) {
    +      val curChunkLimit = bytes.limit()
    +      while (bytes.hasRemaining) {
             val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize)
             bytes.limit(bytes.position() + ioSize)
             channel.write(bytes)
    +        bytes.limit(curChunkLimit)
    --- End diff --
    
    Right. When channel write throw IOException


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21175: [SPARK-24107][CORE] ChunkedByteBuffer.writeFully method ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21175
  
    **[Test build #90035 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90035/testReport)** for PR 21175 at commit [`e78ef39`](https://github.com/apache/spark/commit/e78ef396a571b26870bcc9326524fb8881e293dd).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21175: [SPARK-24107] ChunkedByteBuffer.writeFully method has no...

Posted by manbuyun <gi...@git.apache.org>.
Github user manbuyun commented on the issue:

    https://github.com/apache/spark/pull/21175
  
      test("writeFully() does not affect original buffer's position") {
        val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(80*1024*1024)))
        chunkedByteBuffer.writeFully(new ByteArrayWritableChannel(chunkedByteBuffer.size.toInt))
        assert(chunkedByteBuffer.getChunks().head.position() === 0)
      }


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21175: [SPARK-24107][CORE] ChunkedByteBuffer.writeFully method ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21175
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/90035/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21175: [SPARK-24107][CORE] ChunkedByteBuffer.writeFully ...

Posted by jiangxb1987 <gi...@git.apache.org>.
Github user jiangxb1987 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21175#discussion_r184713695
  
    --- Diff: core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala ---
    @@ -63,10 +63,12 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
        */
       def writeFully(channel: WritableByteChannel): Unit = {
         for (bytes <- getChunks()) {
    -      while (bytes.remaining() > 0) {
    +      val curChunkLimit = bytes.limit()
    +      while (bytes.hasRemaining) {
             val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize)
             bytes.limit(bytes.position() + ioSize)
             channel.write(bytes)
    +        bytes.limit(curChunkLimit)
    --- End diff --
    
    I would rewrite this using:
    ```
    try {
    	val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize)
    	bytes.limit(bytes.position() + ioSize)
    	channel.write(bytes)
    } finally {
    	bytes.limit(curChunkLimit)
    }
    ```
    to be safe.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21175: [SPARK-24107][CORE] ChunkedByteBuffer.writeFully ...

Posted by Ngone51 <gi...@git.apache.org>.
Github user Ngone51 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21175#discussion_r184597197
  
    --- Diff: core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala ---
    @@ -56,6 +56,12 @@ class ChunkedByteBufferSuite extends SparkFunSuite {
         assert(chunkedByteBuffer.getChunks().head.position() === 0)
       }
     
    +  test("writeFully() can write buffer which is larger than bufferWriteChunkSize correctly") {
    +    val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(80*1024*1024)))
    --- End diff --
    
    nit: space beside `*`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21175: [SPARK-24107][CORE] ChunkedByteBuffer.writeFully ...

Posted by manbuyun <gi...@git.apache.org>.
Github user manbuyun commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21175#discussion_r184603588
  
    --- Diff: core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala ---
    @@ -56,6 +56,12 @@ class ChunkedByteBufferSuite extends SparkFunSuite {
         assert(chunkedByteBuffer.getChunks().head.position() === 0)
       }
     
    +  test("writeFully() can write buffer which is larger than bufferWriteChunkSize correctly") {
    --- End diff --
    
    Done. Thanks


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21175: [SPARK-24107][CORE] ChunkedByteBuffer.writeFully ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/21175


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21175: [SPARK-24107][CORE] ChunkedByteBuffer.writeFully method ...

Posted by Ngone51 <gi...@git.apache.org>.
Github user Ngone51 commented on the issue:

    https://github.com/apache/spark/pull/21175
  
    cc @kiszk @maropu @cloud-fan @jiangxb1987 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21175: [SPARK-24107][CORE] ChunkedByteBuffer.writeFully ...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21175#discussion_r184882338
  
    --- Diff: core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala ---
    @@ -20,12 +20,12 @@ package org.apache.spark.io
     import java.nio.ByteBuffer
     
     import com.google.common.io.ByteStreams
    -
    -import org.apache.spark.SparkFunSuite
    +import org.apache.spark.{SparkFunSuite, SharedSparkContext}
    --- End diff --
    
    move SharedSparkContext before SparkFunSuite


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21175: [SPARK-24107][CORE] ChunkedByteBuffer.writeFully ...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21175#discussion_r188154900
  
    --- Diff: core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala ---
    @@ -63,10 +63,15 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
        */
       def writeFully(channel: WritableByteChannel): Unit = {
         for (bytes <- getChunks()) {
    -      while (bytes.remaining() > 0) {
    -        val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize)
    -        bytes.limit(bytes.position() + ioSize)
    -        channel.write(bytes)
    +      val curChunkLimit = bytes.limit()
    +      while (bytes.hasRemaining) {
    +        try {
    +          val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize)
    +          bytes.limit(bytes.position() + ioSize)
    --- End diff --
    
    The rationale for the `limit()` isn't super-clear, but that was a problem in the original PR which introduced the bug (#18730). I'm commenting here only for cross-reference reference for folks who come across this patch in the future. I believe that the original motivation was http://www.evanjones.ca/java-bytebuffer-leak.html


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21175: [SPARK-24107][CORE] ChunkedByteBuffer.writeFully method ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21175
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89928/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21175: [SPARK-24107] ChunkedByteBuffer.writeFully method has no...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21175
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21175: [SPARK-24107][CORE] ChunkedByteBuffer.writeFully ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21175#discussion_r188160044
  
    --- Diff: core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala ---
    @@ -63,10 +63,15 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
        */
       def writeFully(channel: WritableByteChannel): Unit = {
         for (bytes <- getChunks()) {
    -      while (bytes.remaining() > 0) {
    -        val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize)
    -        bytes.limit(bytes.position() + ioSize)
    -        channel.write(bytes)
    +      val curChunkLimit = bytes.limit()
    +      while (bytes.hasRemaining) {
    +        try {
    +          val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize)
    +          bytes.limit(bytes.position() + ioSize)
    +          channel.write(bytes)
    +        } finally {
    --- End diff --
    
    Do you mean this is not a real bug that can cause real workload to fail?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21175: [SPARK-24107][CORE] ChunkedByteBuffer.writeFully ...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21175#discussion_r184882396
  
    --- Diff: core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala ---
    @@ -20,12 +20,12 @@ package org.apache.spark.io
     import java.nio.ByteBuffer
     
     import com.google.common.io.ByteStreams
    --- End diff --
    
    add an empty line behind 22 to separate spark and third-party group.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21175: [SPARK-24107][CORE] ChunkedByteBuffer.writeFully method ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/21175
  
    the R test is a known issue, I'm merging in to master and 2.3, thanks!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21175: [SPARK-24107][CORE] ChunkedByteBuffer.writeFully ...

Posted by manbuyun <gi...@git.apache.org>.
Github user manbuyun commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21175#discussion_r184668664
  
    --- Diff: core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala ---
    @@ -56,6 +56,13 @@ class ChunkedByteBufferSuite extends SparkFunSuite {
         assert(chunkedByteBuffer.getChunks().head.position() === 0)
       }
     
    +  test("SPARK-24107: writeFully() write buffer which is larger than bufferWriteChunkSize") {
    +    val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(80 * 1024 * 1024)))
    --- End diff --
    
    I have modified.Please check


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21175: [SPARK-24107][CORE] ChunkedByteBuffer.writeFully method ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21175
  
    **[Test build #89928 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89928/testReport)** for PR 21175 at commit [`fb527c8`](https://github.com/apache/spark/commit/fb527c87a1f4ddb05eb601038736aeb4ec3f7223).
     * This patch **fails Scala style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21175: [SPARK-24107] ChunkedByteBuffer.writeFully method has no...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21175
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21175: [SPARK-24107] ChunkedByteBuffer.writeFully method has no...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on the issue:

    https://github.com/apache/spark/pull/21175
  
    Plz add `[CORE]` in the title.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21175: [SPARK-24107][CORE] ChunkedByteBuffer.writeFully ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21175#discussion_r188161280
  
    --- Diff: core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala ---
    @@ -63,10 +63,15 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
        */
       def writeFully(channel: WritableByteChannel): Unit = {
         for (bytes <- getChunks()) {
    -      while (bytes.remaining() > 0) {
    -        val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize)
    -        bytes.limit(bytes.position() + ioSize)
    -        channel.write(bytes)
    +      val curChunkLimit = bytes.limit()
    +      while (bytes.hasRemaining) {
    +        try {
    +          val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize)
    +          bytes.limit(bytes.position() + ioSize)
    +          channel.write(bytes)
    +        } finally {
    --- End diff --
    
    I get your point. if there is an exception, there is no next loop and we don't need to restore the limit. so try finally is not needed


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21175: [SPARK-24107][CORE] ChunkedByteBuffer.writeFully ...

Posted by manbuyun <gi...@git.apache.org>.
Github user manbuyun commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21175#discussion_r184612119
  
    --- Diff: core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala ---
    @@ -56,6 +56,12 @@ class ChunkedByteBufferSuite extends SparkFunSuite {
         assert(chunkedByteBuffer.getChunks().head.position() === 0)
       }
     
    +  test("SPARK-24107: writeFully() write buffer which is larger than bufferWriteChunkSize") {
    +    val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(80 * 1024 * 1024)))
    +    chunkedByteBuffer.writeFully(new ByteArrayWritableChannel(chunkedByteBuffer.size.toInt))
    +    assert(chunkedByteBuffer.size === (80L * 1024L * 1024L))
    --- End diff --
    
    My mistake, has been fixed. Thanks


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21175: [SPARK-24107] ChunkedByteBuffer.writeFully method has no...

Posted by Ngone51 <gi...@git.apache.org>.
Github user Ngone51 commented on the issue:

    https://github.com/apache/spark/pull/21175
  
    @manbuyun you need to add the unit test into `ChunkedByteBufferSuite.scala` and push a new commit.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21175: [SPARK-24107][CORE] ChunkedByteBuffer.writeFully ...

Posted by Ngone51 <gi...@git.apache.org>.
Github user Ngone51 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21175#discussion_r184596199
  
    --- Diff: core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala ---
    @@ -56,6 +56,12 @@ class ChunkedByteBufferSuite extends SparkFunSuite {
         assert(chunkedByteBuffer.getChunks().head.position() === 0)
       }
     
    +  test("writeFully() can write buffer which is larger than bufferWriteChunkSize correctly") {
    +    val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(80*1024*1024)))
    +    chunkedByteBuffer.writeFully(new ByteArrayWritableChannel(chunkedByteBuffer.size.toInt))
    +    assert(chunkedByteBuffer.getChunks().head.position() === 0)
    --- End diff --
    
    This assert is unnecessary for this PR change. Please replace it with assert channel's length here.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21175: [SPARK-24107][CORE] ChunkedByteBuffer.writeFully method ...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the issue:

    https://github.com/apache/spark/pull/21175
  
    No, I mean that the code here can simply follow the write call as straight
    through code. We don't need to guard against exceptions here because the
    duplicate of the buffer is used only by a single thread, so you can omit
    the try block and just concatenate the try contents to the finally
    contents. Minor bit but I wanted to comment because I initially was
    confused about when errors could occur and thread safety / sharing until I
    realized that the modified state does not escape this method.
    On Mon, May 14, 2018 at 9:03 PM Wenchen Fan <no...@github.com>
    wrote:
    
    > *@cloud-fan* commented on this pull request.
    > ------------------------------
    >
    > In core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
    > <https://github.com/apache/spark/pull/21175#discussion_r188160044>:
    >
    > > @@ -63,10 +63,15 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
    >     */
    >    def writeFully(channel: WritableByteChannel): Unit = {
    >      for (bytes <- getChunks()) {
    > -      while (bytes.remaining() > 0) {
    > -        val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize)
    > -        bytes.limit(bytes.position() + ioSize)
    > -        channel.write(bytes)
    > +      val curChunkLimit = bytes.limit()
    > +      while (bytes.hasRemaining) {
    > +        try {
    > +          val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize)
    > +          bytes.limit(bytes.position() + ioSize)
    > +          channel.write(bytes)
    > +        } finally {
    >
    > Do you mean this is not a real bug that can cause real workload to fail?
    >
    > —
    > You are receiving this because you commented.
    > Reply to this email directly, view it on GitHub
    > <https://github.com/apache/spark/pull/21175#discussion_r188160044>, or mute
    > the thread
    > <https://github.com/notifications/unsubscribe-auth/AADGPJvZNC5LYjHl2WZ44YEIBVGLrehEks5tylODgaJpZM4TptO_>
    > .
    >



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21175: [SPARK-24107][CORE] ChunkedByteBuffer.writeFully ...

Posted by manbuyun <gi...@git.apache.org>.
Github user manbuyun commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21175#discussion_r184728954
  
    --- Diff: core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala ---
    @@ -63,10 +63,12 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
        */
       def writeFully(channel: WritableByteChannel): Unit = {
         for (bytes <- getChunks()) {
    -      while (bytes.remaining() > 0) {
    +      val curChunkLimit = bytes.limit()
    +      while (bytes.hasRemaining) {
             val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize)
             bytes.limit(bytes.position() + ioSize)
             channel.write(bytes)
    +        bytes.limit(curChunkLimit)
    --- End diff --
    
    I have commit this modified


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21175: [SPARK-24107][CORE] ChunkedByteBuffer.writeFully method ...

Posted by dongjoon-hyun <gi...@git.apache.org>.
Github user dongjoon-hyun commented on the issue:

    https://github.com/apache/spark/pull/21175
  
    Hi, All.
    I created [SPARK-24152](https://issues.apache.org/jira/browse/SPARK-24152) because we start to merge by ignoring that known unknown SparkR failure.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21175: [SPARK-24107][CORE] ChunkedByteBuffer.writeFully method ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21175
  
    **[Test build #90035 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/90035/testReport)** for PR 21175 at commit [`e78ef39`](https://github.com/apache/spark/commit/e78ef396a571b26870bcc9326524fb8881e293dd).
     * This patch **fails SparkR unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21175: [SPARK-24107][CORE] ChunkedByteBuffer.writeFully ...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21175#discussion_r184645573
  
    --- Diff: core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala ---
    @@ -63,10 +63,12 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
        */
       def writeFully(channel: WritableByteChannel): Unit = {
         for (bytes <- getChunks()) {
    +      val limit = bytes.limit()
    --- End diff --
    
    How about renaming `limit` to `curChunkLimit`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org