You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/06/16 00:08:37 UTC

kafka git commit: KAFKA-3753: Add approximateNumEntries() method to KeyValueStore interface

Repository: kafka
Updated Branches:
  refs/heads/trunk 17668e81c -> 1ef7b494b


KAFKA-3753: Add approximateNumEntries() method to KeyValueStore interface

See https://issues.apache.org/jira/browse/KAFKA-3753

This contribution is my original work and I license the work to the project under the project's open source license.

cc guozhangwang kichristensen ijuma

Author: Jeff Klukas <je...@klukas.net>

Reviewers: Ismael Juma, Guozhang Wang

Closes #1486 from jklukas/kvstore-size


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1ef7b494
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1ef7b494
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1ef7b494

Branch: refs/heads/trunk
Commit: 1ef7b494bbd937205ecd14dd30e625c2efdb3aa9
Parents: 17668e8
Author: Jeff Klukas <je...@klukas.net>
Authored: Wed Jun 15 17:08:33 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Jun 15 17:08:33 2016 -0700

----------------------------------------------------------------------
 .../kafka/streams/state/KeyValueStore.java      | 10 ++++++
 .../internals/InMemoryKeyValueLoggedStore.java  |  5 +++
 .../InMemoryKeyValueStoreSupplier.java          |  5 +++
 .../streams/state/internals/MemoryLRUCache.java |  5 +++
 .../state/internals/MeteredKeyValueStore.java   |  5 +++
 .../streams/state/internals/RocksDBStore.java   | 37 ++++++++++++++++++++
 .../internals/AbstractKeyValueStoreTest.java    | 19 ++++++++++
 7 files changed, 86 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1ef7b494/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
index 908e116..1ee790d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
@@ -97,4 +97,14 @@ public interface KeyValueStore<K, V> extends StateStore {
      */
     KeyValueIterator<K, V> all();
 
+    /**
+     * Return an approximate count of key-value mappings in this store.
+     *
+     * The count is not guaranteed to be exact in order to accommodate stores
+     * where an exact count is expensive to calculate.
+     *
+     * @return an approximate count of key-value mappings in the store.
+     */
+    long approximateNumEntries();
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1ef7b494/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
index efcdac7..e13bba3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
@@ -150,6 +150,11 @@ public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> {
     }
 
     @Override
+    public long approximateNumEntries() {
+        return this.inner.approximateNumEntries();
+    }
+
+    @Override
     public void close() {
         inner.close();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1ef7b494/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
index a25153c..3b632cc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
@@ -147,6 +147,11 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
         }
 
         @Override
+        public long approximateNumEntries() {
+            return this.map.size();
+        }
+
+        @Override
         public void flush() {
             // do-nothing since it is in-memory
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1ef7b494/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
index d410e02..0697eda 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
@@ -154,6 +154,11 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
     }
 
     @Override
+    public long approximateNumEntries() {
+        return this.map.size();
+    }
+
+    @Override
     public void flush() {
         // do-nothing since it is in-memory
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1ef7b494/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 5e5b54a..c6e93cb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -154,6 +154,11 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
     }
 
     @Override
+    public long approximateNumEntries() {
+        return this.inner.approximateNumEntries();
+    }
+
+    @Override
     public void close() {
         inner.close();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1ef7b494/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index a00de19..8634d68 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -354,6 +354,43 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
         return new RocksDbIterator<>(innerIter, serdes);
     }
 
+    /**
+     * Return an approximate count of key-value mappings in this store.
+     *
+     * <code>RocksDB</code> cannot return an exact entry count without doing a
+     * full scan, so this method relies on the <code>rocksdb.estimate-num-keys</code>
+     * property to get an approximate count. The returned size also includes
+     * a count of dirty keys in the store's in-memory cache, which may lead to some
+     * double-counting of entries and inflate the estimate.
+     *
+     * @return an approximate count of key-value mappings in the store.
+     */
+    @Override
+    public long approximateNumEntries() {
+        long value;
+        try {
+            value = this.db.getLongProperty("rocksdb.estimate-num-keys");
+        } catch (RocksDBException e) {
+            throw new ProcessorStateException("Error fetching property from store " + this.name, e);
+        }
+        if (isOverflowing(value)) {
+            return Long.MAX_VALUE;
+        }
+        if (this.cacheDirtyKeys != null) {
+            value += this.cacheDirtyKeys.size();
+        }
+        if (isOverflowing(value)) {
+            return Long.MAX_VALUE;
+        }
+        return value;
+    }
+
+    private boolean isOverflowing(long value) {
+        // RocksDB returns an unsigned 8-byte integer, which could overflow long
+        // and manifest as a negative value.
+        return value < 0;
+    }
+
     private void flushCache() {
         // flush of the cache entries if necessary
         if (cache != null) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/1ef7b494/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
index 2bfe644..8a22d37 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
@@ -227,4 +227,23 @@ public abstract class AbstractKeyValueStoreTest {
             store.close();
         }
     }
+
+    @Test
+    public void testSize() {
+        // Create the test driver ...
+        KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
+        KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, true);
+        try {
+            assertEquals("A newly created store should have no entries", 0, store.approximateNumEntries());
+
+            store.put(0, "zero");
+            store.put(1, "one");
+            store.put(2, "two");
+            store.put(4, "four");
+            store.put(5, "five");
+            assertEquals(5, store.approximateNumEntries());
+        } finally {
+            store.close();
+        }
+    }
 }