You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by et...@apache.org on 2024/04/10 11:42:34 UTC

(celeborn) branch main updated: [CELEBORN-1376] Push data failed should always release request body

This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new b65b5433d [CELEBORN-1376] Push data failed should always release request body
b65b5433d is described below

commit b65b5433dceaa19dc22f6a878e5160751a7ef036
Author: Angerszhuuuu <an...@gmail.com>
AuthorDate: Wed Apr 10 19:42:14 2024 +0800

    [CELEBORN-1376] Push data failed should always release request body
    
    ### What changes were proposed in this pull request?
    Worker netty not release
    <img width="1729" alt="截屏2024-04-07 17 26 40" src="https://github.com/apache/celeborn/assets/46485123/5774f735-570b-448e-ab94-4c78661717f5">
    
    Many push failed
    <img width="767" alt="截屏2024-04-07 17 27 46" src="https://github.com/apache/celeborn/assets/46485123/41866bd0-d634-4dbf-8518-b474c8d1faad">
    
    1. For spark shuffle client, enable it release push data body when rpc failure
    2. For flink client, since it use wrapped bytbuf, we need release push data body when rpc failure and release origin body when rpc completed.
    3. For worker replicate, we should enable it release push data body when rpc failure.
    
    ### Why are the changes needed?
    Avoid worker netty memory leak
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    
    Closes #2449 from AngersZhuuuu/CELEBORN-1376.
    
    Authored-by: Angerszhuuuu <an...@gmail.com>
    Signed-off-by: mingji <fe...@alibaba-inc.com>
---
 .../plugin/flink/FlinkShuffleClientImplSuiteJ.java |  2 +-
 .../common/network/client/TransportClient.java     | 54 ++++++++++++++++++++--
 2 files changed, 51 insertions(+), 5 deletions(-)

diff --git a/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/FlinkShuffleClientImplSuiteJ.java b/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/FlinkShuffleClientImplSuiteJ.java
index 693124747..60a843f4a 100644
--- a/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/FlinkShuffleClientImplSuiteJ.java
+++ b/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/FlinkShuffleClientImplSuiteJ.java
@@ -111,7 +111,7 @@ public class FlinkShuffleClientImplSuiteJ {
   @Test
   public void testPushDataByteBufFail() throws IOException {
     ByteBuf byteBuf = Unpooled.wrappedBuffer(TEST_BUF1);
-    when(client.pushData(any(), anyLong(), any(), any()))
+    when(client.pushData(any(), anyLong(), any(), any(), any()))
         .thenAnswer(
             t -> {
               RpcResponseCallback rpcResponseCallback = t.getArgument(1, RpcResponseCallback.class);
diff --git a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java
index fd64b6bd0..2c335b350 100644
--- a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java
+++ b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClient.java
@@ -216,6 +216,23 @@ public class TransportClient implements Closeable {
       long pushDataTimeout,
       RpcResponseCallback callback,
       Runnable rpcSendoutCallback) {
+    Runnable rpcFailureCallback =
+        () -> {
+          try {
+            pushData.body().release();
+          } catch (Throwable e) {
+            logger.error("Error release buffer for PUSH_DATA request {}", pushData.requestId, e);
+          }
+        };
+    return pushData(pushData, pushDataTimeout, callback, rpcSendoutCallback, rpcFailureCallback);
+  }
+
+  public ChannelFuture pushData(
+      PushData pushData,
+      long pushDataTimeout,
+      RpcResponseCallback callback,
+      Runnable rpcSendoutCallback,
+      Runnable rpcFailureCallback) {
     if (logger.isTraceEnabled()) {
       logger.trace("Pushing data to {}", NettyUtils.getRemoteAddress(channel));
     }
@@ -225,7 +242,8 @@ public class TransportClient implements Closeable {
     PushRequestInfo info = new PushRequestInfo(dueTime, callback);
     handler.addPushRequest(requestId, info);
     pushData.requestId = requestId;
-    PushChannelListener listener = new PushChannelListener(requestId, rpcSendoutCallback);
+    PushChannelListener listener =
+        new PushChannelListener(requestId, rpcSendoutCallback, rpcFailureCallback);
     ChannelFuture channelFuture = channel.writeAndFlush(pushData).addListener(listener);
     info.setChannelFuture(channelFuture);
     return channelFuture;
@@ -233,6 +251,26 @@ public class TransportClient implements Closeable {
 
   public ChannelFuture pushMergedData(
       PushMergedData pushMergedData, long pushDataTimeout, RpcResponseCallback callback) {
+    Runnable rpcFailureCallback =
+        () -> {
+          try {
+            pushMergedData.body().release();
+          } catch (Throwable e) {
+            logger.error(
+                "Error release buffer for PUSH_MERGED_DATA request {}",
+                pushMergedData.requestId,
+                e);
+          }
+        };
+    return pushMergedData(pushMergedData, pushDataTimeout, callback, null, rpcFailureCallback);
+  }
+
+  public ChannelFuture pushMergedData(
+      PushMergedData pushMergedData,
+      long pushDataTimeout,
+      RpcResponseCallback callback,
+      Runnable rpcSendoutCallback,
+      Runnable rpcFailureCallback) {
     if (logger.isTraceEnabled()) {
       logger.trace("Pushing merged data to {}", NettyUtils.getRemoteAddress(channel));
     }
@@ -243,7 +281,8 @@ public class TransportClient implements Closeable {
     handler.addPushRequest(requestId, info);
     pushMergedData.requestId = requestId;
 
-    PushChannelListener listener = new PushChannelListener(requestId);
+    PushChannelListener listener =
+        new PushChannelListener(requestId, rpcSendoutCallback, rpcFailureCallback);
     ChannelFuture channelFuture = channel.writeAndFlush(pushMergedData).addListener(listener);
     info.setChannelFuture(channelFuture);
     return channelFuture;
@@ -417,14 +456,18 @@ public class TransportClient implements Closeable {
     final long pushRequestId;
     Runnable rpcSendOutCallback;
 
+    Runnable rpcFailureCallback;
+
     PushChannelListener(long pushRequestId) {
-      this(pushRequestId, null);
+      this(pushRequestId, null, null);
     }
 
-    PushChannelListener(long pushRequestId, Runnable rpcSendOutCallback) {
+    PushChannelListener(
+        long pushRequestId, Runnable rpcSendOutCallback, Runnable rpcFailureCallback) {
       super("PUSH " + pushRequestId);
       this.pushRequestId = pushRequestId;
       this.rpcSendOutCallback = rpcSendOutCallback;
+      this.rpcFailureCallback = rpcFailureCallback;
     }
 
     @Override
@@ -438,6 +481,9 @@ public class TransportClient implements Closeable {
     @Override
     protected void handleFailure(String errorMsg, Throwable cause) {
       handler.handlePushFailure(pushRequestId, errorMsg, cause);
+      if (rpcFailureCallback != null) {
+        rpcFailureCallback.run();
+      }
     }
   }
 }