You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by et...@apache.org on 2023/03/14 03:27:40 UTC

[incubator-celeborn] branch main updated: [CELEBORN-419][FLINK] Fix memory leak when receive RPCs with body. (#1343)

This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 971c93d4d [CELEBORN-419][FLINK] Fix memory leak when receive RPCs with body. (#1343)
971c93d4d is described below

commit 971c93d4d9f5c5ca219391e35a389de24a523a52
Author: Ethan Feng <et...@apache.org>
AuthorDate: Tue Mar 14 11:27:36 2023 +0800

    [CELEBORN-419][FLINK] Fix memory leak when receive RPCs with body. (#1343)
---
 .../plugin/flink/network/TransportFrameDecoderWithBufferSupplier.java | 4 ++--
 .../java/org/apache/celeborn/common/network/protocol/Message.java     | 4 ----
 2 files changed, 2 insertions(+), 6 deletions(-)

diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/TransportFrameDecoderWithBufferSupplier.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/TransportFrameDecoderWithBufferSupplier.java
index 29657d419..01283547a 100644
--- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/TransportFrameDecoderWithBufferSupplier.java
+++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/TransportFrameDecoderWithBufferSupplier.java
@@ -94,7 +94,7 @@ public class TransportFrameDecoderWithBufferSupplier extends ChannelInboundHandl
     if (bodyBuf == null) {
       if (buf.readableBytes() >= bodySize) {
         io.netty.buffer.ByteBuf body = buf.retain().readSlice(bodySize);
-        curMsg.setBody(body.nioBuffer());
+        curMsg.setBody(body);
         ctx.fireChannelRead(curMsg);
         clear();
         return buf;
@@ -112,7 +112,7 @@ public class TransportFrameDecoderWithBufferSupplier extends ChannelInboundHandl
     }
     bodyBuf.addComponent(next).writerIndex(bodyBuf.writerIndex() + next.readableBytes());
     if (bodyBuf.readableBytes() == bodySize) {
-      curMsg.setBody(bodyBuf.nioBuffer());
+      curMsg.setBody(bodyBuf);
       ctx.fireChannelRead(curMsg);
       clear();
     }
diff --git a/common/src/main/java/org/apache/celeborn/common/network/protocol/Message.java b/common/src/main/java/org/apache/celeborn/common/network/protocol/Message.java
index 31f81e336..1450ab931 100644
--- a/common/src/main/java/org/apache/celeborn/common/network/protocol/Message.java
+++ b/common/src/main/java/org/apache/celeborn/common/network/protocol/Message.java
@@ -50,10 +50,6 @@ public abstract class Message implements Encodable {
     this.body = new NettyManagedBuffer(buf);
   }
 
-  public void setBody(ByteBuffer buf) {
-    this.body = new NettyManagedBuffer(buf);
-  }
-
   /** Whether the body should be copied out in frame decoder. */
   public boolean needCopyOut() {
     return false;