You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by et...@apache.org on 2023/03/14 13:04:17 UTC

[incubator-celeborn] branch main updated: [CELEBORN-418][FLINK][FOLLOW UP]Need drop unused bytes from netty when task was already failed (#1350)

This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 4f6e90a7d [CELEBORN-418][FLINK][FOLLOW UP]Need drop unused bytes from netty when task was already failed (#1350)
4f6e90a7d is described below

commit 4f6e90a7d940522a77406ad6b937aa316fb2579c
Author: Shuang <lv...@gmail.com>
AuthorDate: Tue Mar 14 21:04:11 2023 +0800

    [CELEBORN-418][FLINK][FOLLOW UP]Need drop unused bytes from netty when task was already failed (#1350)
---
 .../TransportFrameDecoderWithBufferSupplier.java   | 44 +++++++++++++---------
 ...nsportFrameDecoderWithBufferSupplierSuiteJ.java |  8 ++--
 2 files changed, 31 insertions(+), 21 deletions(-)

diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/TransportFrameDecoderWithBufferSupplier.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/TransportFrameDecoderWithBufferSupplier.java
index eacd375d1..eeddad7c3 100644
--- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/TransportFrameDecoderWithBufferSupplier.java
+++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/TransportFrameDecoderWithBufferSupplier.java
@@ -59,18 +59,6 @@ public class TransportFrameDecoderWithBufferSupplier extends ChannelInboundHandl
     }
   }
 
-  private void dropUnusedBytes(io.netty.buffer.ByteBuf source) {
-    if (source.readableBytes() > 0) {
-      if (remainingSize > source.readableBytes()) {
-        remainingSize = remainingSize - source.readableBytes();
-        source.skipBytes(source.readableBytes());
-      } else {
-        source.skipBytes(remainingSize);
-        clear();
-      }
-    }
-  }
-
   private void decodeHeader(io.netty.buffer.ByteBuf buf, ChannelHandlerContext ctx) {
     copyByteBuf(buf, headerBuf, HEADER_SIZE);
     if (!headerBuf.isWritable()) {
@@ -142,12 +130,14 @@ public class TransportFrameDecoderWithBufferSupplier extends ChannelInboundHandl
     if (externalBuf == null) {
       Supplier<ByteBuf> supplier = bufferSuppliers.get(streamId);
       if (supplier == null) {
-        logger.warn("Need drop unused bytes, streamId: {}, bodySize: {}", streamId, bodySize);
-        remainingSize = bodySize;
-        dropUnusedBytes(buf);
-        return buf;
+        return needDropUnusedBytes(streamId, buf);
+      } else {
+        try {
+          externalBuf = supplier.get();
+        } catch (Exception e) {
+          return needDropUnusedBytes(streamId, buf);
+        }
       }
-      externalBuf = supplier.get();
     }
 
     copyByteBuf(buf, externalBuf, bodySize);
@@ -159,6 +149,26 @@ public class TransportFrameDecoderWithBufferSupplier extends ChannelInboundHandl
     return buf;
   }
 
+  private io.netty.buffer.ByteBuf needDropUnusedBytes(
+      long streamId, io.netty.buffer.ByteBuf byteBuf) {
+    logger.warn("Need drop unused bytes, streamId: {}, bodySize: {}", streamId, bodySize);
+    remainingSize = bodySize;
+    dropUnusedBytes(byteBuf);
+    return byteBuf;
+  }
+
+  private void dropUnusedBytes(io.netty.buffer.ByteBuf source) {
+    if (source.readableBytes() > 0) {
+      if (remainingSize > source.readableBytes()) {
+        remainingSize = remainingSize - source.readableBytes();
+        source.skipBytes(source.readableBytes());
+      } else {
+        source.skipBytes(remainingSize);
+        clear();
+      }
+    }
+  }
+
   public void channelRead(ChannelHandlerContext ctx, Object data) {
     io.netty.buffer.ByteBuf nettyBuf = (io.netty.buffer.ByteBuf) data;
     try {
diff --git a/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/network/TransportFrameDecoderWithBufferSupplierSuiteJ.java b/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/network/TransportFrameDecoderWithBufferSupplierSuiteJ.java
index 150711624..922b14d07 100644
--- a/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/network/TransportFrameDecoderWithBufferSupplierSuiteJ.java
+++ b/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/network/TransportFrameDecoderWithBufferSupplierSuiteJ.java
@@ -57,11 +57,11 @@ public class TransportFrameDecoderWithBufferSupplierSuiteJ {
     ChannelHandlerContext context = Mockito.mock(ChannelHandlerContext.class);
 
     BacklogAnnouncement announcement = new BacklogAnnouncement(0, 0);
-    ReadData unUsedReadData = new ReadData(1, 8, 0, generateData(1024));
-    ReadData readData = new ReadData(2, 8, 0, generateData(1024));
+    ReadData unUsedReadData = new ReadData(1, generateData(1024));
+    ReadData readData = new ReadData(2, generateData(1024));
     BacklogAnnouncement announcement1 = new BacklogAnnouncement(0, 0);
-    ReadData unUsedReadData1 = new ReadData(1, 8, 0, generateData(1024));
-    ReadData readData1 = new ReadData(2, 8, 0, generateData(8));
+    ReadData unUsedReadData1 = new ReadData(1, generateData(1024));
+    ReadData readData1 = new ReadData(2, generateData(8));
 
     ByteBuf buffer = Unpooled.buffer(5000);
     encodeMessage(announcement, buffer);