You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/03/01 01:39:57 UTC
[kafka] branch trunk updated: KAFKA-7918: Inline generic parameters
Pt. III: in-memory window store (#6328)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b03e8c2 KAFKA-7918: Inline generic parameters Pt. III: in-memory window store (#6328)
b03e8c2 is described below
commit b03e8c234a8aeecd10c2c96b683cfb39b24b548a
Author: A. Sophie Blee-Goldman <ab...@gmail.com>
AuthorDate: Thu Feb 28 17:39:47 2019 -0800
KAFKA-7918: Inline generic parameters Pt. III: in-memory window store (#6328)
Third (and final) PR in series to inline the generic parameters of the following bytes stores:
[Pt. I] InMemoryKeyValueStore
[Pt. II] RocksDBWindowStore
[Pt. II] RocksDBSessionStore
[Pt. II] MemoryLRUCache
[Pt. II] MemoryNavigableLRUCache
[x] InMemoryWindowStore
Reviewers: Bill Bejeck <bi...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
.../InMemoryWindowBytesStoreSupplier.java | 13 +-
.../state/internals/InMemoryWindowStore.java | 173 ++++++++++-----------
2 files changed, 84 insertions(+), 102 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowBytesStoreSupplier.java
index a6709ae..ace4de0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowBytesStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowBytesStoreSupplier.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.state.internals;
-import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;
@@ -44,13 +43,11 @@ public class InMemoryWindowBytesStoreSupplier implements WindowBytesStoreSupplie
@Override
public WindowStore<Bytes, byte[]> get() {
- return new InMemoryWindowStore<>(name,
- Serdes.Bytes(),
- Serdes.ByteArray(),
- retentionPeriod,
- windowSize,
- retainDuplicates,
- metricsScope());
+ return new InMemoryWindowStore(name,
+ retentionPeriod,
+ windowSize,
+ retainDuplicates,
+ metricsScope());
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
index 6e9b96b..77820c5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
@@ -16,18 +16,17 @@
*/
package org.apache.kafka.streams.state.internals;
+import java.nio.ByteBuffer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
-import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.KeyValueIterator;
@@ -44,18 +43,16 @@ import java.util.NoSuchElementException;
import java.util.TreeMap;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount;
-import static org.apache.kafka.streams.state.internals.WindowKeySchema.extractStoreKey;
+import static org.apache.kafka.streams.state.internals.WindowKeySchema.extractStoreKeyBytes;
import static org.apache.kafka.streams.state.internals.WindowKeySchema.extractStoreTimestamp;
-public class InMemoryWindowStore<K extends Comparable<K>, V> implements WindowStore<K, V> {
+public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
private static final Logger LOG = LoggerFactory.getLogger(InMemoryWindowStore.class);
+ private static final int SEQNUM_SIZE = 4;
private final String name;
- private final Serde<K> keySerde;
- private final Serde<V> valueSerde;
private final String metricScope;
- private StateSerdes<K, V> serdes;
private InternalProcessorContext context;
private Sensor expiredRecordSensor;
private int seqnum = 0;
@@ -65,20 +62,16 @@ public class InMemoryWindowStore<K extends Comparable<K>, V> implements WindowSt
private final long windowSize;
private final boolean retainDuplicates;
- private final NavigableMap<Long, NavigableMap<WrappedK<K>, V>> segmentMap;
+ private final NavigableMap<Long, NavigableMap<Bytes, byte[]>> segmentMap;
private volatile boolean open = false;
InMemoryWindowStore(final String name,
- final Serde<K> keySerde,
- final Serde<V> valueSerde,
final long retentionPeriod,
final long windowSize,
final boolean retainDuplicates,
final String metricScope) {
this.name = name;
- this.keySerde = keySerde;
- this.valueSerde = valueSerde;
this.retentionPeriod = retentionPeriod;
this.windowSize = windowSize;
this.retainDuplicates = retainDuplicates;
@@ -97,12 +90,6 @@ public class InMemoryWindowStore<K extends Comparable<K>, V> implements WindowSt
public void init(final ProcessorContext context, final StateStore root) {
this.context = (InternalProcessorContext) context;
- // construct the serde
- this.serdes = new StateSerdes<>(
- ProcessorStateManager.storeChangelogTopic(context.applicationId(), name),
- keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
- valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
-
final StreamsMetricsImpl metrics = this.context.metrics();
final String taskName = context.taskId().toString();
expiredRecordSensor = metrics.storeLevelSensor(
@@ -120,33 +107,35 @@ public class InMemoryWindowStore<K extends Comparable<K>, V> implements WindowSt
if (root != null) {
context.register(root, (key, value) -> {
- put(extractStoreKey(key, serdes), serdes.valueFrom(value), extractStoreTimestamp(key));
+ put(Bytes.wrap(extractStoreKeyBytes(key)), value, extractStoreTimestamp(key));
});
}
this.open = true;
}
@Override
- public void put(final K key, final V value) {
+ public void put(final Bytes key, final byte[] value) {
put(key, value, context.timestamp());
}
@Override
- public void put(final K key, final V value, final long windowStartTimestamp) {
+ public void put(final Bytes key, final byte[] value, final long windowStartTimestamp) {
removeExpiredSegments();
maybeUpdateSeqnumForDups();
this.observedStreamTime = Math.max(this.observedStreamTime, windowStartTimestamp);
+ final Bytes keyBytes = retainDuplicates ? wrapForDups(key, seqnum) : key;
+
if (windowStartTimestamp <= this.observedStreamTime - this.retentionPeriod) {
expiredRecordSensor.record();
LOG.debug("Skipping record for expired segment.");
} else {
if (value != null) {
this.segmentMap.computeIfAbsent(windowStartTimestamp, t -> new TreeMap<>());
- this.segmentMap.get(windowStartTimestamp).put(new WrappedK<>(key, seqnum), value);
+ this.segmentMap.get(windowStartTimestamp).put(keyBytes, value);
} else {
this.segmentMap.computeIfPresent(windowStartTimestamp, (t, kvMap) -> {
- kvMap.remove(new WrappedK<>(key, seqnum));
+ kvMap.remove(keyBytes);
return kvMap;
});
}
@@ -154,77 +143,79 @@ public class InMemoryWindowStore<K extends Comparable<K>, V> implements WindowSt
}
@Override
- public V fetch(final K key, final long windowStartTimestamp) {
+ public byte[] fetch(final Bytes key, final long windowStartTimestamp) {
removeExpiredSegments();
- final NavigableMap<WrappedK<K>, V> kvMap = this.segmentMap.get(windowStartTimestamp);
+ final NavigableMap<Bytes, byte[]> kvMap = this.segmentMap.get(windowStartTimestamp);
if (kvMap == null) {
return null;
} else {
- return kvMap.get(new WrappedK<>(key, seqnum));
+ return kvMap.get(key);
}
}
@Deprecated
@Override
- public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo) {
+ public WindowStoreIterator<byte[]> fetch(final Bytes key, final long timeFrom, final long timeTo) {
removeExpiredSegments();
- final List<KeyValue<Long, V>> records = retainDuplicates ? fetchWithDuplicates(key, timeFrom, timeTo) : fetchUnique(key, timeFrom, timeTo);
+ final List<KeyValue<Long, byte[]>> records = retainDuplicates ? fetchWithDuplicates(key, timeFrom, timeTo) : fetchUnique(key, timeFrom, timeTo);
- return new InMemoryWindowStoreIterator<>(records.listIterator());
+ return new InMemoryWindowStoreIterator(records.listIterator());
}
@Deprecated
@Override
- public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final long timeFrom, final long timeTo) {
+ public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from,
+ final Bytes to,
+ final long timeFrom,
+ final long timeTo) {
removeExpiredSegments();
- final List<KeyValue<Windowed<K>, V>> returnSet = new LinkedList<>();
+ final List<KeyValue<Windowed<Bytes>, byte[]>> returnSet = new LinkedList<>();
// add one b/c records expire exactly retentionPeriod ms after created
final long minTime = Math.max(timeFrom, this.observedStreamTime - this.retentionPeriod + 1);
- final WrappedK<K> keyFrom = new WrappedK<>(from, 0);
- final WrappedK<K> keyTo = new WrappedK<>(to, Integer.MAX_VALUE);
+ final Bytes keyFrom = retainDuplicates ? wrapForDups(from, 0) : from;
+ final Bytes keyTo = retainDuplicates ? wrapForDups(to, Integer.MAX_VALUE) : to;
- for (final Map.Entry<Long, NavigableMap<WrappedK<K>, V>> segmentMapEntry : this.segmentMap.subMap(minTime, true, timeTo, true).entrySet()) {
- for (final Map.Entry<WrappedK<K>, V> kvMapEntry : segmentMapEntry.getValue().subMap(keyFrom, true, keyTo, true).entrySet()) {
- final WrappedK<K> wrappedKey = kvMapEntry.getKey();
- returnSet.add(getWindowedKeyValue(wrappedKey.getKey(), segmentMapEntry.getKey(), kvMapEntry.getValue()));
+ for (final Map.Entry<Long, NavigableMap<Bytes, byte[]>> segmentMapEntry : this.segmentMap.subMap(minTime, true, timeTo, true).entrySet()) {
+ for (final Map.Entry<Bytes, byte[]> kvMapEntry : segmentMapEntry.getValue().subMap(keyFrom, true, keyTo, true).entrySet()) {
+ final Bytes keyBytes = retainDuplicates ? getKey(kvMapEntry.getKey()) : kvMapEntry.getKey();
+ returnSet.add(getWindowedKeyValue(keyBytes, segmentMapEntry.getKey(), kvMapEntry.getValue()));
}
}
- return new InMemoryWindowedKeyValueIterator<>(returnSet.listIterator());
+ return new InMemoryWindowedKeyValueIterator(returnSet.listIterator());
}
@Deprecated
@Override
- public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom, final long timeTo) {
+ public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long timeFrom, final long timeTo) {
removeExpiredSegments();
- final List<KeyValue<Windowed<K>, V>> returnSet = new LinkedList<>();
+ final List<KeyValue<Windowed<Bytes>, byte[]>> returnSet = new LinkedList<>();
// add one b/c records expire exactly retentionPeriod ms after created
final long minTime = Math.max(timeFrom, this.observedStreamTime - this.retentionPeriod + 1);
- for (final Map.Entry<Long, NavigableMap<WrappedK<K>, V>> segmentMapEntry : this.segmentMap.subMap(minTime, true, timeTo, true).entrySet()) {
- for (final Map.Entry<WrappedK<K>, V> kvMapEntry : segmentMapEntry.getValue().entrySet()) {
- final WrappedK<K> wrappedKey = kvMapEntry.getKey();
- returnSet.add(getWindowedKeyValue(wrappedKey.getKey(), segmentMapEntry.getKey(), kvMapEntry.getValue()));
+ for (final Map.Entry<Long, NavigableMap<Bytes, byte[]>> segmentMapEntry : this.segmentMap.subMap(minTime, true, timeTo, true).entrySet()) {
+ for (final Map.Entry<Bytes, byte[]> kvMapEntry : segmentMapEntry.getValue().entrySet()) {
+ final Bytes keyBytes = retainDuplicates ? getKey(kvMapEntry.getKey()) : kvMapEntry.getKey();
+ returnSet.add(getWindowedKeyValue(keyBytes, segmentMapEntry.getKey(), kvMapEntry.getValue()));
}
}
- return new InMemoryWindowedKeyValueIterator<>(returnSet.listIterator());
+ return new InMemoryWindowedKeyValueIterator(returnSet.listIterator());
}
@Override
- public KeyValueIterator<Windowed<K>, V> all() {
+ public KeyValueIterator<Windowed<Bytes>, byte[]> all() {
removeExpiredSegments();
- final List<KeyValue<Windowed<K>, V>> returnSet = new LinkedList<>();
+ final List<KeyValue<Windowed<Bytes>, byte[]>> returnSet = new LinkedList<>();
- for (final Entry<Long, NavigableMap<WrappedK<K>, V>> segmentMapEntry : this.segmentMap.entrySet()) {
- for (final Entry<WrappedK<K>, V> kvMapEntry : segmentMapEntry.getValue().entrySet()) {
- final WrappedK<K> wrappedKey = kvMapEntry.getKey();
- returnSet.add(getWindowedKeyValue(wrappedKey.getKey(), segmentMapEntry.getKey(),
- kvMapEntry.getValue()));
+ for (final Entry<Long, NavigableMap<Bytes, byte[]>> segmentMapEntry : this.segmentMap.entrySet()) {
+ for (final Entry<Bytes, byte[]> kvMapEntry : segmentMapEntry.getValue().entrySet()) {
+ final Bytes keyBytes = retainDuplicates ? getKey(kvMapEntry.getKey()) : kvMapEntry.getKey();
+ returnSet.add(getWindowedKeyValue(keyBytes, segmentMapEntry.getKey(), kvMapEntry.getValue()));
}
}
- return new InMemoryWindowedKeyValueIterator<>(returnSet.listIterator());
+ return new InMemoryWindowedKeyValueIterator(returnSet.listIterator());
}
@Override
@@ -248,14 +239,14 @@ public class InMemoryWindowStore<K extends Comparable<K>, V> implements WindowSt
this.open = false;
}
- private List<KeyValue<Long, V>> fetchUnique(final K key, final long timeFrom, final long timeTo) {
- final List<KeyValue<Long, V>> returnSet = new LinkedList<>();
+ private List<KeyValue<Long, byte[]>> fetchUnique(final Bytes key, final long timeFrom, final long timeTo) {
+ final List<KeyValue<Long, byte[]>> returnSet = new LinkedList<>();
// add one b/c records expire exactly retentionPeriod ms after created
final long minTime = Math.max(timeFrom, this.observedStreamTime - this.retentionPeriod + 1);
- for (final Map.Entry<Long, NavigableMap<WrappedK<K>, V>> segmentMapEntry : this.segmentMap.subMap(minTime, true, timeTo, true).entrySet()) {
- final V value = segmentMapEntry.getValue().get(new WrappedK<>(key, seqnum));
+ for (final Map.Entry<Long, NavigableMap<Bytes, byte[]>> segmentMapEntry : this.segmentMap.subMap(minTime, true, timeTo, true).entrySet()) {
+ final byte[] value = segmentMapEntry.getValue().get(key);
if (value != null) {
returnSet.add(new KeyValue<>(segmentMapEntry.getKey(), value));
}
@@ -263,16 +254,16 @@ public class InMemoryWindowStore<K extends Comparable<K>, V> implements WindowSt
return returnSet;
}
- private List<KeyValue<Long, V>> fetchWithDuplicates(final K key, final long timeFrom, final long timeTo) {
- final List<KeyValue<Long, V>> returnSet = new LinkedList<>();
+ private List<KeyValue<Long, byte[]>> fetchWithDuplicates(final Bytes key, final long timeFrom, final long timeTo) {
+ final List<KeyValue<Long, byte[]>> returnSet = new LinkedList<>();
// add one b/c records expire exactly retentionPeriod ms after created
final long minTime = Math.max(timeFrom, this.observedStreamTime - this.retentionPeriod + 1);
- final WrappedK<K> keyFrom = new WrappedK<>(key, 0);
- final WrappedK<K> keyTo = new WrappedK<>(key, Integer.MAX_VALUE);
+ final Bytes keyFrom = wrapForDups(key, 0);
+ final Bytes keyTo = wrapForDups(key, Integer.MAX_VALUE);
- for (final Map.Entry<Long, NavigableMap<WrappedK<K>, V>> segmentMapEntry : this.segmentMap.subMap(minTime, true, timeTo, true).entrySet()) {
- for (final Map.Entry<WrappedK<K>, V> kvMapEntry : segmentMapEntry.getValue().subMap(keyFrom, true, keyTo, true).entrySet()) {
+ for (final Map.Entry<Long, NavigableMap<Bytes, byte[]>> segmentMapEntry : this.segmentMap.subMap(minTime, true, timeTo, true).entrySet()) {
+ for (final Map.Entry<Bytes, byte[]> kvMapEntry : segmentMapEntry.getValue().subMap(keyFrom, true, keyTo, true).entrySet()) {
returnSet.add(new KeyValue<>(segmentMapEntry.getKey(), kvMapEntry.getValue()));
}
}
@@ -284,8 +275,10 @@ public class InMemoryWindowStore<K extends Comparable<K>, V> implements WindowSt
this.segmentMap.headMap(minLiveTime, true).clear();
}
- private KeyValue<Windowed<K>, V> getWindowedKeyValue(final K key, final long startTimestamp, final V value) {
- final Windowed<K> windowedK = new Windowed<>(key, new TimeWindow(startTimestamp, startTimestamp + windowSize));
+ private KeyValue<Windowed<Bytes>, byte[]> getWindowedKeyValue(final Bytes key,
+ final long startTimestamp,
+ final byte[] value) {
+ final Windowed<Bytes> windowedK = new Windowed<>(key, new TimeWindow(startTimestamp, startTimestamp + windowSize));
return new KeyValue<>(windowedK, value);
}
@@ -295,34 +288,26 @@ public class InMemoryWindowStore<K extends Comparable<K>, V> implements WindowSt
}
}
- private static class WrappedK<K extends Comparable<K>> implements Comparable<WrappedK<K>> {
- private final K key;
- private final int seqnum;
+ private static Bytes wrapForDups(final Bytes key, final int seqnum) {
+ final ByteBuffer buf = ByteBuffer.allocate(key.get().length + SEQNUM_SIZE);
+ buf.put(key.get());
+ buf.putInt(seqnum);
- WrappedK(final K key, final int seqnum) {
- this.key = key;
- this.seqnum = seqnum;
- }
+ return Bytes.wrap(buf.array());
+ }
- public K getKey() {
- return this.key;
- }
+ private static Bytes getKey(final Bytes keyBytes) {
+ final byte[] bytes = new byte[keyBytes.get().length - SEQNUM_SIZE];
+ System.arraycopy(keyBytes.get(), 0, bytes, 0, bytes.length);
+ return Bytes.wrap(bytes);
- public int compareTo(final WrappedK<K> k) {
- final int compareKeys = this.key.compareTo(k.key);
- if (compareKeys == 0) {
- return this.seqnum - k.seqnum;
- } else {
- return compareKeys;
- }
- }
}
- private static class InMemoryWindowStoreIterator<V> implements WindowStoreIterator<V> {
+ private static class InMemoryWindowStoreIterator implements WindowStoreIterator<byte[]> {
- private ListIterator<KeyValue<Long, V>> iterator;
+ private ListIterator<KeyValue<Long, byte[]>> iterator;
- InMemoryWindowStoreIterator(final ListIterator<KeyValue<Long, V>> iterator) {
+ InMemoryWindowStoreIterator(final ListIterator<KeyValue<Long, byte[]>> iterator) {
this.iterator = iterator;
}
@@ -332,7 +317,7 @@ public class InMemoryWindowStore<K extends Comparable<K>, V> implements WindowSt
}
@Override
- public KeyValue<Long, V> next() {
+ public KeyValue<Long, byte[]> next() {
return iterator.next();
}
@@ -353,11 +338,11 @@ public class InMemoryWindowStore<K extends Comparable<K>, V> implements WindowSt
}
}
- private static class InMemoryWindowedKeyValueIterator<K, V> implements KeyValueIterator<Windowed<K>, V> {
+ private static class InMemoryWindowedKeyValueIterator implements KeyValueIterator<Windowed<Bytes>, byte[]> {
- ListIterator<KeyValue<Windowed<K>, V>> iterator;
+ ListIterator<KeyValue<Windowed<Bytes>, byte[]>> iterator;
- InMemoryWindowedKeyValueIterator(final ListIterator<KeyValue<Windowed<K>, V>> iterator) {
+ InMemoryWindowedKeyValueIterator(final ListIterator<KeyValue<Windowed<Bytes>, byte[]>> iterator) {
this.iterator = iterator;
}
@@ -367,16 +352,16 @@ public class InMemoryWindowStore<K extends Comparable<K>, V> implements WindowSt
}
@Override
- public KeyValue<Windowed<K>, V> next() {
+ public KeyValue<Windowed<Bytes>, byte[]> next() {
return iterator.next();
}
@Override
- public Windowed<K> peekNextKey() {
+ public Windowed<Bytes> peekNextKey() {
if (!hasNext()) {
throw new NoSuchElementException();
} else {
- final Windowed<K> next = iterator.next().key;
+ final Windowed<Bytes> next = iterator.next().key;
iterator.previous();
return next;
}