You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2023/06/14 03:59:26 UTC

[incubator-uniffle] branch master updated: [#590][part-1] ManagedBuffer instead ByteBuf to hold ShuffleData (#906)

This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new b8f17bf5 [#590][part-1] ManagedBuffer instead ByteBuf to hold ShuffleData  (#906)
b8f17bf5 is described below

commit b8f17bf588aa2333aa2f4c4d97d5f6b149a63bcd
Author: xumanbu <ma...@163.com>
AuthorDate: Wed Jun 14 11:59:20 2023 +0800

    [#590][part-1] ManagedBuffer instead ByteBuf to hold ShuffleData  (#906)
    
    ### What changes were proposed in this pull request?
    
    part-1:
    1. add a ManagedBuffer instead ByteBuf to hold ShuffleData.
    2. ShuffleResultData & GetLocalShuffleDataResponse use ManagedBuffer instead of ByteBuf
    3. MessageEncoder write support sendfile
    
    
    ### Why are the changes needed?
    Fix: #590
    
    ### Does this PR introduce _any_ user-facing change?
    
    (Please list the user-facing changes introduced by your change, including
    No.
    
    ### How was this patch tested?
    Integration Testing
    
    Co-authored-by: jam.xu <ja...@vipshop.com>
---
 .../apache/uniffle/common/ShuffleDataResult.java   |  31 +++++--
 .../uniffle/common/netty/MessageEncoder.java       |   6 +-
 .../netty/buffer/FileSegmentManagedBuffer.java     | 103 +++++++++++++++++++++
 .../uniffle/common/netty/buffer/ManagedBuffer.java |  39 ++++++++
 .../common/netty/buffer/NettyManagedBuffer.java    |  58 ++++++++++++
 .../protocol/GetLocalShuffleDataResponse.java      |  47 ++++++----
 .../common/netty/protocol/Transferable.java        |  25 +++++
 .../common/netty/protocol/NettyProtocolTest.java   |   4 +-
 .../impl/grpc/ShuffleServerGrpcNettyClient.java    |   2 +-
 .../server/netty/ShuffleServerNettyHandler.java    |   9 +-
 10 files changed, 290 insertions(+), 34 deletions(-)

diff --git a/common/src/main/java/org/apache/uniffle/common/ShuffleDataResult.java b/common/src/main/java/org/apache/uniffle/common/ShuffleDataResult.java
index 98a8b1d7..2000d2e8 100644
--- a/common/src/main/java/org/apache/uniffle/common/ShuffleDataResult.java
+++ b/common/src/main/java/org/apache/uniffle/common/ShuffleDataResult.java
@@ -24,11 +24,13 @@ import com.google.common.collect.Lists;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 
+import org.apache.uniffle.common.netty.buffer.ManagedBuffer;
+import org.apache.uniffle.common.netty.buffer.NettyManagedBuffer;
 import org.apache.uniffle.common.util.ByteBufUtils;
 
 public class ShuffleDataResult {
 
-  private final ByteBuf data;
+  private final ManagedBuffer buffer;
   private final List<BufferSegment> bufferSegments;
 
   public ShuffleDataResult() {
@@ -39,13 +41,18 @@ public class ShuffleDataResult {
     this(data, Lists.newArrayList());
   }
 
+  public ShuffleDataResult(ManagedBuffer buffer) {
+    this.buffer = buffer;
+    this.bufferSegments = Lists.newArrayList();
+  }
+
   public ShuffleDataResult(ByteBuffer data, List<BufferSegment> bufferSegments) {
-    this.data = data != null ? Unpooled.wrappedBuffer(data) : Unpooled.EMPTY_BUFFER;
+    this.buffer = new NettyManagedBuffer(data != null ? Unpooled.wrappedBuffer(data) : Unpooled.EMPTY_BUFFER);
     this.bufferSegments = bufferSegments;
   }
 
   public ShuffleDataResult(ByteBuf data, List<BufferSegment> bufferSegments) {
-    this.data = data;
+    this.buffer = new NettyManagedBuffer(data);
     this.bufferSegments = bufferSegments;
   }
 
@@ -54,21 +61,25 @@ public class ShuffleDataResult {
   }
 
   public byte[] getData() {
-    if (data == null) {
+    if (buffer == null) {
       return null;
     }
-    if (data.hasArray()) {
-      return data.array();
+    if (buffer.nioByteBuffer().hasArray()) {
+      return buffer.nioByteBuffer().array();
     }
-    return ByteBufUtils.readBytes(data);
+    return ByteBufUtils.readBytes(buffer.byteBuf());
   }
 
   public ByteBuf getDataBuf() {
-    return data;
+    return buffer.byteBuf();
   }
 
   public ByteBuffer getDataBuffer() {
-    return data.nioBuffer();
+    return buffer.nioByteBuffer();
+  }
+
+  public ManagedBuffer getManagedBuffer() {
+    return buffer;
   }
 
   public List<BufferSegment> getBufferSegments() {
@@ -76,6 +87,6 @@ public class ShuffleDataResult {
   }
 
   public boolean isEmpty() {
-    return bufferSegments == null || bufferSegments.isEmpty() || data == null || data.capacity() == 0;
+    return bufferSegments == null || bufferSegments.isEmpty() || buffer == null || buffer.size() == 0;
   }
 }
diff --git a/common/src/main/java/org/apache/uniffle/common/netty/MessageEncoder.java b/common/src/main/java/org/apache/uniffle/common/netty/MessageEncoder.java
index e3537ecd..354300a9 100644
--- a/common/src/main/java/org/apache/uniffle/common/netty/MessageEncoder.java
+++ b/common/src/main/java/org/apache/uniffle/common/netty/MessageEncoder.java
@@ -26,6 +26,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.netty.protocol.Message;
+import org.apache.uniffle.common.netty.protocol.Transferable;
 
 /**
  * Encoder used by the server side to encode server-to-client responses.
@@ -46,7 +47,6 @@ public class MessageEncoder extends ChannelOutboundHandlerAdapter {
 
   @Override
   public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
-    // todo: support zero copy
     Message message = (Message) msg;
     int encodeLength = message.encodedLength();
     ByteBuf byteBuf = ctx.alloc().buffer(FrameDecoder.HEADER_SIZE + encodeLength);
@@ -59,5 +59,9 @@ public class MessageEncoder extends ChannelOutboundHandlerAdapter {
       byteBuf.release();
     }
     ctx.writeAndFlush(byteBuf);
+    // do transferTo send data after encode buffer send.
+    if (message instanceof Transferable) {
+      ((Transferable) message).transferTo(ctx.channel());
+    }
   }
 }
diff --git a/common/src/main/java/org/apache/uniffle/common/netty/buffer/FileSegmentManagedBuffer.java b/common/src/main/java/org/apache/uniffle/common/netty/buffer/FileSegmentManagedBuffer.java
new file mode 100644
index 00000000..9e80756e
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/netty/buffer/FileSegmentManagedBuffer.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.netty.buffer;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.DefaultFileRegion;
+
+import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.JavaUtils;
+
+public class FileSegmentManagedBuffer extends ManagedBuffer {
+
+  private final File file;
+  private final int offset;
+  private final int length;
+
+  public FileSegmentManagedBuffer(File file, int offset, int length) {
+    this.file = file;
+    this.offset = offset;
+    this.length = length;
+  }
+
+  @Override
+  public int size() {
+    return length;
+  }
+
+  @Override
+  public ByteBuf byteBuf() {
+    return Unpooled.wrappedBuffer(this.nioByteBuffer());
+  }
+
+  @Override
+  public ByteBuffer nioByteBuffer() {
+    FileChannel channel = null;
+    try {
+      channel = new RandomAccessFile(file, "r").getChannel();
+      ByteBuffer buf = ByteBuffer.allocate(length);
+      channel.position(offset);
+      while (buf.remaining() != 0) {
+        if (channel.read(buf) == -1) {
+          throw new IOException(
+              String.format("Reached EOF before filling buffer.offset=%s,file=%s,buf.remaining=%s",
+              offset, file.getAbsoluteFile(), buf.remaining()));
+        }
+      }
+      buf.flip();
+      return buf;
+    } catch (IOException e) {
+      String errorMessage = "Error in reading " + this;
+      try {
+        if (channel != null) {
+          long size = channel.size();
+          errorMessage = "Error in reading " + this + " (actual file length " + size + ")";
+        }
+      } catch (IOException ignored) {
+        // ignore
+      }
+      throw new RssException(errorMessage, e);
+    } finally {
+      JavaUtils.closeQuietly(channel);
+    }
+  }
+
+  @Override
+  public ManagedBuffer release() {
+    return this;
+  }
+
+  @Override
+  public Object convertToNetty() {
+    FileChannel fileChannel;
+    try {
+      fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ);
+    } catch (IOException e) {
+      throw new RssException("Error in reading " + file);
+    }
+    return new DefaultFileRegion(fileChannel, offset, length);
+  }
+}
diff --git a/common/src/main/java/org/apache/uniffle/common/netty/buffer/ManagedBuffer.java b/common/src/main/java/org/apache/uniffle/common/netty/buffer/ManagedBuffer.java
new file mode 100644
index 00000000..ed15640e
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/netty/buffer/ManagedBuffer.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.netty.buffer;
+
+import java.nio.ByteBuffer;
+
+import io.netty.buffer.ByteBuf;
+
+public abstract class ManagedBuffer {
+
+  public abstract int size();
+
+  public abstract ByteBuf byteBuf();
+
+  public abstract ByteBuffer nioByteBuffer();
+
+  public abstract ManagedBuffer release();
+
+  /**
+   * Convert the buffer into an Netty object, used to write the data out. The return value is either
+   * a {@link io.netty.buffer.ByteBuf} or a {@link io.netty.channel.FileRegion}.
+   */
+  public abstract Object convertToNetty();
+}
diff --git a/common/src/main/java/org/apache/uniffle/common/netty/buffer/NettyManagedBuffer.java b/common/src/main/java/org/apache/uniffle/common/netty/buffer/NettyManagedBuffer.java
new file mode 100644
index 00000000..4cc6686a
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/netty/buffer/NettyManagedBuffer.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.netty.buffer;
+
+import java.nio.ByteBuffer;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+public class NettyManagedBuffer extends ManagedBuffer {
+
+  private ByteBuf buf;
+
+  public NettyManagedBuffer(ByteBuf byteBuf) {
+    this.buf = byteBuf;
+  }
+
+  @Override
+  public int size() {
+    return buf.readableBytes();
+  }
+
+  @Override
+  public ByteBuf byteBuf() {
+    return Unpooled.wrappedBuffer(this.nioByteBuffer());
+  }
+
+  @Override
+  public ByteBuffer nioByteBuffer() {
+    return buf.nioBuffer();
+  }
+
+  @Override
+  public ManagedBuffer release() {
+    buf.release();
+    return this;
+  }
+
+  @Override
+  public Object convertToNetty() {
+    return buf.duplicate().retain();
+  }
+}
diff --git a/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleDataResponse.java b/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleDataResponse.java
index d955f22d..e31915cf 100644
--- a/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleDataResponse.java
+++ b/common/src/main/java/org/apache/uniffle/common/netty/protocol/GetLocalShuffleDataResponse.java
@@ -18,45 +18,46 @@
 package org.apache.uniffle.common.netty.protocol;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
 
+import org.apache.uniffle.common.netty.buffer.FileSegmentManagedBuffer;
+import org.apache.uniffle.common.netty.buffer.ManagedBuffer;
+import org.apache.uniffle.common.netty.buffer.NettyManagedBuffer;
 import org.apache.uniffle.common.rpc.StatusCode;
 import org.apache.uniffle.common.util.ByteBufUtils;
 
-public class GetLocalShuffleDataResponse extends RpcResponse {
-  private ByteBuf data;
+public class GetLocalShuffleDataResponse extends RpcResponse implements Transferable {
 
-  public GetLocalShuffleDataResponse(long requestId, StatusCode statusCode, byte[] data) {
-    this(requestId, statusCode, null, data);
-  }
-
-  public GetLocalShuffleDataResponse(long requestId, StatusCode statusCode, String retMessage, byte[] data) {
-    this(requestId, statusCode, retMessage, Unpooled.wrappedBuffer(data));
-  }
+  private ManagedBuffer buffer;
 
-  public GetLocalShuffleDataResponse(long requestId, StatusCode statusCode, String retMessage, ByteBuf data) {
+  public GetLocalShuffleDataResponse(long requestId, StatusCode statusCode, String retMessage, ManagedBuffer data) {
     super(requestId, statusCode, retMessage);
-    this.data = data;
+    this.buffer = data;
   }
 
   @Override
   public int encodedLength() {
-    return super.encodedLength() + Integer.BYTES + data.readableBytes();
+    return super.encodedLength() + Integer.BYTES + buffer.size();
   }
 
   @Override
   public void encode(ByteBuf buf) {
     super.encode(buf);
-    ByteBufUtils.copyByteBuf(data, buf);
-    data.release();
+    if (buffer instanceof FileSegmentManagedBuffer) {
+      buf.writeInt(buffer.size());
+    } else {
+      ByteBufUtils.copyByteBuf(buffer.byteBuf(), buf);
+      buffer.release();
+    }
   }
 
+
   public static GetLocalShuffleDataResponse decode(ByteBuf byteBuf) {
     long requestId = byteBuf.readLong();
     StatusCode statusCode = StatusCode.fromCode(byteBuf.readInt());
     String retMessage = ByteBufUtils.readLengthAndString(byteBuf);
     ByteBuf data = ByteBufUtils.readSlice(byteBuf);
-    return new GetLocalShuffleDataResponse(requestId, statusCode, retMessage, data);
+    return new GetLocalShuffleDataResponse(requestId, statusCode, retMessage, new NettyManagedBuffer(data));
   }
 
   @Override
@@ -64,7 +65,17 @@ public class GetLocalShuffleDataResponse extends RpcResponse {
     return Type.GET_LOCAL_SHUFFLE_DATA_RESPONSE;
   }
 
-  public ByteBuf getData() {
-    return data;
+  public ManagedBuffer getBuffer() {
+    return buffer;
+  }
+
+  public Object getData() {
+    return buffer.convertToNetty();
+  }
+
+  @Override
+  public void transferTo(Channel channel) {
+    channel.write(buffer.convertToNetty());
+    buffer.release();
   }
 }
diff --git a/common/src/main/java/org/apache/uniffle/common/netty/protocol/Transferable.java b/common/src/main/java/org/apache/uniffle/common/netty/protocol/Transferable.java
new file mode 100644
index 00000000..d0fc8ac9
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/netty/protocol/Transferable.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.netty.protocol;
+
+import io.netty.channel.Channel;
+
+public interface Transferable {
+
+  void transferTo(Channel channel);
+}
diff --git a/common/src/test/java/org/apache/uniffle/common/netty/protocol/NettyProtocolTest.java b/common/src/test/java/org/apache/uniffle/common/netty/protocol/NettyProtocolTest.java
index 419c70d9..1e86aff6 100644
--- a/common/src/test/java/org/apache/uniffle/common/netty/protocol/NettyProtocolTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/netty/protocol/NettyProtocolTest.java
@@ -31,6 +31,7 @@ import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.apache.uniffle.common.BufferSegment;
 import org.apache.uniffle.common.ShuffleBlockInfo;
 import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.netty.buffer.NettyManagedBuffer;
 import org.apache.uniffle.common.rpc.StatusCode;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -116,7 +117,8 @@ public class NettyProtocolTest {
   public void testGetLocalShuffleDataResponse() {
     byte[] data = new byte[]{1, 2, 3};
     GetLocalShuffleDataResponse getLocalShuffleDataResponse =
-        new GetLocalShuffleDataResponse(1, StatusCode.SUCCESS, "", Unpooled.wrappedBuffer(data).retain());
+        new GetLocalShuffleDataResponse(1, StatusCode.SUCCESS, "",
+            new NettyManagedBuffer(Unpooled.wrappedBuffer(data).retain()));
     int encodeLength = getLocalShuffleDataResponse.encodedLength();
     ByteBuf byteBuf = Unpooled.buffer(encodeLength, encodeLength);
     getLocalShuffleDataResponse.encode(byteBuf);
diff --git a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
index eb50665c..142bb155 100644
--- a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
+++ b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
@@ -225,7 +225,7 @@ public class ShuffleServerGrpcNettyClient extends ShuffleServerGrpcClient {
       case SUCCESS:
         return new RssGetShuffleDataResponse(
             StatusCode.SUCCESS,
-            getLocalShuffleDataResponse.getData().nioBuffer());
+            getLocalShuffleDataResponse.getBuffer().nioByteBuffer());
       default:
         String msg = "Can't get shuffle data from " + host + ":" + port
                          + " for " + requestInfo + ", errorMsg:" + getLocalShuffleDataResponse.getRetMessage();
diff --git a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
index b87d57a3..145b2e2f 100644
--- a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
+++ b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
@@ -36,6 +36,7 @@ import org.apache.uniffle.common.ShufflePartitionedData;
 import org.apache.uniffle.common.config.RssBaseConf;
 import org.apache.uniffle.common.exception.FileNotFoundException;
 import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.netty.buffer.NettyManagedBuffer;
 import org.apache.uniffle.common.netty.client.TransportClient;
 import org.apache.uniffle.common.netty.handle.BaseMessageHandler;
 import org.apache.uniffle.common.netty.protocol.GetLocalShuffleDataRequest;
@@ -356,12 +357,13 @@ public class ShuffleServerNettyHandler implements BaseMessageHandler {
         LOG.info("Successfully getShuffleData cost {} ms for shuffle"
             + " data with {}", readTime, requestInfo);
         response = new GetLocalShuffleDataResponse(req.getRequestId(),
-            status, msg, sdr.getDataBuf());
+            status, msg, sdr.getManagedBuffer());
       } catch (Exception e) {
         status = StatusCode.INTERNAL_ERROR;
         msg = "Error happened when get shuffle data for " + requestInfo + ", " + e.getMessage();
         LOG.error(msg, e);
-        response = new GetLocalShuffleDataResponse(req.getRequestId(), status, msg, Unpooled.EMPTY_BUFFER);
+        response = new GetLocalShuffleDataResponse(req.getRequestId(), status, msg,
+            new NettyManagedBuffer(Unpooled.EMPTY_BUFFER));
       } finally {
         shuffleServer.getShuffleBufferManager().releaseReadMemory(length);
       }
@@ -369,7 +371,8 @@ public class ShuffleServerNettyHandler implements BaseMessageHandler {
       status = StatusCode.INTERNAL_ERROR;
       msg = "Can't require memory to get shuffle data";
       LOG.error(msg + " for " + requestInfo);
-      response = new GetLocalShuffleDataResponse(req.getRequestId(), status, msg, Unpooled.EMPTY_BUFFER);
+      response = new GetLocalShuffleDataResponse(req.getRequestId(), status, msg,
+          new NettyManagedBuffer(Unpooled.EMPTY_BUFFER));
     }
     client.sendRpcSync(response, RPC_TIMEOUT);
   }