You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by ch...@apache.org on 2022/12/08 14:38:22 UTC

[bookkeeper] 05/08: module distributedlog-core: refactor ByteBuf release usage (#3691)

This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-4.14
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit eaf399629e0fc78c56e8e208bb9b5efc40b943d4
Author: StevenLuMT <ls...@126.com>
AuthorDate: Tue Dec 6 15:21:21 2022 +0800

    module distributedlog-core: refactor ByteBuf release usage (#3691)
    
    Co-authored-by: lushiji <lu...@didiglobal.com>
    (cherry picked from commit 7ae5a04a83984f1731781c7892613e8932d2f62e)
---
 .../src/main/java/org/apache/bookkeeper/client/LedgerReader.java | 3 ++-
 .../src/main/java/org/apache/distributedlog/EnvelopedEntry.java  | 3 ++-
 .../java/org/apache/distributedlog/EnvelopedEntryReader.java     | 3 ++-
 .../java/org/apache/distributedlog/EnvelopedEntryWriter.java     | 8 ++++----
 .../java/org/apache/distributedlog/tools/DistributedLogTool.java | 7 ++++---
 .../core/src/test/java/org/apache/distributedlog/TestEntry.java  | 9 +++++----
 6 files changed, 19 insertions(+), 14 deletions(-)

diff --git a/stream/distributedlog/core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java b/stream/distributedlog/core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java
index f5850bf0ba..4fc3d05be2 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java
@@ -19,6 +19,7 @@ package org.apache.bookkeeper.client;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import io.netty.util.ReferenceCountUtil;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -110,7 +111,7 @@ public class LedgerReader {
                             eid, BKException.Code.DigestMatchException, null, bookieAddress.getSocketAddress());
 
                     } finally {
-                        buffer.release();
+                        ReferenceCountUtil.safeRelease(buffer);
                     }
                 }
                 readResults.add(rr);
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/EnvelopedEntry.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/EnvelopedEntry.java
index 0e17929300..e0ccef949d 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/EnvelopedEntry.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/EnvelopedEntry.java
@@ -18,6 +18,7 @@
 package org.apache.distributedlog;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.util.ReferenceCountUtil;
 import java.io.IOException;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.distributedlog.io.CompressionCodec;
@@ -102,7 +103,7 @@ class EnvelopedEntry {
             CompressionCodec codec = CompressionUtils.getCompressionCodec(Type.of(codecCode));
             decompressedBuf = codec.decompress(compressedBuf, originDataLen);
         } finally {
-            compressedBuf.release();
+            ReferenceCountUtil.safeRelease(compressedBuf);
         }
         return decompressedBuf;
     }
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/EnvelopedEntryReader.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/EnvelopedEntryReader.java
index 82656bab46..6db96e246e 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/EnvelopedEntryReader.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/EnvelopedEntryReader.java
@@ -19,6 +19,7 @@ package org.apache.distributedlog;
 
 import com.google.common.annotations.VisibleForTesting;
 import io.netty.buffer.ByteBuf;
+import io.netty.util.ReferenceCountUtil;
 import java.io.IOException;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.apache.bookkeeper.stats.StatsLogger;
@@ -80,7 +81,7 @@ class EnvelopedEntryReader implements Entry.Reader, RecordStream {
 
     private void releaseBuffer() {
         isExhausted = true;
-        this.src.release();
+        ReferenceCountUtil.safeRelease(this.src);
     }
 
     @Override
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java
index 733593ad60..e6497e9a5b 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java
@@ -210,18 +210,18 @@ class EnvelopedEntryWriter implements Writer {
     @Override
     public void completeTransmit(long lssn, long entryId) {
         satisfyPromises(lssn, entryId);
-        buffer.release();
+        ReferenceCountUtil.safeRelease(buffer);
         synchronized (this) {
-            ReferenceCountUtil.release(finalizedBuffer);
+            ReferenceCountUtil.safeRelease(finalizedBuffer);
         }
     }
 
     @Override
     public void abortTransmit(Throwable reason) {
         cancelPromises(reason);
-        buffer.release();
+        ReferenceCountUtil.safeRelease(buffer);
         synchronized (this) {
-            ReferenceCountUtil.release(finalizedBuffer);
+            ReferenceCountUtil.safeRelease(finalizedBuffer);
         }
     }
 }
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java
index bec9abff72..c752c1cd22 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java
@@ -23,6 +23,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import io.netty.buffer.ByteBuf;
+import io.netty.util.ReferenceCountUtil;
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileInputStream;
@@ -1719,7 +1720,7 @@ import org.slf4j.LoggerFactory;
                     .setEnvelopeEntry(LogSegmentMetadata.supportsEnvelopedEntries(segment.getVersion()))
                     .setEntry(lastEntry.getEntryBuffer())
                     .buildReader();
-            lastEntry.getEntryBuffer().release();
+            ReferenceCountUtil.safeRelease(lastEntry.getEntryBuffer());
             LogRecordWithDLSN record = reader.nextRecord();
             LogRecordWithDLSN lastRecord = null;
             while (null != record) {
@@ -2033,7 +2034,7 @@ import org.slf4j.LoggerFactory;
                                     .setEntry(rr.getValue())
                                     .setEnvelopeEntry(LogSegmentMetadata.supportsEnvelopedEntries(metadataVersion))
                                     .buildReader();
-                            rr.getValue().release();
+                            ReferenceCountUtil.safeRelease(rr.getValue());
                             printEntry(reader);
                         } else {
                             System.out.println("status = " + BKException.getMessage(rr.getResultCode()));
@@ -2095,7 +2096,7 @@ import org.slf4j.LoggerFactory;
                         .setEntry(entry.getEntryBuffer())
                         .setEnvelopeEntry(LogSegmentMetadata.supportsEnvelopedEntries(metadataVersion))
                         .buildReader();
-                entry.getEntryBuffer().release();
+                ReferenceCountUtil.safeRelease(entry.getEntryBuffer());
                 printEntry(reader);
                 ++i;
             }
diff --git a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestEntry.java b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestEntry.java
index a7f413a788..3b0a9de9a6 100644
--- a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestEntry.java
+++ b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestEntry.java
@@ -27,6 +27,7 @@ import static org.junit.Assert.assertTrue;
 
 import com.google.common.collect.Lists;
 import io.netty.buffer.ByteBuf;
+import io.netty.util.ReferenceCountUtil;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -72,7 +73,7 @@ public class TestEntry {
         Assert.assertNull("Empty record set should return null",
             reader.nextRecord());
         assertEquals(refCnt - 1, reader.getSrcBuf().refCnt());
-        buffer.release();
+        ReferenceCountUtil.safeRelease(buffer);
     }
 
     @Test(timeout = 20000)
@@ -97,7 +98,7 @@ public class TestEntry {
 
         ByteBuf buffer = writer.getBuffer();
         assertEquals("zero bytes", HEADER_LENGTH, buffer.readableBytes());
-        buffer.release();
+        ReferenceCountUtil.safeRelease(buffer);
     }
 
     @Test(timeout = 20000)
@@ -158,7 +159,7 @@ public class TestEntry {
                 .setEntryId(0L)
                 .setEnvelopeEntry(true)
                 .buildReader();
-        buffer.release();
+        ReferenceCountUtil.safeRelease(buffer);
         LogRecordWithDLSN record = reader.nextRecord();
         int numReads = 0;
         long expectedTxid = 0L;
@@ -276,7 +277,7 @@ public class TestEntry {
                 new DLSN(1L, 1L, 12L), 0, 0, 3,
                 new DLSN(1L, 1L, 12L), 12L);
 
-        buffer.release();
+        ReferenceCountUtil.safeRelease(buffer);
     }
 
     void verifyReadResult(ByteBuf data,