You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by ch...@apache.org on 2022/12/08 14:38:24 UTC
[bookkeeper] 07/08: module distributedlog-common/distributedlog-protocol: refactor ByteBuf release usage (#3693)
This is an automated email from the ASF dual-hosted git repository.
chenhang pushed a commit to branch branch-4.14
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit 8e352d4abea2fee63d3eb807528fdf370d3364a9
Author: StevenLuMT <ls...@126.com>
AuthorDate: Wed Dec 7 19:48:50 2022 +0800
module distributedlog-common/distributedlog-protocol: refactor ByteBuf release usage (#3693)
Co-authored-by: lushiji <lu...@didiglobal.com>
(cherry picked from commit a71c7a7e681e9908c04245d3d719d8f1c0a79176)
---
.../org/apache/distributedlog/io/TestCompressionCodec.java | 13 +++++++------
.../org/apache/distributedlog/EnvelopedRecordSetReader.java | 9 +++++----
.../org/apache/distributedlog/EnvelopedRecordSetWriter.java | 8 ++++----
.../src/main/java/org/apache/distributedlog/LogRecord.java | 3 ++-
4 files changed, 18 insertions(+), 15 deletions(-)
diff --git a/stream/distributedlog/common/src/test/java/org/apache/distributedlog/io/TestCompressionCodec.java b/stream/distributedlog/common/src/test/java/org/apache/distributedlog/io/TestCompressionCodec.java
index c8edd99e60..fa60f95d94 100644
--- a/stream/distributedlog/common/src/test/java/org/apache/distributedlog/io/TestCompressionCodec.java
+++ b/stream/distributedlog/common/src/test/java/org/apache/distributedlog/io/TestCompressionCodec.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertEquals;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
+import io.netty.util.ReferenceCountUtil;
import java.nio.ByteBuffer;
import org.junit.Test;
@@ -70,9 +71,9 @@ public class TestCompressionCodec {
decompressedBuf.readBytes(decompressedData);
assertArrayEquals("The decompressed bytes should be same as the original bytes",
data, decompressedData);
- buf.release();
- compressedBuf.release();
- decompressedBuf.release();
+ ReferenceCountUtil.safeRelease(buf);
+ ReferenceCountUtil.safeRelease(compressedBuf);
+ ReferenceCountUtil.safeRelease(decompressedBuf);
}
private void testCompressionCodec2(CompressionCodec codec) throws Exception {
@@ -93,9 +94,9 @@ public class TestCompressionCodec {
byte[] decompressedData = new byte[decompressedBuf.readableBytes()];
decompressedBuf.slice().readBytes(decompressedData);
- buffer.release();
- compressedBuf.release();
- decompressedBuf.release();
+ ReferenceCountUtil.safeRelease(buffer);
+ ReferenceCountUtil.safeRelease(compressedBuf);
+ ReferenceCountUtil.safeRelease(decompressedBuf);
}
}
diff --git a/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetReader.java b/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetReader.java
index 83a950ed7c..fd2bbd4905 100644
--- a/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetReader.java
+++ b/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetReader.java
@@ -22,6 +22,7 @@ import static org.apache.distributedlog.LogRecordSet.METADATA_VERSION_MASK;
import static org.apache.distributedlog.LogRecordSet.VERSION;
import io.netty.buffer.ByteBuf;
+import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import lombok.extern.slf4j.Slf4j;
import org.apache.distributedlog.io.CompressionCodec;
@@ -80,10 +81,10 @@ class EnvelopedRecordSetReader implements LogRecordSet.Reader {
CompressionCodec codec = CompressionUtils.getCompressionCodec(Type.of(codecCode));
this.reader = codec.decompress(compressedBuf, decompressedDataLen);
} finally {
- compressedBuf.release();
+ ReferenceCountUtil.safeRelease(compressedBuf);
}
if (numRecords == 0) {
- this.reader.release();
+ ReferenceCountUtil.safeRelease(this.reader);
}
}
@@ -110,7 +111,7 @@ class EnvelopedRecordSetReader implements LogRecordSet.Reader {
// release the record set buffer when exhausting the reader
if (0 == numRecords) {
- this.reader.release();
+ ReferenceCountUtil.safeRelease(this.reader);
}
return record;
@@ -120,7 +121,7 @@ class EnvelopedRecordSetReader implements LogRecordSet.Reader {
public void release() {
if (0 != numRecords) {
numRecords = 0;
- reader.release();
+ ReferenceCountUtil.safeRelease(reader);
}
}
}
diff --git a/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetWriter.java b/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetWriter.java
index ea1824ef51..78d19c3989 100644
--- a/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetWriter.java
+++ b/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetWriter.java
@@ -157,14 +157,14 @@ class EnvelopedRecordSetWriter implements LogRecordSet.Writer {
@Override
public synchronized void completeTransmit(long lssn, long entryId, long startSlotId) {
satisfyPromises(lssn, entryId, startSlotId);
- buffer.release();
- ReferenceCountUtil.release(recordSetBuffer);
+ ReferenceCountUtil.safeRelease(buffer);
+ ReferenceCountUtil.safeRelease(recordSetBuffer);
}
@Override
public synchronized void abortTransmit(Throwable reason) {
cancelPromises(reason);
- buffer.release();
- ReferenceCountUtil.release(recordSetBuffer);
+ ReferenceCountUtil.safeRelease(buffer);
+ ReferenceCountUtil.safeRelease(recordSetBuffer);
}
}
diff --git a/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/LogRecord.java b/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/LogRecord.java
index 63b694aa31..ee98bf5f32 100644
--- a/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/LogRecord.java
+++ b/stream/distributedlog/protocol/src/main/java/org/apache/distributedlog/LogRecord.java
@@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.Unpooled;
+import io.netty.util.ReferenceCountUtil;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
@@ -225,7 +226,7 @@ public class LogRecord {
void setPayloadBuf(ByteBuf payload, boolean copyData) {
if (null != this.payload) {
- this.payload.release();
+ ReferenceCountUtil.safeRelease(this.payload);
}
if (copyData) {
this.payload = Unpooled.copiedBuffer(payload);