You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/10/16 23:41:44 UTC
kafka git commit: KAFKA-2594: Add InMemoryLRUCacheStore as a
preliminary method for bounding in-memory stores
Repository: kafka
Updated Branches:
refs/heads/trunk aa66b42da -> c553249b4
KAFKA-2594: Add InMemoryLRUCacheStore as a preliminary method for bounding in-memory stores
Added a new `KeyValueStore` implementation called `InMemoryLRUCacheStore` that keeps a maximum number of entries in-memory, and as the size exceeds the capacity the least-recently used entry is removed from the store and the backing topic. Also added unit tests for this new store and the existing `InMemoryKeyValueStore` and `RocksDBKeyValueStore` implementations. A new `KeyValueStoreTestDriver` class simplifies all of the other tests, and can be used by other libraries to help test their own custom implementations.
This PR depends upon [KAFKA-2593](https://issues.apache.org/jira/browse/KAFKA-2593) and its PR at https://github.com/apache/kafka/pull/255. Once that PR is merged, I can rebase this PR if desired.
Two issues were uncovered when creating these new unit tests, and both are also addressed as separate (small) commits in this PR:
* The `RocksDBKeyValueStore` initialization was not creating the file system directory if missing.
* `MeteredKeyValueStore` was casting to `ProcessorContextImpl` to access the `RecordCollector`, which prevent using `MeteredKeyValueStore` implementations in tests where something other than `ProcessorContextImpl` was used. The fix was to introduce a `RecordCollector.Supplier` interface to define this `recordCollector()` method, and change `ProcessorContextImpl` and `MockProcessorContext` to both implement this interface. Now, `MeteredKeyValueStore` can cast to the new interface to access the record collector rather than to a single concrete implementation, making it possible to use any and all current stores inside unit tests.
Author: Randall Hauch <rh...@gmail.com>
Reviewers: Edward Ribeiro, Guozhang Wang
Closes #256 from rhauch/kafka-2594
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c553249b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c553249b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c553249b
Branch: refs/heads/trunk
Commit: c553249b4e6d7d385821061f9742f15f90e94def
Parents: aa66b42
Author: Randall Hauch <rh...@gmail.com>
Authored: Fri Oct 16 14:46:26 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Oct 16 14:46:26 2015 -0700
----------------------------------------------------------------------
.../streams/state/InMemoryLRUCacheStore.java | 180 +++++++++++++++++++
.../streams/state/MeteredKeyValueStore.java | 44 ++++-
.../org/apache/kafka/streams/state/Stores.java | 25 ++-
.../state/InMemoryLRUCacheStoreTest.java | 148 +++++++++++++++
4 files changed, 386 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/c553249b/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStore.java b/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStore.java
new file mode 100644
index 0000000..1b96c59
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStore.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.processor.ProcessorContext;
+
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+
+/**
+ * An in-memory key-value store that is limited in size and retains a maximum number of most recently used entries.
+ *
+ * @param <K> The key type
+ * @param <V> The value type
+ *
+ */
+public class InMemoryLRUCacheStore<K, V> extends MeteredKeyValueStore<K, V> {
+
+ protected static <K, V> InMemoryLRUCacheStore<K, V> create(String name, int capacity, ProcessorContext context,
+ Serdes<K, V> serdes, Time time) {
+ if (time == null) time = new SystemTime();
+ MemoryLRUCache<K, V> cache = new MemoryLRUCache<K, V>(name, capacity);
+ final InMemoryLRUCacheStore<K, V> store = new InMemoryLRUCacheStore<>(name, context, cache, serdes, time);
+ cache.whenEldestRemoved(new EldestEntryRemovalListener<K, V>() {
+ @Override
+ public void apply(K key, V value) {
+ store.removed(key);
+ }
+ });
+ return store;
+
+ }
+
+ private InMemoryLRUCacheStore(String name, ProcessorContext context, MemoryLRUCache<K, V> cache, Serdes<K, V> serdes, Time time) {
+ super(name, cache, context, serdes, "kafka-streams", time);
+ }
+
+ private static interface EldestEntryRemovalListener<K, V> {
+ public void apply(K key, V value);
+ }
+
+ protected static final class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
+
+ private final String name;
+ private final Map<K, V> map;
+ private final NavigableSet<K> keys;
+ private EldestEntryRemovalListener<K, V> listener;
+
+ public MemoryLRUCache(String name, final int maxCacheSize) {
+ this.name = name;
+ this.keys = new TreeSet<>();
+ // leave room for one extra entry to handle adding an entry before the oldest can be removed
+ this.map = new LinkedHashMap<K, V>(maxCacheSize + 1, 1.01f, true) {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
+ if (size() > maxCacheSize) {
+ K key = eldest.getKey();
+ keys.remove(key);
+ if (listener != null) listener.apply(key, eldest.getValue());
+ return true;
+ }
+ return false;
+ }
+ };
+ }
+
+ protected void whenEldestRemoved(EldestEntryRemovalListener<K, V> listener) {
+ this.listener = listener;
+ }
+
+ @Override
+ public String name() {
+ return this.name;
+ }
+
+ @Override
+ public boolean persistent() {
+ return false;
+ }
+
+ @Override
+ public V get(K key) {
+ return this.map.get(key);
+ }
+
+ @Override
+ public void put(K key, V value) {
+ this.map.put(key, value);
+ this.keys.add(key);
+ }
+
+ @Override
+ public void putAll(List<Entry<K, V>> entries) {
+ for (Entry<K, V> entry : entries)
+ put(entry.key(), entry.value());
+ }
+
+ @Override
+ public V delete(K key) {
+ V value = this.map.remove(key);
+ this.keys.remove(key);
+ return value;
+ }
+
+ @Override
+ public KeyValueIterator<K, V> range(K from, K to) {
+ return new MemoryLRUCache.CacheIterator<K, V>(this.keys.subSet(from, true, to, false).iterator(), this.map);
+ }
+
+ @Override
+ public KeyValueIterator<K, V> all() {
+ return new MemoryLRUCache.CacheIterator<K, V>(this.keys.iterator(), this.map);
+ }
+
+ @Override
+ public void flush() {
+ // do-nothing since it is in-memory
+ }
+
+ @Override
+ public void close() {
+ // do-nothing
+ }
+
+ private static class CacheIterator<K, V> implements KeyValueIterator<K, V> {
+ private final Iterator<K> keys;
+ private final Map<K, V> entries;
+ private K lastKey;
+
+ public CacheIterator(Iterator<K> keys, Map<K, V> entries) {
+ this.keys = keys;
+ this.entries = entries;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return keys.hasNext();
+ }
+
+ @Override
+ public Entry<K, V> next() {
+ lastKey = keys.next();
+ return new Entry<>(lastKey, entries.get(lastKey));
+ }
+
+ @Override
+ public void remove() {
+ keys.remove();
+ entries.remove(lastKey);
+ }
+
+ @Override
+ public void close() {
+ // do nothing
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/c553249b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
index 90eee05..9a652ac 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
@@ -50,7 +50,9 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
private final String topic;
private final int partition;
private final Set<K> dirty;
+ private final Set<K> removed;
private final int maxDirty;
+ private final int maxRemoved;
private final ProcessorContext context;
// always wrap the logged store with the metered store
@@ -76,7 +78,9 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
this.context = context;
this.dirty = new HashSet<K>();
- this.maxDirty = 100; // TODO: this needs to be configurable
+ this.removed = new HashSet<K>();
+ this.maxDirty = 100; // TODO: this needs to be configurable
+ this.maxRemoved = 100; // TODO: this needs to be configurable
// register and possibly restore the state from the logs
long startNs = time.nanoseconds();
@@ -123,8 +127,8 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
this.inner.put(key, value);
this.dirty.add(key);
- if (this.dirty.size() > this.maxDirty)
- logChange();
+ this.removed.remove(key);
+ maybeLogChange();
} finally {
this.metrics.recordLatency(this.putTime, startNs, time.nanoseconds());
}
@@ -137,11 +141,12 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
this.inner.putAll(entries);
for (Entry<K, V> entry : entries) {
- this.dirty.add(entry.key());
+ K key = entry.key();
+ this.dirty.add(key);
+ this.removed.remove(key);
}
- if (this.dirty.size() > this.maxDirty)
- logChange();
+ maybeLogChange();
} finally {
this.metrics.recordLatency(this.putAllTime, startNs, time.nanoseconds());
}
@@ -153,9 +158,9 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
try {
V value = this.inner.delete(key);
- this.dirty.add(key);
- if (this.dirty.size() > this.maxDirty)
- logChange();
+ this.dirty.remove(key);
+ this.removed.add(key);
+ maybeLogChange();
return value;
} finally {
@@ -163,6 +168,18 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
}
}
+ /**
+ * Called when the underlying {@link #inner} {@link KeyValueStore} removes an entry in response to a call from this
+ * store other than {@link #delete(Object)}.
+ *
+ * @param key the key for the entry that the inner store removed
+ */
+ protected void removed(K key) {
+ this.dirty.remove(key);
+ this.removed.add(key);
+ maybeLogChange();
+ }
+
@Override
public KeyValueIterator<K, V> range(K from, K to) {
return new MeteredKeyValueIterator<K, V>(this.inner.range(from, to), this.rangeTime);
@@ -189,16 +206,25 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
}
}
+ private void maybeLogChange() {
+ if (this.dirty.size() > this.maxDirty || this.removed.size() > this.maxRemoved)
+ logChange();
+ }
+
private void logChange() {
RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector();
if (collector != null) {
Serializer<K> keySerializer = serialization.keySerializer();
Serializer<V> valueSerializer = serialization.valueSerializer();
+ for (K k : this.removed) {
+ collector.send(new ProducerRecord<>(this.topic, this.partition, k, (V) null), keySerializer, valueSerializer);
+ }
for (K k : this.dirty) {
V v = this.inner.get(k);
collector.send(new ProducerRecord<>(this.topic, this.partition, k, v), keySerializer, valueSerializer);
}
+ this.removed.clear();
this.dirty.clear();
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c553249b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index 171ed44..8ec73fd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -54,8 +54,20 @@ public class Stores {
@Override
public InMemoryKeyValueFactory<K, V> inMemory() {
return new InMemoryKeyValueFactory<K, V>() {
+ private int capacity = Integer.MAX_VALUE;
+
+ @Override
+ public InMemoryKeyValueFactory<K, V> maxEntries(int capacity) {
+ if (capacity < 1) throw new IllegalArgumentException("The capacity must be positive");
+ this.capacity = capacity;
+ return this;
+ }
+
@Override
public KeyValueStore<K, V> build() {
+ if (capacity < Integer.MAX_VALUE) {
+ return InMemoryLRUCacheStore.create(name, capacity, context, serdes, null);
+ }
return new InMemoryKeyValueStore<>(name, context, serdes, null);
}
};
@@ -138,7 +150,7 @@ public class Stores {
}
/**
- * The interface used to specify the type of values for key-value stores.
+ * The factory for creating off-heap key-value stores.
*
* @param <K> the type of keys
*/
@@ -200,7 +212,6 @@ public class Stores {
* @return the interface used to specify the remaining key-value store options; never null
*/
public abstract <V> KeyValueFactory<K, V> withValues(Serializer<V> valueSerializer, Deserializer<V> valueDeserializer);
-
}
/**
@@ -235,6 +246,16 @@ public class Stores {
*/
public static interface InMemoryKeyValueFactory<K, V> {
/**
+ * Limits the in-memory key-value store to hold a maximum number of entries. The default is {@link Integer#MAX_VALUE}, which is
+ * equivalent to not placing a limit on the number of entries.
+ *
+ * @param capacity the maximum capacity of the in-memory cache; should be one less than a power of 2
+ * @return this factory
+ * @throws IllegalArgumentException if the capacity is not positive
+ */
+ InMemoryKeyValueFactory<K, V> maxEntries(int capacity);
+
+ /**
* Return the new key-value store.
* @return the key-value store; never null
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/c553249b/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java
new file mode 100644
index 0000000..6b96d3a
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.junit.Test;
+
+public class InMemoryLRUCacheStoreTest {
+
+ @Test
+ public void testPutGetRange() {
+ // Create the test driver ...
+ KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create();
+ KeyValueStore<Integer, String> store = Stores.create("my-store", driver.context())
+ .withIntegerKeys().withStringValues()
+ .inMemory().maxEntries(3)
+ .build();
+
+ // Verify that the store reads and writes correctly, keeping only the last 2 entries ...
+ store.put(0, "zero");
+ store.put(1, "one");
+ store.put(2, "two");
+ store.put(3, "three");
+ store.put(4, "four");
+ store.put(5, "five");
+
+ // It should only keep the last 4 added ...
+ assertEquals(3, driver.sizeOf(store));
+ assertNull(store.get(0));
+ assertNull(store.get(1));
+ assertNull(store.get(2));
+ assertEquals("three", store.get(3));
+ assertEquals("four", store.get(4));
+ assertEquals("five", store.get(5));
+ store.delete(5);
+
+ // Flush the store and verify all current entries were properly flushed ...
+ store.flush();
+ assertNull(driver.flushedEntryStored(0));
+ assertNull(driver.flushedEntryStored(1));
+ assertNull(driver.flushedEntryStored(2));
+ assertEquals("three", driver.flushedEntryStored(3));
+ assertEquals("four", driver.flushedEntryStored(4));
+ assertNull(driver.flushedEntryStored(5));
+
+ assertEquals(true, driver.flushedEntryRemoved(0));
+ assertEquals(true, driver.flushedEntryRemoved(1));
+ assertEquals(true, driver.flushedEntryRemoved(2));
+ assertEquals(false, driver.flushedEntryRemoved(3));
+ assertEquals(false, driver.flushedEntryRemoved(4));
+ assertEquals(true, driver.flushedEntryRemoved(5));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testPutGetRangeWithDefaultSerdes() {
+ // Create the test driver ...
+ KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create();
+
+ Serializer<Integer> keySer = (Serializer<Integer>) driver.context().keySerializer();
+ Deserializer<Integer> keyDeser = (Deserializer<Integer>) driver.context().keyDeserializer();
+ Serializer<String> valSer = (Serializer<String>) driver.context().valueSerializer();
+ Deserializer<String> valDeser = (Deserializer<String>) driver.context().valueDeserializer();
+ KeyValueStore<Integer, String> store = Stores.create("my-store", driver.context())
+ .withKeys(keySer, keyDeser)
+ .withValues(valSer, valDeser)
+ .inMemory().maxEntries(3)
+ .build();
+
+ // Verify that the store reads and writes correctly, keeping only the last 2 entries ...
+ store.put(0, "zero");
+ store.put(1, "one");
+ store.put(2, "two");
+ store.put(3, "three");
+ store.put(4, "four");
+ store.put(5, "five");
+
+ // It should only keep the last 4 added ...
+ assertEquals(3, driver.sizeOf(store));
+ assertNull(store.get(0));
+ assertNull(store.get(1));
+ assertNull(store.get(2));
+ assertEquals("three", store.get(3));
+ assertEquals("four", store.get(4));
+ assertEquals("five", store.get(5));
+ store.delete(5);
+
+ // Flush the store and verify all current entries were properly flushed ...
+ store.flush();
+ assertNull(driver.flushedEntryStored(0));
+ assertNull(driver.flushedEntryStored(1));
+ assertNull(driver.flushedEntryStored(2));
+ assertEquals("three", driver.flushedEntryStored(3));
+ assertEquals("four", driver.flushedEntryStored(4));
+ assertNull(driver.flushedEntryStored(5));
+
+ assertEquals(true, driver.flushedEntryRemoved(0));
+ assertEquals(true, driver.flushedEntryRemoved(1));
+ assertEquals(true, driver.flushedEntryRemoved(2));
+ assertEquals(false, driver.flushedEntryRemoved(3));
+ assertEquals(false, driver.flushedEntryRemoved(4));
+ assertEquals(true, driver.flushedEntryRemoved(5));
+ }
+
+ @Test
+ public void testRestore() {
+ // Create the test driver ...
+ KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
+
+ // Add any entries that will be restored to any store
+ // that uses the driver's context ...
+ driver.addEntryToRestoreLog(1, "one");
+ driver.addEntryToRestoreLog(2, "two");
+ driver.addEntryToRestoreLog(4, "four");
+
+ // Create the store, which should register with the context and automatically
+ // receive the restore entries ...
+ KeyValueStore<Integer, String> store = Stores.create("my-store", driver.context())
+ .withIntegerKeys().withStringValues()
+ .inMemory().maxEntries(3)
+ .build();
+
+ // Verify that the store's contents were properly restored ...
+ assertEquals(0, driver.checkForRestoredEntries(store));
+
+ // and there are no other entries ...
+ assertEquals(3, driver.sizeOf(store));
+ }
+
+}
\ No newline at end of file