You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/08/13 22:06:46 UTC

[kafka] branch 2.3 updated: KAFKA-8736: Track size in InMemoryKeyValueStore (#7177)

This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new f81189c  KAFKA-8736: Track size in InMemoryKeyValueStore (#7177)
f81189c is described below

commit f81189cd3915f0876f8e2ed440269c9c0ecea1e9
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Tue Aug 13 14:54:58 2019 -0700

    KAFKA-8736: Track size in InMemoryKeyValueStore (#7177)
    
    InMemoryKeyValueStore uses ConcurrentSkipListMap#size which takes linear time as it iterates over the entire map. We should just track size ourselves for approximateNumEntries
    
    Reviewers: Guozhang Wang <wa...@gmail.com>, Matthias J. Sax <mj...@apache.org>
---
 .../state/internals/InMemoryKeyValueStore.java     | 23 ++++++++++------------
 1 file changed, 10 insertions(+), 13 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
index aa7b2cc..2d68214 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
@@ -35,6 +35,7 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
     private final String name;
     private final ConcurrentNavigableMap<Bytes, byte[]> map = new ConcurrentSkipListMap<>();
     private volatile boolean open = false;
+    private long size = 0L; // SkipListMap#size is O(N) so we just do our best to track it
 
     private static final Logger LOG = LoggerFactory.getLogger(InMemoryKeyValueStore.class);
 
@@ -50,17 +51,10 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
     @Override
     public void init(final ProcessorContext context,
                      final StateStore root) {
-
+        size = 0;
         if (root != null) {
             // register the store
-            context.register(root, (key, value) -> {
-                // this is a delete
-                if (value == null) {
-                    delete(Bytes.wrap(key));
-                } else {
-                    put(Bytes.wrap(key), value);
-                }
-            });
+            context.register(root, (key, value) -> put(Bytes.wrap(key), value));
         }
 
         open = true;
@@ -84,9 +78,9 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
     @Override
     public void put(final Bytes key, final byte[] value) {
         if (value == null) {
-            map.remove(key);
+            size -= map.remove(key) == null ? 0 : 1;
         } else {
-            map.put(key, value);
+            size += map.put(key, value) == null ? 1 : 0;
         }
     }
 
@@ -108,7 +102,9 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
 
     @Override
     public byte[] delete(final Bytes key) {
-        return map.remove(key);
+        final byte[] oldValue = map.remove(key);
+        size -= oldValue == null ? 0 : 1;
+        return oldValue;
     }
 
     @Override
@@ -135,7 +131,7 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
 
     @Override
     public long approximateNumEntries() {
-        return map.size();
+        return size;
     }
 
     @Override
@@ -146,6 +142,7 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
     @Override
     public void close() {
         map.clear();
+        size = 0;
         open = false;
     }