You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by et...@apache.org on 2023/03/14 13:04:17 UTC
[incubator-celeborn] branch main updated: [CELEBORN-418][FLINK][FOLLOW UP]Need drop unused bytes from netty when task was already failed (#1350)
This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 4f6e90a7d [CELEBORN-418][FLINK][FOLLOW UP]Need drop unused bytes from netty when task was already failed (#1350)
4f6e90a7d is described below
commit 4f6e90a7d940522a77406ad6b937aa316fb2579c
Author: Shuang <lv...@gmail.com>
AuthorDate: Tue Mar 14 21:04:11 2023 +0800
[CELEBORN-418][FLINK][FOLLOW UP]Need drop unused bytes from netty when task was already failed (#1350)
---
.../TransportFrameDecoderWithBufferSupplier.java | 44 +++++++++++++---------
...nsportFrameDecoderWithBufferSupplierSuiteJ.java | 8 ++--
2 files changed, 31 insertions(+), 21 deletions(-)
diff --git a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/TransportFrameDecoderWithBufferSupplier.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/TransportFrameDecoderWithBufferSupplier.java
index eacd375d1..eeddad7c3 100644
--- a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/TransportFrameDecoderWithBufferSupplier.java
+++ b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/network/TransportFrameDecoderWithBufferSupplier.java
@@ -59,18 +59,6 @@ public class TransportFrameDecoderWithBufferSupplier extends ChannelInboundHandl
}
}
- private void dropUnusedBytes(io.netty.buffer.ByteBuf source) {
- if (source.readableBytes() > 0) {
- if (remainingSize > source.readableBytes()) {
- remainingSize = remainingSize - source.readableBytes();
- source.skipBytes(source.readableBytes());
- } else {
- source.skipBytes(remainingSize);
- clear();
- }
- }
- }
-
private void decodeHeader(io.netty.buffer.ByteBuf buf, ChannelHandlerContext ctx) {
copyByteBuf(buf, headerBuf, HEADER_SIZE);
if (!headerBuf.isWritable()) {
@@ -142,12 +130,14 @@ public class TransportFrameDecoderWithBufferSupplier extends ChannelInboundHandl
if (externalBuf == null) {
Supplier<ByteBuf> supplier = bufferSuppliers.get(streamId);
if (supplier == null) {
- logger.warn("Need drop unused bytes, streamId: {}, bodySize: {}", streamId, bodySize);
- remainingSize = bodySize;
- dropUnusedBytes(buf);
- return buf;
+ return needDropUnusedBytes(streamId, buf);
+ } else {
+ try {
+ externalBuf = supplier.get();
+ } catch (Exception e) {
+ return needDropUnusedBytes(streamId, buf);
+ }
}
- externalBuf = supplier.get();
}
copyByteBuf(buf, externalBuf, bodySize);
@@ -159,6 +149,26 @@ public class TransportFrameDecoderWithBufferSupplier extends ChannelInboundHandl
return buf;
}
+ private io.netty.buffer.ByteBuf needDropUnusedBytes(
+ long streamId, io.netty.buffer.ByteBuf byteBuf) {
+ logger.warn("Need drop unused bytes, streamId: {}, bodySize: {}", streamId, bodySize);
+ remainingSize = bodySize;
+ dropUnusedBytes(byteBuf);
+ return byteBuf;
+ }
+
+ private void dropUnusedBytes(io.netty.buffer.ByteBuf source) {
+ if (source.readableBytes() > 0) {
+ if (remainingSize > source.readableBytes()) {
+ remainingSize = remainingSize - source.readableBytes();
+ source.skipBytes(source.readableBytes());
+ } else {
+ source.skipBytes(remainingSize);
+ clear();
+ }
+ }
+ }
+
public void channelRead(ChannelHandlerContext ctx, Object data) {
io.netty.buffer.ByteBuf nettyBuf = (io.netty.buffer.ByteBuf) data;
try {
diff --git a/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/network/TransportFrameDecoderWithBufferSupplierSuiteJ.java b/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/network/TransportFrameDecoderWithBufferSupplierSuiteJ.java
index 150711624..922b14d07 100644
--- a/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/network/TransportFrameDecoderWithBufferSupplierSuiteJ.java
+++ b/client-flink/common/src/test/java/org/apache/celeborn/plugin/flink/network/TransportFrameDecoderWithBufferSupplierSuiteJ.java
@@ -57,11 +57,11 @@ public class TransportFrameDecoderWithBufferSupplierSuiteJ {
ChannelHandlerContext context = Mockito.mock(ChannelHandlerContext.class);
BacklogAnnouncement announcement = new BacklogAnnouncement(0, 0);
- ReadData unUsedReadData = new ReadData(1, 8, 0, generateData(1024));
- ReadData readData = new ReadData(2, 8, 0, generateData(1024));
+ ReadData unUsedReadData = new ReadData(1, generateData(1024));
+ ReadData readData = new ReadData(2, generateData(1024));
BacklogAnnouncement announcement1 = new BacklogAnnouncement(0, 0);
- ReadData unUsedReadData1 = new ReadData(1, 8, 0, generateData(1024));
- ReadData readData1 = new ReadData(2, 8, 0, generateData(8));
+ ReadData unUsedReadData1 = new ReadData(1, generateData(1024));
+ ReadData readData1 = new ReadData(2, generateData(8));
ByteBuf buffer = Unpooled.buffer(5000);
encodeMessage(announcement, buffer);