You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/03/19 15:51:28 UTC
[kafka] branch trunk updated: KAFKA-8094: Iterating over cache with
get(key) is inefficient (#6433)
This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 538bd7e KAFKA-8094: Iterating over cache with get(key) is inefficient (#6433)
538bd7e is described below
commit 538bd7eddf13897245524f015e3207affb03fcdc
Author: A. Sophie Blee-Goldman <ab...@gmail.com>
AuthorDate: Tue Mar 19 08:51:10 2019 -0700
KAFKA-8094: Iterating over cache with get(key) is inefficient (#6433)
Use concurrent data structure for the underlying cache in NamedCache, and iterate over it with subMap instead of many calls to get()
Reviewers: Guozhang Wang <wa...@gmail.com>, Bill Bejeck <bb...@gmail.com>
---
.../kafka/streams/state/internals/NamedCache.java | 18 ++++++---------
.../kafka/streams/state/internals/ThreadCache.java | 24 +++++++++----------
.../streams/state/internals/NamedCacheTest.java | 27 ----------------------
.../streams/state/internals/ThreadCacheTest.java | 3 ++-
4 files changed, 21 insertions(+), 51 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
index 3ce7cbe..0201f20 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
@@ -16,6 +16,8 @@
*/
package org.apache.kafka.streams.state.internals;
+import java.util.NavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
@@ -33,13 +35,11 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
class NamedCache {
private static final Logger log = LoggerFactory.getLogger(NamedCache.class);
private final String name;
- private final TreeMap<Bytes, LRUNode> cache = new TreeMap<>();
+ private final NavigableMap<Bytes, LRUNode> cache = new ConcurrentSkipListMap<>();
private final Set<Bytes> dirtyKeys = new LinkedHashSet<>();
private ThreadCache.DirtyEntryFlushListener listener;
private LRUNode tail;
@@ -266,16 +266,12 @@ class NamedCache {
return cache.size();
}
- synchronized Iterator<Bytes> keyRange(final Bytes from, final Bytes to) {
- return keySetIterator(cache.navigableKeySet().subSet(from, true, to, true));
+ synchronized Iterator<Map.Entry<Bytes, LRUNode>> subMapIterator(final Bytes from, final Bytes to) {
+ return cache.subMap(from, true, to, true).entrySet().iterator();
}
- private Iterator<Bytes> keySetIterator(final Set<Bytes> keySet) {
- return new TreeSet<>(keySet).iterator();
- }
-
- synchronized Iterator<Bytes> allKeys() {
- return keySetIterator(cache.navigableKeySet());
+ synchronized Iterator<Map.Entry<Bytes, LRUNode>> allIterator() {
+ return cache.entrySet().iterator();
}
synchronized LRUCacheEntry first() {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
index 941b522..0db6c78 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.internals.NamedCache.LRUNode;
import org.slf4j.Logger;
import java.util.Collections;
@@ -180,17 +181,17 @@ public class ThreadCache {
public MemoryLRUCacheBytesIterator range(final String namespace, final Bytes from, final Bytes to) {
final NamedCache cache = getCache(namespace);
if (cache == null) {
- return new MemoryLRUCacheBytesIterator(Collections.<Bytes>emptyIterator(), new NamedCache(namespace, this.metrics));
+ return new MemoryLRUCacheBytesIterator(Collections.emptyIterator());
}
- return new MemoryLRUCacheBytesIterator(cache.keyRange(from, to), cache);
+ return new MemoryLRUCacheBytesIterator(cache.subMapIterator(from, to));
}
public MemoryLRUCacheBytesIterator all(final String namespace) {
final NamedCache cache = getCache(namespace);
if (cache == null) {
- return new MemoryLRUCacheBytesIterator(Collections.<Bytes>emptyIterator(), new NamedCache(namespace, this.metrics));
+ return new MemoryLRUCacheBytesIterator(Collections.emptyIterator());
}
- return new MemoryLRUCacheBytesIterator(cache.allKeys(), cache);
+ return new MemoryLRUCacheBytesIterator(cache.allIterator());
}
public long size() {
@@ -260,13 +261,11 @@ public class ThreadCache {
}
static class MemoryLRUCacheBytesIterator implements PeekingKeyValueIterator<Bytes, LRUCacheEntry> {
- private final Iterator<Bytes> keys;
- private final NamedCache cache;
+ private final Iterator<Map.Entry<Bytes, LRUNode>> underlying;
private KeyValue<Bytes, LRUCacheEntry> nextEntry;
- MemoryLRUCacheBytesIterator(final Iterator<Bytes> keys, final NamedCache cache) {
- this.keys = keys;
- this.cache = cache;
+ MemoryLRUCacheBytesIterator(final Iterator<Map.Entry<Bytes, LRUNode>> underlying) {
+ this.underlying = underlying;
}
public Bytes peekNextKey() {
@@ -290,7 +289,7 @@ public class ThreadCache {
return true;
}
- while (keys.hasNext() && nextEntry == null) {
+ while (underlying.hasNext() && nextEntry == null) {
internalNext();
}
@@ -308,8 +307,9 @@ public class ThreadCache {
}
private void internalNext() {
- final Bytes cacheKey = keys.next();
- final LRUCacheEntry entry = cache.get(cacheKey);
+ final Map.Entry<Bytes, LRUNode> mapEntry = underlying.next();
+ final Bytes cacheKey = mapEntry.getKey();
+ final LRUCacheEntry entry = mapEntry.getValue().entry();
if (entry == null) {
return;
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
index 394feed..6c82209 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
@@ -32,7 +32,6 @@ import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -40,7 +39,6 @@ import java.util.Map;
import static org.apache.kafka.test.StreamsTestUtils.getMetricByNameFilterByTags;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
@@ -209,31 +207,6 @@ public class NamedCacheTest {
}
@Test
- public void shouldGetRangeIteratorOverKeys() {
- cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, headers, true, 0, 0, 0, ""));
- cache.put(Bytes.wrap(new byte[]{1}), new LRUCacheEntry(new byte[]{20}));
- cache.put(Bytes.wrap(new byte[]{2}), new LRUCacheEntry(new byte[]{30}, null, true, 0, 0, 0, ""));
-
- final Iterator<Bytes> iterator = cache.keyRange(Bytes.wrap(new byte[]{1}), Bytes.wrap(new byte[]{2}));
- assertEquals(Bytes.wrap(new byte[]{1}), iterator.next());
- assertEquals(Bytes.wrap(new byte[]{2}), iterator.next());
- assertFalse(iterator.hasNext());
- }
-
- @Test
- public void shouldGetIteratorOverAllKeys() {
- cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, headers, true, 0, 0, 0, ""));
- cache.put(Bytes.wrap(new byte[]{1}), new LRUCacheEntry(new byte[]{20}));
- cache.put(Bytes.wrap(new byte[]{2}), new LRUCacheEntry(new byte[]{30}, null, true, 0, 0, 0, ""));
-
- final Iterator<Bytes> iterator = cache.allKeys();
- assertEquals(Bytes.wrap(new byte[]{0}), iterator.next());
- assertEquals(Bytes.wrap(new byte[]{1}), iterator.next());
- assertEquals(Bytes.wrap(new byte[]{2}), iterator.next());
- assertFalse(iterator.hasNext());
- }
-
- @Test
public void shouldNotThrowNullPointerWhenCacheIsEmptyAndEvictionCalled() {
cache.evict();
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
index a7a64c4..5882ee4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
@@ -310,10 +310,11 @@ public class ThreadCacheTest {
}
assertEquals(5, cache.size());
- final ThreadCache.MemoryLRUCacheBytesIterator range = cache.range(namespace, Bytes.wrap(new byte[]{0}), Bytes.wrap(new byte[]{5}));
// should evict byte[] {0}
cache.put(namespace, Bytes.wrap(new byte[]{6}), dirtyEntry(new byte[]{6}));
+ final ThreadCache.MemoryLRUCacheBytesIterator range = cache.range(namespace, Bytes.wrap(new byte[]{0}), Bytes.wrap(new byte[]{5}));
+
assertEquals(Bytes.wrap(new byte[]{1}), range.peekNextKey());
}