You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2021/04/18 18:19:46 UTC

[kafka] branch trunk updated: KAFKA-10847: Delete Time-ordered duplicated records using deleteRange() internally (#10537)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 15c24da  KAFKA-10847: Delete Time-ordered duplicated records using deleteRange() internally (#10537)
15c24da is described below

commit 15c24da888bc0a2582fd84d89fa1b84354fede64
Author: Sergio Peña <se...@confluent.io>
AuthorDate: Sun Apr 18 13:18:09 2021 -0500

    KAFKA-10847: Delete Time-ordered duplicated records using deleteRange() internally (#10537)
    
    This PR changes the TimeOrderedKeySchema composite key from time-seq-key -> time-key-seq to allow deletion of duplicated time-key records using the RocksDB deleteRange API. It also removes all duplicates when put(key, null) is called. Currently, the put(key, null) was a no-op, which was causing problems because there was no way to delete any keys when duplicates are allowed.
    
    The RocksDB deleteRange(keyFrom, keyTo) deletes a range of keys from keyFrom (inclusive) to keyTo (exclusive). To make keyTo inclusive, I incremented the end key by one when calling the RocksDBAccessor.
    
    Reviewers: Guozhang Wang <wa...@gmail.com>
---
 .../AbstractRocksDBSegmentedBytesStore.java        |   9 +
 .../streams/state/internals/KeyValueSegment.java   |   6 +
 .../streams/state/internals/RocksDBStore.java      |  26 +++
 .../internals/RocksDBTimeOrderedWindowStore.java   |  29 ++-
 .../state/internals/RocksDBTimestampedStore.java   |  16 ++
 .../kafka/streams/state/internals/Segment.java     |   1 +
 .../state/internals/SegmentedBytesStore.java       |  20 ++
 .../state/internals/TimeOrderedKeySchema.java      | 106 +++++------
 .../state/internals/TimestampedSegment.java        |   6 +
 .../AbstractRocksDBSegmentedBytesStoreTest.java    |   2 +-
 .../RocksDBTimeOrderedWindowStoreTest.java         | 205 +++++++++++++++++++--
 .../state/internals/TimeOrderedKeySchemaTest.java  |  93 ----------
 12 files changed, 325 insertions(+), 194 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
index 4c545a5..51fb3ca 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
@@ -197,6 +197,15 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
     }
 
     @Override
+    public void remove(final Bytes key, final long timestamp) {
+        final Bytes keyBytes = keySchema.toStoreBinaryKeyPrefix(key, timestamp);
+        final S segment = segments.getSegmentForTimestamp(timestamp);
+        if (segment != null) {
+            segment.deleteRange(keyBytes, keyBytes);
+        }
+    }
+
+    @Override
     public void put(final Bytes key,
                     final byte[] value) {
         final long timestamp = keySchema.segmentTimestamp(key);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java
index b6d6504..66c55fc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
 
@@ -41,6 +42,11 @@ class KeyValueSegment extends RocksDBStore implements Comparable<KeyValueSegment
     }
 
     @Override
+    public synchronized void deleteRange(final Bytes keyFrom, final Bytes keyTo) {
+        super.deleteRange(keyFrom, keyTo);
+    }
+
+    @Override
     public int compareTo(final KeyValueSegment segment) {
         return Long.compare(id, segment.id);
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 12ba4eb..c03f86d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -349,6 +349,16 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
         return oldValue;
     }
 
+    void deleteRange(final Bytes keyFrom, final Bytes keyTo) {
+        Objects.requireNonNull(keyFrom, "keyFrom cannot be null");
+        Objects.requireNonNull(keyTo, "keyTo cannot be null");
+
+        validateStoreOpen();
+
+        // End of key is exclusive, so we increment it by 1 byte to make keyTo inclusive
+        dbAccessor.deleteRange(keyFrom.get(), Bytes.increment(keyTo).get());
+    }
+
     @Override
     public synchronized KeyValueIterator<Bytes, byte[]> range(final Bytes from,
                                                               final Bytes to) {
@@ -524,6 +534,12 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
                                               final Bytes to,
                                               final boolean forward);
 
+        /**
+         * Deletes keys entries in the range ['from', 'to'], including 'from' and excluding 'to'.
+         */
+        void deleteRange(final byte[] from,
+                         final byte[] to);
+
         KeyValueIterator<Bytes, byte[]> all(final boolean forward);
 
         KeyValueIterator<Bytes, byte[]> prefixScan(final Bytes prefix);
@@ -604,6 +620,16 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
         }
 
         @Override
+        public void deleteRange(final byte[] from, final byte[] to) {
+            try {
+                db.deleteRange(columnFamily, wOptions, from, to);
+            } catch (final RocksDBException e) {
+                // String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
+                throw new ProcessorStateException("Error while removing key from store " + name, e);
+            }
+        }
+
+        @Override
         public KeyValueIterator<Bytes, byte[]> all(final boolean forward) {
             final RocksIterator innerIterWithTimestamp = db.newIterator(columnFamily);
             if (forward) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java
index 3efeb32..f8ba883 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java
@@ -52,29 +52,29 @@ public class RocksDBTimeOrderedWindowStore
 
     @Override
     public void put(final Bytes key, final byte[] value, final long timestamp) {
-        // Skip if value is null and duplicates are allowed since this delete is a no-op
         if (!(value == null && retainDuplicates)) {
             maybeUpdateSeqnumForDups();
             wrapped().put(TimeOrderedKeySchema.toStoreKeyBinary(key, timestamp, seqnum), value);
+        } else {
+            // Delete all duplicates for the specified key and timestamp
+            wrapped().remove(key, timestamp);
         }
     }
 
     @Override
     public byte[] fetch(final Bytes key, final long timestamp) {
-        return wrapped().get(TimeOrderedKeySchema.toStoreKeyBinary(key, timestamp, seqnum));
+        throw new UnsupportedOperationException();
     }
 
-    @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed
+    @SuppressWarnings("deprecation")
     @Override
     public WindowStoreIterator<byte[]> fetch(final Bytes key, final long timeFrom, final long timeTo) {
-        final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().fetch(key, timeFrom, timeTo);
-        return new TimeOrderedWindowStoreIteratorWrapper(bytesIterator, windowSize).valuesIterator();
+        throw new UnsupportedOperationException();
     }
 
     @Override
     public WindowStoreIterator<byte[]> backwardFetch(final Bytes key, final long timeFrom, final long timeTo) {
-        final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().backwardFetch(key, timeFrom, timeTo);
-        return new TimeOrderedWindowStoreIteratorWrapper(bytesIterator, windowSize).valuesIterator();
+        throw new UnsupportedOperationException();
     }
 
     @SuppressWarnings("deprecation") // note, this method must be kept if super#fetch(...) is removed
@@ -83,8 +83,7 @@ public class RocksDBTimeOrderedWindowStore
                                                            final Bytes keyTo,
                                                            final long timeFrom,
                                                            final long timeTo) {
-        final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().fetch(keyFrom, keyTo, timeFrom, timeTo);
-        return new TimeOrderedWindowStoreIteratorWrapper(bytesIterator, windowSize).keyValueIterator();
+        throw new UnsupportedOperationException();
     }
 
     @Override
@@ -92,8 +91,7 @@ public class RocksDBTimeOrderedWindowStore
                                                                    final Bytes keyTo,
                                                                    final long timeFrom,
                                                                    final long timeTo) {
-        final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().backwardFetch(keyFrom, keyTo, timeFrom, timeTo);
-        return new TimeOrderedWindowStoreIteratorWrapper(bytesIterator, windowSize).keyValueIterator();
+        throw new UnsupportedOperationException();
     }
 
     @Override
@@ -104,21 +102,18 @@ public class RocksDBTimeOrderedWindowStore
 
     @Override
     public KeyValueIterator<Windowed<Bytes>, byte[]> backwardAll() {
-        final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().backwardAll();
-        return new TimeOrderedWindowStoreIteratorWrapper(bytesIterator, windowSize).keyValueIterator();
+        throw new UnsupportedOperationException();
     }
 
     @SuppressWarnings("deprecation") // note, this method must be kept if super#fetchAll(...) is removed
     @Override
     public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long timeFrom, final long timeTo) {
-        final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().fetchAll(timeFrom, timeTo);
-        return new TimeOrderedWindowStoreIteratorWrapper(bytesIterator, windowSize).keyValueIterator();
+        throw new UnsupportedOperationException();
     }
 
     @Override
     public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetchAll(final long timeFrom, final long timeTo) {
-        final KeyValueIterator<Bytes, byte[]> bytesIterator = wrapped().backwardFetchAll(timeFrom, timeTo);
-        return new TimeOrderedWindowStoreIteratorWrapper(bytesIterator, windowSize).keyValueIterator();
+        throw new UnsupportedOperationException();
     }
 
     private void maybeUpdateSeqnumForDups() {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
index db3d175..dafbc8f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java
@@ -206,6 +206,22 @@ public class RocksDBTimestampedStore extends RocksDBStore implements Timestamped
         }
 
         @Override
+        public void deleteRange(final byte[] from, final byte[] to) {
+            try {
+                db.deleteRange(oldColumnFamily, wOptions, from, to);
+            } catch (final RocksDBException e) {
+                // String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
+                throw new ProcessorStateException("Error while removing key from store " + name, e);
+            }
+            try {
+                db.deleteRange(newColumnFamily, wOptions, from, to);
+            } catch (final RocksDBException e) {
+                // String format is happening in wrapping stores. So formatted message is thrown from wrapping stores.
+                throw new ProcessorStateException("Error while removing key from store " + name, e);
+            }
+        }
+
+        @Override
         public KeyValueIterator<Bytes, byte[]> all(final boolean forward) {
             final RocksIterator innerIterWithTimestamp = db.newIterator(newColumnFamily);
             final RocksIterator innerIterNoTimestamp = db.newIterator(oldColumnFamily);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
index 9fddc16..ea3b89a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
@@ -25,4 +25,5 @@ public interface Segment extends KeyValueStore<Bytes, byte[]>, BatchWritingStore
 
     void destroy() throws IOException;
 
+    void deleteRange(Bytes keyFrom, Bytes keyTo);
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
index 79ada1f..4519929 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
@@ -110,6 +110,14 @@ public interface SegmentedBytesStore extends StateStore {
     void remove(Bytes key);
 
     /**
+     * Remove all duplicated records with the provided key in the specified timestamp.
+     *
+     * @param key   the segmented key to remove
+     * @param timestamp  the timestamp to match
+     */
+    void remove(Bytes key, long timestamp);
+
+    /**
      * Write a new value to the store with the provided key. The key
      * should be a composite of the record key, and the timestamp information etc
      * as described by the {@link KeySchema}
@@ -152,6 +160,18 @@ public interface SegmentedBytesStore extends StateStore {
         Bytes lowerRange(final Bytes key, final long from);
 
         /**
+         * Given a record key and a time, construct a Segmented key to search when performing
+         * prefixed queries.
+         *
+         * @param key
+         * @param timestamp
+         * @return  The key that represents the prefixed Segmented key in bytes.
+         */
+        default Bytes toStoreBinaryKeyPrefix(final Bytes key, long timestamp) {
+            throw new UnsupportedOperationException();
+        }
+
+        /**
          * Given a range of fixed size record keys and a time, construct a Segmented key that represents
          * the upper range of keys to search when performing range queries.
          * @see SessionKeySchema#upperRange
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeySchema.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeySchema.java
index b834b3a..f191d2f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeySchema.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeySchema.java
@@ -30,62 +30,40 @@ import java.util.List;
 
 /**
  * A {@link RocksDBSegmentedBytesStore.KeySchema} to serialize/deserialize a RocksDB store
- * key into a schema combined of (time,seq,key). This key schema is more efficient when doing
- * range queries between a time interval. For key range queries better use {@link WindowKeySchema}.
+ * key into a schema combined of (time,key,seq). Since key is variable length while time/seq is
+ * fixed length, when formatting in this order, varying time range query would be very inefficient
+ * since we'd need to be very conservative in picking the from / to boundaries; however for now
+ * we do not expect any varying time range access at all, only fixed time range only.
  */
 public class TimeOrderedKeySchema implements RocksDBSegmentedBytesStore.KeySchema {
     private static final Logger LOG = LoggerFactory.getLogger(TimeOrderedKeySchema.class);
 
     private static final int TIMESTAMP_SIZE = 8;
     private static final int SEQNUM_SIZE = 4;
-    private static final int PREFIX_SIZE = TIMESTAMP_SIZE + SEQNUM_SIZE;
 
-    /**
-     * {@inheritdoc}
-     *
-     * Queries using the {@link TimeOrderedKeySchema} are optimized for time range queries only. Key
-     * range queries may be slower. If better performance on key range queries are necessary, then
-     * use the {@link WindowKeySchema}.
-     */
     @Override
     public Bytes upperRange(final Bytes key, final long to) {
-        return toStoreKeyBinary(key.get(), to, Integer.MAX_VALUE);
+        throw new UnsupportedOperationException();
     }
 
-    /**
-     * {@inheritdoc}
-     *
-     * Queries using the {@link TimeOrderedKeySchema} are optimized for time range queries only. Key
-     * range queries may be slower. If better performance on key range queries are necessary, then
-     * use the {@link WindowKeySchema}.
-     */
     @Override
     public Bytes lowerRange(final Bytes key, final long from) {
-        return toStoreKeyBinary(key.get(), from, 0);
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Bytes toStoreBinaryKeyPrefix(final Bytes key, final long timestamp) {
+        return toStoreKeyBinaryPrefix(key, timestamp);
     }
 
-    /**
-     * {@inheritdoc}
-     *
-     * Queries using the {@link TimeOrderedKeySchema} are optimized for time range queries only. Key
-     * range queries may be slower. If better performance on key range queries are necessary, then
-     * use the {@link WindowKeySchema}.
-     */
     @Override
     public Bytes upperRangeFixedSize(final Bytes key, final long to) {
-        return toStoreKeyBinary(key, to, Integer.MAX_VALUE);
+        throw new UnsupportedOperationException();
     }
 
-    /**
-     * {@inheritdoc}
-     *
-     * Queries using the {@link TimeOrderedKeySchema} are optimized for time range queries only. Key
-     * range queries may be slower. If better performance on key range queries are necessary, then
-     * use the {@link WindowKeySchema}.
-     */
     @Override
     public Bytes lowerRangeFixedSize(final Bytes key, final long from) {
-        return toStoreKeyBinary(key, Math.max(0, from), 0);
+        throw new UnsupportedOperationException();
     }
 
     @Override
@@ -96,31 +74,36 @@ public class TimeOrderedKeySchema implements RocksDBSegmentedBytesStore.KeySchem
     /**
      * {@inheritdoc}
      *
-     * This method is not optimized for {@link TimeOrderedKeySchema}. The method may do unnecessary
-     * checks to find the next record.
+     * This method is optimized for {@link RocksDBTimeOrderedWindowStore#all()} only. Key and time
+     * range queries are not supported.
      */
     @Override
     public HasNextCondition hasNextCondition(final Bytes binaryKeyFrom, final Bytes binaryKeyTo, final long from, final long to) {
-        return iterator -> {
-            while (iterator.hasNext()) {
-                final Bytes bytes = iterator.peekNextKey();
-                final Bytes keyBytes = Bytes.wrap(extractStoreKeyBytes(bytes.get()));
-                final long time = extractStoreTimestamp(bytes.get());
-                if ((binaryKeyFrom == null || keyBytes.compareTo(binaryKeyFrom) >= 0)
-                    && (binaryKeyTo == null || keyBytes.compareTo(binaryKeyTo) <= 0)
-                    && time >= from
-                    && time <= to) {
-                    return true;
-                }
-                iterator.next();
-            }
-            return false;
-        };
+        if (binaryKeyFrom != null || binaryKeyTo != null) {
+            throw new IllegalArgumentException("binaryKeyFrom/binaryKeyTo keys cannot be non-null. Key and time range queries are not supported.");
+        }
+
+        if (from != 0 && to != Long.MAX_VALUE) {
+            throw new IllegalArgumentException("from/to time ranges should be 0 to Long.MAX_VALUE. Key and time range queries are not supported.");
+        }
+
+        return iterator -> iterator.hasNext();
     }
 
     @Override
     public <S extends Segment> List<S> segmentsToSearch(final Segments<S> segments, final long from, final long to, final boolean forward) {
-        return segments.segments(from, to, forward);
+        throw new UnsupportedOperationException();
+    }
+
+    public static Bytes toStoreKeyBinaryPrefix(final Bytes key,
+                                         final long timestamp) {
+        final byte[] serializedKey = key.get();
+
+        final ByteBuffer buf = ByteBuffer.allocate(TIMESTAMP_SIZE + serializedKey.length);
+        buf.putLong(timestamp);
+        buf.put(serializedKey);
+
+        return Bytes.wrap(buf.array());
     }
 
     public static Bytes toStoreKeyBinary(final Bytes key,
@@ -149,30 +132,23 @@ public class TimeOrderedKeySchema implements RocksDBSegmentedBytesStore.KeySchem
                                   final int seqnum) {
         final ByteBuffer buf = ByteBuffer.allocate(TIMESTAMP_SIZE + serializedKey.length + SEQNUM_SIZE);
         buf.putLong(timestamp);
-        buf.putInt(seqnum);
         buf.put(serializedKey);
+        buf.putInt(seqnum);
         return Bytes.wrap(buf.array());
     }
 
     static byte[] extractStoreKeyBytes(final byte[] binaryKey) {
         final byte[] bytes = new byte[binaryKey.length - TIMESTAMP_SIZE - SEQNUM_SIZE];
-        System.arraycopy(binaryKey, PREFIX_SIZE, bytes, 0, bytes.length);
+        System.arraycopy(binaryKey, TIMESTAMP_SIZE, bytes, 0, bytes.length);
         return bytes;
     }
 
-    static <K> K extractStoreKey(final byte[] binaryKey,
-                                 final StateSerdes<K, ?> serdes) {
-        final byte[] bytes = new byte[binaryKey.length - TIMESTAMP_SIZE - SEQNUM_SIZE];
-        System.arraycopy(binaryKey, PREFIX_SIZE, bytes, 0, bytes.length);
-        return serdes.keyFrom(bytes);
-    }
-
     static long extractStoreTimestamp(final byte[] binaryKey) {
         return ByteBuffer.wrap(binaryKey).getLong(0);
     }
 
     static int extractStoreSequence(final byte[] binaryKey) {
-        return ByteBuffer.wrap(binaryKey).getInt(TIMESTAMP_SIZE);
+        return ByteBuffer.wrap(binaryKey).getInt(binaryKey.length - SEQNUM_SIZE);
     }
 
     static <K> Windowed<K> fromStoreKey(final byte[] binaryKey,
@@ -202,8 +178,8 @@ public class TimeOrderedKeySchema implements RocksDBSegmentedBytesStore.KeySchem
      * Safely construct a time window of the given size,
      * taking care of bounding endMs to Long.MAX_VALUE if necessary
      */
-    static TimeWindow timeWindowForSize(final long startMs,
-                                        final long windowSize) {
+    private static TimeWindow timeWindowForSize(final long startMs,
+                                                final long windowSize) {
         long endMs = startMs + windowSize;
 
         if (endMs < 0) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegment.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegment.java
index 36fe0e5..f0e4cf6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegment.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegment.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
 
@@ -41,6 +42,11 @@ class TimestampedSegment extends RocksDBTimestampedStore implements Comparable<T
     }
 
     @Override
+    public void deleteRange(final Bytes keyFrom, final Bytes keyTo) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
     public int compareTo(final TimestampedSegment segment) {
         return Long.compare(id, segment.id);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
index a1344f6..53d1846 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
@@ -93,7 +93,7 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
 
     @Parameters(name = "{0}")
     public static Object[] getKeySchemas() {
-        return new Object[] {new SessionKeySchema(), new WindowKeySchema(), new TimeOrderedKeySchema()};
+        return new Object[] {new SessionKeySchema(), new WindowKeySchema()};
     }
 
     @Before
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreTest.java
index 37f98f1..2646c4c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStoreTest.java
@@ -16,18 +16,68 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
-import org.apache.kafka.streams.state.StateSerdes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.MockRecordCollector;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
 
+import java.io.File;
+
+import static java.util.Arrays.asList;
 import static org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier.WindowStoreTypes;
+import static org.apache.kafka.test.StreamsTestUtils.toList;
+import static org.junit.Assert.assertEquals;
+
+public class RocksDBTimeOrderedWindowStoreTest {
+    private static final long WINDOW_SIZE = 3L;
+    private static final long SEGMENT_INTERVAL = 60_000L;
+    private static final long RETENTION_PERIOD = 2 * SEGMENT_INTERVAL;
+
+    private static final String STORE_NAME = "rocksDB time-ordered window store";
+
+    WindowStore<Integer, String> windowStore;
+    InternalMockProcessorContext context;
+    MockRecordCollector recordCollector;
+
+    final File baseDir = TestUtils.tempDirectory("test");
 
-public class RocksDBTimeOrderedWindowStoreTest extends RocksDBWindowStoreTest {
-    private static final String STORE_NAME = "rocksDB window store";
+    @Before
+    public void setup() {
+        windowStore = buildWindowStore(RETENTION_PERIOD, WINDOW_SIZE, true, Serdes.Integer(), Serdes.String());
+
+        recordCollector = new MockRecordCollector();
+        context = new InternalMockProcessorContext(
+            baseDir,
+            Serdes.String(),
+            Serdes.Integer(),
+            recordCollector,
+            new ThreadCache(
+                new LogContext("testCache"),
+                0,
+                new MockStreamsMetrics(new Metrics())));
+        context.setTime(1L);
+
+        windowStore.init((StateStoreContext) context, windowStore);
+    }
+
+    @After
+    public void after() {
+        windowStore.close();
+    }
 
-    @Override
     <K, V> WindowStore<K, V> buildWindowStore(final long retentionPeriod,
                                               final long windowSize,
                                               final boolean retainDuplicates,
@@ -47,24 +97,143 @@ public class RocksDBTimeOrderedWindowStoreTest extends RocksDBWindowStoreTest {
             .build();
     }
 
-    @Override
-    String getMetricsScope() {
-        return new RocksDbWindowBytesStoreSupplier(null, 0, 0, 0, false, WindowStoreTypes.TIME_ORDERED_WINDOW_STORE).metricsScope();
+    @Test
+    public void shouldGetAll() {
+        final long startTime = SEGMENT_INTERVAL - 4L;
+
+        windowStore.put(0, "zero", startTime + 0);
+        windowStore.put(1, "one", startTime + 1);
+        windowStore.put(2, "two", startTime + 2);
+
+        final KeyValue<Windowed<Integer>, String> zero = windowedPair(0, "zero", startTime + 0);
+        final KeyValue<Windowed<Integer>, String> one = windowedPair(1, "one", startTime + 1);
+        final KeyValue<Windowed<Integer>, String> two = windowedPair(2, "two", startTime + 2);
+
+        assertEquals(
+            asList(zero, one, two),
+            toList(windowStore.all())
+        );
+    }
+
+    @Test
+    public void shouldGetAllDuplicates() {
+        final long startTime = SEGMENT_INTERVAL - 4L;
+
+        windowStore.put(0, "zero1", startTime + 0);
+        windowStore.put(0, "zero2", startTime + 0);
+        windowStore.put(0, "zero3", startTime + 0);
+
+        final KeyValue<Windowed<Integer>, String> zero1 = windowedPair(0, "zero1", startTime + 0);
+        final KeyValue<Windowed<Integer>, String> zero2 = windowedPair(0, "zero2", startTime + 0);
+        final KeyValue<Windowed<Integer>, String> zero3 = windowedPair(0, "zero3", startTime + 0);
+
+        assertEquals(
+            asList(zero1, zero2, zero3),
+            toList(windowStore.all())
+        );
+    }
+
+    @Test
+    public void shouldGetAllNonDeletedRecords() {
+        final long startTime = SEGMENT_INTERVAL - 4L;
+
+        // Add some records
+        windowStore.put(0, "zero", startTime + 0);
+        windowStore.put(1, "one", startTime + 1);
+        windowStore.put(2, "two", startTime + 2);
+        windowStore.put(3, "three", startTime + 3);
+        windowStore.put(4, "four", startTime + 4);
+
+        // Delete some records
+        windowStore.put(1, null, startTime + 1);
+        windowStore.put(3, null, startTime + 3);
+
+        // Only non-deleted records should appear in the all() iterator
+        final KeyValue<Windowed<Integer>, String> zero = windowedPair(0, "zero", startTime + 0);
+        final KeyValue<Windowed<Integer>, String> two = windowedPair(2, "two", startTime + 2);
+        final KeyValue<Windowed<Integer>, String> four = windowedPair(4, "four", startTime + 4);
+
+        assertEquals(
+            asList(zero, two, four),
+            toList(windowStore.all())
+        );
+    }
+
+    @Test
+    public void shouldDeleteAllDuplicates() {
+        final long startTime = SEGMENT_INTERVAL - 4L;
+
+        windowStore.put(0, "zero1", startTime + 0);
+        windowStore.put(0, "zero2", startTime + 0);
+        windowStore.put(0, "zero3", startTime + 0);
+        windowStore.put(1, "one1", startTime + 1);
+        windowStore.put(1, "one2", startTime + 1);
+
+        windowStore.put(0, null, startTime + 0);
+
+        final KeyValue<Windowed<Integer>, String> one1 = windowedPair(1, "one1", startTime + 1);
+        final KeyValue<Windowed<Integer>, String> one2 = windowedPair(1, "one2", startTime + 1);
+
+        assertEquals(
+            asList(one1, one2),
+            toList(windowStore.all())
+        );
     }
 
-    @Override
-    void setClassLoggerToDebug() {
-        LogCaptureAppender.setClassLoggerToDebug(AbstractRocksDBSegmentedBytesStore.class);
+    @Test
+    public void shouldGetAllReturnTimestampOrderedRecords() {
+        final long startTime = SEGMENT_INTERVAL - 4L;
+
+        // Add some records in different order
+        windowStore.put(4, "four", startTime + 4);
+        windowStore.put(0, "zero", startTime + 0);
+        windowStore.put(2, "two1", startTime + 2);
+        windowStore.put(3, "three", startTime + 3);
+        windowStore.put(1, "one", startTime + 1);
+
+        // Add duplicates
+        windowStore.put(2, "two2", startTime + 2);
+
+        // Only non-deleted records should appear in the all() iterator
+        final KeyValue<Windowed<Integer>, String> zero = windowedPair(0, "zero", startTime + 0);
+        final KeyValue<Windowed<Integer>, String> one = windowedPair(1, "one", startTime + 1);
+        final KeyValue<Windowed<Integer>, String> two1 = windowedPair(2, "two1", startTime + 2);
+        final KeyValue<Windowed<Integer>, String> two2 = windowedPair(2, "two2", startTime + 2);
+        final KeyValue<Windowed<Integer>, String> three = windowedPair(3, "three", startTime + 3);
+        final KeyValue<Windowed<Integer>, String> four = windowedPair(4, "four", startTime + 4);
+
+        assertEquals(
+            asList(zero, one, two1, two2, three, four),
+            toList(windowStore.all())
+        );
+    }
+
+    @Test
+    public void shouldEarlyClosedIteratorStillGetAllRecords() {
+        final long startTime = SEGMENT_INTERVAL - 4L;
+
+        windowStore.put(0, "zero", startTime + 0);
+        windowStore.put(1, "one", startTime + 1);
+
+        final KeyValue<Windowed<Integer>, String> zero = windowedPair(0, "zero", startTime + 0);
+        final KeyValue<Windowed<Integer>, String> one = windowedPair(1, "one", startTime + 1);
+
+        final KeyValueIterator<Windowed<Integer>, String> it = windowStore.all();
+        assertEquals(zero, it.next());
+        it.close();
+
+        // A new all() iterator after a previous all() iterator was closed should return all elements.
+        assertEquals(
+            asList(zero, one),
+            toList(windowStore.all())
+        );
     }
 
-    @Override
-    long extractStoreTimestamp(final byte[] binaryKey) {
-        return TimeOrderedKeySchema.extractStoreTimestamp(binaryKey);
+    private static <K, V> KeyValue<Windowed<K>, V> windowedPair(final K key, final V value, final long timestamp) {
+        return windowedPair(key, value, timestamp, WINDOW_SIZE);
     }
 
-    @Override
-    <K> K extractStoreKey(final byte[] binaryKey,
-                          final StateSerdes<K, ?> serdes) {
-        return TimeOrderedKeySchema.extractStoreKey(binaryKey, serdes);
+    private static <K, V> KeyValue<Windowed<K>, V> windowedPair(final K key, final V value, final long timestamp, final long windowSize) {
+        return KeyValue.pair(new Windowed<>(key, WindowKeySchema.timeWindowForSize(timestamp, windowSize)), value);
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeySchemaTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeySchemaTest.java
index 5a5c4fc..03f81e3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeySchemaTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeySchemaTest.java
@@ -19,20 +19,12 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.apache.kafka.streams.state.StateSerdes;
-import org.apache.kafka.test.KeyValueIteratorStub;
 import org.junit.Test;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.core.IsEqual.equalTo;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 
@@ -44,94 +36,9 @@ public class TimeOrderedKeySchemaTest {
 
     final private Window window = new TimeWindow(startTime, endTime);
     final private Windowed<String> windowedKey = new Windowed<>(key, window);
-    final private TimeOrderedKeySchema timeOrderedKeySchema = new TimeOrderedKeySchema();
     final private StateSerdes<String, byte[]> stateSerdes = new StateSerdes<>("dummy", serde, Serdes.ByteArray());
 
     @Test
-    public void testHasNextConditionUsingNullKeys() {
-        final List<KeyValue<Bytes, Integer>> keys = Arrays.asList(
-            KeyValue.pair(TimeOrderedKeySchema.toStoreKeyBinary(new Windowed<>(Bytes.wrap(new byte[] {0, 0}), new TimeWindow(0, 1)), 0), 1),
-            KeyValue.pair(TimeOrderedKeySchema.toStoreKeyBinary(new Windowed<>(Bytes.wrap(new byte[] {0}), new TimeWindow(0, 1)), 0), 2),
-            KeyValue.pair(TimeOrderedKeySchema.toStoreKeyBinary(new Windowed<>(Bytes.wrap(new byte[] {0, 0, 0}), new TimeWindow(0, 1)), 0), 3),
-            KeyValue.pair(TimeOrderedKeySchema.toStoreKeyBinary(new Windowed<>(Bytes.wrap(new byte[] {0}), new TimeWindow(10, 20)), 4), 4),
-            KeyValue.pair(TimeOrderedKeySchema.toStoreKeyBinary(new Windowed<>(Bytes.wrap(new byte[] {0, 0}), new TimeWindow(10, 20)), 5), 5),
-            KeyValue.pair(TimeOrderedKeySchema.toStoreKeyBinary(new Windowed<>(Bytes.wrap(new byte[] {0, 0, 0}), new TimeWindow(10, 20)), 6), 6));
-        final DelegatingPeekingKeyValueIterator<Bytes, Integer> iterator = new DelegatingPeekingKeyValueIterator<>("foo", new KeyValueIteratorStub<>(keys.iterator()));
-
-        final HasNextCondition hasNextCondition = timeOrderedKeySchema.hasNextCondition(null, null, 0, Long.MAX_VALUE);
-        final List<Integer> results = new ArrayList<>();
-        while (hasNextCondition.hasNext(iterator)) {
-            results.add(iterator.next().value);
-        }
-        assertThat(results, equalTo(Arrays.asList(1, 2, 3, 4, 5, 6)));
-    }
-
-    @Test
-    public void testUpperBoundWithLargeTimestamps() {
-        final Bytes upper = timeOrderedKeySchema.upperRange(Bytes.wrap(new byte[] {0xA, 0xB, 0xC}), Long.MAX_VALUE);
-
-        assertThat(
-            "shorter key with max timestamp should be in range",
-            upper.compareTo(
-                TimeOrderedKeySchema.toStoreKeyBinary(
-                    new byte[] {0xA},
-                    Long.MAX_VALUE,
-                    Integer.MAX_VALUE
-                )
-            ) >= 0
-        );
-
-        assertThat(
-            "shorter key with max timestamp should be in range",
-            upper.compareTo(
-                TimeOrderedKeySchema.toStoreKeyBinary(
-                    new byte[] {0xA, 0xB},
-                    Long.MAX_VALUE,
-                    Integer.MAX_VALUE
-                )
-            ) >= 0
-        );
-
-        assertThat(upper, equalTo(TimeOrderedKeySchema.toStoreKeyBinary(new byte[] {0xA, 0xB, 0xC}, Long.MAX_VALUE, Integer.MAX_VALUE)));
-    }
-
-    @Test
-    public void testUpperBoundWithZeroTimestamp() {
-        final Bytes upper = timeOrderedKeySchema.upperRange(Bytes.wrap(new byte[] {0xA, 0xB, 0xC}), 0);
-        assertThat(upper, equalTo(TimeOrderedKeySchema.toStoreKeyBinary(new byte[] {0xA, 0xB, 0xC}, 0, Integer.MAX_VALUE)));
-    }
-
-    @Test
-    public void testLowerBoundWithZeroTimestamp() {
-        final Bytes lower = timeOrderedKeySchema.lowerRange(Bytes.wrap(new byte[] {0xA, 0xB, 0xC}), 0);
-        assertThat(lower, equalTo(TimeOrderedKeySchema.toStoreKeyBinary(new byte[] {0xA, 0xB, 0xC}, 0, 0)));
-    }
-
-    @Test
-    public void testLowerBoundWithNonZeroTimestamp() {
-        final Bytes lower = timeOrderedKeySchema.lowerRange(Bytes.wrap(new byte[] {0xA, 0xB, 0xC}), 42);
-        assertThat(lower, equalTo(TimeOrderedKeySchema.toStoreKeyBinary(new byte[] {0xA, 0xB, 0xC}, 42, 0)));
-    }
-
-    @Test
-    public void testLowerBoundMatchesTrailingZeros() {
-        final Bytes lower = timeOrderedKeySchema.lowerRange(Bytes.wrap(new byte[] {0xA, 0xB, 0xC}), Long.MAX_VALUE - 1);
-
-        assertThat(
-            "appending zeros to key should still be in range",
-            lower.compareTo(
-                TimeOrderedKeySchema.toStoreKeyBinary(
-                    new byte[] {0xA, 0xB, 0xC, 0, 0, 0, 0, 0, 0, 0, 0},
-                    Long.MAX_VALUE - 1,
-                    0
-                )
-            ) < 0
-        );
-
-        assertThat(lower, equalTo(TimeOrderedKeySchema.toStoreKeyBinary(new byte[] {0xA, 0xB, 0xC}, Long.MAX_VALUE - 1, 0)));
-    }
-
-    @Test
     public void shouldConvertToBinaryAndBack() {
         final Bytes serialized = TimeOrderedKeySchema.toStoreKeyBinary(windowedKey, 0, stateSerdes);
         final Windowed<String> result = TimeOrderedKeySchema.fromStoreKey(serialized.get(), endTime - startTime, stateSerdes.keyDeserializer(), stateSerdes.topic());