You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/03/16 15:45:29 UTC

[GitHub] [kafka] spena opened a new pull request #10331: KAFKA-10847: Add a RocksDBTimeOrderedWindowStore to hold records using their timestamp as key prefix

spena opened a new pull request #10331:
URL: https://github.com/apache/kafka/pull/10331


   The new `RocksDBTimeOrderedWindowStore` stores key/value records with a combined key that starts with the record timestamp, followed by a sequential number and the key in bytes (`<time,seq,key>`). This key combination will keep all records ordered by timestamp in the RocksDB store. The advantage of keeping records this way is to make range queries functions more efficient, like `fetchAll(from, to)`. This range method returns all keys in the specified time range using a more efficient key schema to iterate on RocksDB.
   
   `RocksDBTimeOrderedWindowStore` with key query ranges, like `fetch(key, from, to)` or `fetch(keyFrom, keyTo, from, to)`, do not perform very well. For cases with key query ranges, it is better to use the current `RocksDBWindowStore` which stores records using the record key as the RocksDB key prefix. `RocksDBTimeOrderedWindowStore` is meant to fix the issue with https://issues.apache.org/jira/browse/KAFKA-10847 which requires a temporary store where to hold non-joined records, and later do a query range of all keys in a specified time range.
   
   The PR also adds a new bytes lexico comparator using the key prefixes in the `RocksDBRangeIterator`. 
   ```
   comparator.compare(0001, 0001000F); // smallest key prefix is 4 bytes, so 0001 == 0001
   comparator.compare(0001000F, 0001); // smallest key prefix is 4 bytes, so 0001 == 0001
   comparator.compare(0002000F, 0001); // smallest key prefix is 4 bytes, so 0002 > 0001
   comparator.compare(0001000F, 0002); // smallest key prefix is 4 bytes, so 0001 < 0002
   ```
   This new prefix bytes lexico comparator is used when `range` and `reverseRange` iterators are called with prefixScan=True. Otherwise, the current bytes lexico comparator that compares the full key is used.
   ```
   KeyValueIterator<K, V> range(K from, K to, boolean prefixScan)
   KeyValueIterator<K, V> reverseRange(K from, K to, boolean prefixScan)
   ```
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   - Added unit tests
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] spena commented on pull request #10331: KAFKA-10847: Add a RocksDBTimeOrderedWindowStore to hold records using their timestamp as key prefix

Posted by GitBox <gi...@apache.org>.
spena commented on pull request #10331:
URL: https://github.com/apache/kafka/pull/10331#issuecomment-815058791


   @guozhangwang The test failures are not related to this PR.
   ```
   Build / JDK 8 and Scala 2.12 / org.apache.kafka.streams.integration.JoinWithIncompleteMetadataIntegrationTest.testShouldAutoShutdownOnJoinWithIncompleteMetadata
   ```
   
   The time-ordered store is not used anywhere yet, so this failed test is 100% not caused by this PR>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #10331: KAFKA-10847: Add a RocksDBTimeOrderedWindowStore to hold records using their timestamp as key prefix

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #10331:
URL: https://github.com/apache/kafka/pull/10331#issuecomment-815063710


   I've triggered another round of unit tests to see if they are transient -- will review the latest commit now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #10331: KAFKA-10847: Add a RocksDBTimeOrderedWindowStore to hold records using their timestamp as key prefix

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #10331:
URL: https://github.com/apache/kafka/pull/10331#issuecomment-814481396


   @spena could you also take a look at the failed tests and see if they are related to this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] spena commented on a change in pull request #10331: KAFKA-10847: Add a RocksDBTimeOrderedWindowStore to hold records using their timestamp as key prefix

Posted by GitBox <gi...@apache.org>.
spena commented on a change in pull request #10331:
URL: https://github.com/apache/kafka/pull/10331#discussion_r608133301



##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
##########
@@ -1173,14 +1173,23 @@ private void putSecondBatch(final WindowStore<Integer, String> store,
         store.put(2, "two+6");
     }
 
+    long extractStoreTimestamp(final byte[] binaryKey) {

Review comment:
       Added tests cases




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang merged pull request #10331: KAFKA-10847: Add a RocksDBTimeOrderedWindowStore to hold records using their timestamp as key prefix

Posted by GitBox <gi...@apache.org>.
guozhangwang merged pull request #10331:
URL: https://github.com/apache/kafka/pull/10331


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #10331: KAFKA-10847: Add a RocksDBTimeOrderedWindowStore to hold records using their timestamp as key prefix

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #10331:
URL: https://github.com/apache/kafka/pull/10331#issuecomment-805344527


   cc @cadonna to take a look around prefix scan, and @vcrfxia for the bytes layout.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] spena commented on pull request #10331: KAFKA-10847: Add a RocksDBTimeOrderedWindowStore to hold records using their timestamp as key prefix

Posted by GitBox <gi...@apache.org>.
spena commented on pull request #10331:
URL: https://github.com/apache/kafka/pull/10331#issuecomment-806018503


   @guozhangwang I just replace this PR with only the changes for the `RocksDBTimeOrderedWindowStore`. I will create other PRs with your suggestions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] spena commented on a change in pull request #10331: KAFKA-10847: Add a RocksDBTimeOrderedWindowStore to hold records using their timestamp as key prefix

Posted by GitBox <gi...@apache.org>.
spena commented on a change in pull request #10331:
URL: https://github.com/apache/kafka/pull/10331#discussion_r606336280



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSegmentedBytesStore.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.state.KeyValueIterator;
+
+import java.util.List;
+
+public class RocksDBTimeOrderedSegmentedBytesStore extends AbstractRocksDBSegmentedBytesStore<KeyValueSegment> {
+    private final KeySchema keySchema;
+    private final AbstractSegments<KeyValueSegment> segments;
+
+    RocksDBTimeOrderedSegmentedBytesStore(final String name,

Review comment:
       I'll remove this class. I added because I was going to overwrite the `fetchAll` to use prefixes to fetch data. Now that we don't need to do that, then this class is useless. I will re-use the `RocksDBSegmentedBytesStore` and pass the `TimeOrderedKeySchema` to it.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+/**
+ * A persistent (time-key)-value store based on RocksDB.
+ */
+public class RocksDBTimeOrderedWindowStore

Review comment:
       Seems too much work on the testing side if I keep most of the methods unsupported. Btw, I will remove the `RocksDBTimeOrderedSegmentedBytesStore` because we don't need to do range queries with prefixes anymore. With that, all methods in this class will work as expected without the issues with the range iterators caused by looking with prefixed keys. I think we should keep all the functionality. It is harmless, and it is already tested by `RocksDBTimeOrderedWindowStoreTest`. What do you think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] spena commented on a change in pull request #10331: KAFKA-10847: Add a RocksDBTimeOrderedWindowStore to hold records using their timestamp as key prefix

Posted by GitBox <gi...@apache.org>.
spena commented on a change in pull request #10331:
URL: https://github.com/apache/kafka/pull/10331#discussion_r600572540



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreBuilder.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
+
+public class TimeOrderedWindowStoreBuilder<K, V> extends AbstractStoreBuilder<K, V, WindowStore<K, V>> {

Review comment:
       I'm not sure about this one yet. I found this issue in the `RocksDBTimeOrderedWindowStoreTest` which tests the store can be restored from a changelog. The only different thing with this builder is the `maybeWrapLogging` that returns an instance of `ChangeLoggingTimeOrderedWindowBytesStore`.
   
   I believe we need to allow the store to be restored, right? If I used the generic `ChangeLoggingWindowBytesStore`, then the restore fails because the generic uses the `WindowKeySchema`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #10331: KAFKA-10847: Add a RocksDBTimeOrderedWindowStore to hold records using their timestamp as key prefix

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #10331:
URL: https://github.com/apache/kafka/pull/10331#discussion_r608220386



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeySchema.java
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.state.StateSerdes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ * A {@link RocksDBSegmentedBytesStore.KeySchema} to serialize/deserialize a RocksDB store
+ * key into a schema combined of (time,seq,key). This key schema is more efficient when doing
+ * range queries between a time interval. For key range queries better use {@link WindowKeySchema}.
+ */
+public class TimeOrderedKeySchema implements RocksDBSegmentedBytesStore.KeySchema {
+    private static final Logger LOG = LoggerFactory.getLogger(TimeOrderedKeySchema.class);
+
+    private static final int TIMESTAMP_SIZE = 8;
+    private static final int SEQNUM_SIZE = 4;
+    private static final int PREFIX_SIZE = TIMESTAMP_SIZE + SEQNUM_SIZE;
+
+    @Override
+    public Bytes upperRange(final Bytes key, final long to) {
+        final byte[] maxPrefix = ByteBuffer.allocate(PREFIX_SIZE)
+            .putLong(to)
+            .putInt(Integer.MAX_VALUE)
+            .array();
+
+        return OrderedBytes.range(maxPrefix, key);
+    }
+
+    @Override
+    public Bytes lowerRange(final Bytes key, final long from) {
+        final byte[] minPrefix = ByteBuffer.allocate(PREFIX_SIZE)
+            .putLong(from)
+            .putInt(0)
+            .array();
+
+        return OrderedBytes.range(minPrefix, key);
+    }
+
+    @Override
+    public Bytes upperRangeFixedSize(final Bytes key, final long to) {
+        return TimeOrderedKeySchema.toStoreKeyBinary(key, to, Integer.MAX_VALUE);
+    }
+
+    @Override
+    public Bytes lowerRangeFixedSize(final Bytes key, final long from) {
+        return TimeOrderedKeySchema.toStoreKeyBinary(key, Math.max(0, from), 0);
+    }
+
+    @Override
+    public long segmentTimestamp(final Bytes key) {
+        return TimeOrderedKeySchema.extractStoreTimestamp(key.get());
+    }
+
+    @Override
+    public HasNextCondition hasNextCondition(final Bytes binaryKeyFrom, final Bytes binaryKeyTo, final long from, final long to) {

Review comment:
       As discussed before, for `fetchAll(final long timeFrom, final long timeTo)` we actually do not need to trigger this function at all since we know it should always return true.
   
   I think we can either 1) claim that `fetchAll(final long timeFrom, final long timeTo)` is also not optimal and people should avoid using it with the new schema, or 2) try to still keep that impl as optimal as possible, i.e. in `AbstractRocksDBSegmentedBytesStore#fetchAll` we have a condition like this:
   
   ```
   return keySchema instanceOf TimeOrderedKeySchema ?
               return new SegmentIterator<>(
               searchSpace.iterator(),
               (....) -> true,
               TimeOrderedKeySchema.toStoreKeyBinary(0, from, 0),
               TimeOrderedKeySchema.toStoreKeyBinary(0, to + 1, Integer.MAX_VALUE),
               true) : // else return the normal implementation
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeySchema.java
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.state.StateSerdes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ * A {@link RocksDBSegmentedBytesStore.KeySchema} to serialize/deserialize a RocksDB store
+ * key into a schema combined of (time,seq,key). This key schema is more efficient when doing
+ * range queries between a time interval. For key range queries better use {@link WindowKeySchema}.
+ */
+public class TimeOrderedKeySchema implements RocksDBSegmentedBytesStore.KeySchema {
+    private static final Logger LOG = LoggerFactory.getLogger(TimeOrderedKeySchema.class);
+
+    private static final int TIMESTAMP_SIZE = 8;
+    private static final int SEQNUM_SIZE = 4;
+    private static final int PREFIX_SIZE = TIMESTAMP_SIZE + SEQNUM_SIZE;
+
+    @Override
+    public Bytes upperRange(final Bytes key, final long to) {
+        final byte[] maxPrefix = ByteBuffer.allocate(PREFIX_SIZE)
+            .putLong(to)
+            .putInt(Integer.MAX_VALUE)
+            .array();
+
+        return OrderedBytes.range(maxPrefix, key);
+    }
+
+    @Override
+    public Bytes lowerRange(final Bytes key, final long from) {
+        final byte[] minPrefix = ByteBuffer.allocate(PREFIX_SIZE)
+            .putLong(from)
+            .putInt(0)
+            .array();
+
+        return OrderedBytes.range(minPrefix, key);
+    }
+
+    @Override
+    public Bytes upperRangeFixedSize(final Bytes key, final long to) {

Review comment:
       We know these functions are triggered by `fetch(final Bytes key, final long timeFrom, final long timeTo)` and following the default implementation it is sub-optimal since we will range over a large scan and then drop a lot of the records. Let's add a oneliner comment on top of them referring readers to the head javadoc of this schema class that they should try avoid ever calling these functions.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/OrderedBytes.java
##########
@@ -65,4 +65,16 @@ static Bytes lowerRange(final Bytes key, final byte[] minSuffix) {
                 .array()
         );
     }
+
+    static Bytes range(final byte[] prefix, final Bytes key) {

Review comment:
       Please let me know if you want to keep it as is -- which is fine.
   
   But I think we should try to avoid byte array copies as much as possible: right now we first do a byte array for the prefix, in the schema, and then allocate a longer byte array copying the prefix, and then the key bytes. We can, instead, moving this logic directly to the schema where we just create a single array once, and put in the timestamp / sequence / key correspondingly

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+/**
+ * A persistent (time-key)-value store based on RocksDB.
+ */
+public class RocksDBTimeOrderedWindowStore

Review comment:
       Yeah that's a good point about unit test complexities. Let's just emphasize that key ranges are sub-optimally implemented with the new schema then (see my other comment below).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #10331: KAFKA-10847: Add a RocksDBTimeOrderedWindowStore to hold records using their timestamp as key prefix

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #10331:
URL: https://github.com/apache/kafka/pull/10331#discussion_r599952559



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/Stores.java
##########
@@ -37,6 +37,7 @@
 
 import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
 import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
+import static org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier.WindowStoreTypes;

Review comment:
       Since `Stores` are public APIs, we would need to file a KIP in order to change it. On the other hand, `Stores` is used by users to customize their materialized state stores, while for KAFKA-10847 we can just hard-code which types of stores to use not through `Stores` factory.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
##########
@@ -25,4 +26,35 @@
 
     void destroy() throws IOException;
 
+    /**
+     * INTERNAL USE ONLY - Move this method to ReadOnlyKeyValueStore to make it a public API
+     *
+     * Get an iterator over a given range of keys. This iterator must be closed after use.
+     * The returned iterator must be safe from {@link java.util.ConcurrentModificationException}s
+     * and must not return null values.
+     * Order is not guaranteed as bytes lexicographical ordering might not represent key order.
+     *
+     * @param from The first key that could be in the range, where iteration starts from.
+     * @param to   The last key that could be in the range, where iteration ends.
+     * @param prefixScan If true, then it iterates using the common key prefixes.
+     * @return The iterator for this range, from smallest to largest bytes.
+     * @throws NullPointerException       If null is used for from or to.*
+     */
+    KeyValueIterator<Bytes, byte[]> range(Bytes from, Bytes to, boolean prefixScan);
+
+    /**
+     * INTERNAL USE ONLY - - Move this method to ReadOnlyKeyValueStore to make it a public API
+     *
+     * Get a reverse iterator over a given range of keys. This iterator must be closed after use.
+     * The returned iterator must be safe from {@link java.util.ConcurrentModificationException}s
+     * and must not return null values.
+     * Order is not guaranteed as bytes lexicographical ordering might not represent key order.
+     *
+     * @param from The first key that could be in the range, where iteration ends.
+     * @param to   The last key that could be in the range, where iteration starts from.
+     * @param prefixScan If true, then it iterates using the common key prefixes.
+     * @return The reverse iterator for this range, from largest to smallest key bytes.
+     * @throws NullPointerException       If null is used for from or to.
+     */
+    KeyValueIterator<Bytes, byte[]> reverseRange(Bytes from, Bytes to, boolean prefixScan);

Review comment:
       Related to the other comment: since in stream-stream join we do not really need reverse-prefixScan, just adding a forward `prefixScan(..)` interface may be better.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreBuilder.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
+
+public class TimeOrderedWindowStoreBuilder<K, V> extends AbstractStoreBuilder<K, V, WindowStore<K, V>> {

Review comment:
       If we do not allow such stores to be created from `Stores`, maybe we could remove this class as well.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java
##########
@@ -82,9 +93,9 @@ public boolean hasNext() {
                     }
                 } else {
                     if (forward) {
-                        currentIterator = currentSegment.range(from, to);
+                        currentIterator = currentSegment.range(from, to, prefixScan);

Review comment:
       Instead of passing this boolean around and creating overloaded interfaces, could we:
   
   * add `prefixScan` in Segment / SegmentIterator.
   * add a different `RocksDBPrefixIterator` besides `RocksDBRangeIterator` which would be used in `prefixScan`, and then the `RocksDBStore#prefixScan` to be used in SegmentIterator.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #10331: KAFKA-10847: Add a RocksDBTimeOrderedWindowStore to hold records using their timestamp as key prefix

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #10331:
URL: https://github.com/apache/kafka/pull/10331#issuecomment-815063166


   > @guozhangwang The test failures are not related to this PR.
   
   Sounds good. I think they are irrelevant indeed, similarly the other failures from `KStreamRepartitionIntegrationTest` and `KTableKTableForeignKeyInnerJoinMultiIntegrationTest`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #10331: KAFKA-10847: Add a RocksDBTimeOrderedWindowStore to hold records using their timestamp as key prefix

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #10331:
URL: https://github.com/apache/kafka/pull/10331#issuecomment-815244582


   Merged to trunk, thanks @spena !!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] spena commented on pull request #10331: KAFKA-10847: Add a RocksDBTimeOrderedWindowStore to hold records using their timestamp as key prefix

Posted by GitBox <gi...@apache.org>.
spena commented on pull request #10331:
URL: https://github.com/apache/kafka/pull/10331#issuecomment-805050202


   Failing test is `kafka.server.ListOffsetsRequestTest.testResponseIncludesLeaderEpoch()` which it is unrelated to this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] spena commented on pull request #10331: KAFKA-10847: Add a RocksDBTimeOrderedWindowStore to hold records using their timestamp as key prefix

Posted by GitBox <gi...@apache.org>.
spena commented on pull request #10331:
URL: https://github.com/apache/kafka/pull/10331#issuecomment-805898792


   Thanks @guozhangwang for the feedback. I will create new PRs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] spena commented on a change in pull request #10331: KAFKA-10847: Add a RocksDBTimeOrderedWindowStore to hold records using their timestamp as key prefix

Posted by GitBox <gi...@apache.org>.
spena commented on a change in pull request #10331:
URL: https://github.com/apache/kafka/pull/10331#discussion_r600706763



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreBuilder.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
+
+public class TimeOrderedWindowStoreBuilder<K, V> extends AbstractStoreBuilder<K, V, WindowStore<K, V>> {

Review comment:
       I just removed the `ChangeLoggingTimeOrderedWindowBytesStore` class in this new PR, and pass the window key serializer to the `ChangeLoggingWindowBytesStore` to reuse it. But I think I still need the `TimeOrderedWindowStoreBuilder`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] spena commented on a change in pull request #10331: KAFKA-10847: Add a RocksDBTimeOrderedWindowStore to hold records using their timestamp as key prefix

Posted by GitBox <gi...@apache.org>.
spena commented on a change in pull request #10331:
URL: https://github.com/apache/kafka/pull/10331#discussion_r600569576



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/Stores.java
##########
@@ -37,6 +37,7 @@
 
 import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
 import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
+import static org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier.WindowStoreTypes;

Review comment:
       Right. I'll remove this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #10331: KAFKA-10847: Add a RocksDBTimeOrderedWindowStore to hold records using their timestamp as key prefix

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #10331:
URL: https://github.com/apache/kafka/pull/10331#discussion_r605943732



##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java
##########
@@ -242,7 +242,7 @@ public void shouldConvertToBinaryAndBack() {
     }
 
     @Test
-    public void shouldExtractEndTimeFromBinary() {
+    public void shouldExtractSequenceFromBinary() {

Review comment:
       Thank you! :)

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSegmentedBytesStore.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.state.KeyValueIterator;
+
+import java.util.List;
+
+public class RocksDBTimeOrderedSegmentedBytesStore extends AbstractRocksDBSegmentedBytesStore<KeyValueSegment> {
+    private final KeySchema keySchema;
+    private final AbstractSegments<KeyValueSegment> segments;
+
+    RocksDBTimeOrderedSegmentedBytesStore(final String name,
+                                          final String metricsScope,
+                                          final long retention,
+                                          final long segmentInterval,
+                                          final KeySchema keySchema) {
+        this(name, metricsScope, keySchema, new KeyValueSegments(name, metricsScope, retention, segmentInterval));
+    }
+
+    private RocksDBTimeOrderedSegmentedBytesStore(final String name,
+                                          final String metricsScope,
+                                          final KeySchema keySchema,
+                                          final AbstractSegments<KeyValueSegment> segments) {
+        super(name, metricsScope, keySchema, segments);
+        this.keySchema = keySchema;
+        this.segments = segments;
+    }
+
+    @Override
+    public KeyValueIterator<Bytes, byte[]> fetchAll(final long timeFrom,

Review comment:
       Why need to override here? Seems exactly the same as `AbstractRocksDBSegmentedBytesStore`.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/OrderedBytes.java
##########
@@ -65,4 +65,16 @@ static Bytes lowerRange(final Bytes key, final byte[] minSuffix) {
                 .array()
         );
     }
+
+    static Bytes range(final byte[] prefix, final Bytes key) {

Review comment:
       nit: rename to `prefixRange`?

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSegmentedBytesStore.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.state.KeyValueIterator;
+
+import java.util.List;
+
+public class RocksDBTimeOrderedSegmentedBytesStore extends AbstractRocksDBSegmentedBytesStore<KeyValueSegment> {
+    private final KeySchema keySchema;
+    private final AbstractSegments<KeyValueSegment> segments;
+
+    RocksDBTimeOrderedSegmentedBytesStore(final String name,

Review comment:
       I think this store would only work with `TimeOrderedKeySchema` and no others, right? If yes, then we can just create the schema internally at construction, and no need to ask for a pass-in parameter.

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSegmentedBytesStoreTest.java
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+public class RocksDBTimeOrderedSegmentedBytesStoreTest extends AbstractRocksDBSegmentedBytesStoreTest<KeyValueSegment> {
+    private final static String METRICS_SCOPE = "metrics-scope";
+
+    RocksDBTimeOrderedSegmentedBytesStore getBytesStore() {
+        return new RocksDBTimeOrderedSegmentedBytesStore(

Review comment:
       This test would cover for all three types of `schema` in the `AbstractRocksDBSegmentedBytesStoreTest`, is this intentional? See my other comment below.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeySchema.java
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.state.StateSerdes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ * A {@link RocksDBSegmentedBytesStore.KeySchema} to serialize/deserialize a RocksDB store
+ * key into a schema combined of (time,seq,key). This key schema is more efficient when doing
+ * range queries between a time interval. For key range queries better use {@link WindowKeySchema}.
+ */
+public class TimeOrderedKeySchema implements RocksDBSegmentedBytesStore.KeySchema {
+    private static final Logger LOG = LoggerFactory.getLogger(TimeOrderedKeySchema.class);
+
+    private static final int TIMESTAMP_SIZE = 8;
+    private static final int SEQNUM_SIZE = 4;
+    private static final int PREFIX_SIZE = TIMESTAMP_SIZE + SEQNUM_SIZE;
+
+    public static Bytes lowerTimeRange(final long from) {

Review comment:
       Following my previous comments: we can be more strict to not support any other operations than `put` and `all`, and as a result we can also just not support these `XXrange` APIs as well but only `toStoreKeyBinary / extractStoreTimestamp` for puts and get deserialization. Basically, we want to be eliminate vulnerability to bugs from those not expected function calls

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
##########
@@ -93,7 +93,7 @@
 
     @Parameters(name = "{0}")
     public static Object[] getKeySchemas() {
-        return new Object[] {new SessionKeySchema(), new WindowKeySchema()};
+        return new Object[] {new SessionKeySchema(), new WindowKeySchema(), new TimeOrderedKeySchema()};

Review comment:
       Should `TimeOrderedKeySchema` only be used for `RocksDBTimeOrderedSegmentedBytesStoreTest`? In this way the added schema would be used for other extended test classes as well.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSegmentedBytesStore.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.state.KeyValueIterator;
+
+import java.util.List;
+
+public class RocksDBTimeOrderedSegmentedBytesStore extends AbstractRocksDBSegmentedBytesStore<KeyValueSegment> {
+    private final KeySchema keySchema;
+    private final AbstractSegments<KeyValueSegment> segments;
+
+    RocksDBTimeOrderedSegmentedBytesStore(final String name,
+                                          final String metricsScope,
+                                          final long retention,
+                                          final long segmentInterval,
+                                          final KeySchema keySchema) {
+        this(name, metricsScope, keySchema, new KeyValueSegments(name, metricsScope, retention, segmentInterval));
+    }
+
+    private RocksDBTimeOrderedSegmentedBytesStore(final String name,
+                                          final String metricsScope,

Review comment:
       nit: indentations.

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
##########
@@ -1173,14 +1173,23 @@ private void putSecondBatch(final WindowStore<Integer, String> store,
         store.put(2, "two+6");
     }
 
+    long extractStoreTimestamp(final byte[] binaryKey) {

Review comment:
       About test coverage: for `all()` function today we only have a single `shouldNotThrowConcurrentModificationException`. Let's add some scenarios as well:
   
   1) deleted records (create some that would be at the head of the all iterators, mimicking the stream-stream join behavior) would not be returned from `all`.
   2) returned iterators can be closed early, and subsequent `all` would still return the whole set.
   3) Ordering of `all` are indeed lexico for prefix -- that's the key we would leverage on for stream-stream join.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+/**
+ * A persistent (time-key)-value store based on RocksDB.
+ */
+public class RocksDBTimeOrderedWindowStore

Review comment:
       Could we declare this class as `extends RocksDBWindowStore` so that we can save on those functions like `init` as their logics do not change, also no duplicated `maybeUpdateSeqnumForDups`.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+/**
+ * A persistent (time-key)-value store based on RocksDB.
+ */
+public class RocksDBTimeOrderedWindowStore

Review comment:
       Actually as I think twice I think we can just implement `put` and `all`, and for other read operations directly throw unsupported exceptions to make sure if they are called by mistake, we can fail fast. Also by doing that, we can just make `TimeOrderedWindowStoreIteratorWrapper` an inner class since it is only used by `all()`

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreBuilder.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
+
+public class TimeOrderedWindowStoreBuilder<K, V> extends AbstractStoreBuilder<K, V, WindowStore<K, V>> {

Review comment:
       Makes sense. I realized this store cannot be removed now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #10331: KAFKA-10847: Add a RocksDBTimeOrderedWindowStore to hold records using their timestamp as key prefix

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #10331:
URL: https://github.com/apache/kafka/pull/10331#issuecomment-811288141


   > @guozhangwang I just replace this PR with only the changes for the `RocksDBTimeOrderedWindowStore`. I will create other PRs with your suggestions.
   
   I will continue reviewing this PR, as I understand you would not `replace` it with other PRs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #10331: KAFKA-10847: Add a RocksDBTimeOrderedWindowStore to hold records using their timestamp as key prefix

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #10331:
URL: https://github.com/apache/kafka/pull/10331#discussion_r600028281



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java
##########
@@ -82,9 +93,9 @@ public boolean hasNext() {
                     }
                 } else {
                     if (forward) {
-                        currentIterator = currentSegment.range(from, to);
+                        currentIterator = currentSegment.range(from, to, prefixScan);

Review comment:
       I should clarify: not meant the existing `RocksDBStore#prefixScan` since it only allows a single prefix, but adding another function.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org