You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/05/02 14:40:35 UTC
spark git commit: [SPARK-24107][CORE] ChunkedByteBuffer.writeFully
method has not reset the limit value
Repository: spark
Updated Branches:
refs/heads/master 9215ee7a1 -> 152eaf6ae
[SPARK-24107][CORE] ChunkedByteBuffer.writeFully method has not reset the limit value
JIRA Issue: https://issues.apache.org/jira/browse/SPARK-24107?jql=text%20~%20%22ChunkedByteBuffer%22
ChunkedByteBuffer.writeFully method has not reset the limit value. When
chunks larger than bufferWriteChunkSize, such as 80 * 1024 * 1024 larger than
config.BUFFER_WRITE_CHUNK_SIZE(64 * 1024 * 1024),only while once, will lost 16 * 1024 * 1024 byte
Author: WangJinhai02 <ji...@ele.me>
Closes #21175 from manbuyun/bugfix-ChunkedByteBuffer.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/152eaf6a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/152eaf6a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/152eaf6a
Branch: refs/heads/master
Commit: 152eaf6ae698cd0df7f5a5be3f17ee46e0be929d
Parents: 9215ee7
Author: WangJinhai02 <ji...@ele.me>
Authored: Wed May 2 22:40:14 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Wed May 2 22:40:14 2018 +0800
----------------------------------------------------------------------
.../apache/spark/util/io/ChunkedByteBuffer.scala | 13 +++++++++----
.../apache/spark/io/ChunkedByteBufferSuite.scala | 17 +++++++++++++++--
2 files changed, 24 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/152eaf6a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
index 7367af7..3ae8dfc 100644
--- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
+++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
@@ -63,10 +63,15 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
*/
def writeFully(channel: WritableByteChannel): Unit = {
for (bytes <- getChunks()) {
- while (bytes.remaining() > 0) {
- val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize)
- bytes.limit(bytes.position() + ioSize)
- channel.write(bytes)
+ val curChunkLimit = bytes.limit()
+ while (bytes.hasRemaining) {
+ try {
+ val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize)
+ bytes.limit(bytes.position() + ioSize)
+ channel.write(bytes)
+ } finally {
+ bytes.limit(curChunkLimit)
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/152eaf6a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala
index 3b798e3..2107559 100644
--- a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala
+++ b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala
@@ -21,11 +21,12 @@ import java.nio.ByteBuffer
import com.google.common.io.ByteStreams
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SharedSparkContext, SparkFunSuite}
+import org.apache.spark.internal.config
import org.apache.spark.network.util.ByteArrayWritableChannel
import org.apache.spark.util.io.ChunkedByteBuffer
-class ChunkedByteBufferSuite extends SparkFunSuite {
+class ChunkedByteBufferSuite extends SparkFunSuite with SharedSparkContext {
test("no chunks") {
val emptyChunkedByteBuffer = new ChunkedByteBuffer(Array.empty[ByteBuffer])
@@ -56,6 +57,18 @@ class ChunkedByteBufferSuite extends SparkFunSuite {
assert(chunkedByteBuffer.getChunks().head.position() === 0)
}
+ test("SPARK-24107: writeFully() write buffer which is larger than bufferWriteChunkSize") {
+ try {
+ sc.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, 32L * 1024L * 1024L)
+ val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(40 * 1024 * 1024)))
+ val byteArrayWritableChannel = new ByteArrayWritableChannel(chunkedByteBuffer.size.toInt)
+ chunkedByteBuffer.writeFully(byteArrayWritableChannel)
+ assert(byteArrayWritableChannel.length() === chunkedByteBuffer.size)
+ } finally {
+ sc.conf.remove(config.BUFFER_WRITE_CHUNK_SIZE)
+ }
+ }
+
test("toArray()") {
val empty = ByteBuffer.wrap(Array.empty[Byte])
val bytes = ByteBuffer.wrap(Array.tabulate(8)(_.toByte))
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org