You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by ch...@apache.org on 2022/12/08 14:38:24 UTC

[bookkeeper] 07/08: module distributedlog-common/distributedlog-protocol: refactor ByteBuf release usage (#3693)

This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-4.14
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit 8e352d4abea2fee63d3eb807528fdf370d3364a9
Author: StevenLuMT <ls...@126.com>
AuthorDate: Wed Dec 7 19:48:50 2022 +0800

    module distributedlog-common/distributedlog-protocol: refactor ByteBuf release usage (#3693)
    
    Co-authored-by: lushiji <lu...@didiglobal.com>
    (cherry picked from commit a71c7a7e681e9908c04245d3d719d8f1c0a79176)
---
 .../org/apache/distributedlog/io/TestCompressionCodec.java  | 13 +++++++------
 .../org/apache/distributedlog/EnvelopedRecordSetReader.java |  9 +++++----
 .../org/apache/distributedlog/EnvelopedRecordSetWriter.java |  8 ++++----
 .../src/main/java/org/apache/distributedlog/LogRecord.java  |  3 ++-
 4 files changed, 18 insertions(+), 15 deletions(-)

diff --git a/stream/distributedlog/common/src/test/java/org/apache/distributedlog/io/TestCompressionCodec.java b/stream/distributedlog/common/src/test/java/org/apache/distributedlog/io/TestCompressionCodec.java
index c8edd99e60..fa60f95d94 100644
--- a/stream/distributedlog/common/src/test/java/org/apache/distributedlog/io/TestCompressionCodec.java
+++ b/stream/distributedlog/common/src/test/java/org/apache/distributedlog/io/TestCompressionCodec.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertEquals;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.buffer.Unpooled;
+import io.netty.util.ReferenceCountUtil;
 import java.nio.ByteBuffer;
 import org.junit.Test;
 
@@ -70,9 +71,9 @@ public class TestCompressionCodec {
         decompressedBuf.readBytes(decompressedData);
         assertArrayEquals("The decompressed bytes should be same as the original bytes",
                 data, decompressedData);
-        buf.release();
-        compressedBuf.release();
-        decompressedBuf.release();
+        ReferenceCountUtil.safeRelease(buf);
+        ReferenceCountUtil.safeRelease(compressedBuf);
+        ReferenceCountUtil.safeRelease(decompressedBuf);
     }
 
     private void testCompressionCodec2(CompressionCodec codec) throws Exception {
@@ -93,9 +94,9 @@ public class TestCompressionCodec {
         byte[] decompressedData = new byte[decompressedBuf.readableBytes()];
         decompressedBuf.slice().readBytes(decompressedData);
 
-        buffer.release();
-        compressedBuf.release();
-        decompressedBuf.release();
+        ReferenceCountUtil.safeRelease(buffer);
+        ReferenceCountUtil.safeRelease(compressedBuf);
+        ReferenceCountUtil.safeRelease(decompressedBuf);
     }
 
 }
diff --git a/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetReader.java b/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetReader.java
index 83a950ed7c..fd2bbd4905 100644
--- a/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetReader.java
+++ b/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetReader.java
@@ -22,6 +22,7 @@ import static org.apache.distributedlog.LogRecordSet.METADATA_VERSION_MASK;
 import static org.apache.distributedlog.LogRecordSet.VERSION;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.util.ReferenceCountUtil;
 import java.io.IOException;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.distributedlog.io.CompressionCodec;
@@ -80,10 +81,10 @@ class EnvelopedRecordSetReader implements LogRecordSet.Reader {
             CompressionCodec codec = CompressionUtils.getCompressionCodec(Type.of(codecCode));
             this.reader = codec.decompress(compressedBuf, decompressedDataLen);
         } finally {
-            compressedBuf.release();
+            ReferenceCountUtil.safeRelease(compressedBuf);
         }
         if (numRecords == 0) {
-            this.reader.release();
+            ReferenceCountUtil.safeRelease(this.reader);
         }
     }
 
@@ -110,7 +111,7 @@ class EnvelopedRecordSetReader implements LogRecordSet.Reader {
 
         // release the record set buffer when exhausting the reader
         if (0 == numRecords) {
-            this.reader.release();
+            ReferenceCountUtil.safeRelease(this.reader);
         }
 
         return record;
@@ -120,7 +121,7 @@ class EnvelopedRecordSetReader implements LogRecordSet.Reader {
     public void release() {
         if (0 != numRecords) {
             numRecords = 0;
-            reader.release();
+            ReferenceCountUtil.safeRelease(reader);
         }
     }
 }
diff --git a/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetWriter.java b/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetWriter.java
index ea1824ef51..78d19c3989 100644
--- a/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetWriter.java
+++ b/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetWriter.java
@@ -157,14 +157,14 @@ class EnvelopedRecordSetWriter implements LogRecordSet.Writer {
     @Override
     public synchronized void completeTransmit(long lssn, long entryId, long startSlotId) {
         satisfyPromises(lssn, entryId, startSlotId);
-        buffer.release();
-        ReferenceCountUtil.release(recordSetBuffer);
+        ReferenceCountUtil.safeRelease(buffer);
+        ReferenceCountUtil.safeRelease(recordSetBuffer);
     }
 
     @Override
     public synchronized void abortTransmit(Throwable reason) {
         cancelPromises(reason);
-        buffer.release();
-        ReferenceCountUtil.release(recordSetBuffer);
+        ReferenceCountUtil.safeRelease(buffer);
+        ReferenceCountUtil.safeRelease(recordSetBuffer);
     }
 }
diff --git a/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/LogRecord.java b/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/LogRecord.java
index 63b694aa31..ee98bf5f32 100644
--- a/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/LogRecord.java
+++ b/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/LogRecord.java
@@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufInputStream;
 import io.netty.buffer.Unpooled;
+import io.netty.util.ReferenceCountUtil;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
@@ -225,7 +226,7 @@ public class LogRecord {
 
     void setPayloadBuf(ByteBuf payload, boolean copyData) {
         if (null != this.payload) {
-            this.payload.release();
+            ReferenceCountUtil.safeRelease(this.payload);
         }
         if (copyData) {
             this.payload = Unpooled.copiedBuffer(payload);