You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/10/11 20:57:33 UTC
kafka git commit: KAFKA-4283: records deleted from
CachingKeyValueStore still appear in range and all queries
Repository: kafka
Updated Branches:
refs/heads/trunk 9179bbc06 -> 93b940016
KAFKA-4283: records deleted from CachingKeyValueStore still appear in range and all queries
Records that are deleted/removed from the CachingKeyValueStore shouldn't appear in range and all queries.
Modified the iterator such that it skips over the deleted records.
Author: Damian Guy <da...@gmail.com>
Reviewers: Eno Thereska, Matthias J. Sax, Guozhang Wang
Closes #2001 from dguy/kafka-4283
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/93b94001
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/93b94001
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/93b94001
Branch: refs/heads/trunk
Commit: 93b940016348fcf64034f36e4b4f3fc53a966f74
Parents: 9179bbc
Author: Damian Guy <da...@gmail.com>
Authored: Tue Oct 11 13:57:30 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Oct 11 13:57:30 2016 -0700
----------------------------------------------------------------------
.../MergedSortedCacheKeyValueStoreIterator.java | 15 +++
.../streams/state/internals/ThreadCache.java | 7 ++
.../internals/CachingKeyValueStoreTest.java | 20 ++++
...gedSortedCacheKeyValueStoreIteratorTest.java | 107 +++++++++++++++++--
4 files changed, 141 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/93b94001/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIterator.java
index 23bbe7f..cbcd0f4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIterator.java
@@ -46,9 +46,24 @@ class MergedSortedCacheKeyValueStoreIterator<K, V> implements KeyValueIterator<K
@Override
public boolean hasNext() {
+ while (cacheIterator.hasNext() && isDeletedCacheEntry(cacheIterator.peekNext())) {
+ if (storeIterator.hasNext()) {
+ final byte[] storeKey = storeIterator.peekNextKey().get();
+ // advance the store iterator if the key is the same as the deleted cache key
+ if (comparator.compare(storeKey, cacheIterator.peekNext().key) == 0) {
+ storeIterator.next();
+ }
+ }
+ // skip over items deleted from cache
+ cacheIterator.next();
+ }
return cacheIterator.hasNext() || storeIterator.hasNext();
}
+ private boolean isDeletedCacheEntry(final KeyValue<byte[], LRUCacheEntry> nextFromCache) {
+ return nextFromCache.value.value == null;
+ }
+
@Override
public KeyValue<K, V> next() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/93b94001/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
index e24dc7a..d76e5c8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
@@ -228,6 +228,13 @@ public class ThreadCache {
return nextEntry.key;
}
+ KeyValue<byte[], LRUCacheEntry> peekNext() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ return nextEntry;
+ }
+
@Override
public boolean hasNext() {
if (nextEntry != null) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/93b94001/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
index e7c59ef..23f8a6a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
@@ -36,6 +36,7 @@ import java.util.Map;
import static org.apache.kafka.streams.state.internals.ThreadCacheTest.memoryCacheEntrySize;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
@@ -129,6 +130,25 @@ public class CachingKeyValueStoreTest {
assertEquals(items, results.size());
}
+ @Test
+ public void shouldDeleteItemsFromCache() throws Exception {
+ store.put("a", "a");
+ store.delete("a");
+ assertNull(store.get("a"));
+ assertFalse(store.range("a", "b").hasNext());
+ assertFalse(store.all().hasNext());
+ }
+
+ @Test
+ public void shouldNotShowItemsDeletedFromCacheButFlushedToStoreBeforeDelete() throws Exception {
+ store.put("a", "a");
+ store.flush();
+ store.delete("a");
+ assertNull(store.get("a"));
+ assertFalse(store.range("a", "b").hasNext());
+ assertFalse(store.all().hasNext());
+ }
+
private int addItemsToCache() throws IOException {
int cachedSize = 0;
int i = 0;
http://git-wip-us.apache.org/repos/asf/kafka/blob/93b94001/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java
index 59607ea..dee2593 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java
@@ -21,29 +21,39 @@ import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StateSerdes;
+import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertFalse;
public class MergedSortedCacheKeyValueStoreIteratorTest {
+ private final String namespace = "one";
+ private final StateSerdes<byte[], byte[]> serdes = new StateSerdes<>(namespace, Serdes.ByteArray(), Serdes.ByteArray());
+ private KeyValueStore<Bytes, byte[]> store;
+ private ThreadCache cache;
+
+ @Before
+ public void setUp() throws Exception {
+ store = new InMemoryKeyValueStore<>(namespace);
+ cache = new ThreadCache(10000L);
+ }
+
@Test
public void shouldIterateOverRange() throws Exception {
- KeyValueStore<Bytes, byte[]> kv = new InMemoryKeyValueStore<>("one");
- final ThreadCache cache = new ThreadCache(1000000L);
- byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}};
- final String namespace = "one";
- for (int i = 0; i < bytes.length - 1; i += 2) {
- kv.put(Bytes.wrap(bytes[i]), bytes[i]);
+ final byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}, {11}};
+ for (int i = 0; i < bytes.length; i += 2) {
+ store.put(Bytes.wrap(bytes[i]), bytes[i]);
cache.put(namespace, bytes[i + 1], new LRUCacheEntry(bytes[i + 1]));
}
final Bytes from = Bytes.wrap(new byte[]{2});
final Bytes to = Bytes.wrap(new byte[]{9});
- final PeekingKeyValueIterator<Bytes, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>(kv.range(from, to));
+ final PeekingKeyValueIterator<Bytes, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>(store.range(from, to));
final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(namespace, from.get(), to.get());
- final MergedSortedCacheKeyValueStoreIterator<byte[], byte[]> iterator = new MergedSortedCacheKeyValueStoreIterator<>(cacheIterator, storeIterator, new StateSerdes<>("name", Serdes.ByteArray(), Serdes.ByteArray()));
+ final MergedSortedCacheKeyValueStoreIterator<byte[], byte[]> iterator = new MergedSortedCacheKeyValueStoreIterator<>(cacheIterator, storeIterator, serdes);
byte[][] values = new byte[8][];
int index = 0;
int bytesIndex = 2;
@@ -54,4 +64,85 @@ public class MergedSortedCacheKeyValueStoreIteratorTest {
}
}
+
+ @Test
+ public void shouldSkipLargerDeletedCacheValue() throws Exception {
+ final byte[][] bytes = {{0}, {1}};
+ store.put(Bytes.wrap(bytes[0]), bytes[0]);
+ cache.put(namespace, bytes[1], new LRUCacheEntry(null));
+ final MergedSortedCacheKeyValueStoreIterator<byte[], byte[]> iterator = createIterator();
+ assertArrayEquals(bytes[0], iterator.next().key);
+ assertFalse(iterator.hasNext());
+ }
+
+ @Test
+ public void shouldSkipSmallerDeletedCachedValue() throws Exception {
+ final byte[][] bytes = {{0}, {1}};
+ cache.put(namespace, bytes[0], new LRUCacheEntry(null));
+ store.put(Bytes.wrap(bytes[1]), bytes[1]);
+ final MergedSortedCacheKeyValueStoreIterator<byte[], byte[]> iterator = createIterator();
+ assertArrayEquals(bytes[1], iterator.next().key);
+ assertFalse(iterator.hasNext());
+ }
+
+ @Test
+ public void shouldIgnoreIfDeletedInCacheButExistsInStore() throws Exception {
+ final byte[][] bytes = {{0}};
+ cache.put(namespace, bytes[0], new LRUCacheEntry(null));
+ store.put(Bytes.wrap(bytes[0]), bytes[0]);
+ final MergedSortedCacheKeyValueStoreIterator<byte[], byte[]> iterator = createIterator();
+ assertFalse(iterator.hasNext());
+ }
+
+ @Test
+ public void shouldNotHaveNextIfAllCachedItemsDeleted() throws Exception {
+ final byte[][] bytes = {{0}, {1}, {2}};
+ for (int i = 0; i < bytes.length; i++) {
+ store.put(Bytes.wrap(bytes[i]), bytes[i]);
+ cache.put(namespace, bytes[i], new LRUCacheEntry(null));
+ }
+ assertFalse(createIterator().hasNext());
+ }
+
+ @Test
+ public void shouldNotHaveNextIfOnlyCacheItemsAndAllDeleted() throws Exception {
+ final byte[][] bytes = {{0}, {1}, {2}};
+ for (int i = 0; i < bytes.length; i++) {
+ cache.put(namespace, bytes[i], new LRUCacheEntry(null));
+ }
+ assertFalse(createIterator().hasNext());
+ }
+
+ @Test
+ public void shouldSkipAllDeletedFromCache() throws Exception {
+ final byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}, {11}};
+ for (int i = 0; i < bytes.length; i++) {
+ store.put(Bytes.wrap(bytes[i]), bytes[i]);
+ cache.put(namespace, bytes[i], new LRUCacheEntry(bytes[i]));
+ }
+ cache.put(namespace, bytes[1], new LRUCacheEntry(null));
+ cache.put(namespace, bytes[2], new LRUCacheEntry(null));
+ cache.put(namespace, bytes[3], new LRUCacheEntry(null));
+ cache.put(namespace, bytes[8], new LRUCacheEntry(null));
+ cache.put(namespace, bytes[11], new LRUCacheEntry(null));
+
+ final MergedSortedCacheKeyValueStoreIterator<byte[], byte[]> iterator = createIterator();
+ assertArrayEquals(bytes[0], iterator.next().key);
+ assertArrayEquals(bytes[4], iterator.next().key);
+ assertArrayEquals(bytes[5], iterator.next().key);
+ assertArrayEquals(bytes[6], iterator.next().key);
+ assertArrayEquals(bytes[7], iterator.next().key);
+ assertArrayEquals(bytes[9], iterator.next().key);
+ assertArrayEquals(bytes[10], iterator.next().key);
+ assertFalse(iterator.hasNext());
+
+ }
+
+ private MergedSortedCacheKeyValueStoreIterator<byte[], byte[]> createIterator() {
+ final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.all(namespace);
+ final PeekingKeyValueIterator<Bytes, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>(store.all());
+ return new MergedSortedCacheKeyValueStoreIterator<>(cacheIterator, storeIterator, serdes);
+ }
+
+
}
\ No newline at end of file