You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/05/04 14:44:31 UTC
[kafka] branch trunk updated: [HOT FIX] in-memory store behavior
should match rocksDB (#6657)
This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 14314e3 [HOT FIX] in-memory store behavior should match rocksDB (#6657)
14314e3 is described below
commit 14314e3d687b4c7b77750d27a085b803c934e3e9
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Sat May 4 07:44:18 2019 -0700
[HOT FIX] in-memory store behavior should match rocksDB (#6657)
While working on consolidating the various store unit tests I uncovered some minor "bugs" in the in-memory stores (inconsistencies with the behavior as established by the RocksDB stores).
open iterators should be properly closed in the case the store is closed
fetch/findSessions should always throw NPE if key is null
window end time should be truncated at Long.MAX_VALUE rather than throw exception
(Verified in-memory stores pass all applicable rocksDB tests now, unified unit tests coming in another PR)
Reviewers: Guozhang Wang <wa...@gmail.com>, Bill Bejeck <bb...@gmail.com>
---
.../state/internals/InMemorySessionStore.java | 25 ++++++++
.../state/internals/InMemoryWindowStore.java | 70 +++++++++++++++++-----
.../streams/state/internals/WindowKeySchema.java | 13 +++-
3 files changed, 91 insertions(+), 17 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
index c39dd58..f3b8565 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentNavigableMap;
@@ -134,6 +135,8 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
public byte[] fetchSession(final Bytes key, final long startTime, final long endTime) {
removeExpiredSegments();
+ Objects.requireNonNull(key, "key cannot be null");
+
// Only need to search if the record hasn't expired yet
if (endTime > observedStreamTime - retentionPeriod) {
final ConcurrentNavigableMap<Bytes, ConcurrentNavigableMap<Long, byte[]>> keyMap = endTimeMap.get(endTime);
@@ -152,6 +155,8 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final Bytes key,
final long earliestSessionEndTime,
final long latestSessionStartTime) {
+ Objects.requireNonNull(key, "key cannot be null");
+
removeExpiredSegments();
return registerNewIterator(key,
@@ -166,6 +171,9 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
final Bytes keyTo,
final long earliestSessionEndTime,
final long latestSessionStartTime) {
+ Objects.requireNonNull(keyFrom, "from key cannot be null");
+ Objects.requireNonNull(keyTo, "to key cannot be null");
+
removeExpiredSegments();
if (keyFrom.compareTo(keyTo) > 0) {
@@ -183,6 +191,9 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
@Override
public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes key) {
+
+ Objects.requireNonNull(key, "key cannot be null");
+
removeExpiredSegments();
return registerNewIterator(key, key, Long.MAX_VALUE, endTimeMap.entrySet().iterator());
@@ -190,8 +201,13 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
@Override
public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes from, final Bytes to) {
+
+ Objects.requireNonNull(from, "from key cannot be null");
+ Objects.requireNonNull(to, "to key cannot be null");
+
removeExpiredSegments();
+
return registerNewIterator(from, to, Long.MAX_VALUE, endTimeMap.entrySet().iterator());
}
@@ -212,6 +228,13 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
@Override
public void close() {
+ if (openIterators.size() != 0) {
+ LOG.warn("Closing {} open iterators for store {}", openIterators.size(), name);
+ for (final InMemorySessionStoreIterator it : openIterators) {
+ it.close();
+ }
+ }
+
endTimeMap.clear();
openIterators.clear();
open = false;
@@ -303,6 +326,8 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
@Override
public void close() {
+ next = null;
+ recordIterator = null;
callback.deregisterIterator(this);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
index 797a5d9..8063410 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.state.internals;
import java.nio.ByteBuffer;
import java.util.Iterator;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentNavigableMap;
@@ -135,6 +136,9 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
} else {
segmentMap.computeIfPresent(windowStartTimestamp, (t, kvMap) -> {
kvMap.remove(keyBytes);
+ if (kvMap.isEmpty()) {
+ segmentMap.remove(windowStartTimestamp);
+ }
return kvMap;
});
}
@@ -143,6 +147,9 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
@Override
public byte[] fetch(final Bytes key, final long windowStartTimestamp) {
+
+ Objects.requireNonNull(key, "key cannot be null");
+
removeExpiredSegments();
if (windowStartTimestamp <= observedStreamTime - retentionPeriod) {
@@ -160,6 +167,9 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
@Deprecated
@Override
public WindowStoreIterator<byte[]> fetch(final Bytes key, final long timeFrom, final long timeTo) {
+
+ Objects.requireNonNull(key, "key cannot be null");
+
removeExpiredSegments();
// add one b/c records expire exactly retentionPeriod ms after created
@@ -179,6 +189,9 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
final Bytes to,
final long timeFrom,
final long timeTo) {
+ Objects.requireNonNull(from, "from key cannot be null");
+ Objects.requireNonNull(to, "to key cannot be null");
+
removeExpiredSegments();
if (from.compareTo(to) > 0) {
@@ -242,6 +255,13 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
@Override
public void close() {
+ if (openIterators.size() != 0) {
+ LOG.warn("Closing {} open iterators for store {}", openIterators.size(), name);
+ for (final InMemoryWindowStoreIteratorWrapper it : openIterators) {
+ it.close();
+ }
+ }
+
segmentMap.clear();
open = false;
}
@@ -281,7 +301,7 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
final Bytes keyTo = retainDuplicates ? wrapForDups(key, Integer.MAX_VALUE) : key;
final WrappedInMemoryWindowStoreIterator iterator =
- new WrappedInMemoryWindowStoreIterator(keyFrom, keyTo, segmentIterator, openIterators::remove);
+ new WrappedInMemoryWindowStoreIterator(keyFrom, keyTo, segmentIterator, openIterators::remove, retainDuplicates);
openIterators.add(iterator);
return iterator;
@@ -319,15 +339,18 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
private final boolean allKeys;
private final Bytes keyFrom;
private final Bytes keyTo;
+ private final boolean retainDuplicates;
private final ClosingCallback callback;
InMemoryWindowStoreIteratorWrapper(final Bytes keyFrom,
final Bytes keyTo,
final Iterator<Map.Entry<Long, ConcurrentNavigableMap<Bytes, byte[]>>> segmentIterator,
- final ClosingCallback callback) {
+ final ClosingCallback callback,
+ final boolean retainDuplicates) {
this.keyFrom = keyFrom;
this.keyTo = keyTo;
allKeys = (keyFrom == null) && (keyTo == null);
+ this.retainDuplicates = retainDuplicates;
this.segmentIterator = segmentIterator;
this.callback = callback;
@@ -343,15 +366,26 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
}
next = getNext();
- return next != null;
- }
+ if (next == null) {
+ return false;
+ }
- public void remove() {
- throw new UnsupportedOperationException(
- "remove() is not supported in " + getClass().getName());
+ if (allKeys || !retainDuplicates) {
+ return true;
+ }
+
+ final Bytes key = getKey(next.key);
+ if (key.compareTo(getKey(keyFrom)) >= 0 && key.compareTo(getKey(keyTo)) <= 0) {
+ return true;
+ } else {
+ next = null;
+ return hasNext();
+ }
}
public void close() {
+ next = null;
+ recordIterator = null;
callback.deregisterIterator(this);
}
@@ -395,8 +429,9 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
WrappedInMemoryWindowStoreIterator(final Bytes keyFrom,
final Bytes keyTo,
final Iterator<Map.Entry<Long, ConcurrentNavigableMap<Bytes, byte[]>>> segmentIterator,
- final ClosingCallback callback) {
- super(keyFrom, keyTo, segmentIterator, callback);
+ final ClosingCallback callback,
+ final boolean retainDuplicates) {
+ super(keyFrom, keyTo, segmentIterator, callback, retainDuplicates);
}
@Override
@@ -419,13 +454,12 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
}
public static WrappedInMemoryWindowStoreIterator emptyIterator() {
- return new WrappedInMemoryWindowStoreIterator(null, null, null, it -> { });
+ return new WrappedInMemoryWindowStoreIterator(null, null, null, it -> { }, false);
}
}
private static class WrappedWindowedKeyValueIterator extends InMemoryWindowStoreIteratorWrapper implements KeyValueIterator<Windowed<Bytes>, byte[]> {
- private final boolean retainDuplicates;
private final long windowSize;
WrappedWindowedKeyValueIterator(final Bytes keyFrom,
@@ -434,8 +468,7 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
final ClosingCallback callback,
final boolean retainDuplicates,
final long windowSize) {
- super(keyFrom, keyTo, segmentIterator, callback);
- this.retainDuplicates = retainDuplicates;
+ super(keyFrom, keyTo, segmentIterator, callback, retainDuplicates);
this.windowSize = windowSize;
}
@@ -457,8 +490,15 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
}
private Windowed<Bytes> getWindowedKey() {
- final Bytes key = retainDuplicates ? getKey(super.next.key) : super.next.key;
- final TimeWindow timeWindow = new TimeWindow(super.currentTime, super.currentTime + windowSize);
+ final Bytes key = super.retainDuplicates ? getKey(super.next.key) : super.next.key;
+ long endTime = super.currentTime + windowSize;
+
+ if (endTime < 0) {
+ LOG.warn("Warning: window end time was truncated to Long.MAX");
+ endTime = Long.MAX_VALUE;
+ }
+
+ final TimeWindow timeWindow = new TimeWindow(super.currentTime, endTime);
return new Windowed<>(key, timeWindow);
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
index dd8a2f1..9218ccf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
@@ -26,9 +26,13 @@ import org.apache.kafka.streams.state.StateSerdes;
import java.nio.ByteBuffer;
import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema {
+ private static final Logger LOG = LoggerFactory.getLogger(WindowKeySchema.class);
+
private static final int SEQNUM_SIZE = 4;
private static final int TIMESTAMP_SIZE = 8;
private static final int SUFFIX_SIZE = TIMESTAMP_SIZE + SEQNUM_SIZE;
@@ -99,8 +103,13 @@ public class WindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema {
*/
static TimeWindow timeWindowForSize(final long startMs,
final long windowSize) {
- final long endMs = startMs + windowSize;
- return new TimeWindow(startMs, endMs < 0 ? Long.MAX_VALUE : endMs);
+ long endMs = startMs + windowSize;
+
+ if (endMs < 0) {
+ LOG.warn("Warning: window end time was truncated to Long.MAX");
+ endMs = Long.MAX_VALUE;
+ }
+ return new TimeWindow(startMs, endMs);
}
// for pipe serdes