You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/10/19 21:01:29 UTC
kafka git commit: HOTFIX: Fix putAll and putIfAbsent logic for
correct eviction behavior
Repository: kafka
Updated Branches:
refs/heads/trunk 332b8b9af -> 179d4dc0f
HOTFIX: Fix putAll and putIfAbsent logic for correct eviction behavior
Author: Eno Thereska <en...@gmail.com>
Reviewers: Damian Guy, Guozhang Wang
Closes #2038 from enothereska/hotfix-put-cache
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/179d4dc0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/179d4dc0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/179d4dc0
Branch: refs/heads/trunk
Commit: 179d4dc0f2a23d7e67caf4875a0563e74027933a
Parents: 332b8b9
Author: Eno Thereska <en...@gmail.com>
Authored: Wed Oct 19 14:01:23 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Oct 19 14:01:23 2016 -0700
----------------------------------------------------------------------
.../streams/state/internals/ThreadCache.java | 14 ++++++--
.../state/internals/ThreadCacheTest.java | 37 ++++++++++++++++++++
2 files changed, 48 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/179d4dc0/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
index d76e5c8..f7355d8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
@@ -123,12 +123,20 @@ public class ThreadCache {
public LRUCacheEntry putIfAbsent(final String namespace, byte[] key, LRUCacheEntry value) {
final NamedCache cache = getOrCreateCache(namespace);
- return cache.putIfAbsent(Bytes.wrap(key), value);
+
+ final LRUCacheEntry result = cache.putIfAbsent(Bytes.wrap(key), value);
+ maybeEvict(namespace);
+
+ if (result == null) {
+ numPuts++;
+ }
+ return result;
}
public void putAll(final String namespace, final List<KeyValue<byte[], LRUCacheEntry>> entries) {
- final NamedCache cache = getOrCreateCache(namespace);
- cache.putAll(entries);
+ for (KeyValue<byte[], LRUCacheEntry> entry : entries) {
+ put(namespace, entry.key, entry.value);
+ }
}
public LRUCacheEntry delete(final String namespace, final byte[] key) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/179d4dc0/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
index 2ff3b89..b07da6e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
@@ -389,6 +389,24 @@ public class ThreadCacheTest {
}
@Test
+ public void shouldEvictAfterPutAll() throws Exception {
+ final List<ThreadCache.DirtyEntry> received = new ArrayList<>();
+ final String namespace = "namespace";
+ final ThreadCache cache = new ThreadCache(1);
+ cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() {
+ @Override
+ public void apply(final List<ThreadCache.DirtyEntry> dirty) {
+ received.addAll(dirty);
+ }
+ });
+
+ cache.putAll(namespace, Arrays.asList(KeyValue.pair(new byte[]{0}, dirtyEntry(new byte[]{5})),
+ KeyValue.pair(new byte[]{1}, dirtyEntry(new byte[]{6}))));
+
+ assertEquals(cache.evicts(), 2);
+ }
+
+ @Test
public void shouldPutAll() throws Exception {
final ThreadCache cache = new ThreadCache(100000);
@@ -422,6 +440,25 @@ public class ThreadCacheTest {
assertArrayEquals(value, cache.get("n", key).value);
}
+ @Test
+ public void shouldEvictAfterPutIfAbsent() throws Exception {
+ final List<ThreadCache.DirtyEntry> received = new ArrayList<>();
+ final String namespace = "namespace";
+ final ThreadCache cache = new ThreadCache(1);
+ cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() {
+ @Override
+ public void apply(final List<ThreadCache.DirtyEntry> dirty) {
+ received.addAll(dirty);
+ }
+ });
+
+ cache.putIfAbsent(namespace, new byte[]{0}, dirtyEntry(new byte[]{5}));
+ cache.putIfAbsent(namespace, new byte[]{1}, dirtyEntry(new byte[]{6}));
+ cache.putIfAbsent(namespace, new byte[]{1}, dirtyEntry(new byte[]{6}));
+
+ assertEquals(cache.evicts(), 3);
+ }
+
private LRUCacheEntry dirtyEntry(final byte[] key) {
return new LRUCacheEntry(key, true, -1, -1, -1, "");
}