You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/03/06 19:02:37 UTC
[kafka] branch trunk updated: KAFKA-8007: Avoid copying on fetch in
InMemoryWindowStore (#6335)
This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4788863 KAFKA-8007: Avoid copying on fetch in InMemoryWindowStore (#6335)
4788863 is described below
commit 47888630a05260f548988d943a633120845f767d
Author: A. Sophie Blee-Goldman <ab...@gmail.com>
AuthorDate: Wed Mar 6 11:02:27 2019 -0800
KAFKA-8007: Avoid copying on fetch in InMemoryWindowStore (#6335)
Rewrote the InMemoryWindowStore implementation by moving the work of a fetch to the iterator, and cleaned up the iterators as well.
Reviewers: Guozhang Wang <wa...@gmail.com>, Bill Bejeck <bb...@gmail.com>
---
.../state/internals/InMemoryWindowStore.java | 296 ++++++++++++---------
.../state/internals/InMemoryWindowStoreTest.java | 108 +++++++-
2 files changed, 284 insertions(+), 120 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
index 67eec0d..7d1b279 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
@@ -17,6 +17,11 @@
package org.apache.kafka.streams.state.internals;
import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Bytes;
@@ -33,20 +38,15 @@ import org.apache.kafka.streams.state.KeyValueIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.ListIterator;
import java.util.Map;
-import java.util.Map.Entry;
-import java.util.NavigableMap;
import java.util.NoSuchElementException;
-import java.util.TreeMap;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.EXPIRED_WINDOW_RECORD_DROP;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount;
import static org.apache.kafka.streams.state.internals.WindowKeySchema.extractStoreKeyBytes;
import static org.apache.kafka.streams.state.internals.WindowKeySchema.extractStoreTimestamp;
+
public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
private static final Logger LOG = LoggerFactory.getLogger(InMemoryWindowStore.class);
@@ -63,22 +63,24 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
private final long windowSize;
private final boolean retainDuplicates;
- private final NavigableMap<Long, NavigableMap<Bytes, byte[]>> segmentMap;
+ private final ConcurrentNavigableMap<Long, ConcurrentNavigableMap<Bytes, byte[]>> segmentMap;
+ private final Set<InMemoryWindowStoreIteratorWrapper> openIterators;
private volatile boolean open = false;
InMemoryWindowStore(final String name,
- final long retentionPeriod,
- final long windowSize,
- final boolean retainDuplicates,
- final String metricScope) {
+ final long retentionPeriod,
+ final long windowSize,
+ final boolean retainDuplicates,
+ final String metricScope) {
this.name = name;
this.retentionPeriod = retentionPeriod;
this.windowSize = windowSize;
this.retainDuplicates = retainDuplicates;
this.metricScope = metricScope;
- this.segmentMap = new TreeMap<>();
+ this.openIterators = ConcurrentHashMap.newKeySet();
+ this.segmentMap = new ConcurrentSkipListMap<>();
}
@Override
@@ -132,7 +134,7 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
LOG.debug("Skipping record for expired segment.");
} else {
if (value != null) {
- this.segmentMap.computeIfAbsent(windowStartTimestamp, t -> new TreeMap<>());
+ this.segmentMap.computeIfAbsent(windowStartTimestamp, t -> new ConcurrentSkipListMap<>());
this.segmentMap.get(windowStartTimestamp).put(keyBytes, value);
} else {
this.segmentMap.computeIfPresent(windowStartTimestamp, (t, kvMap) -> {
@@ -147,7 +149,11 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
public byte[] fetch(final Bytes key, final long windowStartTimestamp) {
removeExpiredSegments();
- final NavigableMap<Bytes, byte[]> kvMap = this.segmentMap.get(windowStartTimestamp);
+ if (windowStartTimestamp <= this.observedStreamTime - this.retentionPeriod) {
+ return null;
+ }
+
+ final ConcurrentNavigableMap<Bytes, byte[]> kvMap = this.segmentMap.get(windowStartTimestamp);
if (kvMap == null) {
return null;
} else {
@@ -159,9 +165,16 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
@Override
public WindowStoreIterator<byte[]> fetch(final Bytes key, final long timeFrom, final long timeTo) {
removeExpiredSegments();
- final List<KeyValue<Long, byte[]>> records = retainDuplicates ? fetchWithDuplicates(key, timeFrom, timeTo) : fetchUnique(key, timeFrom, timeTo);
- return new InMemoryWindowStoreIterator(records.listIterator());
+ // add one b/c records expire exactly retentionPeriod ms after created
+ final long minTime = Math.max(timeFrom, this.observedStreamTime - this.retentionPeriod + 1);
+
+ if (timeTo < minTime) {
+ return new WrappedInMemoryWindowStoreIterator();
+ }
+
+ return new WrappedInMemoryWindowStoreIterator(
+ key, key, this.segmentMap.subMap(minTime, true, timeTo, true).entrySet().iterator());
}
@Deprecated
@@ -171,52 +184,42 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
final long timeFrom,
final long timeTo) {
removeExpiredSegments();
- final List<KeyValue<Windowed<Bytes>, byte[]>> returnSet = new LinkedList<>();
// add one b/c records expire exactly retentionPeriod ms after created
final long minTime = Math.max(timeFrom, this.observedStreamTime - this.retentionPeriod + 1);
- final Bytes keyFrom = retainDuplicates ? wrapForDups(from, 0) : from;
- final Bytes keyTo = retainDuplicates ? wrapForDups(to, Integer.MAX_VALUE) : to;
- for (final Map.Entry<Long, NavigableMap<Bytes, byte[]>> segmentMapEntry : this.segmentMap.subMap(minTime, true, timeTo, true).entrySet()) {
- for (final Map.Entry<Bytes, byte[]> kvMapEntry : segmentMapEntry.getValue().subMap(keyFrom, true, keyTo, true).entrySet()) {
- final Bytes keyBytes = retainDuplicates ? getKey(kvMapEntry.getKey()) : kvMapEntry.getKey();
- returnSet.add(getWindowedKeyValue(keyBytes, segmentMapEntry.getKey(), kvMapEntry.getValue()));
- }
+ if (timeTo < minTime) {
+ return new WrappedWindowedKeyValueIterator();
}
- return new InMemoryWindowedKeyValueIterator(returnSet.listIterator());
+
+ return new WrappedWindowedKeyValueIterator(
+ from, to, this.segmentMap.subMap(minTime, true, timeTo, true).entrySet().iterator());
}
@Deprecated
@Override
public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long timeFrom, final long timeTo) {
removeExpiredSegments();
- final List<KeyValue<Windowed<Bytes>, byte[]>> returnSet = new LinkedList<>();
// add one b/c records expire exactly retentionPeriod ms after created
final long minTime = Math.max(timeFrom, this.observedStreamTime - this.retentionPeriod + 1);
- for (final Map.Entry<Long, NavigableMap<Bytes, byte[]>> segmentMapEntry : this.segmentMap.subMap(minTime, true, timeTo, true).entrySet()) {
- for (final Map.Entry<Bytes, byte[]> kvMapEntry : segmentMapEntry.getValue().entrySet()) {
- final Bytes keyBytes = retainDuplicates ? getKey(kvMapEntry.getKey()) : kvMapEntry.getKey();
- returnSet.add(getWindowedKeyValue(keyBytes, segmentMapEntry.getKey(), kvMapEntry.getValue()));
- }
+ if (timeTo < minTime) {
+ return new WrappedWindowedKeyValueIterator();
}
- return new InMemoryWindowedKeyValueIterator(returnSet.listIterator());
+
+ return new WrappedWindowedKeyValueIterator(
+ null, null, this.segmentMap.subMap(minTime, true, timeTo, true).entrySet().iterator());
}
@Override
public KeyValueIterator<Windowed<Bytes>, byte[]> all() {
removeExpiredSegments();
- final List<KeyValue<Windowed<Bytes>, byte[]>> returnSet = new LinkedList<>();
- for (final Entry<Long, NavigableMap<Bytes, byte[]>> segmentMapEntry : this.segmentMap.entrySet()) {
- for (final Entry<Bytes, byte[]> kvMapEntry : segmentMapEntry.getValue().entrySet()) {
- final Bytes keyBytes = retainDuplicates ? getKey(kvMapEntry.getKey()) : kvMapEntry.getKey();
- returnSet.add(getWindowedKeyValue(keyBytes, segmentMapEntry.getKey(), kvMapEntry.getValue()));
- }
- }
- return new InMemoryWindowedKeyValueIterator(returnSet.listIterator());
+ final long minTime = this.observedStreamTime - this.retentionPeriod;
+
+ return new WrappedWindowedKeyValueIterator(
+ null, null, this.segmentMap.tailMap(minTime, false).entrySet().iterator());
}
@Override
@@ -240,47 +243,12 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
this.open = false;
}
- private List<KeyValue<Long, byte[]>> fetchUnique(final Bytes key, final long timeFrom, final long timeTo) {
- final List<KeyValue<Long, byte[]>> returnSet = new LinkedList<>();
-
- // add one b/c records expire exactly retentionPeriod ms after created
- final long minTime = Math.max(timeFrom, this.observedStreamTime - this.retentionPeriod + 1);
-
- for (final Map.Entry<Long, NavigableMap<Bytes, byte[]>> segmentMapEntry : this.segmentMap.subMap(minTime, true, timeTo, true).entrySet()) {
- final byte[] value = segmentMapEntry.getValue().get(key);
- if (value != null) {
- returnSet.add(new KeyValue<>(segmentMapEntry.getKey(), value));
- }
- }
- return returnSet;
- }
-
- private List<KeyValue<Long, byte[]>> fetchWithDuplicates(final Bytes key, final long timeFrom, final long timeTo) {
- final List<KeyValue<Long, byte[]>> returnSet = new LinkedList<>();
-
- // add one b/c records expire exactly retentionPeriod ms after created
- final long minTime = Math.max(timeFrom, this.observedStreamTime - this.retentionPeriod + 1);
- final Bytes keyFrom = wrapForDups(key, 0);
- final Bytes keyTo = wrapForDups(key, Integer.MAX_VALUE);
-
- for (final Map.Entry<Long, NavigableMap<Bytes, byte[]>> segmentMapEntry : this.segmentMap.subMap(minTime, true, timeTo, true).entrySet()) {
- for (final Map.Entry<Bytes, byte[]> kvMapEntry : segmentMapEntry.getValue().subMap(keyFrom, true, keyTo, true).entrySet()) {
- returnSet.add(new KeyValue<>(segmentMapEntry.getKey(), kvMapEntry.getValue()));
- }
- }
- return returnSet;
- }
-
private void removeExpiredSegments() {
- final long minLiveTime = this.observedStreamTime - this.retentionPeriod;
- this.segmentMap.headMap(minLiveTime, true).clear();
- }
-
- private KeyValue<Windowed<Bytes>, byte[]> getWindowedKeyValue(final Bytes key,
- final long startTimestamp,
- final byte[] value) {
- final Windowed<Bytes> windowedK = new Windowed<>(key, new TimeWindow(startTimestamp, startTimestamp + windowSize));
- return new KeyValue<>(windowedK, value);
+ long minLiveTime = Math.max(0L, this.observedStreamTime - this.retentionPeriod + 1);
+ for (final InMemoryWindowStoreIteratorWrapper it : openIterators) {
+ minLiveTime = Math.min(minLiveTime, it.minTime());
+ }
+ this.segmentMap.headMap(minLiveTime, false).clear();
}
private void maybeUpdateSeqnumForDups() {
@@ -304,76 +272,166 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
}
- private static class InMemoryWindowStoreIterator implements WindowStoreIterator<byte[]> {
+ private abstract class InMemoryWindowStoreIteratorWrapper implements Comparable<InMemoryWindowStoreIteratorWrapper> {
+
+ private Iterator<Map.Entry<Long, ConcurrentNavigableMap<Bytes, byte[]>>> segmentIterator;
+ private Iterator<Map.Entry<Bytes, byte[]>> recordIterator;
+ private KeyValue<Bytes, byte[]> next;
+ private long currentTime;
- private ListIterator<KeyValue<Long, byte[]>> iterator;
+ private final boolean allKeys;
+ private Bytes keyFrom;
+ private Bytes keyTo;
- InMemoryWindowStoreIterator(final ListIterator<KeyValue<Long, byte[]>> iterator) {
- this.iterator = iterator;
+ // Default constructor sets up a dummy iterator when no results are returned (eg entire fetch range is expired)
+ InMemoryWindowStoreIteratorWrapper() {
+ this.allKeys = false;
+ recordIterator = null;
+ }
+
+ InMemoryWindowStoreIteratorWrapper(final Bytes keyFrom,
+ final Bytes keyTo,
+ final Iterator<Map.Entry<Long, ConcurrentNavigableMap<Bytes, byte[]>>> segmentIterator) {
+ this.allKeys = (keyFrom == null) && (keyTo == null);
+ if (retainDuplicates && !allKeys) {
+ this.keyFrom = wrapForDups(keyFrom, 0);
+ this.keyTo = wrapForDups(keyTo, Integer.MAX_VALUE);
+ } else {
+ this.keyFrom = keyFrom;
+ this.keyTo = keyTo;
+ }
+
+ this.segmentIterator = segmentIterator;
+ this.recordIterator = setRecordIterator();
+
+ openIterators.add(this);
}
- @Override
public boolean hasNext() {
- return iterator.hasNext();
+ if (next != null) {
+ return true;
+ }
+ if (recordIterator == null || (!recordIterator.hasNext() && !segmentIterator.hasNext())) {
+ return false;
+ }
+
+ next = getNext();
+ return next != null;
}
- @Override
- public KeyValue<Long, byte[]> next() {
- return iterator.next();
+ public void remove() {
+ throw new UnsupportedOperationException(
+ "remove() is not supported in " + getClass().getName());
}
- @Override
- public Long peekNextKey() {
- if (!hasNext()) {
- throw new NoSuchElementException();
+ public void close() {
+ openIterators.remove(this);
+ }
+
+ // getNext is only called when either recordIterator or segmentIterator has a next
+ // Note this does not guarantee a next record exists as the next segments may not contain any keys in range
+ protected KeyValue<Bytes, byte[]> getNext() {
+ while (!recordIterator.hasNext()) {
+ recordIterator = setRecordIterator();
+ if (recordIterator == null) {
+ return null;
+ }
+ }
+ final Map.Entry<Bytes, byte[]> nextRecord = recordIterator.next();
+ return new KeyValue<>(nextRecord.getKey(), nextRecord.getValue());
+ }
+
+ // Resets recordIterator to point to the next segment and returns null if there are no more segments
+ // Note it may not actually point to anything if no keys in range exist in the next segment
+ Iterator<Map.Entry<Bytes, byte[]>> setRecordIterator() {
+ if (!segmentIterator.hasNext()) {
+ return null;
+ }
+
+ final Map.Entry<Long, ConcurrentNavigableMap<Bytes, byte[]>> currentSegment = segmentIterator.next();
+ currentTime = currentSegment.getKey();
+
+ if (allKeys) {
+ return currentSegment.getValue().entrySet().iterator();
} else {
- final long next = iterator.next().key;
- iterator.previous();
- return next;
+ return currentSegment.getValue().subMap(keyFrom, true, keyTo, true).entrySet().iterator();
}
}
- @Override
- public void close() {
- iterator = null;
+ Long minTime() {
+ return currentTime;
}
- }
- private static class InMemoryWindowedKeyValueIterator implements KeyValueIterator<Windowed<Bytes>, byte[]> {
+ public int compareTo(final InMemoryWindowStoreIteratorWrapper other) {
+ return (int) (minTime() - other.minTime());
+ }
+ }
- ListIterator<KeyValue<Windowed<Bytes>, byte[]>> iterator;
+ private class WrappedInMemoryWindowStoreIterator extends InMemoryWindowStoreIteratorWrapper implements WindowStoreIterator<byte[]> {
- InMemoryWindowedKeyValueIterator(final ListIterator<KeyValue<Windowed<Bytes>, byte[]>> iterator) {
- this.iterator = iterator;
+ WrappedInMemoryWindowStoreIterator() {
+ super();
}
- @Override
- public boolean hasNext() {
- return iterator.hasNext();
+ WrappedInMemoryWindowStoreIterator(final Bytes keyFrom,
+ final Bytes keyTo,
+ final Iterator<Map.Entry<Long, ConcurrentNavigableMap<Bytes, byte[]>>> segmentIterator) {
+ super(keyFrom, keyTo, segmentIterator);
}
@Override
- public KeyValue<Windowed<Bytes>, byte[]> next() {
- return iterator.next();
+ public Long peekNextKey() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ return super.currentTime;
}
@Override
- public Windowed<Bytes> peekNextKey() {
+ public KeyValue<Long, byte[]> next() {
if (!hasNext()) {
throw new NoSuchElementException();
- } else {
- final Windowed<Bytes> next = iterator.next().key;
- iterator.previous();
- return next;
}
- }
- @Override
- public void close() {
- iterator = null;
+ final KeyValue<Long, byte[]> result = new KeyValue<>(super.currentTime, super.next.value);
+ super.next = null;
+ return result;
}
}
-}
+ private class WrappedWindowedKeyValueIterator extends InMemoryWindowStoreIteratorWrapper implements KeyValueIterator<Windowed<Bytes>, byte[]> {
+
+ WrappedWindowedKeyValueIterator() {
+ super();
+ }
+
+ WrappedWindowedKeyValueIterator(final Bytes keyFrom,
+ final Bytes keyTo,
+ final Iterator<Map.Entry<Long, ConcurrentNavigableMap<Bytes, byte[]>>> segmentIterator) {
+ super(keyFrom, keyTo, segmentIterator);
+ }
+
+ public Windowed<Bytes> peekNextKey() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ return getWindowedKey();
+ }
+
+ public KeyValue<Windowed<Bytes>, byte[]> next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ final KeyValue<Windowed<Bytes>, byte[]> result = new KeyValue<>(getWindowedKey(), super.next.value);
+ super.next = null;
+ return result;
+ }
+ private Windowed<Bytes> getWindowedKey() {
+ final Bytes key = retainDuplicates ? getKey(super.next.key) : super.next.key;
+ final TimeWindow timeWindow = new TimeWindow(super.currentTime, super.currentTime + windowSize);
+ return new Windowed<>(key, timeWindow);
+ }
+ }
+}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
index 5de4b44..e7f5ed0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
@@ -361,7 +361,7 @@ public class InMemoryWindowStoreTest {
setCurrentTime(currentTime);
windowStore.put(1, "five");
- final KeyValueIterator<Windowed<Integer>, String> iterator = windowStore.fetchAll(0L, currentTime);
+ KeyValueIterator<Windowed<Integer>, String> iterator = windowStore.fetchAll(0L, currentTime);
// effect of this put (expires next oldest record, adds new one) should not be reflected in the already fetched results
currentTime = currentTime + retentionPeriod / 4;
@@ -375,6 +375,15 @@ public class InMemoryWindowStoreTest {
assertEquals(windowedPair(1, "four", 3 * (retentionPeriod / 4)), iterator.next());
assertEquals(windowedPair(1, "five", retentionPeriod), iterator.next());
assertFalse(iterator.hasNext());
+
+ iterator = windowStore.fetchAll(0L, currentTime);
+
+ // If we fetch again after the last put, the second oldest record should have expired and newest should appear in results
+ assertEquals(windowedPair(1, "three", retentionPeriod / 2), iterator.next());
+ assertEquals(windowedPair(1, "four", 3 * (retentionPeriod / 4)), iterator.next());
+ assertEquals(windowedPair(1, "five", retentionPeriod), iterator.next());
+ assertEquals(windowedPair(1, "six", 5 * (retentionPeriod / 4)), iterator.next());
+ assertFalse(iterator.hasNext());
}
@Test
@@ -473,4 +482,101 @@ public class InMemoryWindowStoreTest {
final List<String> messages = appender.getMessages();
assertThat(messages, hasItem("Skipping record for expired segment."));
}
+
+ @Test
+ public void testIteratorMultiplePeekAndHasNext() {
+ windowStore = createInMemoryWindowStore(context, false);
+
+ long currentTime = 0;
+ setCurrentTime(currentTime);
+ windowStore.put(1, "one");
+
+ currentTime += windowSize * 10;
+ setCurrentTime(currentTime);
+ windowStore.put(2, "two");
+
+ currentTime += windowSize * 10;
+ setCurrentTime(currentTime);
+ windowStore.put(3, "three");
+
+ final KeyValueIterator<Windowed<Integer>, String> iterator = windowStore.fetch(1, 4, 0L, currentTime);
+
+ assertFalse(!iterator.hasNext());
+ assertFalse(!iterator.hasNext());
+ assertEquals(new Windowed<>(1, WindowKeySchema.timeWindowForSize(0L, windowSize)), iterator.peekNextKey());
+ assertEquals(new Windowed<>(1, WindowKeySchema.timeWindowForSize(0L, windowSize)), iterator.peekNextKey());
+
+ assertEquals(windowedPair(1, "one", 0), iterator.next());
+ assertEquals(windowedPair(2, "two", windowSize * 10), iterator.next());
+ assertEquals(windowedPair(3, "three", windowSize * 20), iterator.next());
+ assertFalse(iterator.hasNext());
+ }
+
+ @Test
+ public void shouldNotThrowConcurrentModificationException() {
+ windowStore = createInMemoryWindowStore(context, false);
+
+ long currentTime = 0;
+ setCurrentTime(currentTime);
+ windowStore.put(1, "one");
+
+ currentTime += windowSize * 10;
+ setCurrentTime(currentTime);
+ windowStore.put(1, "two");
+
+ final KeyValueIterator<Windowed<Integer>, String> iterator = windowStore.all();
+
+ currentTime += windowSize * 10;
+ setCurrentTime(currentTime);
+ windowStore.put(1, "three");
+
+ currentTime += windowSize * 10;
+ setCurrentTime(currentTime);
+ windowStore.put(2, "four");
+
+ // Iterator should return all records in store and not throw exception b/c some were added after fetch
+ assertEquals(windowedPair(1, "one", 0), iterator.next());
+ assertEquals(windowedPair(1, "two", windowSize * 10), iterator.next());
+ assertEquals(windowedPair(1, "three", windowSize * 20), iterator.next());
+ assertEquals(windowedPair(2, "four", windowSize * 30), iterator.next());
+ assertFalse(iterator.hasNext());
+ }
+
+ @Test
+ public void shouldNotExpireFromOpenIterator() {
+ windowStore = createInMemoryWindowStore(context, false);
+
+ windowStore.put(1, "one", 0L);
+ windowStore.put(1, "two", 10L);
+
+ windowStore.put(2, "one", 5L);
+ windowStore.put(2, "two", 15L);
+
+ final WindowStoreIterator<String> iterator1 = windowStore.fetch(1, 0L, 50L);
+ final WindowStoreIterator<String> iterator2 = windowStore.fetch(2, 0L, 50L);
+
+ // This put expires all four previous records, but they should still be returned from already open iterators
+ windowStore.put(1, "four", retentionPeriod + 50L);
+
+ assertEquals(new KeyValue<>(0L, "one"), iterator1.next());
+ assertEquals(new KeyValue<>(5L, "one"), iterator2.next());
+
+ assertEquals(new KeyValue<>(15L, "two"), iterator2.next());
+ assertEquals(new KeyValue<>(10L, "two"), iterator1.next());
+
+ assertFalse(iterator1.hasNext());
+ assertFalse(iterator2.hasNext());
+ }
+
+ @Test
+ public void shouldNotThrowExceptionWhenFetchRangeIsExpired() {
+ windowStore = createInMemoryWindowStore(context, false);
+
+ windowStore.put(1, "one", 0L);
+ windowStore.put(1, "two", retentionPeriod);
+
+ final WindowStoreIterator<String> iterator = windowStore.fetch(1, 0L, 10L);
+
+ assertFalse(iterator.hasNext());
+ }
}