You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hbase.apache.org by GitBox <gi...@apache.org> on 2022/07/07 09:05:48 UTC

[GitHub] [hbase] Apache9 commented on a diff in pull request #4597: HBASE-27180 Fix multiple possible buffer leaks

Apache9 commented on code in PR #4597:
URL: https://github.com/apache/hbase/pull/4597#discussion_r915640922


##########
hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java:
##########
@@ -662,7 +662,8 @@ public void flush(ChannelHandlerContext ctx) throws Exception {
     }
 
     @Override
-    public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
+    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {

Review Comment:
   Good.



##########
hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java:
##########
@@ -429,6 +429,9 @@ private void flushBuffer(CompletableFuture<Long> future, ByteBuf dataBuf,
       future.completeExceptionally(new IOException("stream already broken"));
       // it's the one we have just pushed or just a no-op
       waitingAckQueue.removeFirst();
+
+      // This method takes ownership of the dataBuf so we need release it before returning.
+      dataBuf.release();

Review Comment:
   I guess here we also need to release the headerBuf and checksumBuf?
   
   So maybe we declare these 3 ByteBufs at the top of the method, and use a big try finally to hold the whole method, and in the finally block, we release these 3 ByteBufs? Like this:
   
   ```
     private void flushBuffer(CompletableFuture<Long> future, ByteBuf dataBuf,
       long nextPacketOffsetInBlock, boolean syncBlock) {
       ByteBuf checksumBuf = null;
       ByteBuf headerBuf = null;
       try {
         int dataLen = dataBuf.readableBytes();
         int chunkLen = summer.getBytesPerChecksum();
         int trailingPartialChunkLen = dataLen % chunkLen;
         int numChecks = dataLen / chunkLen + (trailingPartialChunkLen != 0 ? 1 : 0);
         int checksumLen = numChecks * summer.getChecksumSize();
         checksumBuf = alloc.directBuffer(checksumLen);
         summer.calculateChunkedSums(dataBuf.nioBuffer(), checksumBuf.nioBuffer(0, checksumLen));
         checksumBuf.writerIndex(checksumLen);
         PacketHeader header = new PacketHeader(4 + checksumLen + dataLen, nextPacketOffsetInBlock,
           nextPacketSeqno, false, dataLen, syncBlock);
         int headerLen = header.getSerializedSize();
         headerBuf = alloc.buffer(headerLen);
         header.putInBuffer(headerBuf.nioBuffer(0, headerLen));
         headerBuf.writerIndex(headerLen);
         Callback c =
           new Callback(future, nextPacketOffsetInBlock + dataLen, datanodeInfoMap.keySet(), dataLen);
         waitingAckQueue.addLast(c);
         // recheck again after we pushed the callback to queue
         if (state != State.STREAMING && waitingAckQueue.peekFirst() == c) {
           future.completeExceptionally(new IOException("stream already broken"));
           // it's the one we have just pushed or just a no-op
           waitingAckQueue.removeFirst();
           return;
         }
         // TODO: we should perhaps measure time taken per DN here;
         // we could collect statistics per DN, and/or exclude bad nodes in createOutput.
         for (Channel ch : datanodeInfoMap.keySet()) {
           ch.write(headerBuf.retainedDuplicate());
           ch.write(checksumBuf.retainedDuplicate());
           ch.writeAndFlush(dataBuf.retainedDuplicate());
         }
         nextPacketSeqno++;
       } finally {
         ReferenceCountUtil.safeRelease(checksumBuf);
         ReferenceCountUtil.safeRelease(headerBuf);
         ReferenceCountUtil.safeRelease(dataBuf);
       }
     }
   ```



##########
hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java:
##########
@@ -113,6 +115,8 @@ private MultiByteBuff(RefCnt refCnt, ByteBuffer[] items, int[] itemBeginPos, int
     this.limit = limit;
     this.limitedItemIndex = limitedIndex;
     this.markedItemIndex = markedIndex;
+    // Touch the reference count it's easier to debug leaks.
+    refCnt.touch();

Review Comment:
   Ditto.



##########
hbase-common/src/main/java/org/apache/hadoop/hbase/nio/MultiByteBuff.java:
##########
@@ -101,6 +101,8 @@ public MultiByteBuff(Recycler recycler, ByteBuffer... items) {
     this.limit = offset;
     this.itemBeginPos[items.length] = offset + 1;
     this.limitedItemIndex = this.items.length - 1;
+    // Touch the reference count it's easier to debug leaks.
+    refCnt.touch();

Review Comment:
   In Viraj's test this will introduce problems... So I think we can delete it here, and open another issue for better supporting leak detection?



##########
hbase-common/src/main/java/org/apache/hadoop/hbase/nio/SingleByteBuff.java:
##########
@@ -64,6 +64,8 @@ public SingleByteBuff(Recycler recycler, ByteBuffer buf) {
     } else {
       this.unsafeOffset = UnsafeAccess.directBufferAddress(buf);
     }
+    // Touch the reference count it's easier to debug leaks.
+    refCnt.touch();

Review Comment:
   Ditto.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@hbase.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org