You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/08/16 17:42:24 UTC
[kafka] branch 2.3 updated: KAFKA-8802: ConcurrentSkipListMap shows
performance regression in cache and in-memory store (#7212)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.3 by this push:
new 051d290 KAFKA-8802: ConcurrentSkipListMap shows performance regression in cache and in-memory store (#7212)
051d290 is described below
commit 051d29098ac7529f54475899c3441ed918a6882c
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Fri Aug 16 10:41:30 2019 -0700
KAFKA-8802: ConcurrentSkipListMap shows performance regression in cache and in-memory store (#7212)
Reverts the TreeMap -> ConcurrentSkipListMap change that caused a performance regression in 2.3, and fixes the ConcurrentModificationException by copying (just) the key set to iterate over
Reviewers: Bill Bejeck <bi...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
.../state/internals/InMemoryKeyValueStore.java | 37 +++++++++++-----------
.../kafka/streams/state/internals/NamedCache.java | 17 ++++++----
.../kafka/streams/state/internals/ThreadCache.java | 24 +++++++-------
3 files changed, 42 insertions(+), 36 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
index 2d68214..ed3d024 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
@@ -17,8 +17,10 @@
package org.apache.kafka.streams.state.internals;
import java.util.List;
-import java.util.concurrent.ConcurrentNavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
@@ -27,13 +29,12 @@ import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import java.util.Iterator;
-import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
private final String name;
- private final ConcurrentNavigableMap<Bytes, byte[]> map = new ConcurrentSkipListMap<>();
+ private final NavigableMap<Bytes, byte[]> map = new TreeMap<>();
private volatile boolean open = false;
private long size = 0L; // SkipListMap#size is O(N) so we just do our best to track it
@@ -71,12 +72,12 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
}
@Override
- public byte[] get(final Bytes key) {
+ public synchronized byte[] get(final Bytes key) {
return map.get(key);
}
@Override
- public void put(final Bytes key, final byte[] value) {
+ public synchronized void put(final Bytes key, final byte[] value) {
if (value == null) {
size -= map.remove(key) == null ? 0 : 1;
} else {
@@ -85,7 +86,7 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
}
@Override
- public byte[] putIfAbsent(final Bytes key, final byte[] value) {
+ public synchronized byte[] putIfAbsent(final Bytes key, final byte[] value) {
final byte[] originalValue = get(key);
if (originalValue == null) {
put(key, value);
@@ -101,14 +102,14 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
}
@Override
- public byte[] delete(final Bytes key) {
+ public synchronized byte[] delete(final Bytes key) {
final byte[] oldValue = map.remove(key);
size -= oldValue == null ? 0 : 1;
return oldValue;
}
@Override
- public KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes to) {
+ public synchronized KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes to) {
if (from.compareTo(to) > 0) {
LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. "
@@ -119,14 +120,14 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
return new DelegatingPeekingKeyValueIterator<>(
name,
- new InMemoryKeyValueIterator(map.subMap(from, true, to, true).entrySet().iterator()));
+ new InMemoryKeyValueIterator(map.subMap(from, true, to, true).keySet()));
}
@Override
- public KeyValueIterator<Bytes, byte[]> all() {
+ public synchronized KeyValueIterator<Bytes, byte[]> all() {
return new DelegatingPeekingKeyValueIterator<>(
name,
- new InMemoryKeyValueIterator(map.entrySet().iterator()));
+ new InMemoryKeyValueIterator(map.keySet()));
}
@Override
@@ -146,11 +147,11 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
open = false;
}
- private static class InMemoryKeyValueIterator implements KeyValueIterator<Bytes, byte[]> {
- private final Iterator<Map.Entry<Bytes, byte[]>> iter;
+ private class InMemoryKeyValueIterator implements KeyValueIterator<Bytes, byte[]> {
+ private final Iterator<Bytes> iter;
- private InMemoryKeyValueIterator(final Iterator<Map.Entry<Bytes, byte[]>> iter) {
- this.iter = iter;
+ private InMemoryKeyValueIterator(final Set<Bytes> keySet) {
+ this.iter = new TreeSet<>(keySet).iterator();
}
@Override
@@ -160,8 +161,8 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
@Override
public KeyValue<Bytes, byte[]> next() {
- final Map.Entry<Bytes, byte[]> entry = iter.next();
- return new KeyValue<>(entry.getKey(), entry.getValue());
+ final Bytes key = iter.next();
+ return new KeyValue<>(key, map.get(key));
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
index ec53dfa..a1d0aab 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
@@ -17,7 +17,8 @@
package org.apache.kafka.streams.state.internals;
import java.util.NavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.TreeMap;
+import java.util.TreeSet;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
@@ -39,7 +40,7 @@ import java.util.Set;
class NamedCache {
private static final Logger log = LoggerFactory.getLogger(NamedCache.class);
private final String name;
- private final NavigableMap<Bytes, LRUNode> cache = new ConcurrentSkipListMap<>();
+ private final NavigableMap<Bytes, LRUNode> cache = new TreeMap<>();
private final Set<Bytes> dirtyKeys = new LinkedHashSet<>();
private ThreadCache.DirtyEntryFlushListener listener;
private LRUNode tail;
@@ -270,12 +271,16 @@ class NamedCache {
return cache.isEmpty();
}
- synchronized Iterator<Map.Entry<Bytes, LRUNode>> subMapIterator(final Bytes from, final Bytes to) {
- return cache.subMap(from, true, to, true).entrySet().iterator();
+ synchronized Iterator<Bytes> keyRange(final Bytes from, final Bytes to) {
+ return keySetIterator(cache.navigableKeySet().subSet(from, true, to, true));
}
- synchronized Iterator<Map.Entry<Bytes, LRUNode>> allIterator() {
- return cache.entrySet().iterator();
+ private Iterator<Bytes> keySetIterator(final Set<Bytes> keySet) {
+ return new TreeSet<>(keySet).iterator();
+ }
+
+ synchronized Iterator<Bytes> allKeys() {
+ return keySetIterator(cache.navigableKeySet());
}
synchronized LRUCacheEntry first() {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
index 116b1ef..7e3231e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
@@ -20,7 +20,6 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.internals.NamedCache.LRUNode;
import org.slf4j.Logger;
import java.util.Collections;
@@ -181,17 +180,17 @@ public class ThreadCache {
public MemoryLRUCacheBytesIterator range(final String namespace, final Bytes from, final Bytes to) {
final NamedCache cache = getCache(namespace);
if (cache == null) {
- return new MemoryLRUCacheBytesIterator(Collections.emptyIterator());
+ return new MemoryLRUCacheBytesIterator(Collections.<Bytes>emptyIterator(), new NamedCache(namespace, this.metrics));
}
- return new MemoryLRUCacheBytesIterator(cache.subMapIterator(from, to));
+ return new MemoryLRUCacheBytesIterator(cache.keyRange(from, to), cache);
}
public MemoryLRUCacheBytesIterator all(final String namespace) {
final NamedCache cache = getCache(namespace);
if (cache == null) {
- return new MemoryLRUCacheBytesIterator(Collections.emptyIterator());
+ return new MemoryLRUCacheBytesIterator(Collections.<Bytes>emptyIterator(), new NamedCache(namespace, this.metrics));
}
- return new MemoryLRUCacheBytesIterator(cache.allIterator());
+ return new MemoryLRUCacheBytesIterator(cache.allKeys(), cache);
}
public long size() {
@@ -261,11 +260,13 @@ public class ThreadCache {
}
static class MemoryLRUCacheBytesIterator implements PeekingKeyValueIterator<Bytes, LRUCacheEntry> {
- private final Iterator<Map.Entry<Bytes, LRUNode>> underlying;
+ private final Iterator<Bytes> keys;
+ private final NamedCache cache;
private KeyValue<Bytes, LRUCacheEntry> nextEntry;
- MemoryLRUCacheBytesIterator(final Iterator<Map.Entry<Bytes, LRUNode>> underlying) {
- this.underlying = underlying;
+ MemoryLRUCacheBytesIterator(final Iterator<Bytes> keys, final NamedCache cache) {
+ this.keys = keys;
+ this.cache = cache;
}
public Bytes peekNextKey() {
@@ -289,7 +290,7 @@ public class ThreadCache {
return true;
}
- while (underlying.hasNext() && nextEntry == null) {
+ while (keys.hasNext() && nextEntry == null) {
internalNext();
}
@@ -307,9 +308,8 @@ public class ThreadCache {
}
private void internalNext() {
- final Map.Entry<Bytes, LRUNode> mapEntry = underlying.next();
- final Bytes cacheKey = mapEntry.getKey();
- final LRUCacheEntry entry = mapEntry.getValue().entry();
+ final Bytes cacheKey = keys.next();
+ final LRUCacheEntry entry = cache.get(cacheKey);
if (entry == null) {
return;
}