You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ir...@apache.org on 2018/07/27 03:15:18 UTC

spark git commit: [SPARK-24801][CORE] Avoid memory waste by empty byte[] arrays in SaslEncryption$EncryptedMessage

Repository: spark
Updated Branches:
  refs/heads/master fa09d9192 -> 094aa5971


[SPARK-24801][CORE] Avoid memory waste by empty byte[] arrays in SaslEncryption$EncryptedMessage

## What changes were proposed in this pull request?

Initialize SaslEncryption$EncryptedMessage.byteChannel lazily,
so that empty, not yet used instances of ByteArrayWritableChannel
referenced by this field don't use up memory.

I analyzed a heap dump from Yarn Node Manager where this code is used, and found that there are over 40,000 of the above objects in memory, each with a big empty byte[] array. The reason they are all there is because of Netty queued up a large number of messages in memory  before transferTo() is called. There is a small number of netty ChannelOutboundBuffer objects, and then collectively , via linked lists starting from their flushedEntry data fields, they end up referencing over 40K ChannelOutboundBuffer$Entry objects, which ultimately reference EncryptedMessage objects.

## How was this patch tested?

Ran all the tests locally.

Author: Misha Dmitriev <mi...@cloudera.com>

Closes #21811 from countmdm/misha/spark-24801.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/094aa597
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/094aa597
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/094aa597

Branch: refs/heads/master
Commit: 094aa597155dfcbf41a2490c9e462415e3824901
Parents: fa09d91
Author: Misha Dmitriev <mi...@cloudera.com>
Authored: Thu Jul 26 22:15:12 2018 -0500
Committer: Imran Rashid <ir...@cloudera.com>
Committed: Thu Jul 26 22:15:12 2018 -0500

----------------------------------------------------------------------
 .../org/apache/spark/network/sasl/SaslEncryption.java     | 10 +++++++---
 1 file changed, 7 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/094aa597/common/network-common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java b/common/network-common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java
index 3ac9081..d3b2a33 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/sasl/SaslEncryption.java
@@ -135,13 +135,14 @@ class SaslEncryption {
     private final boolean isByteBuf;
     private final ByteBuf buf;
     private final FileRegion region;
+    private final int maxOutboundBlockSize;
 
     /**
      * A channel used to buffer input data for encryption. The channel has an upper size bound
      * so that if the input is larger than the allowed buffer, it will be broken into multiple
-     * chunks.
+     * chunks. Made non-final to enable lazy initialization, which saves memory.
      */
-    private final ByteArrayWritableChannel byteChannel;
+    private ByteArrayWritableChannel byteChannel;
 
     private ByteBuf currentHeader;
     private ByteBuffer currentChunk;
@@ -157,7 +158,7 @@ class SaslEncryption {
       this.isByteBuf = msg instanceof ByteBuf;
       this.buf = isByteBuf ? (ByteBuf) msg : null;
       this.region = isByteBuf ? null : (FileRegion) msg;
-      this.byteChannel = new ByteArrayWritableChannel(maxOutboundBlockSize);
+      this.maxOutboundBlockSize = maxOutboundBlockSize;
     }
 
     /**
@@ -292,6 +293,9 @@ class SaslEncryption {
     }
 
     private void nextChunk() throws IOException {
+      if (byteChannel == null) {
+        byteChannel = new ByteArrayWritableChannel(maxOutboundBlockSize);
+      }
       byteChannel.reset();
       if (isByteBuf) {
         int copied = byteChannel.write(buf.nioBuffer());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org