You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2021/10/07 01:33:31 UTC

[kafka] branch trunk updated: MINOR: remove unneeded size and add lock coarsening to inMemoryKeyValueStore (#11370)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 769882d  MINOR: remove unneeded size and add lock coarsening to inMemoryKeyValueStore (#11370)
769882d is described below

commit 769882d9100cccf82fbccd4398a1d7f0a2d80f9d
Author: Luke Chen <sh...@gmail.com>
AuthorDate: Thu Oct 7 09:32:10 2021 +0800

    MINOR: remove unneeded size and add lock coarsening to inMemoryKeyValueStore (#11370)
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>, A. Sophie Blee-Goldman <so...@confluent.io>
---
 .../state/internals/InMemoryKeyValueStore.java     | 28 +++++++++++-----------
 1 file changed, 14 insertions(+), 14 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
index c6201c1..f0c6dbe 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
@@ -40,7 +40,6 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
     private final String name;
     private final NavigableMap<Bytes, byte[]> map = new TreeMap<>();
     private volatile boolean open = false;
-    private long size = 0L; // SkipListMap#size is O(N) so we just do our best to track it
 
     public InMemoryKeyValueStore(final String name) {
         this.name = name;
@@ -55,7 +54,6 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
     @Override
     public void init(final ProcessorContext context,
                      final StateStore root) {
-        size = 0;
         if (root != null) {
             // register the store
             context.register(root, (key, value) -> put(Bytes.wrap(key), value));
@@ -81,11 +79,7 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
 
     @Override
     public synchronized void put(final Bytes key, final byte[] value) {
-        if (value == null) {
-            size -= map.remove(key) == null ? 0 : 1;
-        } else {
-            size += map.put(key, value) == null ? 1 : 0;
-        }
+        putInternal(key, value);
     }
 
     @Override
@@ -97,10 +91,19 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
         return originalValue;
     }
 
+    // the unlocked implementation of put method, to avoid multiple lock/unlock cost in `putAll` method
+    private void putInternal(final Bytes key, final byte[] value) {
+        if (value == null) {
+            map.remove(key);
+        } else {
+            map.put(key, value);
+        }
+    }
+
     @Override
-    public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
+    public synchronized void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
         for (final KeyValue<Bytes, byte[]> entry : entries) {
-            put(entry.key, entry.value);
+            putInternal(entry.key, entry.value);
         }
     }
 
@@ -118,9 +121,7 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
 
     @Override
     public synchronized byte[] delete(final Bytes key) {
-        final byte[] oldValue = map.remove(key);
-        size -= oldValue == null ? 0 : 1;
-        return oldValue;
+        return map.remove(key);
     }
 
     @Override
@@ -169,7 +170,7 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
 
     @Override
     public long approximateNumEntries() {
-        return size;
+        return map.size();
     }
 
     @Override
@@ -180,7 +181,6 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
     @Override
     public void close() {
         map.clear();
-        size = 0;
         open = false;
     }