You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by ch...@apache.org on 2022/12/08 14:38:22 UTC
[bookkeeper] 05/08: module distributedlog-core: refactor ByteBuf release usage (#3691)
This is an automated email from the ASF dual-hosted git repository.
chenhang pushed a commit to branch branch-4.14
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit eaf399629e0fc78c56e8e208bb9b5efc40b943d4
Author: StevenLuMT <ls...@126.com>
AuthorDate: Tue Dec 6 15:21:21 2022 +0800
module distributedlog-core: refactor ByteBuf release usage (#3691)
Co-authored-by: lushiji <lu...@didiglobal.com>
(cherry picked from commit 7ae5a04a83984f1731781c7892613e8932d2f62e)
---
.../src/main/java/org/apache/bookkeeper/client/LedgerReader.java | 3 ++-
.../src/main/java/org/apache/distributedlog/EnvelopedEntry.java | 3 ++-
.../java/org/apache/distributedlog/EnvelopedEntryReader.java | 3 ++-
.../java/org/apache/distributedlog/EnvelopedEntryWriter.java | 8 ++++----
.../java/org/apache/distributedlog/tools/DistributedLogTool.java | 7 ++++---
.../core/src/test/java/org/apache/distributedlog/TestEntry.java | 9 +++++----
6 files changed, 19 insertions(+), 14 deletions(-)
diff --git a/stream/distributedlog/core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java b/stream/distributedlog/core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java
index f5850bf0ba..4fc3d05be2 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java
@@ -19,6 +19,7 @@ package org.apache.bookkeeper.client;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
+import io.netty.util.ReferenceCountUtil;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashSet;
@@ -110,7 +111,7 @@ public class LedgerReader {
eid, BKException.Code.DigestMatchException, null, bookieAddress.getSocketAddress());
} finally {
- buffer.release();
+ ReferenceCountUtil.safeRelease(buffer);
}
}
readResults.add(rr);
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/EnvelopedEntry.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/EnvelopedEntry.java
index 0e17929300..e0ccef949d 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/EnvelopedEntry.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/EnvelopedEntry.java
@@ -18,6 +18,7 @@
package org.apache.distributedlog;
import io.netty.buffer.ByteBuf;
+import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.distributedlog.io.CompressionCodec;
@@ -102,7 +103,7 @@ class EnvelopedEntry {
CompressionCodec codec = CompressionUtils.getCompressionCodec(Type.of(codecCode));
decompressedBuf = codec.decompress(compressedBuf, originDataLen);
} finally {
- compressedBuf.release();
+ ReferenceCountUtil.safeRelease(compressedBuf);
}
return decompressedBuf;
}
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/EnvelopedEntryReader.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/EnvelopedEntryReader.java
index 82656bab46..6db96e246e 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/EnvelopedEntryReader.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/EnvelopedEntryReader.java
@@ -19,6 +19,7 @@ package org.apache.distributedlog;
import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
+import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.bookkeeper.stats.StatsLogger;
@@ -80,7 +81,7 @@ class EnvelopedEntryReader implements Entry.Reader, RecordStream {
private void releaseBuffer() {
isExhausted = true;
- this.src.release();
+ ReferenceCountUtil.safeRelease(this.src);
}
@Override
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java
index 733593ad60..e6497e9a5b 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java
@@ -210,18 +210,18 @@ class EnvelopedEntryWriter implements Writer {
@Override
public void completeTransmit(long lssn, long entryId) {
satisfyPromises(lssn, entryId);
- buffer.release();
+ ReferenceCountUtil.safeRelease(buffer);
synchronized (this) {
- ReferenceCountUtil.release(finalizedBuffer);
+ ReferenceCountUtil.safeRelease(finalizedBuffer);
}
}
@Override
public void abortTransmit(Throwable reason) {
cancelPromises(reason);
- buffer.release();
+ ReferenceCountUtil.safeRelease(buffer);
synchronized (this) {
- ReferenceCountUtil.release(finalizedBuffer);
+ ReferenceCountUtil.safeRelease(finalizedBuffer);
}
}
}
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java
index bec9abff72..c752c1cd22 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java
@@ -23,6 +23,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
+import io.netty.util.ReferenceCountUtil;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
@@ -1719,7 +1720,7 @@ import org.slf4j.LoggerFactory;
.setEnvelopeEntry(LogSegmentMetadata.supportsEnvelopedEntries(segment.getVersion()))
.setEntry(lastEntry.getEntryBuffer())
.buildReader();
- lastEntry.getEntryBuffer().release();
+ ReferenceCountUtil.safeRelease(lastEntry.getEntryBuffer());
LogRecordWithDLSN record = reader.nextRecord();
LogRecordWithDLSN lastRecord = null;
while (null != record) {
@@ -2033,7 +2034,7 @@ import org.slf4j.LoggerFactory;
.setEntry(rr.getValue())
.setEnvelopeEntry(LogSegmentMetadata.supportsEnvelopedEntries(metadataVersion))
.buildReader();
- rr.getValue().release();
+ ReferenceCountUtil.safeRelease(rr.getValue());
printEntry(reader);
} else {
System.out.println("status = " + BKException.getMessage(rr.getResultCode()));
@@ -2095,7 +2096,7 @@ import org.slf4j.LoggerFactory;
.setEntry(entry.getEntryBuffer())
.setEnvelopeEntry(LogSegmentMetadata.supportsEnvelopedEntries(metadataVersion))
.buildReader();
- entry.getEntryBuffer().release();
+ ReferenceCountUtil.safeRelease(entry.getEntryBuffer());
printEntry(reader);
++i;
}
diff --git a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestEntry.java b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestEntry.java
index a7f413a788..3b0a9de9a6 100644
--- a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestEntry.java
+++ b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestEntry.java
@@ -27,6 +27,7 @@ import static org.junit.Assert.assertTrue;
import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
+import io.netty.util.ReferenceCountUtil;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@@ -72,7 +73,7 @@ public class TestEntry {
Assert.assertNull("Empty record set should return null",
reader.nextRecord());
assertEquals(refCnt - 1, reader.getSrcBuf().refCnt());
- buffer.release();
+ ReferenceCountUtil.safeRelease(buffer);
}
@Test(timeout = 20000)
@@ -97,7 +98,7 @@ public class TestEntry {
ByteBuf buffer = writer.getBuffer();
assertEquals("zero bytes", HEADER_LENGTH, buffer.readableBytes());
- buffer.release();
+ ReferenceCountUtil.safeRelease(buffer);
}
@Test(timeout = 20000)
@@ -158,7 +159,7 @@ public class TestEntry {
.setEntryId(0L)
.setEnvelopeEntry(true)
.buildReader();
- buffer.release();
+ ReferenceCountUtil.safeRelease(buffer);
LogRecordWithDLSN record = reader.nextRecord();
int numReads = 0;
long expectedTxid = 0L;
@@ -276,7 +277,7 @@ public class TestEntry {
new DLSN(1L, 1L, 12L), 0, 0, 3,
new DLSN(1L, 1L, 12L), 12L);
- buffer.release();
+ ReferenceCountUtil.safeRelease(buffer);
}
void verifyReadResult(ByteBuf data,