You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/10/11 20:57:33 UTC

kafka git commit: KAFKA-4283: records deleted from CachingKeyValueStore still appear in range and all queries

Repository: kafka
Updated Branches:
  refs/heads/trunk 9179bbc06 -> 93b940016


KAFKA-4283: records deleted from CachingKeyValueStore still appear in range and all queries

Records that are deleted/removed from the CachingKeyValueStore shouldn't appear in range and all queries.
Modified the iterator such that it skips over the deleted records.

Author: Damian Guy <da...@gmail.com>

Reviewers: Eno Thereska, Matthias J. Sax, Guozhang Wang

Closes #2001 from dguy/kafka-4283


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/93b94001
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/93b94001
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/93b94001

Branch: refs/heads/trunk
Commit: 93b940016348fcf64034f36e4b4f3fc53a966f74
Parents: 9179bbc
Author: Damian Guy <da...@gmail.com>
Authored: Tue Oct 11 13:57:30 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Oct 11 13:57:30 2016 -0700

----------------------------------------------------------------------
 .../MergedSortedCacheKeyValueStoreIterator.java |  15 +++
 .../streams/state/internals/ThreadCache.java    |   7 ++
 .../internals/CachingKeyValueStoreTest.java     |  20 ++++
 ...gedSortedCacheKeyValueStoreIteratorTest.java | 107 +++++++++++++++++--
 4 files changed, 141 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/93b94001/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIterator.java
index 23bbe7f..cbcd0f4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIterator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIterator.java
@@ -46,9 +46,24 @@ class MergedSortedCacheKeyValueStoreIterator<K, V> implements KeyValueIterator<K
 
     @Override
     public boolean hasNext() {
+        while (cacheIterator.hasNext() && isDeletedCacheEntry(cacheIterator.peekNext())) {
+            if (storeIterator.hasNext()) {
+                final byte[] storeKey = storeIterator.peekNextKey().get();
+                // advance the store iterator if the key is the same as the deleted cache key
+                if (comparator.compare(storeKey, cacheIterator.peekNext().key) == 0) {
+                    storeIterator.next();
+                }
+            }
+            // skip over items deleted from cache
+            cacheIterator.next();
+        }
         return cacheIterator.hasNext() || storeIterator.hasNext();
     }
 
+    private boolean isDeletedCacheEntry(final KeyValue<byte[], LRUCacheEntry> nextFromCache) {
+        return  nextFromCache.value.value == null;
+    }
+
 
     @Override
     public KeyValue<K, V> next() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/93b94001/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
index e24dc7a..d76e5c8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
@@ -228,6 +228,13 @@ public class ThreadCache {
             return nextEntry.key;
         }
 
+        KeyValue<byte[], LRUCacheEntry> peekNext() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            return nextEntry;
+        }
+
         @Override
         public boolean hasNext() {
             if (nextEntry != null) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/93b94001/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
index e7c59ef..23f8a6a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingKeyValueStoreTest.java
@@ -36,6 +36,7 @@ import java.util.Map;
 
 import static org.apache.kafka.streams.state.internals.ThreadCacheTest.memoryCacheEntrySize;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 
@@ -129,6 +130,25 @@ public class CachingKeyValueStoreTest {
         assertEquals(items, results.size());
     }
 
+    @Test
+    public void shouldDeleteItemsFromCache() throws Exception {
+        store.put("a", "a");
+        store.delete("a");
+        assertNull(store.get("a"));
+        assertFalse(store.range("a", "b").hasNext());
+        assertFalse(store.all().hasNext());
+    }
+
+    @Test
+    public void shouldNotShowItemsDeletedFromCacheButFlushedToStoreBeforeDelete() throws Exception {
+        store.put("a", "a");
+        store.flush();
+        store.delete("a");
+        assertNull(store.get("a"));
+        assertFalse(store.range("a", "b").hasNext());
+        assertFalse(store.all().hasNext());
+    }
+
     private int addItemsToCache() throws IOException {
         int cachedSize = 0;
         int i = 0;

http://git-wip-us.apache.org/repos/asf/kafka/blob/93b94001/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java
index 59607ea..dee2593 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java
@@ -21,29 +21,39 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StateSerdes;
+import org.junit.Before;
 import org.junit.Test;
 
 import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertFalse;
 
 public class MergedSortedCacheKeyValueStoreIteratorTest {
 
+    private final String namespace = "one";
+    private final StateSerdes<byte[], byte[]> serdes =  new StateSerdes<>(namespace, Serdes.ByteArray(), Serdes.ByteArray());
+    private KeyValueStore<Bytes, byte[]> store;
+    private ThreadCache cache;
+
+    @Before
+    public void setUp() throws Exception {
+        store = new InMemoryKeyValueStore<>(namespace);
+        cache = new ThreadCache(10000L);
+    }
+
     @Test
     public void shouldIterateOverRange() throws Exception {
-        KeyValueStore<Bytes, byte[]> kv = new InMemoryKeyValueStore<>("one");
-        final ThreadCache cache = new ThreadCache(1000000L);
-        byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}};
-        final String namespace = "one";
-        for (int i = 0; i < bytes.length - 1; i += 2) {
-            kv.put(Bytes.wrap(bytes[i]), bytes[i]);
+        final byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}, {11}};
+        for (int i = 0; i < bytes.length; i += 2) {
+            store.put(Bytes.wrap(bytes[i]), bytes[i]);
             cache.put(namespace, bytes[i + 1], new LRUCacheEntry(bytes[i + 1]));
         }
 
         final Bytes from = Bytes.wrap(new byte[]{2});
         final Bytes to = Bytes.wrap(new byte[]{9});
-        final PeekingKeyValueIterator<Bytes, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>(kv.range(from, to));
+        final PeekingKeyValueIterator<Bytes, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>(store.range(from, to));
         final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(namespace, from.get(), to.get());
 
-        final MergedSortedCacheKeyValueStoreIterator<byte[], byte[]> iterator = new MergedSortedCacheKeyValueStoreIterator<>(cacheIterator, storeIterator, new StateSerdes<>("name", Serdes.ByteArray(), Serdes.ByteArray()));
+        final MergedSortedCacheKeyValueStoreIterator<byte[], byte[]> iterator = new MergedSortedCacheKeyValueStoreIterator<>(cacheIterator, storeIterator, serdes);
         byte[][] values = new byte[8][];
         int index = 0;
         int bytesIndex = 2;
@@ -54,4 +64,85 @@ public class MergedSortedCacheKeyValueStoreIteratorTest {
         }
     }
 
+
+    @Test
+    public void shouldSkipLargerDeletedCacheValue() throws Exception {
+        final byte[][] bytes = {{0}, {1}};
+        store.put(Bytes.wrap(bytes[0]), bytes[0]);
+        cache.put(namespace, bytes[1], new LRUCacheEntry(null));
+        final MergedSortedCacheKeyValueStoreIterator<byte[], byte[]> iterator = createIterator();
+        assertArrayEquals(bytes[0], iterator.next().key);
+        assertFalse(iterator.hasNext());
+    }
+
+    @Test
+    public void shouldSkipSmallerDeletedCachedValue() throws Exception {
+        final byte[][] bytes = {{0}, {1}};
+        cache.put(namespace, bytes[0], new LRUCacheEntry(null));
+        store.put(Bytes.wrap(bytes[1]), bytes[1]);
+        final MergedSortedCacheKeyValueStoreIterator<byte[], byte[]> iterator = createIterator();
+        assertArrayEquals(bytes[1], iterator.next().key);
+        assertFalse(iterator.hasNext());
+    }
+
+    @Test
+    public void shouldIgnoreIfDeletedInCacheButExistsInStore() throws Exception {
+        final byte[][] bytes = {{0}};
+        cache.put(namespace, bytes[0], new LRUCacheEntry(null));
+        store.put(Bytes.wrap(bytes[0]), bytes[0]);
+        final MergedSortedCacheKeyValueStoreIterator<byte[], byte[]> iterator = createIterator();
+        assertFalse(iterator.hasNext());
+    }
+
+    @Test
+    public void shouldNotHaveNextIfAllCachedItemsDeleted() throws Exception {
+        final byte[][] bytes = {{0}, {1}, {2}};
+        for (int i = 0; i < bytes.length; i++) {
+            store.put(Bytes.wrap(bytes[i]), bytes[i]);
+            cache.put(namespace, bytes[i], new LRUCacheEntry(null));
+        }
+        assertFalse(createIterator().hasNext());
+    }
+
+    @Test
+    public void shouldNotHaveNextIfOnlyCacheItemsAndAllDeleted() throws Exception {
+        final byte[][] bytes = {{0}, {1}, {2}};
+        for (int i = 0; i < bytes.length; i++) {
+            cache.put(namespace, bytes[i], new LRUCacheEntry(null));
+        }
+        assertFalse(createIterator().hasNext());
+    }
+
+    @Test
+    public void shouldSkipAllDeletedFromCache() throws Exception {
+        final byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}, {11}};
+        for (int i = 0; i < bytes.length; i++) {
+            store.put(Bytes.wrap(bytes[i]), bytes[i]);
+            cache.put(namespace, bytes[i], new LRUCacheEntry(bytes[i]));
+        }
+        cache.put(namespace, bytes[1], new LRUCacheEntry(null));
+        cache.put(namespace, bytes[2], new LRUCacheEntry(null));
+        cache.put(namespace, bytes[3], new LRUCacheEntry(null));
+        cache.put(namespace, bytes[8], new LRUCacheEntry(null));
+        cache.put(namespace, bytes[11], new LRUCacheEntry(null));
+
+        final MergedSortedCacheKeyValueStoreIterator<byte[], byte[]> iterator = createIterator();
+        assertArrayEquals(bytes[0], iterator.next().key);
+        assertArrayEquals(bytes[4], iterator.next().key);
+        assertArrayEquals(bytes[5], iterator.next().key);
+        assertArrayEquals(bytes[6], iterator.next().key);
+        assertArrayEquals(bytes[7], iterator.next().key);
+        assertArrayEquals(bytes[9], iterator.next().key);
+        assertArrayEquals(bytes[10], iterator.next().key);
+        assertFalse(iterator.hasNext());
+
+    }
+
+    private MergedSortedCacheKeyValueStoreIterator<byte[], byte[]> createIterator() {
+        final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.all(namespace);
+        final PeekingKeyValueIterator<Bytes, byte[]> storeIterator = new DelegatingPeekingKeyValueIterator<>(store.all());
+        return new MergedSortedCacheKeyValueStoreIterator<>(cacheIterator, storeIterator, serdes);
+    }
+
+
 }
\ No newline at end of file