You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/08/16 17:42:24 UTC

[kafka] branch 2.3 updated: KAFKA-8802: ConcurrentSkipListMap shows performance regression in cache and in-memory store (#7212)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new 051d290  KAFKA-8802: ConcurrentSkipListMap shows performance regression in cache and in-memory store (#7212)
051d290 is described below

commit 051d29098ac7529f54475899c3441ed918a6882c
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Fri Aug 16 10:41:30 2019 -0700

    KAFKA-8802: ConcurrentSkipListMap shows performance regression in cache and in-memory store (#7212)
    
    Reverts the TreeMap -> ConcurrentSkipListMap change that caused a performance regression in 2.3, and fixes the ConcurrentModificationException by copying (just) the key set to iterate over
    
    Reviewers: Bill Bejeck <bi...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
 .../state/internals/InMemoryKeyValueStore.java     | 37 +++++++++++-----------
 .../kafka/streams/state/internals/NamedCache.java  | 17 ++++++----
 .../kafka/streams/state/internals/ThreadCache.java | 24 +++++++-------
 3 files changed, 42 insertions(+), 36 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
index 2d68214..ed3d024 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
@@ -17,8 +17,10 @@
 package org.apache.kafka.streams.state.internals;
 
 import java.util.List;
-import java.util.concurrent.ConcurrentNavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -27,13 +29,12 @@ import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 
 import java.util.Iterator;
-import java.util.Map;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
     private final String name;
-    private final ConcurrentNavigableMap<Bytes, byte[]> map = new ConcurrentSkipListMap<>();
+    private final NavigableMap<Bytes, byte[]> map = new TreeMap<>();
     private volatile boolean open = false;
     private long size = 0L; // SkipListMap#size is O(N) so we just do our best to track it
 
@@ -71,12 +72,12 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
     }
 
     @Override
-    public byte[] get(final Bytes key) {
+    public synchronized byte[] get(final Bytes key) {
         return map.get(key);
     }
 
     @Override
-    public void put(final Bytes key, final byte[] value) {
+    public synchronized void put(final Bytes key, final byte[] value) {
         if (value == null) {
             size -= map.remove(key) == null ? 0 : 1;
         } else {
@@ -85,7 +86,7 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
     }
 
     @Override
-    public byte[] putIfAbsent(final Bytes key, final byte[] value) {
+    public synchronized byte[] putIfAbsent(final Bytes key, final byte[] value) {
         final byte[] originalValue = get(key);
         if (originalValue == null) {
             put(key, value);
@@ -101,14 +102,14 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
     }
 
     @Override
-    public byte[] delete(final Bytes key) {
+    public synchronized byte[] delete(final Bytes key) {
         final byte[] oldValue = map.remove(key);
         size -= oldValue == null ? 0 : 1;
         return oldValue;
     }
 
     @Override
-    public KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes to) {
+    public synchronized KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes to) {
 
         if (from.compareTo(to) > 0) {
             LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. "
@@ -119,14 +120,14 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
 
         return new DelegatingPeekingKeyValueIterator<>(
             name,
-            new InMemoryKeyValueIterator(map.subMap(from, true, to, true).entrySet().iterator()));
+            new InMemoryKeyValueIterator(map.subMap(from, true, to, true).keySet()));
     }
 
     @Override
-    public KeyValueIterator<Bytes, byte[]> all() {
+    public synchronized KeyValueIterator<Bytes, byte[]> all() {
         return new DelegatingPeekingKeyValueIterator<>(
             name,
-            new InMemoryKeyValueIterator(map.entrySet().iterator()));
+            new InMemoryKeyValueIterator(map.keySet()));
     }
 
     @Override
@@ -146,11 +147,11 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
         open = false;
     }
 
-    private static class InMemoryKeyValueIterator implements KeyValueIterator<Bytes, byte[]> {
-        private final Iterator<Map.Entry<Bytes, byte[]>> iter;
+    private class InMemoryKeyValueIterator implements KeyValueIterator<Bytes, byte[]> {
+        private final Iterator<Bytes> iter;
 
-        private InMemoryKeyValueIterator(final Iterator<Map.Entry<Bytes, byte[]>> iter) {
-            this.iter = iter;
+        private InMemoryKeyValueIterator(final Set<Bytes> keySet) {
+            this.iter = new TreeSet<>(keySet).iterator();
         }
 
         @Override
@@ -160,8 +161,8 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
 
         @Override
         public KeyValue<Bytes, byte[]> next() {
-            final Map.Entry<Bytes, byte[]> entry = iter.next();
-            return new KeyValue<>(entry.getKey(), entry.getValue());
+            final Bytes key = iter.next();
+            return new KeyValue<>(key, map.get(key));
         }
 
         @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
index ec53dfa..a1d0aab 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
@@ -17,7 +17,8 @@
 package org.apache.kafka.streams.state.internals;
 
 import java.util.NavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.TreeMap;
+import java.util.TreeSet;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
@@ -39,7 +40,7 @@ import java.util.Set;
 class NamedCache {
     private static final Logger log = LoggerFactory.getLogger(NamedCache.class);
     private final String name;
-    private final NavigableMap<Bytes, LRUNode> cache = new ConcurrentSkipListMap<>();
+    private final NavigableMap<Bytes, LRUNode> cache = new TreeMap<>();
     private final Set<Bytes> dirtyKeys = new LinkedHashSet<>();
     private ThreadCache.DirtyEntryFlushListener listener;
     private LRUNode tail;
@@ -270,12 +271,16 @@ class NamedCache {
         return cache.isEmpty();
     }
 
-    synchronized Iterator<Map.Entry<Bytes, LRUNode>> subMapIterator(final Bytes from, final Bytes to) {
-        return cache.subMap(from, true, to, true).entrySet().iterator();
+    synchronized Iterator<Bytes> keyRange(final Bytes from, final Bytes to) {
+        return keySetIterator(cache.navigableKeySet().subSet(from, true, to, true));
     }
 
-    synchronized Iterator<Map.Entry<Bytes, LRUNode>> allIterator() {
-        return cache.entrySet().iterator();
+    private Iterator<Bytes> keySetIterator(final Set<Bytes> keySet) {
+        return new TreeSet<>(keySet).iterator();
+    }
+
+    synchronized Iterator<Bytes> allKeys() {
+        return keySetIterator(cache.navigableKeySet());
     }
 
     synchronized LRUCacheEntry first() {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
index 116b1ef..7e3231e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
@@ -20,7 +20,6 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.internals.NamedCache.LRUNode;
 import org.slf4j.Logger;
 
 import java.util.Collections;
@@ -181,17 +180,17 @@ public class ThreadCache {
     public MemoryLRUCacheBytesIterator range(final String namespace, final Bytes from, final Bytes to) {
         final NamedCache cache = getCache(namespace);
         if (cache == null) {
-            return new MemoryLRUCacheBytesIterator(Collections.emptyIterator());
+            return new MemoryLRUCacheBytesIterator(Collections.<Bytes>emptyIterator(), new NamedCache(namespace, this.metrics));
         }
-        return new MemoryLRUCacheBytesIterator(cache.subMapIterator(from, to));
+        return new MemoryLRUCacheBytesIterator(cache.keyRange(from, to), cache);
     }
 
     public MemoryLRUCacheBytesIterator all(final String namespace) {
         final NamedCache cache = getCache(namespace);
         if (cache == null) {
-            return new MemoryLRUCacheBytesIterator(Collections.emptyIterator());
+            return new MemoryLRUCacheBytesIterator(Collections.<Bytes>emptyIterator(), new NamedCache(namespace, this.metrics));
         }
-        return new MemoryLRUCacheBytesIterator(cache.allIterator());
+        return new MemoryLRUCacheBytesIterator(cache.allKeys(), cache);
     }
 
     public long size() {
@@ -261,11 +260,13 @@ public class ThreadCache {
     }
 
     static class MemoryLRUCacheBytesIterator implements PeekingKeyValueIterator<Bytes, LRUCacheEntry> {
-        private final Iterator<Map.Entry<Bytes, LRUNode>> underlying;
+        private final Iterator<Bytes> keys;
+        private final NamedCache cache;
         private KeyValue<Bytes, LRUCacheEntry> nextEntry;
 
-        MemoryLRUCacheBytesIterator(final Iterator<Map.Entry<Bytes, LRUNode>> underlying) {
-            this.underlying = underlying;
+        MemoryLRUCacheBytesIterator(final Iterator<Bytes> keys, final NamedCache cache) {
+            this.keys = keys;
+            this.cache = cache;
         }
 
         public Bytes peekNextKey() {
@@ -289,7 +290,7 @@ public class ThreadCache {
                 return true;
             }
 
-            while (underlying.hasNext() && nextEntry == null) {
+            while (keys.hasNext() && nextEntry == null) {
                 internalNext();
             }
 
@@ -307,9 +308,8 @@ public class ThreadCache {
         }
 
         private void internalNext() {
-            final Map.Entry<Bytes, LRUNode> mapEntry = underlying.next();
-            final Bytes cacheKey = mapEntry.getKey();
-            final LRUCacheEntry entry = mapEntry.getValue().entry();
+            final Bytes cacheKey = keys.next();
+            final LRUCacheEntry entry = cache.get(cacheKey);
             if (entry == null) {
                 return;
             }