You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/03/06 19:02:37 UTC

[kafka] branch trunk updated: KAFKA-8007: Avoid copying on fetch in InMemoryWindowStore (#6335)

This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4788863  KAFKA-8007: Avoid copying on fetch in InMemoryWindowStore (#6335)
4788863 is described below

commit 47888630a05260f548988d943a633120845f767d
Author: A. Sophie Blee-Goldman <ab...@gmail.com>
AuthorDate: Wed Mar 6 11:02:27 2019 -0800

    KAFKA-8007: Avoid copying on fetch in InMemoryWindowStore (#6335)
    
    Rewrote the InMemoryWindowStore implementation by moving the work of a fetch to the iterator, and cleaned up the iterators as well.
    
    Reviewers: Guozhang Wang <wa...@gmail.com>, Bill Bejeck <bb...@gmail.com>
---
 .../state/internals/InMemoryWindowStore.java       | 296 ++++++++++++---------
 .../state/internals/InMemoryWindowStoreTest.java   | 108 +++++++-
 2 files changed, 284 insertions(+), 120 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
index 67eec0d..7d1b279 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
@@ -17,6 +17,11 @@
 package org.apache.kafka.streams.state.internals;
 
 import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.utils.Bytes;
@@ -33,20 +38,15 @@ import org.apache.kafka.streams.state.KeyValueIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.LinkedList;
-import java.util.List;
-import java.util.ListIterator;
 import java.util.Map;
-import java.util.Map.Entry;
-import java.util.NavigableMap;
 import java.util.NoSuchElementException;
-import java.util.TreeMap;
 
 import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.EXPIRED_WINDOW_RECORD_DROP;
 import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount;
 import static org.apache.kafka.streams.state.internals.WindowKeySchema.extractStoreKeyBytes;
 import static org.apache.kafka.streams.state.internals.WindowKeySchema.extractStoreTimestamp;
 
+
 public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
 
     private static final Logger LOG = LoggerFactory.getLogger(InMemoryWindowStore.class);
@@ -63,22 +63,24 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
     private final long windowSize;
     private final boolean retainDuplicates;
 
-    private final NavigableMap<Long, NavigableMap<Bytes, byte[]>> segmentMap;
+    private final ConcurrentNavigableMap<Long, ConcurrentNavigableMap<Bytes, byte[]>> segmentMap;
+    private final Set<InMemoryWindowStoreIteratorWrapper> openIterators;
 
     private volatile boolean open = false;
 
     InMemoryWindowStore(final String name,
-                               final long retentionPeriod,
-                               final long windowSize,
-                               final boolean retainDuplicates,
-                               final String metricScope) {
+                        final long retentionPeriod,
+                        final long windowSize,
+                        final boolean retainDuplicates,
+                        final String metricScope) {
         this.name = name;
         this.retentionPeriod = retentionPeriod;
         this.windowSize = windowSize;
         this.retainDuplicates = retainDuplicates;
         this.metricScope = metricScope;
 
-        this.segmentMap = new TreeMap<>();
+        this.openIterators = ConcurrentHashMap.newKeySet();
+        this.segmentMap = new ConcurrentSkipListMap<>();
     }
 
     @Override
@@ -132,7 +134,7 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
             LOG.debug("Skipping record for expired segment.");
         } else {
             if (value != null) {
-                this.segmentMap.computeIfAbsent(windowStartTimestamp, t -> new TreeMap<>());
+                this.segmentMap.computeIfAbsent(windowStartTimestamp, t -> new ConcurrentSkipListMap<>());
                 this.segmentMap.get(windowStartTimestamp).put(keyBytes, value);
             } else {
                 this.segmentMap.computeIfPresent(windowStartTimestamp, (t, kvMap) -> {
@@ -147,7 +149,11 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
     public byte[] fetch(final Bytes key, final long windowStartTimestamp) {
         removeExpiredSegments();
 
-        final NavigableMap<Bytes, byte[]> kvMap = this.segmentMap.get(windowStartTimestamp);
+        if (windowStartTimestamp <= this.observedStreamTime - this.retentionPeriod) {
+            return null;
+        }
+
+        final ConcurrentNavigableMap<Bytes, byte[]> kvMap = this.segmentMap.get(windowStartTimestamp);
         if (kvMap == null) {
             return null;
         } else {
@@ -159,9 +165,16 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
     @Override
     public WindowStoreIterator<byte[]> fetch(final Bytes key, final long timeFrom, final long timeTo) {
         removeExpiredSegments();
-        final List<KeyValue<Long, byte[]>> records = retainDuplicates ? fetchWithDuplicates(key, timeFrom, timeTo) : fetchUnique(key, timeFrom, timeTo);
 
-        return new InMemoryWindowStoreIterator(records.listIterator());
+        // add one b/c records expire exactly retentionPeriod ms after created
+        final long minTime = Math.max(timeFrom, this.observedStreamTime - this.retentionPeriod + 1);
+
+        if (timeTo < minTime) {
+            return new WrappedInMemoryWindowStoreIterator();
+        }
+
+        return new WrappedInMemoryWindowStoreIterator(
+            key, key, this.segmentMap.subMap(minTime, true, timeTo, true).entrySet().iterator());
     }
 
     @Deprecated
@@ -171,52 +184,42 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
                                                            final long timeFrom,
                                                            final long timeTo) {
         removeExpiredSegments();
-        final List<KeyValue<Windowed<Bytes>, byte[]>> returnSet = new LinkedList<>();
 
         // add one b/c records expire exactly retentionPeriod ms after created
         final long minTime = Math.max(timeFrom, this.observedStreamTime - this.retentionPeriod + 1);
-        final Bytes keyFrom = retainDuplicates ? wrapForDups(from, 0) : from;
-        final Bytes keyTo = retainDuplicates ? wrapForDups(to, Integer.MAX_VALUE) : to;
 
-        for (final Map.Entry<Long, NavigableMap<Bytes, byte[]>> segmentMapEntry : this.segmentMap.subMap(minTime, true, timeTo, true).entrySet()) {
-            for (final Map.Entry<Bytes, byte[]> kvMapEntry : segmentMapEntry.getValue().subMap(keyFrom, true, keyTo, true).entrySet()) {
-                final Bytes keyBytes = retainDuplicates ? getKey(kvMapEntry.getKey()) : kvMapEntry.getKey();
-                returnSet.add(getWindowedKeyValue(keyBytes, segmentMapEntry.getKey(), kvMapEntry.getValue()));
-            }
+        if (timeTo < minTime) {
+            return new WrappedWindowedKeyValueIterator();
         }
-        return new InMemoryWindowedKeyValueIterator(returnSet.listIterator());
+
+        return new WrappedWindowedKeyValueIterator(
+            from, to, this.segmentMap.subMap(minTime, true, timeTo, true).entrySet().iterator());
     }
 
     @Deprecated
     @Override
     public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long timeFrom, final long timeTo) {
         removeExpiredSegments();
-        final List<KeyValue<Windowed<Bytes>, byte[]>> returnSet = new LinkedList<>();
 
         // add one b/c records expire exactly retentionPeriod ms after created
         final long minTime = Math.max(timeFrom, this.observedStreamTime - this.retentionPeriod + 1);
 
-        for (final Map.Entry<Long, NavigableMap<Bytes, byte[]>> segmentMapEntry : this.segmentMap.subMap(minTime, true, timeTo, true).entrySet()) {
-            for (final Map.Entry<Bytes,  byte[]> kvMapEntry : segmentMapEntry.getValue().entrySet()) {
-                final Bytes keyBytes = retainDuplicates ? getKey(kvMapEntry.getKey()) : kvMapEntry.getKey();
-                returnSet.add(getWindowedKeyValue(keyBytes, segmentMapEntry.getKey(), kvMapEntry.getValue()));
-            }
+        if (timeTo < minTime) {
+            return new WrappedWindowedKeyValueIterator();
         }
-        return new InMemoryWindowedKeyValueIterator(returnSet.listIterator());
+
+        return new WrappedWindowedKeyValueIterator(
+            null, null, this.segmentMap.subMap(minTime, true, timeTo, true).entrySet().iterator());
     }
 
     @Override
     public KeyValueIterator<Windowed<Bytes>, byte[]> all() {
         removeExpiredSegments();
-        final List<KeyValue<Windowed<Bytes>, byte[]>> returnSet = new LinkedList<>();
 
-        for (final Entry<Long, NavigableMap<Bytes, byte[]>> segmentMapEntry : this.segmentMap.entrySet()) {
-            for (final Entry<Bytes, byte[]> kvMapEntry : segmentMapEntry.getValue().entrySet()) {
-                final Bytes keyBytes = retainDuplicates ? getKey(kvMapEntry.getKey()) : kvMapEntry.getKey();
-                returnSet.add(getWindowedKeyValue(keyBytes, segmentMapEntry.getKey(), kvMapEntry.getValue()));
-            }
-        }
-        return new InMemoryWindowedKeyValueIterator(returnSet.listIterator());
+        final long minTime = this.observedStreamTime - this.retentionPeriod;
+
+        return new WrappedWindowedKeyValueIterator(
+            null, null, this.segmentMap.tailMap(minTime, false).entrySet().iterator());
     }
 
     @Override
@@ -240,47 +243,12 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
         this.open = false;
     }
 
-    private List<KeyValue<Long, byte[]>> fetchUnique(final Bytes key, final long timeFrom, final long timeTo) {
-        final List<KeyValue<Long, byte[]>> returnSet = new LinkedList<>();
-
-        // add one b/c records expire exactly retentionPeriod ms after created
-        final long minTime = Math.max(timeFrom, this.observedStreamTime - this.retentionPeriod + 1);
-
-        for (final Map.Entry<Long, NavigableMap<Bytes, byte[]>> segmentMapEntry : this.segmentMap.subMap(minTime, true, timeTo, true).entrySet()) {
-            final byte[] value = segmentMapEntry.getValue().get(key);
-            if (value != null) {
-                returnSet.add(new KeyValue<>(segmentMapEntry.getKey(), value));
-            }
-        }
-        return returnSet;
-    }
-
-    private List<KeyValue<Long, byte[]>> fetchWithDuplicates(final Bytes key, final long timeFrom, final long timeTo) {
-        final List<KeyValue<Long, byte[]>> returnSet = new LinkedList<>();
-
-        // add one b/c records expire exactly retentionPeriod ms after created
-        final long minTime = Math.max(timeFrom, this.observedStreamTime - this.retentionPeriod + 1);
-        final Bytes keyFrom = wrapForDups(key, 0);
-        final Bytes keyTo = wrapForDups(key, Integer.MAX_VALUE);
-
-        for (final Map.Entry<Long, NavigableMap<Bytes, byte[]>> segmentMapEntry : this.segmentMap.subMap(minTime, true, timeTo, true).entrySet()) {
-            for (final Map.Entry<Bytes, byte[]> kvMapEntry : segmentMapEntry.getValue().subMap(keyFrom, true, keyTo, true).entrySet()) {
-                returnSet.add(new KeyValue<>(segmentMapEntry.getKey(), kvMapEntry.getValue()));
-            }
-        }
-        return returnSet;
-    }
-
     private void removeExpiredSegments() {
-        final long minLiveTime = this.observedStreamTime - this.retentionPeriod;
-        this.segmentMap.headMap(minLiveTime, true).clear();
-    }
-
-    private KeyValue<Windowed<Bytes>, byte[]> getWindowedKeyValue(final Bytes key,
-                                                                  final long startTimestamp,
-                                                                  final byte[] value) {
-        final Windowed<Bytes> windowedK = new Windowed<>(key, new TimeWindow(startTimestamp, startTimestamp + windowSize));
-        return new KeyValue<>(windowedK, value);
+        long minLiveTime = Math.max(0L, this.observedStreamTime - this.retentionPeriod + 1);
+        for (final InMemoryWindowStoreIteratorWrapper it : openIterators) {
+            minLiveTime = Math.min(minLiveTime, it.minTime());
+        }
+        this.segmentMap.headMap(minLiveTime, false).clear();
     }
 
     private void maybeUpdateSeqnumForDups() {
@@ -304,76 +272,166 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
 
     }
 
-    private static class InMemoryWindowStoreIterator implements WindowStoreIterator<byte[]> {
+    private abstract class InMemoryWindowStoreIteratorWrapper implements Comparable<InMemoryWindowStoreIteratorWrapper> {
+
+        private Iterator<Map.Entry<Long, ConcurrentNavigableMap<Bytes, byte[]>>> segmentIterator;
+        private Iterator<Map.Entry<Bytes, byte[]>> recordIterator;
+        private KeyValue<Bytes, byte[]> next;
+        private long currentTime;
 
-        private ListIterator<KeyValue<Long, byte[]>> iterator;
+        private final boolean allKeys;
+        private Bytes keyFrom;
+        private Bytes keyTo;
 
-        InMemoryWindowStoreIterator(final ListIterator<KeyValue<Long, byte[]>> iterator) {
-            this.iterator = iterator;
+        // Default constructor sets up a dummy iterator when no results are returned (eg entire fetch range is expired)
+        InMemoryWindowStoreIteratorWrapper() {
+            this.allKeys = false;
+            recordIterator = null;
+        }
+
+        InMemoryWindowStoreIteratorWrapper(final Bytes keyFrom,
+                                           final Bytes keyTo,
+                                           final Iterator<Map.Entry<Long, ConcurrentNavigableMap<Bytes, byte[]>>> segmentIterator) {
+            this.allKeys = (keyFrom == null) && (keyTo == null);
+            if (retainDuplicates && !allKeys) {
+                this.keyFrom = wrapForDups(keyFrom, 0);
+                this.keyTo = wrapForDups(keyTo, Integer.MAX_VALUE);
+            } else {
+                this.keyFrom = keyFrom;
+                this.keyTo = keyTo;
+            }
+
+            this.segmentIterator = segmentIterator;
+            this.recordIterator = setRecordIterator();
+
+            openIterators.add(this);
         }
 
-        @Override
         public boolean hasNext() {
-            return iterator.hasNext();
+            if (next != null) {
+                return true;
+            }
+            if (recordIterator == null || (!recordIterator.hasNext() && !segmentIterator.hasNext())) {
+                return false;
+            }
+
+            next = getNext();
+            return next != null;
         }
 
-        @Override
-        public KeyValue<Long, byte[]> next() {
-            return iterator.next();
+        public void remove() {
+            throw new UnsupportedOperationException(
+                "remove() is not supported in " + getClass().getName());
         }
 
-        @Override
-        public Long peekNextKey() {
-            if (!hasNext()) {
-                throw new NoSuchElementException();
+        public void close() {
+            openIterators.remove(this);
+        }
+
+        // getNext is only called when either recordIterator or segmentIterator has a next
+        // Note this does not guarantee a next record exists as the next segments may not contain any keys in range
+        protected KeyValue<Bytes, byte[]> getNext() {
+            while (!recordIterator.hasNext()) {
+                recordIterator = setRecordIterator();
+                if (recordIterator == null) {
+                    return null;
+                }
+            }
+            final Map.Entry<Bytes, byte[]> nextRecord = recordIterator.next();
+            return new KeyValue<>(nextRecord.getKey(), nextRecord.getValue());
+        }
+
+        // Resets recordIterator to point to the next segment and returns null if there are no more segments
+        // Note it may not actually point to anything if no keys in range exist in the next segment
+        Iterator<Map.Entry<Bytes, byte[]>> setRecordIterator() {
+            if (!segmentIterator.hasNext()) {
+                return null;
+            }
+
+            final Map.Entry<Long, ConcurrentNavigableMap<Bytes, byte[]>> currentSegment = segmentIterator.next();
+            currentTime = currentSegment.getKey();
+
+            if (allKeys) {
+                return currentSegment.getValue().entrySet().iterator();
             } else {
-                final long next = iterator.next().key;
-                iterator.previous();
-                return next;
+                return currentSegment.getValue().subMap(keyFrom, true, keyTo, true).entrySet().iterator();
             }
         }
 
-        @Override
-        public void close() {
-            iterator = null;
+        Long minTime() {
+            return currentTime;
         }
-    }
 
-    private static class InMemoryWindowedKeyValueIterator implements KeyValueIterator<Windowed<Bytes>, byte[]> {
+        public int compareTo(final InMemoryWindowStoreIteratorWrapper other) {
+            return (int) (minTime() - other.minTime());
+        }
+    }
 
-        ListIterator<KeyValue<Windowed<Bytes>, byte[]>> iterator;
+    private class WrappedInMemoryWindowStoreIterator extends InMemoryWindowStoreIteratorWrapper implements WindowStoreIterator<byte[]>  {
 
-        InMemoryWindowedKeyValueIterator(final ListIterator<KeyValue<Windowed<Bytes>, byte[]>> iterator) {
-            this.iterator = iterator;
+        WrappedInMemoryWindowStoreIterator() {
+            super();
         }
 
-        @Override
-        public boolean hasNext() {
-            return iterator.hasNext();
+        WrappedInMemoryWindowStoreIterator(final Bytes keyFrom,
+                                           final Bytes keyTo,
+                                           final Iterator<Map.Entry<Long, ConcurrentNavigableMap<Bytes, byte[]>>> segmentIterator) {
+            super(keyFrom, keyTo, segmentIterator);
         }
 
         @Override
-        public KeyValue<Windowed<Bytes>, byte[]> next() {
-            return iterator.next();
+        public Long peekNextKey() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            return super.currentTime;
         }
 
         @Override
-        public Windowed<Bytes> peekNextKey() {
+        public KeyValue<Long, byte[]> next() {
             if (!hasNext()) {
                 throw new NoSuchElementException();
-            } else {
-                final Windowed<Bytes> next = iterator.next().key;
-                iterator.previous();
-                return next;
             }
-        }
 
-        @Override
-        public void close() {
-            iterator = null;
+            final KeyValue<Long, byte[]> result = new KeyValue<>(super.currentTime, super.next.value);
+            super.next = null;
+            return result;
         }
     }
-}
 
+    private class WrappedWindowedKeyValueIterator extends InMemoryWindowStoreIteratorWrapper implements KeyValueIterator<Windowed<Bytes>, byte[]> {
+
+        WrappedWindowedKeyValueIterator() {
+            super();
+        }
+
+        WrappedWindowedKeyValueIterator(final Bytes keyFrom,
+                                        final Bytes keyTo,
+                                        final Iterator<Map.Entry<Long, ConcurrentNavigableMap<Bytes, byte[]>>> segmentIterator) {
+            super(keyFrom, keyTo, segmentIterator);
+        }
+
+        public Windowed<Bytes> peekNextKey() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            return getWindowedKey();
+        }
+
+        public KeyValue<Windowed<Bytes>, byte[]> next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
 
+            final KeyValue<Windowed<Bytes>, byte[]> result = new KeyValue<>(getWindowedKey(), super.next.value);
+            super.next = null;
+            return result;
+        }
 
+        private Windowed<Bytes> getWindowedKey() {
+            final Bytes key = retainDuplicates ? getKey(super.next.key) : super.next.key;
+            final TimeWindow timeWindow = new TimeWindow(super.currentTime, super.currentTime + windowSize);
+            return new Windowed<>(key, timeWindow);
+        }
+    }
+}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
index 5de4b44..e7f5ed0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
@@ -361,7 +361,7 @@ public class InMemoryWindowStoreTest {
         setCurrentTime(currentTime);
         windowStore.put(1, "five");
 
-        final KeyValueIterator<Windowed<Integer>, String> iterator = windowStore.fetchAll(0L, currentTime);
+        KeyValueIterator<Windowed<Integer>, String> iterator = windowStore.fetchAll(0L, currentTime);
 
         // effect of this put (expires next oldest record, adds new one) should not be reflected in the already fetched results
         currentTime = currentTime + retentionPeriod / 4;
@@ -375,6 +375,15 @@ public class InMemoryWindowStoreTest {
         assertEquals(windowedPair(1, "four", 3 * (retentionPeriod / 4)), iterator.next());
         assertEquals(windowedPair(1, "five", retentionPeriod), iterator.next());
         assertFalse(iterator.hasNext());
+
+        iterator = windowStore.fetchAll(0L, currentTime);
+
+        // If we fetch again after the last put, the second oldest record should have expired and newest should appear in results
+        assertEquals(windowedPair(1, "three", retentionPeriod / 2), iterator.next());
+        assertEquals(windowedPair(1, "four", 3 * (retentionPeriod / 4)), iterator.next());
+        assertEquals(windowedPair(1, "five", retentionPeriod), iterator.next());
+        assertEquals(windowedPair(1, "six", 5 * (retentionPeriod / 4)), iterator.next());
+        assertFalse(iterator.hasNext());
     }
 
     @Test
@@ -473,4 +482,101 @@ public class InMemoryWindowStoreTest {
         final List<String> messages = appender.getMessages();
         assertThat(messages, hasItem("Skipping record for expired segment."));
     }
+
+    @Test
+    public void testIteratorMultiplePeekAndHasNext() {
+        windowStore = createInMemoryWindowStore(context, false);
+
+        long currentTime = 0;
+        setCurrentTime(currentTime);
+        windowStore.put(1, "one");
+
+        currentTime += windowSize * 10;
+        setCurrentTime(currentTime);
+        windowStore.put(2, "two");
+
+        currentTime += windowSize * 10;
+        setCurrentTime(currentTime);
+        windowStore.put(3, "three");
+
+        final KeyValueIterator<Windowed<Integer>, String> iterator = windowStore.fetch(1, 4, 0L, currentTime);
+
+        assertFalse(!iterator.hasNext());
+        assertFalse(!iterator.hasNext());
+        assertEquals(new Windowed<>(1, WindowKeySchema.timeWindowForSize(0L, windowSize)), iterator.peekNextKey());
+        assertEquals(new Windowed<>(1, WindowKeySchema.timeWindowForSize(0L, windowSize)), iterator.peekNextKey());
+
+        assertEquals(windowedPair(1, "one", 0), iterator.next());
+        assertEquals(windowedPair(2, "two", windowSize * 10), iterator.next());
+        assertEquals(windowedPair(3, "three", windowSize * 20), iterator.next());
+        assertFalse(iterator.hasNext());
+    }
+
+    @Test
+    public void shouldNotThrowConcurrentModificationException() {
+        windowStore = createInMemoryWindowStore(context, false);
+
+        long currentTime = 0;
+        setCurrentTime(currentTime);
+        windowStore.put(1, "one");
+
+        currentTime += windowSize * 10;
+        setCurrentTime(currentTime);
+        windowStore.put(1, "two");
+
+        final KeyValueIterator<Windowed<Integer>, String> iterator = windowStore.all();
+
+        currentTime += windowSize * 10;
+        setCurrentTime(currentTime);
+        windowStore.put(1, "three");
+
+        currentTime += windowSize * 10;
+        setCurrentTime(currentTime);
+        windowStore.put(2, "four");
+
+        // Iterator should return all records in store and not throw exception b/c some were added after fetch
+        assertEquals(windowedPair(1, "one", 0), iterator.next());
+        assertEquals(windowedPair(1, "two", windowSize * 10), iterator.next());
+        assertEquals(windowedPair(1, "three", windowSize * 20), iterator.next());
+        assertEquals(windowedPair(2, "four", windowSize * 30), iterator.next());
+        assertFalse(iterator.hasNext());
+    }
+
+    @Test
+    public void shouldNotExpireFromOpenIterator() {
+        windowStore = createInMemoryWindowStore(context, false);
+
+        windowStore.put(1, "one", 0L);
+        windowStore.put(1, "two", 10L);
+
+        windowStore.put(2, "one", 5L);
+        windowStore.put(2, "two", 15L);
+
+        final WindowStoreIterator<String> iterator1 = windowStore.fetch(1, 0L, 50L);
+        final WindowStoreIterator<String> iterator2 = windowStore.fetch(2, 0L, 50L);
+
+        // This put expires all four previous records, but they should still be returned from already open iterators
+        windowStore.put(1, "four", retentionPeriod + 50L);
+
+        assertEquals(new KeyValue<>(0L, "one"), iterator1.next());
+        assertEquals(new KeyValue<>(5L, "one"), iterator2.next());
+
+        assertEquals(new KeyValue<>(15L, "two"), iterator2.next());
+        assertEquals(new KeyValue<>(10L, "two"), iterator1.next());
+
+        assertFalse(iterator1.hasNext());
+        assertFalse(iterator2.hasNext());
+    }
+
+    @Test
+    public void shouldNotThrowExceptionWhenFetchRangeIsExpired() {
+        windowStore = createInMemoryWindowStore(context, false);
+
+        windowStore.put(1, "one", 0L);
+        windowStore.put(1, "two", retentionPeriod);
+
+        final WindowStoreIterator<String> iterator = windowStore.fetch(1, 0L, 10L);
+
+        assertFalse(iterator.hasNext());
+    }
 }