You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2019/03/01 01:39:57 UTC

[kafka] branch trunk updated: KAFKA-7918: Inline generic parameters Pt. III: in-memory window store (#6328)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b03e8c2  KAFKA-7918: Inline generic parameters Pt. III: in-memory window store (#6328)
b03e8c2 is described below

commit b03e8c234a8aeecd10c2c96b683cfb39b24b548a
Author: A. Sophie Blee-Goldman <ab...@gmail.com>
AuthorDate: Thu Feb 28 17:39:47 2019 -0800

    KAFKA-7918: Inline generic parameters Pt. III: in-memory window store (#6328)
    
    Third (and final) PR in series to inline the generic parameters of the following bytes stores:
    
    [Pt. I] InMemoryKeyValueStore
    [Pt. II] RocksDBWindowStore
    [Pt. II] RocksDBSessionStore
    [Pt. II] MemoryLRUCache
    [Pt. II] MemoryNavigableLRUCache
    [x] InMemoryWindowStore
    
    Reviewers: Bill Bejeck <bi...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
 .../InMemoryWindowBytesStoreSupplier.java          |  13 +-
 .../state/internals/InMemoryWindowStore.java       | 173 ++++++++++-----------
 2 files changed, 84 insertions(+), 102 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowBytesStoreSupplier.java
index a6709ae..ace4de0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowBytesStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowBytesStoreSupplier.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
 import org.apache.kafka.streams.state.WindowStore;
@@ -44,13 +43,11 @@ public class InMemoryWindowBytesStoreSupplier implements WindowBytesStoreSupplie
 
     @Override
     public WindowStore<Bytes, byte[]> get() {
-        return new InMemoryWindowStore<>(name,
-                                         Serdes.Bytes(),
-                                         Serdes.ByteArray(),
-                                         retentionPeriod,
-                                         windowSize,
-                                         retainDuplicates,
-                                         metricsScope());
+        return new InMemoryWindowStore(name,
+                                       retentionPeriod,
+                                       windowSize,
+                                       retainDuplicates,
+                                       metricsScope());
     }
 
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
index 6e9b96b..77820c5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
@@ -16,18 +16,17 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import java.nio.ByteBuffer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
-import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 import org.apache.kafka.streams.state.KeyValueIterator;
@@ -44,18 +43,16 @@ import java.util.NoSuchElementException;
 import java.util.TreeMap;
 
 import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount;
-import static org.apache.kafka.streams.state.internals.WindowKeySchema.extractStoreKey;
+import static org.apache.kafka.streams.state.internals.WindowKeySchema.extractStoreKeyBytes;
 import static org.apache.kafka.streams.state.internals.WindowKeySchema.extractStoreTimestamp;
 
-public class InMemoryWindowStore<K extends Comparable<K>, V> implements WindowStore<K, V> {
+public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
 
     private static final Logger LOG = LoggerFactory.getLogger(InMemoryWindowStore.class);
+    private static final int SEQNUM_SIZE = 4;
 
     private final String name;
-    private final Serde<K> keySerde;
-    private final Serde<V> valueSerde;
     private final String metricScope;
-    private StateSerdes<K, V> serdes;
     private InternalProcessorContext context;
     private Sensor expiredRecordSensor;
     private int seqnum = 0;
@@ -65,20 +62,16 @@ public class InMemoryWindowStore<K extends Comparable<K>, V> implements WindowSt
     private final long windowSize;
     private final boolean retainDuplicates;
 
-    private final NavigableMap<Long, NavigableMap<WrappedK<K>, V>> segmentMap;
+    private final NavigableMap<Long, NavigableMap<Bytes, byte[]>> segmentMap;
 
     private volatile boolean open = false;
 
     InMemoryWindowStore(final String name,
-                               final Serde<K> keySerde,
-                               final Serde<V> valueSerde,
                                final long retentionPeriod,
                                final long windowSize,
                                final boolean retainDuplicates,
                                final String metricScope) {
         this.name = name;
-        this.keySerde = keySerde;
-        this.valueSerde = valueSerde;
         this.retentionPeriod = retentionPeriod;
         this.windowSize = windowSize;
         this.retainDuplicates = retainDuplicates;
@@ -97,12 +90,6 @@ public class InMemoryWindowStore<K extends Comparable<K>, V> implements WindowSt
     public void init(final ProcessorContext context, final StateStore root) {
         this.context = (InternalProcessorContext) context;
 
-        // construct the serde
-        this.serdes = new StateSerdes<>(
-            ProcessorStateManager.storeChangelogTopic(context.applicationId(), name),
-            keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
-            valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
-
         final StreamsMetricsImpl metrics = this.context.metrics();
         final String taskName = context.taskId().toString();
         expiredRecordSensor = metrics.storeLevelSensor(
@@ -120,33 +107,35 @@ public class InMemoryWindowStore<K extends Comparable<K>, V> implements WindowSt
 
         if (root != null) {
             context.register(root, (key, value) -> {
-                put(extractStoreKey(key, serdes), serdes.valueFrom(value), extractStoreTimestamp(key));
+                put(Bytes.wrap(extractStoreKeyBytes(key)), value, extractStoreTimestamp(key));
             });
         }
         this.open = true;
     }
 
     @Override
-    public void put(final K key, final V value) {
+    public void put(final Bytes key, final byte[] value) {
         put(key, value, context.timestamp());
     }
 
     @Override
-    public void put(final K key, final V value, final long windowStartTimestamp) {
+    public void put(final Bytes key, final byte[] value, final long windowStartTimestamp) {
         removeExpiredSegments();
         maybeUpdateSeqnumForDups();
         this.observedStreamTime = Math.max(this.observedStreamTime, windowStartTimestamp);
 
+        final Bytes keyBytes = retainDuplicates ? wrapForDups(key, seqnum) : key;
+
         if (windowStartTimestamp <= this.observedStreamTime - this.retentionPeriod) {
             expiredRecordSensor.record();
             LOG.debug("Skipping record for expired segment.");
         } else {
             if (value != null) {
                 this.segmentMap.computeIfAbsent(windowStartTimestamp, t -> new TreeMap<>());
-                this.segmentMap.get(windowStartTimestamp).put(new WrappedK<>(key, seqnum), value);
+                this.segmentMap.get(windowStartTimestamp).put(keyBytes, value);
             } else {
                 this.segmentMap.computeIfPresent(windowStartTimestamp, (t, kvMap) -> {
-                    kvMap.remove(new WrappedK<>(key, seqnum));
+                    kvMap.remove(keyBytes);
                     return kvMap;
                 });
             }
@@ -154,77 +143,79 @@ public class InMemoryWindowStore<K extends Comparable<K>, V> implements WindowSt
     }
 
     @Override
-    public V fetch(final K key, final long windowStartTimestamp) {
+    public byte[] fetch(final Bytes key, final long windowStartTimestamp) {
         removeExpiredSegments();
 
-        final NavigableMap<WrappedK<K>, V> kvMap = this.segmentMap.get(windowStartTimestamp);
+        final NavigableMap<Bytes, byte[]> kvMap = this.segmentMap.get(windowStartTimestamp);
         if (kvMap == null) {
             return null;
         } else {
-            return kvMap.get(new WrappedK<>(key, seqnum));
+            return kvMap.get(key);
         }
     }
 
     @Deprecated
     @Override
-    public WindowStoreIterator<V> fetch(final K key, final long timeFrom, final long timeTo) {
+    public WindowStoreIterator<byte[]> fetch(final Bytes key, final long timeFrom, final long timeTo) {
         removeExpiredSegments();
-        final List<KeyValue<Long, V>> records = retainDuplicates ? fetchWithDuplicates(key, timeFrom, timeTo) : fetchUnique(key, timeFrom, timeTo);
+        final List<KeyValue<Long, byte[]>> records = retainDuplicates ? fetchWithDuplicates(key, timeFrom, timeTo) : fetchUnique(key, timeFrom, timeTo);
 
-        return new InMemoryWindowStoreIterator<>(records.listIterator());
+        return new InMemoryWindowStoreIterator(records.listIterator());
     }
 
     @Deprecated
     @Override
-    public KeyValueIterator<Windowed<K>, V> fetch(final K from, final K to, final long timeFrom, final long timeTo) {
+    public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from,
+                                                           final Bytes to,
+                                                           final long timeFrom,
+                                                           final long timeTo) {
         removeExpiredSegments();
-        final List<KeyValue<Windowed<K>, V>> returnSet = new LinkedList<>();
+        final List<KeyValue<Windowed<Bytes>, byte[]>> returnSet = new LinkedList<>();
 
         // add one b/c records expire exactly retentionPeriod ms after created
         final long minTime = Math.max(timeFrom, this.observedStreamTime - this.retentionPeriod + 1);
-        final WrappedK<K> keyFrom = new WrappedK<>(from, 0);
-        final WrappedK<K> keyTo = new WrappedK<>(to, Integer.MAX_VALUE);
+        final Bytes keyFrom = retainDuplicates ? wrapForDups(from, 0) : from;
+        final Bytes keyTo = retainDuplicates ? wrapForDups(to, Integer.MAX_VALUE) : to;
 
-        for (final Map.Entry<Long, NavigableMap<WrappedK<K>, V>> segmentMapEntry : this.segmentMap.subMap(minTime, true, timeTo, true).entrySet()) {
-            for (final Map.Entry<WrappedK<K>, V> kvMapEntry : segmentMapEntry.getValue().subMap(keyFrom, true, keyTo, true).entrySet()) {
-                final WrappedK<K> wrappedKey = kvMapEntry.getKey();
-                returnSet.add(getWindowedKeyValue(wrappedKey.getKey(), segmentMapEntry.getKey(), kvMapEntry.getValue()));
+        for (final Map.Entry<Long, NavigableMap<Bytes, byte[]>> segmentMapEntry : this.segmentMap.subMap(minTime, true, timeTo, true).entrySet()) {
+            for (final Map.Entry<Bytes, byte[]> kvMapEntry : segmentMapEntry.getValue().subMap(keyFrom, true, keyTo, true).entrySet()) {
+                final Bytes keyBytes = retainDuplicates ? getKey(kvMapEntry.getKey()) : kvMapEntry.getKey();
+                returnSet.add(getWindowedKeyValue(keyBytes, segmentMapEntry.getKey(), kvMapEntry.getValue()));
             }
         }
-        return new InMemoryWindowedKeyValueIterator<>(returnSet.listIterator());
+        return new InMemoryWindowedKeyValueIterator(returnSet.listIterator());
     }
 
     @Deprecated
     @Override
-    public KeyValueIterator<Windowed<K>, V> fetchAll(final long timeFrom, final long timeTo) {
+    public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long timeFrom, final long timeTo) {
         removeExpiredSegments();
-        final List<KeyValue<Windowed<K>, V>> returnSet = new LinkedList<>();
+        final List<KeyValue<Windowed<Bytes>, byte[]>> returnSet = new LinkedList<>();
 
         // add one b/c records expire exactly retentionPeriod ms after created
         final long minTime = Math.max(timeFrom, this.observedStreamTime - this.retentionPeriod + 1);
 
-        for (final Map.Entry<Long, NavigableMap<WrappedK<K>, V>> segmentMapEntry : this.segmentMap.subMap(minTime, true, timeTo, true).entrySet()) {
-            for (final Map.Entry<WrappedK<K>, V> kvMapEntry : segmentMapEntry.getValue().entrySet()) {
-                final WrappedK<K> wrappedKey = kvMapEntry.getKey();
-                returnSet.add(getWindowedKeyValue(wrappedKey.getKey(), segmentMapEntry.getKey(), kvMapEntry.getValue()));
+        for (final Map.Entry<Long, NavigableMap<Bytes, byte[]>> segmentMapEntry : this.segmentMap.subMap(minTime, true, timeTo, true).entrySet()) {
+            for (final Map.Entry<Bytes,  byte[]> kvMapEntry : segmentMapEntry.getValue().entrySet()) {
+                final Bytes keyBytes = retainDuplicates ? getKey(kvMapEntry.getKey()) : kvMapEntry.getKey();
+                returnSet.add(getWindowedKeyValue(keyBytes, segmentMapEntry.getKey(), kvMapEntry.getValue()));
             }
         }
-        return new InMemoryWindowedKeyValueIterator<>(returnSet.listIterator());
+        return new InMemoryWindowedKeyValueIterator(returnSet.listIterator());
     }
 
     @Override
-    public KeyValueIterator<Windowed<K>, V> all() {
+    public KeyValueIterator<Windowed<Bytes>, byte[]> all() {
         removeExpiredSegments();
-        final List<KeyValue<Windowed<K>, V>> returnSet = new LinkedList<>();
+        final List<KeyValue<Windowed<Bytes>, byte[]>> returnSet = new LinkedList<>();
 
-        for (final Entry<Long, NavigableMap<WrappedK<K>, V>> segmentMapEntry : this.segmentMap.entrySet()) {
-            for (final Entry<WrappedK<K>, V> kvMapEntry : segmentMapEntry.getValue().entrySet()) {
-                final WrappedK<K> wrappedKey = kvMapEntry.getKey();
-                returnSet.add(getWindowedKeyValue(wrappedKey.getKey(), segmentMapEntry.getKey(),
-                    kvMapEntry.getValue()));
+        for (final Entry<Long, NavigableMap<Bytes, byte[]>> segmentMapEntry : this.segmentMap.entrySet()) {
+            for (final Entry<Bytes, byte[]> kvMapEntry : segmentMapEntry.getValue().entrySet()) {
+                final Bytes keyBytes = retainDuplicates ? getKey(kvMapEntry.getKey()) : kvMapEntry.getKey();
+                returnSet.add(getWindowedKeyValue(keyBytes, segmentMapEntry.getKey(), kvMapEntry.getValue()));
             }
         }
-        return new InMemoryWindowedKeyValueIterator<>(returnSet.listIterator());
+        return new InMemoryWindowedKeyValueIterator(returnSet.listIterator());
     }
 
     @Override
@@ -248,14 +239,14 @@ public class InMemoryWindowStore<K extends Comparable<K>, V> implements WindowSt
         this.open = false;
     }
 
-    private List<KeyValue<Long, V>> fetchUnique(final K key, final long timeFrom, final long timeTo) {
-        final List<KeyValue<Long, V>> returnSet = new LinkedList<>();
+    private List<KeyValue<Long, byte[]>> fetchUnique(final Bytes key, final long timeFrom, final long timeTo) {
+        final List<KeyValue<Long, byte[]>> returnSet = new LinkedList<>();
 
         // add one b/c records expire exactly retentionPeriod ms after created
         final long minTime = Math.max(timeFrom, this.observedStreamTime - this.retentionPeriod + 1);
 
-        for (final Map.Entry<Long, NavigableMap<WrappedK<K>, V>> segmentMapEntry : this.segmentMap.subMap(minTime, true, timeTo, true).entrySet()) {
-            final V value = segmentMapEntry.getValue().get(new WrappedK<>(key, seqnum));
+        for (final Map.Entry<Long, NavigableMap<Bytes, byte[]>> segmentMapEntry : this.segmentMap.subMap(minTime, true, timeTo, true).entrySet()) {
+            final byte[] value = segmentMapEntry.getValue().get(key);
             if (value != null) {
                 returnSet.add(new KeyValue<>(segmentMapEntry.getKey(), value));
             }
@@ -263,16 +254,16 @@ public class InMemoryWindowStore<K extends Comparable<K>, V> implements WindowSt
         return returnSet;
     }
 
-    private List<KeyValue<Long, V>> fetchWithDuplicates(final K key, final long timeFrom, final long timeTo) {
-        final List<KeyValue<Long, V>> returnSet = new LinkedList<>();
+    private List<KeyValue<Long, byte[]>> fetchWithDuplicates(final Bytes key, final long timeFrom, final long timeTo) {
+        final List<KeyValue<Long, byte[]>> returnSet = new LinkedList<>();
 
         // add one b/c records expire exactly retentionPeriod ms after created
         final long minTime = Math.max(timeFrom, this.observedStreamTime - this.retentionPeriod + 1);
-        final WrappedK<K> keyFrom = new WrappedK<>(key, 0);
-        final WrappedK<K> keyTo = new WrappedK<>(key, Integer.MAX_VALUE);
+        final Bytes keyFrom = wrapForDups(key, 0);
+        final Bytes keyTo = wrapForDups(key, Integer.MAX_VALUE);
 
-        for (final Map.Entry<Long, NavigableMap<WrappedK<K>, V>> segmentMapEntry : this.segmentMap.subMap(minTime, true, timeTo, true).entrySet()) {
-            for (final Map.Entry<WrappedK<K>, V> kvMapEntry : segmentMapEntry.getValue().subMap(keyFrom, true, keyTo, true).entrySet()) {
+        for (final Map.Entry<Long, NavigableMap<Bytes, byte[]>> segmentMapEntry : this.segmentMap.subMap(minTime, true, timeTo, true).entrySet()) {
+            for (final Map.Entry<Bytes, byte[]> kvMapEntry : segmentMapEntry.getValue().subMap(keyFrom, true, keyTo, true).entrySet()) {
                 returnSet.add(new KeyValue<>(segmentMapEntry.getKey(), kvMapEntry.getValue()));
             }
         }
@@ -284,8 +275,10 @@ public class InMemoryWindowStore<K extends Comparable<K>, V> implements WindowSt
         this.segmentMap.headMap(minLiveTime, true).clear();
     }
 
-    private KeyValue<Windowed<K>, V> getWindowedKeyValue(final K key, final long startTimestamp, final V value) {
-        final Windowed<K> windowedK = new Windowed<>(key, new TimeWindow(startTimestamp, startTimestamp + windowSize));
+    private KeyValue<Windowed<Bytes>, byte[]> getWindowedKeyValue(final Bytes key,
+                                                                  final long startTimestamp,
+                                                                  final byte[] value) {
+        final Windowed<Bytes> windowedK = new Windowed<>(key, new TimeWindow(startTimestamp, startTimestamp + windowSize));
         return new KeyValue<>(windowedK, value);
     }
 
@@ -295,34 +288,26 @@ public class InMemoryWindowStore<K extends Comparable<K>, V> implements WindowSt
         }
     }
 
-    private static class WrappedK<K extends Comparable<K>> implements Comparable<WrappedK<K>> {
-        private final K key;
-        private final int seqnum;
+    private static Bytes wrapForDups(final Bytes key, final int seqnum) {
+        final ByteBuffer buf = ByteBuffer.allocate(key.get().length + SEQNUM_SIZE);
+        buf.put(key.get());
+        buf.putInt(seqnum);
 
-        WrappedK(final K key, final int seqnum) {
-            this.key = key;
-            this.seqnum = seqnum;
-        }
+        return Bytes.wrap(buf.array());
+    }
 
-        public K getKey() {
-            return this.key;
-        }
+    private static Bytes getKey(final Bytes keyBytes) {
+        final byte[] bytes = new byte[keyBytes.get().length  - SEQNUM_SIZE];
+        System.arraycopy(keyBytes.get(), 0, bytes, 0, bytes.length);
+        return Bytes.wrap(bytes);
 
-        public int compareTo(final WrappedK<K> k) {
-            final int compareKeys = this.key.compareTo(k.key);
-            if (compareKeys == 0) {
-                return this.seqnum - k.seqnum;
-            } else {
-                return compareKeys;
-            }
-        }
     }
 
-    private static class InMemoryWindowStoreIterator<V> implements WindowStoreIterator<V> {
+    private static class InMemoryWindowStoreIterator implements WindowStoreIterator<byte[]> {
 
-        private ListIterator<KeyValue<Long, V>> iterator;
+        private ListIterator<KeyValue<Long, byte[]>> iterator;
 
-        InMemoryWindowStoreIterator(final ListIterator<KeyValue<Long, V>> iterator) {
+        InMemoryWindowStoreIterator(final ListIterator<KeyValue<Long, byte[]>> iterator) {
             this.iterator = iterator;
         }
 
@@ -332,7 +317,7 @@ public class InMemoryWindowStore<K extends Comparable<K>, V> implements WindowSt
         }
 
         @Override
-        public KeyValue<Long, V> next() {
+        public KeyValue<Long, byte[]> next() {
             return iterator.next();
         }
 
@@ -353,11 +338,11 @@ public class InMemoryWindowStore<K extends Comparable<K>, V> implements WindowSt
         }
     }
 
-    private static class InMemoryWindowedKeyValueIterator<K, V> implements KeyValueIterator<Windowed<K>, V> {
+    private static class InMemoryWindowedKeyValueIterator implements KeyValueIterator<Windowed<Bytes>, byte[]> {
 
-        ListIterator<KeyValue<Windowed<K>, V>> iterator;
+        ListIterator<KeyValue<Windowed<Bytes>, byte[]>> iterator;
 
-        InMemoryWindowedKeyValueIterator(final ListIterator<KeyValue<Windowed<K>, V>> iterator) {
+        InMemoryWindowedKeyValueIterator(final ListIterator<KeyValue<Windowed<Bytes>, byte[]>> iterator) {
             this.iterator = iterator;
         }
 
@@ -367,16 +352,16 @@ public class InMemoryWindowStore<K extends Comparable<K>, V> implements WindowSt
         }
 
         @Override
-        public KeyValue<Windowed<K>, V> next() {
+        public KeyValue<Windowed<Bytes>, byte[]> next() {
             return iterator.next();
         }
 
         @Override
-        public Windowed<K> peekNextKey() {
+        public Windowed<Bytes> peekNextKey() {
             if (!hasNext()) {
                 throw new NoSuchElementException();
             } else {
-                final Windowed<K> next = iterator.next().key;
+                final Windowed<Bytes> next = iterator.next().key;
                 iterator.previous();
                 return next;
             }