You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/05/02 14:40:35 UTC

spark git commit: [SPARK-24107][CORE] ChunkedByteBuffer.writeFully method has not reset the limit value

Repository: spark
Updated Branches:
  refs/heads/master 9215ee7a1 -> 152eaf6ae


[SPARK-24107][CORE] ChunkedByteBuffer.writeFully method has not reset the limit value

JIRA Issue: https://issues.apache.org/jira/browse/SPARK-24107?jql=text%20~%20%22ChunkedByteBuffer%22

ChunkedByteBuffer.writeFully method has not reset the limit value. When
chunks larger than bufferWriteChunkSize, such as 80 * 1024 * 1024 larger than
config.BUFFER_WRITE_CHUNK_SIZE(64 * 1024 * 1024),only while once, will lost 16 * 1024 * 1024 byte

Author: WangJinhai02 <ji...@ele.me>

Closes #21175 from manbuyun/bugfix-ChunkedByteBuffer.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/152eaf6a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/152eaf6a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/152eaf6a

Branch: refs/heads/master
Commit: 152eaf6ae698cd0df7f5a5be3f17ee46e0be929d
Parents: 9215ee7
Author: WangJinhai02 <ji...@ele.me>
Authored: Wed May 2 22:40:14 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Wed May 2 22:40:14 2018 +0800

----------------------------------------------------------------------
 .../apache/spark/util/io/ChunkedByteBuffer.scala   | 13 +++++++++----
 .../apache/spark/io/ChunkedByteBufferSuite.scala   | 17 +++++++++++++++--
 2 files changed, 24 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/152eaf6a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
index 7367af7..3ae8dfc 100644
--- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
+++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
@@ -63,10 +63,15 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
    */
   def writeFully(channel: WritableByteChannel): Unit = {
     for (bytes <- getChunks()) {
-      while (bytes.remaining() > 0) {
-        val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize)
-        bytes.limit(bytes.position() + ioSize)
-        channel.write(bytes)
+      val curChunkLimit = bytes.limit()
+      while (bytes.hasRemaining) {
+        try {
+          val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize)
+          bytes.limit(bytes.position() + ioSize)
+          channel.write(bytes)
+        } finally {
+          bytes.limit(curChunkLimit)
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/152eaf6a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala
index 3b798e3..2107559 100644
--- a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala
+++ b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala
@@ -21,11 +21,12 @@ import java.nio.ByteBuffer
 
 import com.google.common.io.ByteStreams
 
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SharedSparkContext, SparkFunSuite}
+import org.apache.spark.internal.config
 import org.apache.spark.network.util.ByteArrayWritableChannel
 import org.apache.spark.util.io.ChunkedByteBuffer
 
-class ChunkedByteBufferSuite extends SparkFunSuite {
+class ChunkedByteBufferSuite extends SparkFunSuite with SharedSparkContext {
 
   test("no chunks") {
     val emptyChunkedByteBuffer = new ChunkedByteBuffer(Array.empty[ByteBuffer])
@@ -56,6 +57,18 @@ class ChunkedByteBufferSuite extends SparkFunSuite {
     assert(chunkedByteBuffer.getChunks().head.position() === 0)
   }
 
+  test("SPARK-24107: writeFully() write buffer which is larger than bufferWriteChunkSize") {
+    try {
+      sc.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, 32L * 1024L * 1024L)
+      val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(40 * 1024 * 1024)))
+      val byteArrayWritableChannel = new ByteArrayWritableChannel(chunkedByteBuffer.size.toInt)
+      chunkedByteBuffer.writeFully(byteArrayWritableChannel)
+      assert(byteArrayWritableChannel.length() === chunkedByteBuffer.size)
+    } finally {
+      sc.conf.remove(config.BUFFER_WRITE_CHUNK_SIZE)
+    }
+  }
+
   test("toArray()") {
     val empty = ByteBuffer.wrap(Array.empty[Byte])
     val bytes = ByteBuffer.wrap(Array.tabulate(8)(_.toByte))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org