You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/05/17 12:42:45 UTC
spark git commit: [SPARK-24107][CORE][FOLLOWUP]
ChunkedByteBuffer.writeFully method has not reset the limit value
Repository: spark
Updated Branches:
refs/heads/master 6c35865d9 -> 6ec05826d
[SPARK-24107][CORE][FOLLOWUP] ChunkedByteBuffer.writeFully method has not reset the limit value
## What changes were proposed in this pull request?
According to the discussion in https://github.com/apache/spark/pull/21175 , this PR proposes 2 improvements:
1. add comments to explain why we call `limit` to write out `ByteBuffer` with slices.
2. remove the `try ... finally`
## How was this patch tested?
existing tests
Author: Wenchen Fan <we...@databricks.com>
Closes #21327 from cloud-fan/minor.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6ec05826
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6ec05826
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6ec05826
Branch: refs/heads/master
Commit: 6ec05826d7b0a512847e2522564e01256c8d192d
Parents: 6c35865
Author: Wenchen Fan <we...@databricks.com>
Authored: Thu May 17 20:42:40 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu May 17 20:42:40 2018 +0800
----------------------------------------------------------------------
.../spark/util/io/ChunkedByteBuffer.scala | 20 ++++++++++++--------
1 file changed, 12 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/6ec05826/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
index 3ae8dfc..700ce56 100644
--- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
+++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
@@ -63,15 +63,19 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
*/
def writeFully(channel: WritableByteChannel): Unit = {
for (bytes <- getChunks()) {
- val curChunkLimit = bytes.limit()
+ val originalLimit = bytes.limit()
while (bytes.hasRemaining) {
- try {
- val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize)
- bytes.limit(bytes.position() + ioSize)
- channel.write(bytes)
- } finally {
- bytes.limit(curChunkLimit)
- }
+ // If `bytes` is an on-heap ByteBuffer, the Java NIO API will copy it to a temporary direct
+ // ByteBuffer when writing it out. This temporary direct ByteBuffer is cached per thread.
+ // Its size has no limit and can keep growing if it sees a larger input ByteBuffer. This may
+ // cause significant native memory leak, if a large direct ByteBuffer is allocated and
+ // cached, as it's never released until thread exits. Here we write the `bytes` with
+ // fixed-size slices to limit the size of the cached direct ByteBuffer.
+ // Please refer to http://www.evanjones.ca/java-bytebuffer-leak.html for more details.
+ val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize)
+ bytes.limit(bytes.position() + ioSize)
+ channel.write(bytes)
+ bytes.limit(originalLimit)
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org