You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/10/19 21:01:29 UTC

kafka git commit: HOTFIX: Fix putAll and putIfAbsent logic for correct eviction behavior

Repository: kafka
Updated Branches:
  refs/heads/trunk 332b8b9af -> 179d4dc0f


HOTFIX: Fix putAll and putIfAbsent logic for correct eviction behavior

Author: Eno Thereska <en...@gmail.com>

Reviewers: Damian Guy, Guozhang Wang

Closes #2038 from enothereska/hotfix-put-cache


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/179d4dc0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/179d4dc0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/179d4dc0

Branch: refs/heads/trunk
Commit: 179d4dc0f2a23d7e67caf4875a0563e74027933a
Parents: 332b8b9
Author: Eno Thereska <en...@gmail.com>
Authored: Wed Oct 19 14:01:23 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Oct 19 14:01:23 2016 -0700

----------------------------------------------------------------------
 .../streams/state/internals/ThreadCache.java    | 14 ++++++--
 .../state/internals/ThreadCacheTest.java        | 37 ++++++++++++++++++++
 2 files changed, 48 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/179d4dc0/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
index d76e5c8..f7355d8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
@@ -123,12 +123,20 @@ public class ThreadCache {
 
     public LRUCacheEntry putIfAbsent(final String namespace, byte[] key, LRUCacheEntry value) {
         final NamedCache cache = getOrCreateCache(namespace);
-        return cache.putIfAbsent(Bytes.wrap(key), value);
+
+        final LRUCacheEntry result = cache.putIfAbsent(Bytes.wrap(key), value);
+        maybeEvict(namespace);
+
+        if (result == null) {
+            numPuts++;
+        }
+        return result;
     }
 
     public void putAll(final String namespace, final List<KeyValue<byte[], LRUCacheEntry>> entries) {
-        final NamedCache cache = getOrCreateCache(namespace);
-        cache.putAll(entries);
+        for (KeyValue<byte[], LRUCacheEntry> entry : entries) {
+            put(namespace, entry.key, entry.value);
+        }
     }
 
     public LRUCacheEntry delete(final String namespace, final byte[] key) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/179d4dc0/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
index 2ff3b89..b07da6e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
@@ -389,6 +389,24 @@ public class ThreadCacheTest {
     }
 
     @Test
+    public void shouldEvictAfterPutAll() throws Exception {
+        final List<ThreadCache.DirtyEntry> received = new ArrayList<>();
+        final String namespace = "namespace";
+        final ThreadCache cache = new ThreadCache(1);
+        cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() {
+            @Override
+            public void apply(final List<ThreadCache.DirtyEntry> dirty) {
+                received.addAll(dirty);
+            }
+        });
+
+        cache.putAll(namespace, Arrays.asList(KeyValue.pair(new byte[]{0}, dirtyEntry(new byte[]{5})),
+            KeyValue.pair(new byte[]{1}, dirtyEntry(new byte[]{6}))));
+
+        assertEquals(cache.evicts(), 2);
+    }
+
+    @Test
     public void shouldPutAll() throws Exception {
         final ThreadCache cache = new ThreadCache(100000);
 
@@ -422,6 +440,25 @@ public class ThreadCacheTest {
         assertArrayEquals(value, cache.get("n", key).value);
     }
 
+    @Test
+    public void shouldEvictAfterPutIfAbsent() throws Exception {
+        final List<ThreadCache.DirtyEntry> received = new ArrayList<>();
+        final String namespace = "namespace";
+        final ThreadCache cache = new ThreadCache(1);
+        cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() {
+            @Override
+            public void apply(final List<ThreadCache.DirtyEntry> dirty) {
+                received.addAll(dirty);
+            }
+        });
+
+        cache.putIfAbsent(namespace, new byte[]{0}, dirtyEntry(new byte[]{5}));
+        cache.putIfAbsent(namespace, new byte[]{1}, dirtyEntry(new byte[]{6}));
+        cache.putIfAbsent(namespace, new byte[]{1}, dirtyEntry(new byte[]{6}));
+
+        assertEquals(cache.evicts(), 3);
+    }
+
     private LRUCacheEntry dirtyEntry(final byte[] key) {
         return new LRUCacheEntry(key, true, -1, -1, -1, "");
     }