You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/08/13 22:06:46 UTC
[kafka] branch 2.3 updated: KAFKA-8736: Track size in
InMemoryKeyValueStore (#7177)
This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.3 by this push:
new f81189c KAFKA-8736: Track size in InMemoryKeyValueStore (#7177)
f81189c is described below
commit f81189cd3915f0876f8e2ed440269c9c0ecea1e9
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Tue Aug 13 14:54:58 2019 -0700
KAFKA-8736: Track size in InMemoryKeyValueStore (#7177)
InMemoryKeyValueStore uses ConcurrentSkipListMap#size which takes linear time as it iterates over the entire map. We should just track size ourselves for approximateNumEntries
Reviewers: Guozhang Wang <wa...@gmail.com>, Matthias J. Sax <mj...@apache.org>
---
.../state/internals/InMemoryKeyValueStore.java | 23 ++++++++++------------
1 file changed, 10 insertions(+), 13 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
index aa7b2cc..2d68214 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
@@ -35,6 +35,7 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
private final String name;
private final ConcurrentNavigableMap<Bytes, byte[]> map = new ConcurrentSkipListMap<>();
private volatile boolean open = false;
+ private long size = 0L; // SkipListMap#size is O(N) so we just do our best to track it
private static final Logger LOG = LoggerFactory.getLogger(InMemoryKeyValueStore.class);
@@ -50,17 +51,10 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
@Override
public void init(final ProcessorContext context,
final StateStore root) {
-
+ size = 0;
if (root != null) {
// register the store
- context.register(root, (key, value) -> {
- // this is a delete
- if (value == null) {
- delete(Bytes.wrap(key));
- } else {
- put(Bytes.wrap(key), value);
- }
- });
+ context.register(root, (key, value) -> put(Bytes.wrap(key), value));
}
open = true;
@@ -84,9 +78,9 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
@Override
public void put(final Bytes key, final byte[] value) {
if (value == null) {
- map.remove(key);
+ size -= map.remove(key) == null ? 0 : 1;
} else {
- map.put(key, value);
+ size += map.put(key, value) == null ? 1 : 0;
}
}
@@ -108,7 +102,9 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
@Override
public byte[] delete(final Bytes key) {
- return map.remove(key);
+ final byte[] oldValue = map.remove(key);
+ size -= oldValue == null ? 0 : 1;
+ return oldValue;
}
@Override
@@ -135,7 +131,7 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
@Override
public long approximateNumEntries() {
- return map.size();
+ return size;
}
@Override
@@ -146,6 +142,7 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
@Override
public void close() {
map.clear();
+ size = 0;
open = false;
}