You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by lu...@apache.org on 2022/12/11 12:35:56 UTC
[bookkeeper] branch master updated: module stream-storage-common/stream-storage-java-kv-client/stream-storage-cli: refactor ByteBuf release usage (#3694)
This is an automated email from the ASF dual-hosted git repository.
lushiji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 4e37cf68d1 module stream-storage-common/stream-storage-java-kv-client/stream-storage-cli: refactor ByteBuf release usage (#3694)
4e37cf68d1 is described below
commit 4e37cf68d1af0ad6754720a34db958a3d8ee4f0e
Author: StevenLuMT <ls...@126.com>
AuthorDate: Sun Dec 11 20:35:50 2022 +0800
module stream-storage-common/stream-storage-java-kv-client/stream-storage-cli: refactor ByteBuf release usage (#3694)
Co-authored-by: lushiji <lu...@didiglobal.com>
---
.../clients/impl/kv/PByteBufSimpleTableImpl.java | 25 +++++++++++-----------
.../clients/impl/kv/PByteBufTableRangeImpl.java | 25 +++++++++++-----------
.../common/router/AbstractHashRouter.java | 3 ++-
.../stream/cli/commands/table/DelCommand.java | 3 ++-
4 files changed, 30 insertions(+), 26 deletions(-)
diff --git a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PByteBufSimpleTableImpl.java b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PByteBufSimpleTableImpl.java
index c9170051fb..75b847a8b6 100644
--- a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PByteBufSimpleTableImpl.java
+++ b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PByteBufSimpleTableImpl.java
@@ -44,6 +44,7 @@ import io.grpc.stub.AbstractStub;
import io.grpc.stub.ClientCalls;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
+import io.netty.util.ReferenceCountUtil;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
@@ -151,10 +152,10 @@ public class PByteBufSimpleTableImpl
))
.thenApply(response -> KvUtils.newRangeResult(response, resultFactory, kvFactory))
.whenComplete((value, cause) -> {
- pKey.release();
- lKey.release();
+ ReferenceCountUtil.safeRelease(pKey);
+ ReferenceCountUtil.safeRelease(lKey);
if (null != option.endKey()) {
- option.endKey().release();
+ ReferenceCountUtil.safeRelease(option.endKey());
}
});
}
@@ -175,9 +176,9 @@ public class PByteBufSimpleTableImpl
))
.thenApply(response -> KvUtils.newPutResult(response, resultFactory, kvFactory))
.whenComplete((ignored, cause) -> {
- pKey.release();
- lKey.release();
- value.release();
+ ReferenceCountUtil.safeRelease(pKey);
+ ReferenceCountUtil.safeRelease(lKey);
+ ReferenceCountUtil.safeRelease(value);
});
}
@@ -199,10 +200,10 @@ public class PByteBufSimpleTableImpl
))
.thenApply(response -> KvUtils.newDeleteResult(response, resultFactory, kvFactory))
.whenComplete((ignored, cause) -> {
- pKey.release();
- lKey.release();
+ ReferenceCountUtil.safeRelease(pKey);
+ ReferenceCountUtil.safeRelease(lKey);
if (null != option.endKey()) {
- option.endKey().release();
+ ReferenceCountUtil.safeRelease(option.endKey());
}
});
}
@@ -222,8 +223,8 @@ public class PByteBufSimpleTableImpl
))
.thenApply(response -> KvUtils.newIncrementResult(response, resultFactory, kvFactory))
.whenComplete((ignored, cause) -> {
- pKey.release();
- lKey.release();
+ ReferenceCountUtil.safeRelease(pKey);
+ ReferenceCountUtil.safeRelease(lKey);
});
}
@@ -297,7 +298,7 @@ public class PByteBufSimpleTableImpl
))
.thenApply(response -> KvUtils.newKvTxnResult(response, resultFactory, kvFactory))
.whenComplete((ignored, cause) -> {
- pKey.release();
+ ReferenceCountUtil.safeRelease(pKey);
for (AutoCloseable resource : resourcesToRelease) {
closeResource(resource);
}
diff --git a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PByteBufTableRangeImpl.java b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PByteBufTableRangeImpl.java
index 1f07b80bfb..18ceee6f37 100644
--- a/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PByteBufTableRangeImpl.java
+++ b/stream/clients/java/kv/src/main/java/org/apache/bookkeeper/clients/impl/kv/PByteBufTableRangeImpl.java
@@ -20,6 +20,7 @@ import static org.apache.bookkeeper.clients.impl.kv.KvUtils.toProtoRequest;
import com.google.common.collect.Lists;
import com.google.protobuf.UnsafeByteOperations;
import io.netty.buffer.ByteBuf;
+import io.netty.util.ReferenceCountUtil;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
@@ -103,10 +104,10 @@ class PByteBufTableRangeImpl implements PTable<ByteBuf, ByteBuf> {
executor,
backoffPolicy
).process().whenComplete((value, cause) -> {
- pKey.release();
- lKey.release();
+ ReferenceCountUtil.safeRelease(pKey);
+ ReferenceCountUtil.safeRelease(lKey);
if (null != option.endKey()) {
- option.endKey().release();
+ ReferenceCountUtil.safeRelease(option.endKey());
}
});
}
@@ -128,9 +129,9 @@ class PByteBufTableRangeImpl implements PTable<ByteBuf, ByteBuf> {
executor,
backoffPolicy
).process().whenComplete((ignored, cause) -> {
- pKey.release();
- lKey.release();
- value.release();
+ ReferenceCountUtil.safeRelease(pKey);
+ ReferenceCountUtil.safeRelease(lKey);
+ ReferenceCountUtil.safeRelease(value);
});
}
@@ -152,10 +153,10 @@ class PByteBufTableRangeImpl implements PTable<ByteBuf, ByteBuf> {
executor,
backoffPolicy
).process().whenComplete((ignored, cause) -> {
- pKey.release();
- lKey.release();
+ ReferenceCountUtil.safeRelease(pKey);
+ ReferenceCountUtil.safeRelease(lKey);
if (null != option.endKey()) {
- option.endKey().release();
+ ReferenceCountUtil.safeRelease(option.endKey());
}
});
}
@@ -176,8 +177,8 @@ class PByteBufTableRangeImpl implements PTable<ByteBuf, ByteBuf> {
executor,
backoffPolicy
).process().whenComplete((ignored, cause) -> {
- pKey.release();
- lKey.release();
+ ReferenceCountUtil.safeRelease(pKey);
+ ReferenceCountUtil.safeRelease(lKey);
});
}
@@ -251,7 +252,7 @@ class PByteBufTableRangeImpl implements PTable<ByteBuf, ByteBuf> {
executor,
backoffPolicy
).process().whenComplete((ignored, cause) -> {
- pKey.release();
+ ReferenceCountUtil.safeRelease(pKey);
for (AutoCloseable resource : resourcesToRelease) {
closeResource(resource);
}
diff --git a/stream/common/src/main/java/org/apache/bookkeeper/common/router/AbstractHashRouter.java b/stream/common/src/main/java/org/apache/bookkeeper/common/router/AbstractHashRouter.java
index 7dadf2d4ab..5313276f79 100644
--- a/stream/common/src/main/java/org/apache/bookkeeper/common/router/AbstractHashRouter.java
+++ b/stream/common/src/main/java/org/apache/bookkeeper/common/router/AbstractHashRouter.java
@@ -18,6 +18,7 @@
package org.apache.bookkeeper.common.router;
import io.netty.buffer.ByteBuf;
+import io.netty.util.ReferenceCountUtil;
import org.apache.bookkeeper.common.hash.Murmur3;
/**
@@ -38,7 +39,7 @@ public abstract class AbstractHashRouter<K> implements HashRouter<K> {
return Murmur3.hash128(
keyData, keyData.readerIndex(), keyData.readableBytes(), HASH_SEED)[0];
} finally {
- keyData.release();
+ ReferenceCountUtil.safeRelease(keyData);
}
}
diff --git a/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/DelCommand.java b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/DelCommand.java
index 5027f4eb1c..df85b0c8d4 100644
--- a/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/DelCommand.java
+++ b/tools/stream/src/main/java/org/apache/bookkeeper/stream/cli/commands/table/DelCommand.java
@@ -26,6 +26,7 @@ import static org.apache.bookkeeper.stream.cli.Commands.OP_DEL;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
+import io.netty.util.ReferenceCountUtil;
import org.apache.bookkeeper.api.StorageClient;
import org.apache.bookkeeper.api.kv.Table;
import org.apache.bookkeeper.stream.cli.commands.ClientCommand;
@@ -69,7 +70,7 @@ public class DelCommand extends ClientCommand<Flags> {
ByteBuf value = result(table.delete(
Unpooled.wrappedBuffer(key.getBytes(UTF_8))));
if (null != value) {
- value.release();
+ ReferenceCountUtil.safeRelease(value);
spec.console().println("Successfully deleted key: ('" + key + "').");
} else {
spec.console().println("key '" + key + "' doesn't exist.");