You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by lu...@apache.org on 2022/12/11 12:35:56 UTC

[bookkeeper] branch master updated: module stream-storage-common/stream-storage-java-kv-client/stream-storage-cli: refactor ByteBuf release usage (#3694)

This is an automated email from the ASF dual-hosted git repository.

lushiji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e37cf68d1 module stream-storage-common/stream-storage-java-kv-client/stream-storage-cli: refactor ByteBuf release usage (#3694)
4e37cf68d1 is described below

commit 4e37cf68d1af0ad6754720a34db958a3d8ee4f0e
Author: StevenLuMT <ls...@126.com>
AuthorDate: Sun Dec 11 20:35:50 2022 +0800

    module stream-storage-common/stream-storage-java-kv-client/stream-storage-cli: refactor ByteBuf release usage (#3694)
    
    Co-authored-by: lushiji <lu...@didiglobal.com>
---
 .../clients/impl/kv/PByteBufSimpleTableImpl.java   | 25 +++++++++++-----------
 .../clients/impl/kv/PByteBufTableRangeImpl.java    | 25 +++++++++++-----------
 .../common/router/AbstractHashRouter.java          |  3 ++-
 .../stream/cli/commands/table/DelCommand.java      |  3 ++-
 4 files changed, 30 insertions(+), 26 deletions(-)

diff --git a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PByteBufSimpleTableImpl.java b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PByteBufSimpleTableImpl.java
index c9170051fb..75b847a8b6 100644
--- a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PByteBufSimpleTableImpl.java
+++ b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PByteBufSimpleTableImpl.java
@@ -44,6 +44,7 @@ import io.grpc.stub.AbstractStub;
 import io.grpc.stub.ClientCalls;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufUtil;
+import io.netty.util.ReferenceCountUtil;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import lombok.extern.slf4j.Slf4j;
@@ -151,10 +152,10 @@ public class PByteBufSimpleTableImpl
         ))
         .thenApply(response -> KvUtils.newRangeResult(response, resultFactory, kvFactory))
         .whenComplete((value, cause) -> {
-            pKey.release();
-            lKey.release();
+            ReferenceCountUtil.safeRelease(pKey);
+            ReferenceCountUtil.safeRelease(lKey);
             if (null != option.endKey()) {
-                option.endKey().release();
+                ReferenceCountUtil.safeRelease(option.endKey());
             }
         });
     }
@@ -175,9 +176,9 @@ public class PByteBufSimpleTableImpl
             ))
             .thenApply(response -> KvUtils.newPutResult(response, resultFactory, kvFactory))
             .whenComplete((ignored, cause) -> {
-                pKey.release();
-                lKey.release();
-                value.release();
+                ReferenceCountUtil.safeRelease(pKey);
+                ReferenceCountUtil.safeRelease(lKey);
+                ReferenceCountUtil.safeRelease(value);
             });
     }
 
@@ -199,10 +200,10 @@ public class PByteBufSimpleTableImpl
         ))
         .thenApply(response -> KvUtils.newDeleteResult(response, resultFactory, kvFactory))
         .whenComplete((ignored, cause) -> {
-            pKey.release();
-            lKey.release();
+            ReferenceCountUtil.safeRelease(pKey);
+            ReferenceCountUtil.safeRelease(lKey);
             if (null != option.endKey()) {
-                option.endKey().release();
+                ReferenceCountUtil.safeRelease(option.endKey());
             }
         });
     }
@@ -222,8 +223,8 @@ public class PByteBufSimpleTableImpl
         ))
         .thenApply(response -> KvUtils.newIncrementResult(response, resultFactory, kvFactory))
         .whenComplete((ignored, cause) -> {
-            pKey.release();
-            lKey.release();
+            ReferenceCountUtil.safeRelease(pKey);
+            ReferenceCountUtil.safeRelease(lKey);
         });
     }
 
@@ -297,7 +298,7 @@ public class PByteBufSimpleTableImpl
             ))
             .thenApply(response -> KvUtils.newKvTxnResult(response, resultFactory, kvFactory))
             .whenComplete((ignored, cause) -> {
-                pKey.release();
+                ReferenceCountUtil.safeRelease(pKey);
                 for (AutoCloseable resource : resourcesToRelease) {
                     closeResource(resource);
                 }
diff --git a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PByteBufTableRangeImpl.java b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PByteBufTableRangeImpl.java
index 1f07b80bfb..18ceee6f37 100644
--- a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PByteBufTableRangeImpl.java
+++ b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PByteBufTableRangeImpl.java
@@ -20,6 +20,7 @@ import static org.apache.bookkeeper.clients.impl.kv.KvUtils.toProtoRequest;
 import com.google.common.collect.Lists;
 import com.google.protobuf.UnsafeByteOperations;
 import io.netty.buffer.ByteBuf;
+import io.netty.util.ReferenceCountUtil;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
@@ -103,10 +104,10 @@ class PByteBufTableRangeImpl implements PTable<ByteBuf, ByteBuf> {
             executor,
             backoffPolicy
         ).process().whenComplete((value, cause) -> {
-            pKey.release();
-            lKey.release();
+            ReferenceCountUtil.safeRelease(pKey);
+            ReferenceCountUtil.safeRelease(lKey);
             if (null != option.endKey()) {
-                option.endKey().release();
+                ReferenceCountUtil.safeRelease(option.endKey());
             }
         });
     }
@@ -128,9 +129,9 @@ class PByteBufTableRangeImpl implements PTable<ByteBuf, ByteBuf> {
             executor,
             backoffPolicy
         ).process().whenComplete((ignored, cause) -> {
-            pKey.release();
-            lKey.release();
-            value.release();
+            ReferenceCountUtil.safeRelease(pKey);
+            ReferenceCountUtil.safeRelease(lKey);
+            ReferenceCountUtil.safeRelease(value);
         });
     }
 
@@ -152,10 +153,10 @@ class PByteBufTableRangeImpl implements PTable<ByteBuf, ByteBuf> {
             executor,
             backoffPolicy
         ).process().whenComplete((ignored, cause) -> {
-            pKey.release();
-            lKey.release();
+            ReferenceCountUtil.safeRelease(pKey);
+            ReferenceCountUtil.safeRelease(lKey);
             if (null != option.endKey()) {
-                option.endKey().release();
+                ReferenceCountUtil.safeRelease(option.endKey());
             }
         });
     }
@@ -176,8 +177,8 @@ class PByteBufTableRangeImpl implements PTable<ByteBuf, ByteBuf> {
             executor,
             backoffPolicy
         ).process().whenComplete((ignored, cause) -> {
-            pKey.release();
-            lKey.release();
+            ReferenceCountUtil.safeRelease(pKey);
+            ReferenceCountUtil.safeRelease(lKey);
         });
     }
 
@@ -251,7 +252,7 @@ class PByteBufTableRangeImpl implements PTable<ByteBuf, ByteBuf> {
                 executor,
                 backoffPolicy
             ).process().whenComplete((ignored, cause) -> {
-                pKey.release();
+                ReferenceCountUtil.safeRelease(pKey);
                 for (AutoCloseable resource : resourcesToRelease) {
                     closeResource(resource);
                 }
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/router/AbstractHashRouter.java b/stream/common/src/main/java/org/apache/bookkeeper/common/router/AbstractHashRouter.java
index 7dadf2d4ab..5313276f79 100644
--- a/stream/common/src/main/java/org/apache/bookkeeper/common/router/AbstractHashRouter.java
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/router/AbstractHashRouter.java
@@ -18,6 +18,7 @@
 package org.apache.bookkeeper.common.router;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.util.ReferenceCountUtil;
 import org.apache.bookkeeper.common.hash.Murmur3;
 
 /**
@@ -38,7 +39,7 @@ public abstract class AbstractHashRouter<K> implements HashRouter<K> {
             return Murmur3.hash128(
                 keyData, keyData.readerIndex(), keyData.readableBytes(), HASH_SEED)[0];
         } finally {
-            keyData.release();
+            ReferenceCountUtil.safeRelease(keyData);
         }
     }
 
diff --git a/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/DelCommand.java b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/DelCommand.java
index 5027f4eb1c..df85b0c8d4 100644
--- a/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/DelCommand.java
+++ b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/DelCommand.java
@@ -26,6 +26,7 @@ import static org.apache.bookkeeper.stream.cli.Commands.OP_DEL;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import io.netty.util.ReferenceCountUtil;
 import org.apache.bookkeeper.api.StorageClient;
 import org.apache.bookkeeper.api.kv.Table;
 import org.apache.bookkeeper.stream.cli.commands.ClientCommand;
@@ -69,7 +70,7 @@ public class DelCommand extends ClientCommand<Flags> {
             ByteBuf value = result(table.delete(
                 Unpooled.wrappedBuffer(key.getBytes(UTF_8))));
             if (null != value) {
-                value.release();
+                ReferenceCountUtil.safeRelease(value);
                 spec.console().println("Successfully deleted key: ('" + key + "').");
             } else {
                 spec.console().println("key '" + key + "' doesn't exist.");