You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/06/16 00:08:37 UTC
kafka git commit: KAFKA-3753: Add approximateNumEntries() method to
KeyValueStore interface
Repository: kafka
Updated Branches:
refs/heads/trunk 17668e81c -> 1ef7b494b
KAFKA-3753: Add approximateNumEntries() method to KeyValueStore interface
See https://issues.apache.org/jira/browse/KAFKA-3753
This contribution is my original work and I license the work to the project under the project's open source license.
cc guozhangwang kichristensen ijuma
Author: Jeff Klukas <je...@klukas.net>
Reviewers: Ismael Juma, Guozhang Wang
Closes #1486 from jklukas/kvstore-size
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1ef7b494
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1ef7b494
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1ef7b494
Branch: refs/heads/trunk
Commit: 1ef7b494bbd937205ecd14dd30e625c2efdb3aa9
Parents: 17668e8
Author: Jeff Klukas <je...@klukas.net>
Authored: Wed Jun 15 17:08:33 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Jun 15 17:08:33 2016 -0700
----------------------------------------------------------------------
.../kafka/streams/state/KeyValueStore.java | 10 ++++++
.../internals/InMemoryKeyValueLoggedStore.java | 5 +++
.../InMemoryKeyValueStoreSupplier.java | 5 +++
.../streams/state/internals/MemoryLRUCache.java | 5 +++
.../state/internals/MeteredKeyValueStore.java | 5 +++
.../streams/state/internals/RocksDBStore.java | 37 ++++++++++++++++++++
.../internals/AbstractKeyValueStoreTest.java | 19 ++++++++++
7 files changed, 86 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/1ef7b494/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
index 908e116..1ee790d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
@@ -97,4 +97,14 @@ public interface KeyValueStore<K, V> extends StateStore {
*/
KeyValueIterator<K, V> all();
+ /**
+ * Return an approximate count of key-value mappings in this store.
+ *
+ * The count is not guaranteed to be exact in order to accommodate stores
+ * where an exact count is expensive to calculate.
+ *
+ * @return an approximate count of key-value mappings in the store.
+ */
+ long approximateNumEntries();
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1ef7b494/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
index efcdac7..e13bba3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
@@ -150,6 +150,11 @@ public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> {
}
@Override
+ public long approximateNumEntries() {
+ return this.inner.approximateNumEntries();
+ }
+
+ @Override
public void close() {
inner.close();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1ef7b494/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
index a25153c..3b632cc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
@@ -147,6 +147,11 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
}
@Override
+ public long approximateNumEntries() {
+ return this.map.size();
+ }
+
+ @Override
public void flush() {
// do-nothing since it is in-memory
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1ef7b494/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
index d410e02..0697eda 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
@@ -154,6 +154,11 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
}
@Override
+ public long approximateNumEntries() {
+ return this.map.size();
+ }
+
+ @Override
public void flush() {
// do-nothing since it is in-memory
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1ef7b494/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 5e5b54a..c6e93cb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -154,6 +154,11 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
}
@Override
+ public long approximateNumEntries() {
+ return this.inner.approximateNumEntries();
+ }
+
+ @Override
public void close() {
inner.close();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1ef7b494/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index a00de19..8634d68 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -354,6 +354,43 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
return new RocksDbIterator<>(innerIter, serdes);
}
+ /**
+ * Return an approximate count of key-value mappings in this store.
+ *
+ * <code>RocksDB</code> cannot return an exact entry count without doing a
+ * full scan, so this method relies on the <code>rocksdb.estimate-num-keys</code>
+ * property to get an approximate count. The returned size also includes
+ * a count of dirty keys in the store's in-memory cache, which may lead to some
+ * double-counting of entries and inflate the estimate.
+ *
+ * @return an approximate count of key-value mappings in the store.
+ */
+ @Override
+ public long approximateNumEntries() {
+ long value;
+ try {
+ value = this.db.getLongProperty("rocksdb.estimate-num-keys");
+ } catch (RocksDBException e) {
+ throw new ProcessorStateException("Error fetching property from store " + this.name, e);
+ }
+ if (isOverflowing(value)) {
+ return Long.MAX_VALUE;
+ }
+ if (this.cacheDirtyKeys != null) {
+ value += this.cacheDirtyKeys.size();
+ }
+ if (isOverflowing(value)) {
+ return Long.MAX_VALUE;
+ }
+ return value;
+ }
+
+ private boolean isOverflowing(long value) {
+ // RocksDB returns an unsigned 8-byte integer, which could overflow long
+ // and manifest as a negative value.
+ return value < 0;
+ }
+
private void flushCache() {
// flush of the cache entries if necessary
if (cache != null) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/1ef7b494/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
index 2bfe644..8a22d37 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
@@ -227,4 +227,23 @@ public abstract class AbstractKeyValueStoreTest {
store.close();
}
}
+
+ @Test
+ public void testSize() {
+ // Create the test driver ...
+ KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
+ KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, true);
+ try {
+ assertEquals("A newly created store should have no entries", 0, store.approximateNumEntries());
+
+ store.put(0, "zero");
+ store.put(1, "one");
+ store.put(2, "two");
+ store.put(4, "four");
+ store.put(5, "five");
+ assertEquals(5, store.approximateNumEntries());
+ } finally {
+ store.close();
+ }
+ }
}