You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by ch...@apache.org on 2022/12/08 14:38:20 UTC
[bookkeeper] 03/08: [refactor][bookkeeper] Refactor ByteBuf release method in stream/statelib (#3689)
This is an automated email from the ASF dual-hosted git repository.
chenhang pushed a commit to branch branch-4.14
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit ba045c84a291619b49493d8b974c1e3bb073958a
Author: Qiang Huang <qi...@gmail.com>
AuthorDate: Sun Dec 4 19:32:11 2022 +0800
[refactor][bookkeeper] Refactor ByteBuf release method in stream/statelib (#3689)
* refactor ByteBuf release usage
(cherry picked from commit 324b8d43ab47f1a9bf681abaa8ab486c00d331a5)
---
.../statelib/impl/journal/AbstractStateStoreWithJournal.java | 5 +++--
.../main/java/org/apache/bookkeeper/statelib/impl/kv/KVUtils.java | 3 ++-
.../org/apache/bookkeeper/statelib/impl/kv/RocksdbKVAsyncStore.java | 5 +++--
.../java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCRecord.java | 5 +++--
.../org/apache/bookkeeper/statelib/impl/mvcc/MVCCRecordCoder.java | 3 ++-
.../java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCStoreImpl.java | 3 ++-
6 files changed, 15 insertions(+), 9 deletions(-)
diff --git a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/journal/AbstractStateStoreWithJournal.java b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/journal/AbstractStateStoreWithJournal.java
index a43fcf866c..600c9bc346 100644
--- a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/journal/AbstractStateStoreWithJournal.java
+++ b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/journal/AbstractStateStoreWithJournal.java
@@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.buffer.ByteBuf;
+import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.Callable;
@@ -548,7 +549,7 @@ public abstract class AbstractStateStoreWithJournal<LocalStateStoreT extends Sta
long txId = ++nextRevision;
return FutureUtils.ensure(
writer.write(new LogRecord(txId, cmdBuf.nioBuffer())),
- () -> cmdBuf.release());
+ () -> ReferenceCountUtil.safeRelease(cmdBuf));
}
protected synchronized CompletableFuture<Long> writeCommandBufReturnTxId(ByteBuf cmdBuf) {
@@ -556,7 +557,7 @@ public abstract class AbstractStateStoreWithJournal<LocalStateStoreT extends Sta
return FutureUtils.ensure(
writer.write(new LogRecord(txId, cmdBuf.nioBuffer()))
.thenApply(dlsn -> txId),
- () -> cmdBuf.release());
+ () -> ReferenceCountUtil.safeRelease(cmdBuf));
}
//
diff --git a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/kv/KVUtils.java b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/kv/KVUtils.java
index 16d647edd0..aeac76d153 100644
--- a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/kv/KVUtils.java
+++ b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/kv/KVUtils.java
@@ -23,6 +23,7 @@ import com.google.protobuf.UnsafeByteOperations;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
@@ -76,7 +77,7 @@ final class KVUtils {
try {
cmd.writeTo(new ByteBufOutputStream(buf));
} catch (IOException e) {
- buf.release();
+ ReferenceCountUtil.safeRelease(buf);
throw e;
}
return buf;
diff --git a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/kv/RocksdbKVAsyncStore.java b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/kv/RocksdbKVAsyncStore.java
index 4252ddec6e..0bceee6a09 100644
--- a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/kv/RocksdbKVAsyncStore.java
+++ b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/kv/RocksdbKVAsyncStore.java
@@ -21,6 +21,7 @@ package org.apache.bookkeeper.statelib.impl.kv;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
+import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
@@ -99,7 +100,7 @@ public class RocksdbKVAsyncStore<K, V>
byte[] serializedBytes = ByteBufUtil.getBytes(serializedBuf);
localStore.put(keyBytes, serializedBytes, revision);
} finally {
- serializedBuf.release();
+ ReferenceCountUtil.safeRelease(serializedBuf);
}
return null;
}, writeIOScheduler);
@@ -125,7 +126,7 @@ public class RocksdbKVAsyncStore<K, V>
return KVUtils.deserialize(valCoder, Unpooled.wrappedBuffer(prevValue));
}
} finally {
- serializedBuf.release();
+ ReferenceCountUtil.safeRelease(serializedBuf);
}
}, writeIOScheduler);
}
diff --git a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCRecord.java b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCRecord.java
index b1cf0fe8fc..762adcf1e1 100644
--- a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCRecord.java
+++ b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCRecord.java
@@ -20,6 +20,7 @@ package org.apache.bookkeeper.statelib.impl.mvcc;
import io.netty.buffer.ByteBuf;
import io.netty.util.Recycler;
+import io.netty.util.ReferenceCountUtil;
import java.util.function.Predicate;
import lombok.Data;
import lombok.Getter;
@@ -87,7 +88,7 @@ public class MVCCRecord implements Recycled, Predicate<RangeOption<?>> {
public void setValue(ByteBuf buf, ValueType valueType) {
if (null != value) {
- value.release();
+ ReferenceCountUtil.safeRelease(value);
}
this.value = buf;
this.valueType = valueType;
@@ -98,7 +99,7 @@ public class MVCCRecord implements Recycled, Predicate<RangeOption<?>> {
private void reset() {
if (null != value) {
- value.release();
+ ReferenceCountUtil.safeRelease(value);
value = null;
}
modRev = -1L;
diff --git a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCRecordCoder.java b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCRecordCoder.java
index 826f92103b..11a5e1ad7a 100644
--- a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCRecordCoder.java
+++ b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCRecordCoder.java
@@ -22,6 +22,7 @@ import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.InvalidProtocolBufferException;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
+import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import java.nio.ByteBuffer;
import lombok.AccessLevel;
@@ -74,7 +75,7 @@ final class MVCCRecordCoder implements Coder<MVCCRecord> {
buf.writerIndex(buf.writerIndex() + metaLen);
buf.writeInt(valLen);
buf.writeBytes(record.getValue().slice());
- buf.release();
+ ReferenceCountUtil.safeRelease(buf);
return data;
}
diff --git a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCStoreImpl.java b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCStoreImpl.java
index 74d862861a..9393541979 100644
--- a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCStoreImpl.java
+++ b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCStoreImpl.java
@@ -30,6 +30,7 @@ import com.google.protobuf.TextFormat;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
+import io.netty.util.ReferenceCountUtil;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
@@ -490,7 +491,7 @@ class MVCCStoreImpl<K, V> extends RocksdbKVStore<K, V> implements MVCCStore<K, V
try {
record = getKeyRecord(key, rawKey);
} catch (StateStoreRuntimeException e) {
- rawValBuf.release();
+ ReferenceCountUtil.safeRelease(rawValBuf);
throw e;
}