You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2017/07/26 00:59:23 UTC
spark git commit: [SPARK-21517][CORE] Avoid copying memory when
transfer chunks remotely
Repository: spark
Updated Branches:
refs/heads/master 300807c6e -> 16612638f
[SPARK-21517][CORE] Avoid copying memory when transfer chunks remotely
## What changes were proposed in this pull request?
In our production cluster,oom happens when NettyBlockRpcServer receive OpenBlocks message.The reason we observed is below:
When BlockManagerManagedBuffer call ChunkedByteBuffer#toNetty, it will use Unpooled.wrappedBuffer(ByteBuffer... buffers) which use default maxNumComponents=16 in low-level CompositeByteBuf.When our component's number is bigger than 16, it will execute consolidateIfNeeded
int numComponents = this.components.size();
if(numComponents > this.maxNumComponents) {
int capacity = ((CompositeByteBuf.Component)this.components.get(numComponents - 1)).endOffset;
ByteBuf consolidated = this.allocBuffer(capacity);
for(int c = 0; c < numComponents; ++c) {
CompositeByteBuf.Component c1 = (CompositeByteBuf.Component)this.components.get(c);
ByteBuf b = c1.buf;
consolidated.writeBytes(b);
c1.freeIfNecessary();
}
CompositeByteBuf.Component var7 = new CompositeByteBuf.Component(consolidated);
var7.endOffset = var7.length;
this.components.clear();
this.components.add(var7);
}
in CompositeByteBuf which will consume some memory during buffer copy.
We can use another api Unpooled. wrappedBuffer(int maxNumComponents, ByteBuffer... buffers) to avoid this comsuming.
## How was this patch tested?
Test in production cluster.
Author: zhoukang <zh...@xiaomi.com>
Closes #18723 from caneGuy/zhoukang/fix-chunkbuffer.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/16612638
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/16612638
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/16612638
Branch: refs/heads/master
Commit: 16612638f0539f197eb7deb1be2ec53fed60d707
Parents: 300807c
Author: zhoukang <zh...@xiaomi.com>
Authored: Tue Jul 25 17:59:21 2017 -0700
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Tue Jul 25 17:59:21 2017 -0700
----------------------------------------------------------------------
.../main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/16612638/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
index 2f905c8..f48bfd5 100644
--- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
+++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
@@ -66,7 +66,7 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
* Wrap this buffer to view it as a Netty ByteBuf.
*/
def toNetty: ByteBuf = {
- Unpooled.wrappedBuffer(getChunks(): _*)
+ Unpooled.wrappedBuffer(chunks.length, getChunks(): _*)
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org