You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2021/10/07 01:33:31 UTC
[kafka] branch trunk updated: MINOR: remove unneeded size and add
lock coarsening to inMemoryKeyValueStore (#11370)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 769882d MINOR: remove unneeded size and add lock coarsening to inMemoryKeyValueStore (#11370)
769882d is described below
commit 769882d9100cccf82fbccd4398a1d7f0a2d80f9d
Author: Luke Chen <sh...@gmail.com>
AuthorDate: Thu Oct 7 09:32:10 2021 +0800
MINOR: remove unneeded size and add lock coarsening to inMemoryKeyValueStore (#11370)
Reviewers: Matthias J. Sax <ma...@confluent.io>, A. Sophie Blee-Goldman <so...@confluent.io>
---
.../state/internals/InMemoryKeyValueStore.java | 28 +++++++++++-----------
1 file changed, 14 insertions(+), 14 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
index c6201c1..f0c6dbe 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
@@ -40,7 +40,6 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
private final String name;
private final NavigableMap<Bytes, byte[]> map = new TreeMap<>();
private volatile boolean open = false;
- private long size = 0L; // SkipListMap#size is O(N) so we just do our best to track it
public InMemoryKeyValueStore(final String name) {
this.name = name;
@@ -55,7 +54,6 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
@Override
public void init(final ProcessorContext context,
final StateStore root) {
- size = 0;
if (root != null) {
// register the store
context.register(root, (key, value) -> put(Bytes.wrap(key), value));
@@ -81,11 +79,7 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
@Override
public synchronized void put(final Bytes key, final byte[] value) {
- if (value == null) {
- size -= map.remove(key) == null ? 0 : 1;
- } else {
- size += map.put(key, value) == null ? 1 : 0;
- }
+ putInternal(key, value);
}
@Override
@@ -97,10 +91,19 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
return originalValue;
}
+ // the unlocked implementation of put method, to avoid multiple lock/unlock cost in `putAll` method
+ private void putInternal(final Bytes key, final byte[] value) {
+ if (value == null) {
+ map.remove(key);
+ } else {
+ map.put(key, value);
+ }
+ }
+
@Override
- public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
+ public synchronized void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
for (final KeyValue<Bytes, byte[]> entry : entries) {
- put(entry.key, entry.value);
+ putInternal(entry.key, entry.value);
}
}
@@ -118,9 +121,7 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
@Override
public synchronized byte[] delete(final Bytes key) {
- final byte[] oldValue = map.remove(key);
- size -= oldValue == null ? 0 : 1;
- return oldValue;
+ return map.remove(key);
}
@Override
@@ -169,7 +170,7 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
@Override
public long approximateNumEntries() {
- return size;
+ return map.size();
}
@Override
@@ -180,7 +181,6 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
@Override
public void close() {
map.clear();
- size = 0;
open = false;
}