You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/05/04 14:44:31 UTC

[kafka] branch trunk updated: [HOT FIX] in-memory store behavior should match rocksDB (#6657)

This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 14314e3  [HOT FIX] in-memory store behavior should match rocksDB (#6657)
14314e3 is described below

commit 14314e3d687b4c7b77750d27a085b803c934e3e9
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Sat May 4 07:44:18 2019 -0700

    [HOT FIX] in-memory store behavior should match rocksDB (#6657)
    
    While working on consolidating the various store unit tests I uncovered some minor "bugs" in the in-memory stores (inconsistencies with the behavior as established by the RocksDB stores).
    
    open iterators should be properly closed in the case the store is closed
    fetch/findSessions should always throw NPE if key is null
    window end time should be truncated at Long.MAX_VALUE rather than throw exception
    (Verified in-memory stores pass all applicable rocksDB tests now, unified unit tests coming in another PR)
    
    Reviewers: Guozhang Wang <wa...@gmail.com>, Bill Bejeck <bb...@gmail.com>
---
 .../state/internals/InMemorySessionStore.java      | 25 ++++++++
 .../state/internals/InMemoryWindowStore.java       | 70 +++++++++++++++++-----
 .../streams/state/internals/WindowKeySchema.java   | 13 +++-
 3 files changed, 91 insertions(+), 17 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
index c39dd58..f3b8565 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.NoSuchElementException;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentNavigableMap;
@@ -134,6 +135,8 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
     public byte[] fetchSession(final Bytes key, final long startTime, final long endTime) {
         removeExpiredSegments();
 
+        Objects.requireNonNull(key, "key cannot be null");
+
         // Only need to search if the record hasn't expired yet
         if (endTime > observedStreamTime - retentionPeriod) {
             final ConcurrentNavigableMap<Bytes, ConcurrentNavigableMap<Long, byte[]>> keyMap = endTimeMap.get(endTime);
@@ -152,6 +155,8 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
     public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Bytes key,
                                                                   final long earliestSessionEndTime,
                                                                   final long latestSessionStartTime) {
+        Objects.requireNonNull(key, "key cannot be null");
+
         removeExpiredSegments();
 
         return registerNewIterator(key,
@@ -166,6 +171,9 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
                                                                   final Bytes keyTo,
                                                                   final long earliestSessionEndTime,
                                                                   final long latestSessionStartTime) {
+        Objects.requireNonNull(keyFrom, "from key cannot be null");
+        Objects.requireNonNull(keyTo, "to key cannot be null");
+
         removeExpiredSegments();
 
         if (keyFrom.compareTo(keyTo) > 0) {
@@ -183,6 +191,9 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
 
     @Override
     public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes key) {
+
+        Objects.requireNonNull(key, "key cannot be null");
+
         removeExpiredSegments();
 
         return registerNewIterator(key, key, Long.MAX_VALUE, endTimeMap.entrySet().iterator());
@@ -190,8 +201,13 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
 
     @Override
     public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from, final Bytes to) {
+
+        Objects.requireNonNull(from, "from key cannot be null");
+        Objects.requireNonNull(to, "to key cannot be null");
+
         removeExpiredSegments();
 
+
         return registerNewIterator(from, to, Long.MAX_VALUE, endTimeMap.entrySet().iterator());
     }
 
@@ -212,6 +228,13 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
 
     @Override
     public void close() {
+        if (openIterators.size() != 0) {
+            LOG.warn("Closing {} open iterators for store {}", openIterators.size(), name);
+            for (final InMemorySessionStoreIterator it : openIterators) {
+                it.close();
+            }
+        }
+
         endTimeMap.clear();
         openIterators.clear();
         open = false;
@@ -303,6 +326,8 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
 
         @Override
         public void close() {
+            next = null;
+            recordIterator = null;
             callback.deregisterIterator(this);
         }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
index 797a5d9..8063410 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals;
 
 import java.nio.ByteBuffer;
 import java.util.Iterator;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentNavigableMap;
@@ -135,6 +136,9 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
             } else {
                 segmentMap.computeIfPresent(windowStartTimestamp, (t, kvMap) -> {
                     kvMap.remove(keyBytes);
+                    if (kvMap.isEmpty()) {
+                        segmentMap.remove(windowStartTimestamp);
+                    }
                     return kvMap;
                 });
             }
@@ -143,6 +147,9 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
 
     @Override
     public byte[] fetch(final Bytes key, final long windowStartTimestamp) {
+
+        Objects.requireNonNull(key, "key cannot be null");
+
         removeExpiredSegments();
 
         if (windowStartTimestamp <= observedStreamTime - retentionPeriod) {
@@ -160,6 +167,9 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
     @Deprecated
     @Override
     public WindowStoreIterator<byte[]> fetch(final Bytes key, final long timeFrom, final long timeTo) {
+
+        Objects.requireNonNull(key, "key cannot be null");
+
         removeExpiredSegments();
 
         // add one b/c records expire exactly retentionPeriod ms after created
@@ -179,6 +189,9 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
                                                            final Bytes to,
                                                            final long timeFrom,
                                                            final long timeTo) {
+        Objects.requireNonNull(from, "from key cannot be null");
+        Objects.requireNonNull(to, "to key cannot be null");
+
         removeExpiredSegments();
 
         if (from.compareTo(to) > 0) {
@@ -242,6 +255,13 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
 
     @Override
     public void close() {
+        if (openIterators.size() != 0) {
+            LOG.warn("Closing {} open iterators for store {}", openIterators.size(), name);
+            for (final InMemoryWindowStoreIteratorWrapper it : openIterators) {
+                it.close();
+            }
+        }
+        
         segmentMap.clear();
         open = false;
     }
@@ -281,7 +301,7 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
         final Bytes keyTo = retainDuplicates ? wrapForDups(key, Integer.MAX_VALUE) : key;
 
         final WrappedInMemoryWindowStoreIterator iterator =
-            new WrappedInMemoryWindowStoreIterator(keyFrom, keyTo, segmentIterator, openIterators::remove);
+            new WrappedInMemoryWindowStoreIterator(keyFrom, keyTo, segmentIterator, openIterators::remove, retainDuplicates);
 
         openIterators.add(iterator);
         return iterator;
@@ -319,15 +339,18 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
         private final boolean allKeys;
         private final Bytes keyFrom;
         private final Bytes keyTo;
+        private final boolean retainDuplicates;
         private final ClosingCallback callback;
 
         InMemoryWindowStoreIteratorWrapper(final Bytes keyFrom,
                                            final Bytes keyTo,
                                            final Iterator<Map.Entry<Long, ConcurrentNavigableMap<Bytes, byte[]>>> segmentIterator,
-                                           final ClosingCallback callback) {
+                                           final ClosingCallback callback,
+                                           final boolean retainDuplicates) {
             this.keyFrom = keyFrom;
             this.keyTo = keyTo;
             allKeys = (keyFrom == null) && (keyTo == null);
+            this.retainDuplicates = retainDuplicates;
 
             this.segmentIterator = segmentIterator;
             this.callback = callback;
@@ -343,15 +366,26 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
             }
 
             next = getNext();
-            return next != null;
-        }
+            if (next == null) {
+                return false;
+            }
 
-        public void remove() {
-            throw new UnsupportedOperationException(
-                "remove() is not supported in " + getClass().getName());
+            if (allKeys || !retainDuplicates) {
+                return true;
+            }
+
+            final Bytes key = getKey(next.key);
+            if (key.compareTo(getKey(keyFrom)) >= 0 && key.compareTo(getKey(keyTo)) <= 0) {
+                return true;
+            } else {
+                next = null;
+                return hasNext();
+            }
         }
 
         public void close() {
+            next = null;
+            recordIterator = null;
             callback.deregisterIterator(this);
         }
 
@@ -395,8 +429,9 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
         WrappedInMemoryWindowStoreIterator(final Bytes keyFrom,
                                            final Bytes keyTo,
                                            final Iterator<Map.Entry<Long, ConcurrentNavigableMap<Bytes, byte[]>>> segmentIterator,
-                                           final ClosingCallback callback)  {
-            super(keyFrom, keyTo, segmentIterator, callback);
+                                           final ClosingCallback callback,
+                                           final boolean retainDuplicates)  {
+            super(keyFrom, keyTo, segmentIterator, callback, retainDuplicates);
         }
 
         @Override
@@ -419,13 +454,12 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
         }
 
         public static WrappedInMemoryWindowStoreIterator emptyIterator() {
-            return new WrappedInMemoryWindowStoreIterator(null, null, null, it -> { });
+            return new WrappedInMemoryWindowStoreIterator(null, null, null, it -> { }, false);
         }
     }
 
     private static class WrappedWindowedKeyValueIterator extends InMemoryWindowStoreIteratorWrapper implements KeyValueIterator<Windowed<Bytes>, byte[]> {
 
-        private final boolean retainDuplicates;
         private final long windowSize;
 
         WrappedWindowedKeyValueIterator(final Bytes keyFrom,
@@ -434,8 +468,7 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
                                         final ClosingCallback callback,
                                         final boolean retainDuplicates,
                                         final long windowSize) {
-            super(keyFrom, keyTo, segmentIterator, callback);
-            this.retainDuplicates = retainDuplicates;
+            super(keyFrom, keyTo, segmentIterator, callback, retainDuplicates);
             this.windowSize = windowSize;
         }
 
@@ -457,8 +490,15 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
         }
 
         private Windowed<Bytes> getWindowedKey() {
-            final Bytes key = retainDuplicates ? getKey(super.next.key) : super.next.key;
-            final TimeWindow timeWindow = new TimeWindow(super.currentTime, super.currentTime + windowSize);
+            final Bytes key = super.retainDuplicates ? getKey(super.next.key) : super.next.key;
+            long endTime = super.currentTime + windowSize;
+
+            if (endTime < 0) {
+                LOG.warn("Warning: window end time was truncated to Long.MAX");
+                endTime = Long.MAX_VALUE;
+            }
+
+            final TimeWindow timeWindow = new TimeWindow(super.currentTime, endTime);
             return new Windowed<>(key, timeWindow);
         }
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
index dd8a2f1..9218ccf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
@@ -26,9 +26,13 @@ import org.apache.kafka.streams.state.StateSerdes;
 
 import java.nio.ByteBuffer;
 import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema {
 
+    private static final Logger LOG = LoggerFactory.getLogger(WindowKeySchema.class);
+
     private static final int SEQNUM_SIZE = 4;
     private static final int TIMESTAMP_SIZE = 8;
     private static final int SUFFIX_SIZE = TIMESTAMP_SIZE + SEQNUM_SIZE;
@@ -99,8 +103,13 @@ public class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema {
      */
     static TimeWindow timeWindowForSize(final long startMs,
                                         final long windowSize) {
-        final long endMs = startMs + windowSize;
-        return new TimeWindow(startMs, endMs < 0 ? Long.MAX_VALUE : endMs);
+        long endMs = startMs + windowSize;
+
+        if (endMs < 0) {
+            LOG.warn("Warning: window end time was truncated to Long.MAX");
+            endMs = Long.MAX_VALUE;
+        }
+        return new TimeWindow(startMs, endMs);
     }
 
     // for pipe serdes