You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2023/06/14 03:59:26 UTC
[incubator-uniffle] branch master updated: [#590][part-1] ManagedBuffer instead ByteBuf to hold ShuffleData (#906)
This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new b8f17bf5 [#590][part-1] ManagedBuffer instead ByteBuf to hold ShuffleData (#906)
b8f17bf5 is described below
commit b8f17bf588aa2333aa2f4c4d97d5f6b149a63bcd
Author: xumanbu <ma...@163.com>
AuthorDate: Wed Jun 14 11:59:20 2023 +0800
[#590][part-1] ManagedBuffer instead ByteBuf to hold ShuffleData (#906)
### What changes were proposed in this pull request?
part-1:
1. add a ManagedBuffer instead ByteBuf to hold ShuffleData.
2. ShuffleResultData & GetLocalShuffleDataResponse use ManagedBuffer instead of ByteBuf
3. MessageEncoder write support sendfile
### Why are the changes needed?
Fix: #590
### Does this PR introduce _any_ user-facing change?
(Please list the user-facing changes introduced by your change, including
No.
### How was this patch tested?
Integration Testing
Co-authored-by: jam.xu <ja...@vipshop.com>
---
.../apache/uniffle/common/ShuffleDataResult.java | 31 +++++--
.../uniffle/common/netty/MessageEncoder.java | 6 +-
.../netty/buffer/FileSegmentManagedBuffer.java | 103 +++++++++++++++++++++
.../uniffle/common/netty/buffer/ManagedBuffer.java | 39 ++++++++
.../common/netty/buffer/NettyManagedBuffer.java | 58 ++++++++++++
.../protocol/GetLocalShuffleDataResponse.java | 47 ++++++----
.../common/netty/protocol/Transferable.java | 25 +++++
.../common/netty/protocol/NettyProtocolTest.java | 4 +-
.../impl/grpc/ShuffleServerGrpcNettyClient.java | 2 +-
.../server/netty/ShuffleServerNettyHandler.java | 9 +-
10 files changed, 290 insertions(+), 34 deletions(-)
diff --git a/common/src/main/java/org/apache/uniffle/common/ShuffleDataResult.java b/common/src/main/java/org/apache/uniffle/common/ShuffleDataResult.java
index 98a8b1d7..2000d2e8 100644
--- a/common/src/main/java/org/apache/uniffle/common/ShuffleDataResult.java
+++ b/common/src/main/java/org/apache/uniffle/common/ShuffleDataResult.java
@@ -24,11 +24,13 @@ import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
+import org.apache.uniffle.common.netty.buffer.ManagedBuffer;
+import org.apache.uniffle.common.netty.buffer.NettyManagedBuffer;
import org.apache.uniffle.common.util.ByteBufUtils;
public class ShuffleDataResult {
- private final ByteBuf data;
+ private final ManagedBuffer buffer;
private final List<BufferSegment> bufferSegments;
public ShuffleDataResult() {
@@ -39,13 +41,18 @@ public class ShuffleDataResult {
this(data, Lists.newArrayList());
}
+ public ShuffleDataResult(ManagedBuffer buffer) {
+ this.buffer = buffer;
+ this.bufferSegments = Lists.newArrayList();
+ }
+
public ShuffleDataResult(ByteBuffer data, List<BufferSegment> bufferSegments) {
- this.data = data != null ? Unpooled.wrappedBuffer(data) : Unpooled.EMPTY_BUFFER;
+ this.buffer = new NettyManagedBuffer(data != null ? Unpooled.wrappedBuffer(data) : Unpooled.EMPTY_BUFFER);
this.bufferSegments = bufferSegments;
}
public ShuffleDataResult(ByteBuf data, List<BufferSegment> bufferSegments) {
- this.data = data;
+ this.buffer = new NettyManagedBuffer(data);
this.bufferSegments = bufferSegments;
}
@@ -54,21 +61,25 @@ public class ShuffleDataResult {
}
public byte[] getData() {
- if (data == null) {
+ if (buffer == null) {
return null;
}
- if (data.hasArray()) {
- return data.array();
+ if (buffer.nioByteBuffer().hasArray()) {
+ return buffer.nioByteBuffer().array();
}
- return ByteBufUtils.readBytes(data);
+ return ByteBufUtils.readBytes(buffer.byteBuf());
}
public ByteBuf getDataBuf() {
- return data;
+ return buffer.byteBuf();
}
public ByteBuffer getDataBuffer() {
- return data.nioBuffer();
+ return buffer.nioByteBuffer();
+ }
+
+ public ManagedBuffer getManagedBuffer() {
+ return buffer;
}
public List<BufferSegment> getBufferSegments() {
@@ -76,6 +87,6 @@ public class ShuffleDataResult {
}
public boolean isEmpty() {
- return bufferSegments == null || bufferSegments.isEmpty() || data == null || data.capacity() == 0;
+ return bufferSegments == null || bufferSegments.isEmpty() || buffer == null || buffer.size() == 0;
}
}
diff --git a/common/src/main/java/org/apache/uniffle/common/netty/MessageEncoder.java b/common/src/main/java/org/apache/uniffle/common/netty/MessageEncoder.java
index e3537ecd..354300a9 100644
--- a/common/src/main/java/org/apache/uniffle/common/netty/MessageEncoder.java
+++ b/common/src/main/java/org/apache/uniffle/common/netty/MessageEncoder.java
@@ -26,6 +26,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.netty.protocol.Message;
+import org.apache.uniffle.common.netty.protocol.Transferable;
/**
* Encoder used by the server side to encode server-to-client responses.
@@ -46,7 +47,6 @@ public class MessageEncoder extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
- // todo: support zero copy
Message message = (Message) msg;
int encodeLength = message.encodedLength();
ByteBuf byteBuf = ctx.alloc().buffer(FrameDecoder.HEADER_SIZE + encodeLength);
@@ -59,5 +59,9 @@ public class MessageEncoder extends ChannelOutboundHandlerAdapter {
byteBuf.release();
}
ctx.writeAndFlush(byteBuf);
+ // do transferTo send data after encode buffer send.
+ if (message instanceof Transferable) {
+ ((Transferable) message).transferTo(ctx.channel());
+ }
}
}
diff --git a/common/src/main/java/org/apache/uniffle/common/netty/buffer/FileSegmentManagedBuffer.java b/common/src/main/java/org/apache/uniffle/common/netty/buffer/FileSegmentManagedBuffer.java
new file mode 100644
index 00000000..9e80756e
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/netty/buffer/FileSegmentManagedBuffer.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.netty.buffer;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.DefaultFileRegion;
+
+import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.JavaUtils;
+
+public class FileSegmentManagedBuffer extends ManagedBuffer {
+
+ private final File file;
+ private final int offset;
+ private final int length;
+
+ public FileSegmentManagedBuffer(File file, int offset, int length) {
+ this.file = file;
+ this.offset = offset;
+ this.length = length;
+ }
+
+ @Override
+ public int size() {
+ return length;
+ }
+
+ @Override
+ public ByteBuf byteBuf() {
+ return Unpooled.wrappedBuffer(this.nioByteBuffer());
+ }
+
+ @Override
+ public ByteBuffer nioByteBuffer() {
+ FileChannel channel = null;
+ try {
+ channel = new RandomAccessFile(file, "r").getChannel();
+ ByteBuffer buf = ByteBuffer.allocate(length);
+ channel.position(offset);
+ while (buf.remaining() != 0) {
+ if (channel.read(buf) == -1) {
+ throw new IOException(
+ String.format("Reached EOF before filling buffer.offset=%s,file=%s,buf.remaining=%s",
+ offset, file.getAbsoluteFile(), buf.remaining()));
+ }
+ }
+ buf.flip();
+ return buf;
+ } catch (IOException e) {
+ String errorMessage = "Error in reading " + this;
+ try {
+ if (channel != null) {
+ long size = channel.size();
+ errorMessage = "Error in reading " + this + " (actual file length " + size + ")";
+ }
+ } catch (IOException ignored) {
+ // ignore
+ }
+ throw new RssException(errorMessage, e);
+ } finally {
+ JavaUtils.closeQuietly(channel);
+ }
+ }
+
+ @Override
+ public ManagedBuffer release() {
+ return this;
+ }
+
+ @Override
+ public Object convertToNetty() {
+ FileChannel fileChannel;
+ try {
+ fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ);
+ } catch (IOException e) {
+ throw new RssException("Error in reading " + file);
+ }
+ return new DefaultFileRegion(fileChannel, offset, length);
+ }
+}
diff --git a/common/src/main/java/org/apache/uniffle/common/netty/buffer/ManagedBuffer.java b/common/src/main/java/org/apache/uniffle/common/netty/buffer/ManagedBuffer.java
new file mode 100644
index 00000000..ed15640e
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/netty/buffer/ManagedBuffer.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.netty.buffer;
+
+import java.nio.ByteBuffer;
+
+import io.netty.buffer.ByteBuf;
+
+public abstract class ManagedBuffer {
+
+ public abstract int size();
+
+ public abstract ByteBuf byteBuf();
+
+ public abstract ByteBuffer nioByteBuffer();
+
+ public abstract ManagedBuffer release();
+
+ /**
+ * Convert the buffer into an Netty object, used to write the data out. The return value is either
+ * a {@link io.netty.buffer.ByteBuf} or a {@link io.netty.channel.FileRegion}.
+ */
+ public abstract Object convertToNetty();
+}
diff --git a/common/src/main/java/org/apache/uniffle/common/netty/buffer/NettyManagedBuffer.java b/common/src/main/java/org/apache/uniffle/common/netty/buffer/NettyManagedBuffer.java
new file mode 100644
index 00000000..4cc6686a
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/netty/buffer/NettyManagedBuffer.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.netty.buffer;
+
+import java.nio.ByteBuffer;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+public class NettyManagedBuffer extends ManagedBuffer {
+
+ private ByteBuf buf;
+
+ public NettyManagedBuffer(ByteBuf byteBuf) {
+ this.buf = byteBuf;
+ }
+
+ @Override
+ public int size() {
+ return buf.readableBytes();
+ }
+
+ @Override
+ public ByteBuf byteBuf() {
+ return Unpooled.wrappedBuffer(this.nioByteBuffer());
+ }
+
+ @Override
+ public ByteBuffer nioByteBuffer() {
+ return buf.nioBuffer();
+ }
+
+ @Override
+ public ManagedBuffer release() {
+ buf.release();
+ return this;
+ }
+
+ @Override
+ public Object convertToNetty() {
+ return buf.duplicate().retain();
+ }
+}
diff --git a/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleDataResponse.java b/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleDataResponse.java
index d955f22d..e31915cf 100644
--- a/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleDataResponse.java
+++ b/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleDataResponse.java
@@ -18,45 +18,46 @@
package org.apache.uniffle.common.netty.protocol;
import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import org.apache.uniffle.common.netty.buffer.FileSegmentManagedBuffer;
+import org.apache.uniffle.common.netty.buffer.ManagedBuffer;
+import org.apache.uniffle.common.netty.buffer.NettyManagedBuffer;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.ByteBufUtils;
-public class GetLocalShuffleDataResponse extends RpcResponse {
- private ByteBuf data;
+public class GetLocalShuffleDataResponse extends RpcResponse implements Transferable {
- public GetLocalShuffleDataResponse(long requestId, StatusCode statusCode, byte[] data) {
- this(requestId, statusCode, null, data);
- }
-
- public GetLocalShuffleDataResponse(long requestId, StatusCode statusCode, String retMessage, byte[] data) {
- this(requestId, statusCode, retMessage, Unpooled.wrappedBuffer(data));
- }
+ private ManagedBuffer buffer;
- public GetLocalShuffleDataResponse(long requestId, StatusCode statusCode, String retMessage, ByteBuf data) {
+ public GetLocalShuffleDataResponse(long requestId, StatusCode statusCode, String retMessage, ManagedBuffer data) {
super(requestId, statusCode, retMessage);
- this.data = data;
+ this.buffer = data;
}
@Override
public int encodedLength() {
- return super.encodedLength() + Integer.BYTES + data.readableBytes();
+ return super.encodedLength() + Integer.BYTES + buffer.size();
}
@Override
public void encode(ByteBuf buf) {
super.encode(buf);
- ByteBufUtils.copyByteBuf(data, buf);
- data.release();
+ if (buffer instanceof FileSegmentManagedBuffer) {
+ buf.writeInt(buffer.size());
+ } else {
+ ByteBufUtils.copyByteBuf(buffer.byteBuf(), buf);
+ buffer.release();
+ }
}
+
public static GetLocalShuffleDataResponse decode(ByteBuf byteBuf) {
long requestId = byteBuf.readLong();
StatusCode statusCode = StatusCode.fromCode(byteBuf.readInt());
String retMessage = ByteBufUtils.readLengthAndString(byteBuf);
ByteBuf data = ByteBufUtils.readSlice(byteBuf);
- return new GetLocalShuffleDataResponse(requestId, statusCode, retMessage, data);
+ return new GetLocalShuffleDataResponse(requestId, statusCode, retMessage, new NettyManagedBuffer(data));
}
@Override
@@ -64,7 +65,17 @@ public class GetLocalShuffleDataResponse extends RpcResponse {
return Type.GET_LOCAL_SHUFFLE_DATA_RESPONSE;
}
- public ByteBuf getData() {
- return data;
+ public ManagedBuffer getBuffer() {
+ return buffer;
+ }
+
+ public Object getData() {
+ return buffer.convertToNetty();
+ }
+
+ @Override
+ public void transferTo(Channel channel) {
+ channel.write(buffer.convertToNetty());
+ buffer.release();
}
}
diff --git a/common/src/main/java/org/apache/uniffle/common/netty/protocol/Transferable.java b/common/src/main/java/org/apache/uniffle/common/netty/protocol/Transferable.java
new file mode 100644
index 00000000..d0fc8ac9
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/netty/protocol/Transferable.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.netty.protocol;
+
+import io.netty.channel.Channel;
+
+public interface Transferable {
+
+ void transferTo(Channel channel);
+}
diff --git a/common/src/test/java/org/apache/uniffle/common/netty/protocol/NettyProtocolTest.java b/common/src/test/java/org/apache/uniffle/common/netty/protocol/NettyProtocolTest.java
index 419c70d9..1e86aff6 100644
--- a/common/src/test/java/org/apache/uniffle/common/netty/protocol/NettyProtocolTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/netty/protocol/NettyProtocolTest.java
@@ -31,6 +31,7 @@ import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.netty.buffer.NettyManagedBuffer;
import org.apache.uniffle.common.rpc.StatusCode;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -116,7 +117,8 @@ public class NettyProtocolTest {
public void testGetLocalShuffleDataResponse() {
byte[] data = new byte[]{1, 2, 3};
GetLocalShuffleDataResponse getLocalShuffleDataResponse =
- new GetLocalShuffleDataResponse(1, StatusCode.SUCCESS, "", Unpooled.wrappedBuffer(data).retain());
+ new GetLocalShuffleDataResponse(1, StatusCode.SUCCESS, "",
+ new NettyManagedBuffer(Unpooled.wrappedBuffer(data).retain()));
int encodeLength = getLocalShuffleDataResponse.encodedLength();
ByteBuf byteBuf = Unpooled.buffer(encodeLength, encodeLength);
getLocalShuffleDataResponse.encode(byteBuf);
diff --git a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
index eb50665c..142bb155 100644
--- a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
+++ b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
@@ -225,7 +225,7 @@ public class ShuffleServerGrpcNettyClient extends ShuffleServerGrpcClient {
case SUCCESS:
return new RssGetShuffleDataResponse(
StatusCode.SUCCESS,
- getLocalShuffleDataResponse.getData().nioBuffer());
+ getLocalShuffleDataResponse.getBuffer().nioByteBuffer());
default:
String msg = "Can't get shuffle data from " + host + ":" + port
+ " for " + requestInfo + ", errorMsg:" + getLocalShuffleDataResponse.getRetMessage();
diff --git a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
index b87d57a3..145b2e2f 100644
--- a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
+++ b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
@@ -36,6 +36,7 @@ import org.apache.uniffle.common.ShufflePartitionedData;
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.exception.FileNotFoundException;
import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.netty.buffer.NettyManagedBuffer;
import org.apache.uniffle.common.netty.client.TransportClient;
import org.apache.uniffle.common.netty.handle.BaseMessageHandler;
import org.apache.uniffle.common.netty.protocol.GetLocalShuffleDataRequest;
@@ -356,12 +357,13 @@ public class ShuffleServerNettyHandler implements BaseMessageHandler {
LOG.info("Successfully getShuffleData cost {} ms for shuffle"
+ " data with {}", readTime, requestInfo);
response = new GetLocalShuffleDataResponse(req.getRequestId(),
- status, msg, sdr.getDataBuf());
+ status, msg, sdr.getManagedBuffer());
} catch (Exception e) {
status = StatusCode.INTERNAL_ERROR;
msg = "Error happened when get shuffle data for " + requestInfo + ", " + e.getMessage();
LOG.error(msg, e);
- response = new GetLocalShuffleDataResponse(req.getRequestId(), status, msg, Unpooled.EMPTY_BUFFER);
+ response = new GetLocalShuffleDataResponse(req.getRequestId(), status, msg,
+ new NettyManagedBuffer(Unpooled.EMPTY_BUFFER));
} finally {
shuffleServer.getShuffleBufferManager().releaseReadMemory(length);
}
@@ -369,7 +371,8 @@ public class ShuffleServerNettyHandler implements BaseMessageHandler {
status = StatusCode.INTERNAL_ERROR;
msg = "Can't require memory to get shuffle data";
LOG.error(msg + " for " + requestInfo);
- response = new GetLocalShuffleDataResponse(req.getRequestId(), status, msg, Unpooled.EMPTY_BUFFER);
+ response = new GetLocalShuffleDataResponse(req.getRequestId(), status, msg,
+ new NettyManagedBuffer(Unpooled.EMPTY_BUFFER));
}
client.sendRpcSync(response, RPC_TIMEOUT);
}