You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/03/19 15:51:28 UTC

[kafka] branch trunk updated: KAFKA-8094: Iterating over cache with get(key) is inefficient (#6433)

This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 538bd7e  KAFKA-8094: Iterating over cache with get(key) is inefficient (#6433)
538bd7e is described below

commit 538bd7eddf13897245524f015e3207affb03fcdc
Author: A. Sophie Blee-Goldman <ab...@gmail.com>
AuthorDate: Tue Mar 19 08:51:10 2019 -0700

    KAFKA-8094: Iterating over cache with get(key) is inefficient (#6433)
    
    Use concurrent data structure for the underlying cache in NamedCache, and iterate over it with subMap instead of many calls to get()
    
    Reviewers: Guozhang Wang <wa...@gmail.com>, Bill Bejeck <bb...@gmail.com>
---
 .../kafka/streams/state/internals/NamedCache.java  | 18 ++++++---------
 .../kafka/streams/state/internals/ThreadCache.java | 24 +++++++++----------
 .../streams/state/internals/NamedCacheTest.java    | 27 ----------------------
 .../streams/state/internals/ThreadCacheTest.java   |  3 ++-
 4 files changed, 21 insertions(+), 51 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
index 3ce7cbe..0201f20 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import java.util.NavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
@@ -33,13 +35,11 @@ import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
 
 class NamedCache {
     private static final Logger log = LoggerFactory.getLogger(NamedCache.class);
     private final String name;
-    private final TreeMap<Bytes, LRUNode> cache = new TreeMap<>();
+    private final NavigableMap<Bytes, LRUNode> cache = new ConcurrentSkipListMap<>();
     private final Set<Bytes> dirtyKeys = new LinkedHashSet<>();
     private ThreadCache.DirtyEntryFlushListener listener;
     private LRUNode tail;
@@ -266,16 +266,12 @@ class NamedCache {
         return cache.size();
     }
 
-    synchronized Iterator<Bytes> keyRange(final Bytes from, final Bytes to) {
-        return keySetIterator(cache.navigableKeySet().subSet(from, true, to, true));
+    synchronized Iterator<Map.Entry<Bytes, LRUNode>> subMapIterator(final Bytes from, final Bytes to) {
+        return cache.subMap(from, true, to, true).entrySet().iterator();
     }
 
-    private Iterator<Bytes> keySetIterator(final Set<Bytes> keySet) {
-        return new TreeSet<>(keySet).iterator();
-    }
-
-    synchronized Iterator<Bytes> allKeys() {
-        return keySetIterator(cache.navigableKeySet());
+    synchronized Iterator<Map.Entry<Bytes, LRUNode>> allIterator() {
+        return cache.entrySet().iterator();
     }
 
     synchronized LRUCacheEntry first() {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
index 941b522..0db6c78 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.internals.NamedCache.LRUNode;
 import org.slf4j.Logger;
 
 import java.util.Collections;
@@ -180,17 +181,17 @@ public class ThreadCache {
     public MemoryLRUCacheBytesIterator range(final String namespace, final Bytes from, final Bytes to) {
         final NamedCache cache = getCache(namespace);
         if (cache == null) {
-            return new MemoryLRUCacheBytesIterator(Collections.<Bytes>emptyIterator(), new NamedCache(namespace, this.metrics));
+            return new MemoryLRUCacheBytesIterator(Collections.emptyIterator());
         }
-        return new MemoryLRUCacheBytesIterator(cache.keyRange(from, to), cache);
+        return new MemoryLRUCacheBytesIterator(cache.subMapIterator(from, to));
     }
 
     public MemoryLRUCacheBytesIterator all(final String namespace) {
         final NamedCache cache = getCache(namespace);
         if (cache == null) {
-            return new MemoryLRUCacheBytesIterator(Collections.<Bytes>emptyIterator(), new NamedCache(namespace, this.metrics));
+            return new MemoryLRUCacheBytesIterator(Collections.emptyIterator());
         }
-        return new MemoryLRUCacheBytesIterator(cache.allKeys(), cache);
+        return new MemoryLRUCacheBytesIterator(cache.allIterator());
     }
     
     public long size() {
@@ -260,13 +261,11 @@ public class ThreadCache {
     }
 
     static class MemoryLRUCacheBytesIterator implements PeekingKeyValueIterator<Bytes, LRUCacheEntry> {
-        private final Iterator<Bytes> keys;
-        private final NamedCache cache;
+        private final Iterator<Map.Entry<Bytes, LRUNode>> underlying;
         private KeyValue<Bytes, LRUCacheEntry> nextEntry;
 
-        MemoryLRUCacheBytesIterator(final Iterator<Bytes> keys, final NamedCache cache) {
-            this.keys = keys;
-            this.cache = cache;
+        MemoryLRUCacheBytesIterator(final Iterator<Map.Entry<Bytes, LRUNode>> underlying) {
+            this.underlying = underlying;
         }
 
         public Bytes peekNextKey() {
@@ -290,7 +289,7 @@ public class ThreadCache {
                 return true;
             }
 
-            while (keys.hasNext() && nextEntry == null) {
+            while (underlying.hasNext() && nextEntry == null) {
                 internalNext();
             }
 
@@ -308,8 +307,9 @@ public class ThreadCache {
         }
 
         private void internalNext() {
-            final Bytes cacheKey = keys.next();
-            final LRUCacheEntry entry = cache.get(cacheKey);
+            final Map.Entry<Bytes, LRUNode> mapEntry = underlying.next();
+            final Bytes cacheKey = mapEntry.getKey();
+            final LRUCacheEntry entry = mapEntry.getValue().entry();
             if (entry == null) {
                 return;
             }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
index 394feed..6c82209 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
@@ -32,7 +32,6 @@ import org.junit.Test;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -40,7 +39,6 @@ import java.util.Map;
 import static org.apache.kafka.test.StreamsTestUtils.getMetricByNameFilterByTags;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
@@ -209,31 +207,6 @@ public class NamedCacheTest {
     }
 
     @Test
-    public void shouldGetRangeIteratorOverKeys() {
-        cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, headers, true, 0, 0, 0, ""));
-        cache.put(Bytes.wrap(new byte[]{1}), new LRUCacheEntry(new byte[]{20}));
-        cache.put(Bytes.wrap(new byte[]{2}), new LRUCacheEntry(new byte[]{30}, null, true, 0, 0, 0, ""));
-
-        final Iterator<Bytes> iterator = cache.keyRange(Bytes.wrap(new byte[]{1}), Bytes.wrap(new byte[]{2}));
-        assertEquals(Bytes.wrap(new byte[]{1}), iterator.next());
-        assertEquals(Bytes.wrap(new byte[]{2}), iterator.next());
-        assertFalse(iterator.hasNext());
-    }
-
-    @Test
-    public void shouldGetIteratorOverAllKeys() {
-        cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10}, headers, true, 0, 0, 0, ""));
-        cache.put(Bytes.wrap(new byte[]{1}), new LRUCacheEntry(new byte[]{20}));
-        cache.put(Bytes.wrap(new byte[]{2}), new LRUCacheEntry(new byte[]{30}, null, true, 0, 0, 0, ""));
-
-        final Iterator<Bytes> iterator = cache.allKeys();
-        assertEquals(Bytes.wrap(new byte[]{0}), iterator.next());
-        assertEquals(Bytes.wrap(new byte[]{1}), iterator.next());
-        assertEquals(Bytes.wrap(new byte[]{2}), iterator.next());
-        assertFalse(iterator.hasNext());
-    }
-
-    @Test
     public void shouldNotThrowNullPointerWhenCacheIsEmptyAndEvictionCalled() {
         cache.evict();
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
index a7a64c4..5882ee4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
@@ -310,10 +310,11 @@ public class ThreadCacheTest {
         }
         assertEquals(5, cache.size());
 
-        final ThreadCache.MemoryLRUCacheBytesIterator range = cache.range(namespace, Bytes.wrap(new byte[]{0}), Bytes.wrap(new byte[]{5}));
         // should evict byte[] {0}
         cache.put(namespace, Bytes.wrap(new byte[]{6}), dirtyEntry(new byte[]{6}));
 
+        final ThreadCache.MemoryLRUCacheBytesIterator range = cache.range(namespace, Bytes.wrap(new byte[]{0}), Bytes.wrap(new byte[]{5}));
+
         assertEquals(Bytes.wrap(new byte[]{1}), range.peekNextKey());
     }