You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/10/16 23:41:44 UTC

kafka git commit: KAFKA-2594: Add InMemoryLRUCacheStore as a preliminary method for bounding in-memory stores

Repository: kafka
Updated Branches:
  refs/heads/trunk aa66b42da -> c553249b4


KAFKA-2594: Add InMemoryLRUCacheStore as a preliminary method for bounding in-memory stores

Added a new `KeyValueStore` implementation called `InMemoryLRUCacheStore` that keeps a maximum number of entries in-memory, and as the size exceeds the capacity the least-recently used entry is removed from the store and the backing topic. Also added unit tests for this new store and the existing `InMemoryKeyValueStore` and `RocksDBKeyValueStore` implementations. A new `KeyValueStoreTestDriver` class simplifies all of the other tests, and can be used by other libraries to help test their own custom implementations.

This PR depends upon [KAFKA-2593](https://issues.apache.org/jira/browse/KAFKA-2593) and its PR at https://github.com/apache/kafka/pull/255. Once that PR is merged, I can rebase this PR if desired.

Two issues were uncovered when creating these new unit tests, and both are also addressed as separate (small) commits in this PR:
* The `RocksDBKeyValueStore` initialization was not creating the file system directory if missing.
* `MeteredKeyValueStore` was casting to `ProcessorContextImpl` to access the `RecordCollector`, which prevent using `MeteredKeyValueStore` implementations in tests where something other than `ProcessorContextImpl` was used. The fix was to introduce a `RecordCollector.Supplier` interface to define this `recordCollector()` method, and change `ProcessorContextImpl` and `MockProcessorContext` to both implement this interface. Now, `MeteredKeyValueStore` can cast to the new interface to access the record collector rather than to a single concrete implementation, making it possible to use any and all current stores inside unit tests.

Author: Randall Hauch <rh...@gmail.com>

Reviewers: Edward Ribeiro, Guozhang Wang

Closes #256 from rhauch/kafka-2594


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c553249b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c553249b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c553249b

Branch: refs/heads/trunk
Commit: c553249b4e6d7d385821061f9742f15f90e94def
Parents: aa66b42
Author: Randall Hauch <rh...@gmail.com>
Authored: Fri Oct 16 14:46:26 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Oct 16 14:46:26 2015 -0700

----------------------------------------------------------------------
 .../streams/state/InMemoryLRUCacheStore.java    | 180 +++++++++++++++++++
 .../streams/state/MeteredKeyValueStore.java     |  44 ++++-
 .../org/apache/kafka/streams/state/Stores.java  |  25 ++-
 .../state/InMemoryLRUCacheStoreTest.java        | 148 +++++++++++++++
 4 files changed, 386 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c553249b/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStore.java b/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStore.java
new file mode 100644
index 0000000..1b96c59
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStore.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.processor.ProcessorContext;
+
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+
+/**
+ * An in-memory key-value store that is limited in size and retains a maximum number of most recently used entries.
+ *
+ * @param <K> The key type
+ * @param <V> The value type
+ *
+ */
+public class InMemoryLRUCacheStore<K, V> extends MeteredKeyValueStore<K, V> {
+
+    protected static <K, V> InMemoryLRUCacheStore<K, V> create(String name, int capacity, ProcessorContext context,
+                                                               Serdes<K, V> serdes, Time time) {
+        if (time == null) time = new SystemTime();
+        MemoryLRUCache<K, V> cache = new MemoryLRUCache<K, V>(name, capacity);
+        final InMemoryLRUCacheStore<K, V> store = new InMemoryLRUCacheStore<>(name, context, cache, serdes, time);
+        cache.whenEldestRemoved(new EldestEntryRemovalListener<K, V>() {
+            @Override
+            public void apply(K key, V value) {
+                store.removed(key);
+            }
+        });
+        return store;
+
+    }
+
+    private InMemoryLRUCacheStore(String name, ProcessorContext context, MemoryLRUCache<K, V> cache, Serdes<K, V> serdes, Time time) {
+        super(name, cache, context, serdes, "kafka-streams", time);
+    }
+
+    private static interface EldestEntryRemovalListener<K, V> {
+        public void apply(K key, V value);
+    }
+
+    protected static final class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
+
+        private final String name;
+        private final Map<K, V> map;
+        private final NavigableSet<K> keys;
+        private EldestEntryRemovalListener<K, V> listener;
+
+        public MemoryLRUCache(String name, final int maxCacheSize) {
+            this.name = name;
+            this.keys = new TreeSet<>();
+            // leave room for one extra entry to handle adding an entry before the oldest can be removed
+            this.map = new LinkedHashMap<K, V>(maxCacheSize + 1, 1.01f, true) {
+                private static final long serialVersionUID = 1L;
+
+                @Override
+                protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
+                    if (size() > maxCacheSize) {
+                        K key = eldest.getKey();
+                        keys.remove(key);
+                        if (listener != null) listener.apply(key, eldest.getValue());
+                        return true;
+                    }
+                    return false;
+                }
+            };
+        }
+
+        protected void whenEldestRemoved(EldestEntryRemovalListener<K, V> listener) {
+            this.listener = listener;
+        }
+
+        @Override
+        public String name() {
+            return this.name;
+        }
+
+        @Override
+        public boolean persistent() {
+            return false;
+        }
+
+        @Override
+        public V get(K key) {
+            return this.map.get(key);
+        }
+
+        @Override
+        public void put(K key, V value) {
+            this.map.put(key, value);
+            this.keys.add(key);
+        }
+
+        @Override
+        public void putAll(List<Entry<K, V>> entries) {
+            for (Entry<K, V> entry : entries)
+                put(entry.key(), entry.value());
+        }
+
+        @Override
+        public V delete(K key) {
+            V value = this.map.remove(key);
+            this.keys.remove(key);
+            return value;
+        }
+
+        @Override
+        public KeyValueIterator<K, V> range(K from, K to) {
+            return new MemoryLRUCache.CacheIterator<K, V>(this.keys.subSet(from, true, to, false).iterator(), this.map);
+        }
+
+        @Override
+        public KeyValueIterator<K, V> all() {
+            return new MemoryLRUCache.CacheIterator<K, V>(this.keys.iterator(), this.map);
+        }
+
+        @Override
+        public void flush() {
+            // do-nothing since it is in-memory
+        }
+
+        @Override
+        public void close() {
+            // do-nothing
+        }
+
+        private static class CacheIterator<K, V> implements KeyValueIterator<K, V> {
+            private final Iterator<K> keys;
+            private final Map<K, V> entries;
+            private K lastKey;
+
+            public CacheIterator(Iterator<K> keys, Map<K, V> entries) {
+                this.keys = keys;
+                this.entries = entries;
+            }
+
+            @Override
+            public boolean hasNext() {
+                return keys.hasNext();
+            }
+
+            @Override
+            public Entry<K, V> next() {
+                lastKey = keys.next();
+                return new Entry<>(lastKey, entries.get(lastKey));
+            }
+
+            @Override
+            public void remove() {
+                keys.remove();
+                entries.remove(lastKey);
+            }
+
+            @Override
+            public void close() {
+                // do nothing
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/c553249b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
index 90eee05..9a652ac 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
@@ -50,7 +50,9 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
     private final String topic;
     private final int partition;
     private final Set<K> dirty;
+    private final Set<K> removed;
     private final int maxDirty;
+    private final int maxRemoved;
     private final ProcessorContext context;
 
     // always wrap the logged store with the metered store
@@ -76,7 +78,9 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
         this.context = context;
 
         this.dirty = new HashSet<K>();
-        this.maxDirty = 100;        // TODO: this needs to be configurable
+        this.removed = new HashSet<K>();
+        this.maxDirty = 100; // TODO: this needs to be configurable
+        this.maxRemoved = 100; // TODO: this needs to be configurable
 
         // register and possibly restore the state from the logs
         long startNs = time.nanoseconds();
@@ -123,8 +127,8 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
             this.inner.put(key, value);
 
             this.dirty.add(key);
-            if (this.dirty.size() > this.maxDirty)
-                logChange();
+            this.removed.remove(key);
+            maybeLogChange();
         } finally {
             this.metrics.recordLatency(this.putTime, startNs, time.nanoseconds());
         }
@@ -137,11 +141,12 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
             this.inner.putAll(entries);
 
             for (Entry<K, V> entry : entries) {
-                this.dirty.add(entry.key());
+                K key = entry.key();
+                this.dirty.add(key);
+                this.removed.remove(key);
             }
 
-            if (this.dirty.size() > this.maxDirty)
-                logChange();
+            maybeLogChange();
         } finally {
             this.metrics.recordLatency(this.putAllTime, startNs, time.nanoseconds());
         }
@@ -153,9 +158,9 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
         try {
             V value = this.inner.delete(key);
 
-            this.dirty.add(key);
-            if (this.dirty.size() > this.maxDirty)
-                logChange();
+            this.dirty.remove(key);
+            this.removed.add(key);
+            maybeLogChange();
 
             return value;
         } finally {
@@ -163,6 +168,18 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
         }
     }
 
+    /**
+     * Called when the underlying {@link #inner} {@link KeyValueStore} removes an entry in response to a call from this
+     * store other than {@link #delete(Object)}.
+     * 
+     * @param key the key for the entry that the inner store removed
+     */
+    protected void removed(K key) {
+        this.dirty.remove(key);
+        this.removed.add(key);
+        maybeLogChange();
+    }
+
     @Override
     public KeyValueIterator<K, V> range(K from, K to) {
         return new MeteredKeyValueIterator<K, V>(this.inner.range(from, to), this.rangeTime);
@@ -189,16 +206,25 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
         }
     }
 
+    private void maybeLogChange() {
+        if (this.dirty.size() > this.maxDirty || this.removed.size() > this.maxRemoved)
+            logChange();
+    }
+
     private void logChange() {
         RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector();
         if (collector != null) {
             Serializer<K> keySerializer = serialization.keySerializer();
             Serializer<V> valueSerializer = serialization.valueSerializer();
 
+            for (K k : this.removed) {
+                collector.send(new ProducerRecord<>(this.topic, this.partition, k, (V) null), keySerializer, valueSerializer);
+            }
             for (K k : this.dirty) {
                 V v = this.inner.get(k);
                 collector.send(new ProducerRecord<>(this.topic, this.partition, k, v), keySerializer, valueSerializer);
             }
+            this.removed.clear();
             this.dirty.clear();
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c553249b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index 171ed44..8ec73fd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -54,8 +54,20 @@ public class Stores {
                             @Override
                             public InMemoryKeyValueFactory<K, V> inMemory() {
                                 return new InMemoryKeyValueFactory<K, V>() {
+                                    private int capacity = Integer.MAX_VALUE;
+
+                                    @Override
+                                    public InMemoryKeyValueFactory<K, V> maxEntries(int capacity) {
+                                        if (capacity < 1) throw new IllegalArgumentException("The capacity must be positive");
+                                        this.capacity = capacity;
+                                        return this;
+                                    }
+
                                     @Override
                                     public KeyValueStore<K, V> build() {
+                                        if (capacity < Integer.MAX_VALUE) {
+                                            return InMemoryLRUCacheStore.create(name, capacity, context, serdes, null);
+                                        }
                                         return new InMemoryKeyValueStore<>(name, context, serdes, null);
                                     }
                                 };
@@ -138,7 +150,7 @@ public class Stores {
     }
 
     /**
-     * The interface used to specify the type of values for key-value stores.
+     * The factory for creating off-heap key-value stores.
      * 
      * @param <K> the type of keys
      */
@@ -200,7 +212,6 @@ public class Stores {
          * @return the interface used to specify the remaining key-value store options; never null
          */
         public abstract <V> KeyValueFactory<K, V> withValues(Serializer<V> valueSerializer, Deserializer<V> valueDeserializer);
-
     }
 
     /**
@@ -235,6 +246,16 @@ public class Stores {
      */
     public static interface InMemoryKeyValueFactory<K, V> {
         /**
+         * Limits the in-memory key-value store to hold a maximum number of entries. The default is {@link Integer#MAX_VALUE}, which is
+         * equivalent to not placing a limit on the number of entries.
+         * 
+         * @param capacity the maximum capacity of the in-memory cache; should be one less than a power of 2
+         * @return this factory
+         * @throws IllegalArgumentException if the capacity is not positive
+         */
+        InMemoryKeyValueFactory<K, V> maxEntries(int capacity);
+
+        /**
          * Return the new key-value store.
          * @return the key-value store; never null
          */

http://git-wip-us.apache.org/repos/asf/kafka/blob/c553249b/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java
new file mode 100644
index 0000000..6b96d3a
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.junit.Test;
+
+public class InMemoryLRUCacheStoreTest {
+
+    @Test
+    public void testPutGetRange() {
+        // Create the test driver ...
+        KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create();
+        KeyValueStore<Integer, String> store = Stores.create("my-store", driver.context())
+                                                     .withIntegerKeys().withStringValues()
+                                                     .inMemory().maxEntries(3)
+                                                     .build();
+
+        // Verify that the store reads and writes correctly, keeping only the last 2 entries ...
+        store.put(0, "zero");
+        store.put(1, "one");
+        store.put(2, "two");
+        store.put(3, "three");
+        store.put(4, "four");
+        store.put(5, "five");
+
+        // It should only keep the last 4 added ...
+        assertEquals(3, driver.sizeOf(store));
+        assertNull(store.get(0));
+        assertNull(store.get(1));
+        assertNull(store.get(2));
+        assertEquals("three", store.get(3));
+        assertEquals("four", store.get(4));
+        assertEquals("five", store.get(5));
+        store.delete(5);
+
+        // Flush the store and verify all current entries were properly flushed ...
+        store.flush();
+        assertNull(driver.flushedEntryStored(0));
+        assertNull(driver.flushedEntryStored(1));
+        assertNull(driver.flushedEntryStored(2));
+        assertEquals("three", driver.flushedEntryStored(3));
+        assertEquals("four", driver.flushedEntryStored(4));
+        assertNull(driver.flushedEntryStored(5));
+
+        assertEquals(true, driver.flushedEntryRemoved(0));
+        assertEquals(true, driver.flushedEntryRemoved(1));
+        assertEquals(true, driver.flushedEntryRemoved(2));
+        assertEquals(false, driver.flushedEntryRemoved(3));
+        assertEquals(false, driver.flushedEntryRemoved(4));
+        assertEquals(true, driver.flushedEntryRemoved(5));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testPutGetRangeWithDefaultSerdes() {
+        // Create the test driver ...
+        KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create();
+
+        Serializer<Integer> keySer = (Serializer<Integer>) driver.context().keySerializer();
+        Deserializer<Integer> keyDeser = (Deserializer<Integer>) driver.context().keyDeserializer();
+        Serializer<String> valSer = (Serializer<String>) driver.context().valueSerializer();
+        Deserializer<String> valDeser = (Deserializer<String>) driver.context().valueDeserializer();
+        KeyValueStore<Integer, String> store = Stores.create("my-store", driver.context())
+                                                     .withKeys(keySer, keyDeser)
+                                                     .withValues(valSer, valDeser)
+                                                     .inMemory().maxEntries(3)
+                                                     .build();
+
+        // Verify that the store reads and writes correctly, keeping only the last 2 entries ...
+        store.put(0, "zero");
+        store.put(1, "one");
+        store.put(2, "two");
+        store.put(3, "three");
+        store.put(4, "four");
+        store.put(5, "five");
+
+        // It should only keep the last 4 added ...
+        assertEquals(3, driver.sizeOf(store));
+        assertNull(store.get(0));
+        assertNull(store.get(1));
+        assertNull(store.get(2));
+        assertEquals("three", store.get(3));
+        assertEquals("four", store.get(4));
+        assertEquals("five", store.get(5));
+        store.delete(5);
+
+        // Flush the store and verify all current entries were properly flushed ...
+        store.flush();
+        assertNull(driver.flushedEntryStored(0));
+        assertNull(driver.flushedEntryStored(1));
+        assertNull(driver.flushedEntryStored(2));
+        assertEquals("three", driver.flushedEntryStored(3));
+        assertEquals("four", driver.flushedEntryStored(4));
+        assertNull(driver.flushedEntryStored(5));
+
+        assertEquals(true, driver.flushedEntryRemoved(0));
+        assertEquals(true, driver.flushedEntryRemoved(1));
+        assertEquals(true, driver.flushedEntryRemoved(2));
+        assertEquals(false, driver.flushedEntryRemoved(3));
+        assertEquals(false, driver.flushedEntryRemoved(4));
+        assertEquals(true, driver.flushedEntryRemoved(5));
+    }
+
+    @Test
+    public void testRestore() {
+        // Create the test driver ...
+        KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
+
+        // Add any entries that will be restored to any store
+        // that uses the driver's context ...
+        driver.addEntryToRestoreLog(1, "one");
+        driver.addEntryToRestoreLog(2, "two");
+        driver.addEntryToRestoreLog(4, "four");
+
+        // Create the store, which should register with the context and automatically
+        // receive the restore entries ...
+        KeyValueStore<Integer, String> store = Stores.create("my-store", driver.context())
+                                                     .withIntegerKeys().withStringValues()
+                                                     .inMemory().maxEntries(3)
+                                                     .build();
+
+        // Verify that the store's contents were properly restored ...
+        assertEquals(0, driver.checkForRestoredEntries(store));
+
+        // and there are no other entries ...
+        assertEquals(3, driver.sizeOf(store));
+    }
+
+}
\ No newline at end of file