You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by ch...@apache.org on 2022/12/08 14:38:20 UTC

[bookkeeper] 03/08: [refactor][bookkeeper] Refactor ByteBuf release method in stream/statelib (#3689)

This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-4.14
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit ba045c84a291619b49493d8b974c1e3bb073958a
Author: Qiang Huang <qi...@gmail.com>
AuthorDate: Sun Dec 4 19:32:11 2022 +0800

    [refactor][bookkeeper] Refactor ByteBuf release method in stream/statelib (#3689)
    
    * refactor ByteBuf release usage
    
    (cherry picked from commit 324b8d43ab47f1a9bf681abaa8ab486c00d331a5)
---
 .../statelib/impl/journal/AbstractStateStoreWithJournal.java         | 5 +++--
 .../main/java/org/apache/bookkeeper/statelib/impl/kv/KVUtils.java    | 3 ++-
 .../org/apache/bookkeeper/statelib/impl/kv/RocksdbKVAsyncStore.java  | 5 +++--
 .../java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCRecord.java    | 5 +++--
 .../org/apache/bookkeeper/statelib/impl/mvcc/MVCCRecordCoder.java    | 3 ++-
 .../java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCStoreImpl.java | 3 ++-
 6 files changed, 15 insertions(+), 9 deletions(-)

diff --git a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/journal/AbstractStateStoreWithJournal.java b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/journal/AbstractStateStoreWithJournal.java
index a43fcf866c..600c9bc346 100644
--- a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/journal/AbstractStateStoreWithJournal.java
+++ b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/journal/AbstractStateStoreWithJournal.java
@@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import io.netty.buffer.ByteBuf;
+import io.netty.util.ReferenceCountUtil;
 import java.io.IOException;
 import java.time.Duration;
 import java.util.concurrent.Callable;
@@ -548,7 +549,7 @@ public abstract class AbstractStateStoreWithJournal<LocalStateStoreT extends Sta
         long txId = ++nextRevision;
         return FutureUtils.ensure(
             writer.write(new LogRecord(txId, cmdBuf.nioBuffer())),
-            () -> cmdBuf.release());
+            () -> ReferenceCountUtil.safeRelease(cmdBuf));
     }
 
     protected synchronized CompletableFuture<Long> writeCommandBufReturnTxId(ByteBuf cmdBuf) {
@@ -556,7 +557,7 @@ public abstract class AbstractStateStoreWithJournal<LocalStateStoreT extends Sta
         return FutureUtils.ensure(
             writer.write(new LogRecord(txId, cmdBuf.nioBuffer()))
                 .thenApply(dlsn -> txId),
-            () -> cmdBuf.release());
+            () -> ReferenceCountUtil.safeRelease(cmdBuf));
     }
 
     //
diff --git a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/kv/KVUtils.java b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/kv/KVUtils.java
index 16d647edd0..aeac76d153 100644
--- a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/kv/KVUtils.java
+++ b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/kv/KVUtils.java
@@ -23,6 +23,7 @@ import com.google.protobuf.UnsafeByteOperations;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufOutputStream;
 import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.util.ReferenceCountUtil;
 import java.io.IOException;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
@@ -76,7 +77,7 @@ final class KVUtils {
         try {
             cmd.writeTo(new ByteBufOutputStream(buf));
         } catch (IOException e) {
-            buf.release();
+            ReferenceCountUtil.safeRelease(buf);
             throw e;
         }
         return buf;
diff --git a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/kv/RocksdbKVAsyncStore.java b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/kv/RocksdbKVAsyncStore.java
index 4252ddec6e..0bceee6a09 100644
--- a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/kv/RocksdbKVAsyncStore.java
+++ b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/kv/RocksdbKVAsyncStore.java
@@ -21,6 +21,7 @@ package org.apache.bookkeeper.statelib.impl.kv;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufUtil;
 import io.netty.buffer.Unpooled;
+import io.netty.util.ReferenceCountUtil;
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Supplier;
@@ -99,7 +100,7 @@ public class RocksdbKVAsyncStore<K, V>
                 byte[] serializedBytes = ByteBufUtil.getBytes(serializedBuf);
                 localStore.put(keyBytes, serializedBytes, revision);
             } finally {
-                serializedBuf.release();
+                ReferenceCountUtil.safeRelease(serializedBuf);
             }
             return null;
         }, writeIOScheduler);
@@ -125,7 +126,7 @@ public class RocksdbKVAsyncStore<K, V>
                     return KVUtils.deserialize(valCoder, Unpooled.wrappedBuffer(prevValue));
                 }
             } finally {
-                serializedBuf.release();
+                ReferenceCountUtil.safeRelease(serializedBuf);
             }
         }, writeIOScheduler);
     }
diff --git a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCRecord.java b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCRecord.java
index b1cf0fe8fc..762adcf1e1 100644
--- a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCRecord.java
+++ b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCRecord.java
@@ -20,6 +20,7 @@ package org.apache.bookkeeper.statelib.impl.mvcc;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.util.Recycler;
+import io.netty.util.ReferenceCountUtil;
 import java.util.function.Predicate;
 import lombok.Data;
 import lombok.Getter;
@@ -87,7 +88,7 @@ public class MVCCRecord implements Recycled, Predicate<RangeOption<?>> {
 
     public void setValue(ByteBuf buf, ValueType valueType) {
         if (null != value) {
-            value.release();
+            ReferenceCountUtil.safeRelease(value);
         }
         this.value = buf;
         this.valueType = valueType;
@@ -98,7 +99,7 @@ public class MVCCRecord implements Recycled, Predicate<RangeOption<?>> {
 
     private void reset() {
         if (null != value) {
-            value.release();
+            ReferenceCountUtil.safeRelease(value);
             value = null;
         }
         modRev = -1L;
diff --git a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCRecordCoder.java b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCRecordCoder.java
index 826f92103b..11a5e1ad7a 100644
--- a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCRecordCoder.java
+++ b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCRecordCoder.java
@@ -22,6 +22,7 @@ import com.google.protobuf.CodedOutputStream;
 import com.google.protobuf.InvalidProtocolBufferException;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import io.netty.util.ReferenceCountUtil;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import lombok.AccessLevel;
@@ -74,7 +75,7 @@ final class MVCCRecordCoder implements Coder<MVCCRecord> {
         buf.writerIndex(buf.writerIndex() + metaLen);
         buf.writeInt(valLen);
         buf.writeBytes(record.getValue().slice());
-        buf.release();
+        ReferenceCountUtil.safeRelease(buf);
 
         return data;
     }
diff --git a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCStoreImpl.java b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCStoreImpl.java
index 74d862861a..9393541979 100644
--- a/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCStoreImpl.java
+++ b/stream/statelib/src/main/java/org/apache/bookkeeper/statelib/impl/mvcc/MVCCStoreImpl.java
@@ -30,6 +30,7 @@ import com.google.protobuf.TextFormat;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.buffer.Unpooled;
+import io.netty.util.ReferenceCountUtil;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
@@ -490,7 +491,7 @@ class MVCCStoreImpl<K, V> extends RocksdbKVStore<K, V> implements MVCCStore<K, V
         try {
             record = getKeyRecord(key, rawKey);
         } catch (StateStoreRuntimeException e) {
-            rawValBuf.release();
+            ReferenceCountUtil.safeRelease(rawValBuf);
             throw e;
         }