You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2017/07/26 00:59:23 UTC

spark git commit: [SPARK-21517][CORE] Avoid copying memory when transfer chunks remotely

Repository: spark
Updated Branches:
  refs/heads/master 300807c6e -> 16612638f


[SPARK-21517][CORE] Avoid copying memory when transfer chunks remotely

## What changes were proposed in this pull request?

In our production cluster,oom happens when NettyBlockRpcServer receive OpenBlocks message.The reason we observed is below:
When BlockManagerManagedBuffer call ChunkedByteBuffer#toNetty, it will use Unpooled.wrappedBuffer(ByteBuffer... buffers) which use default maxNumComponents=16 in low-level CompositeByteBuf.When our component's number is bigger than 16, it will execute consolidateIfNeeded

        int numComponents = this.components.size();
        if(numComponents > this.maxNumComponents) {
            int capacity = ((CompositeByteBuf.Component)this.components.get(numComponents - 1)).endOffset;
            ByteBuf consolidated = this.allocBuffer(capacity);

            for(int c = 0; c < numComponents; ++c) {
                CompositeByteBuf.Component c1 = (CompositeByteBuf.Component)this.components.get(c);
                ByteBuf b = c1.buf;
                consolidated.writeBytes(b);
                c1.freeIfNecessary();
            }

            CompositeByteBuf.Component var7 = new CompositeByteBuf.Component(consolidated);
            var7.endOffset = var7.length;
            this.components.clear();
            this.components.add(var7);
        }

in CompositeByteBuf which will consume some memory during buffer copy.
We can use another api Unpooled. wrappedBuffer(int maxNumComponents, ByteBuffer... buffers) to avoid this comsuming.

## How was this patch tested?

Test in production cluster.

Author: zhoukang <zh...@xiaomi.com>

Closes #18723 from caneGuy/zhoukang/fix-chunkbuffer.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/16612638
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/16612638
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/16612638

Branch: refs/heads/master
Commit: 16612638f0539f197eb7deb1be2ec53fed60d707
Parents: 300807c
Author: zhoukang <zh...@xiaomi.com>
Authored: Tue Jul 25 17:59:21 2017 -0700
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Tue Jul 25 17:59:21 2017 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/16612638/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
index 2f905c8..f48bfd5 100644
--- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
+++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
@@ -66,7 +66,7 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
    * Wrap this buffer to view it as a Netty ByteBuf.
    */
   def toNetty: ByteBuf = {
-    Unpooled.wrappedBuffer(getChunks(): _*)
+    Unpooled.wrappedBuffer(chunks.length, getChunks(): _*)
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org