You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/03/23 23:18:53 UTC

[GitHub] [kafka] guozhangwang commented on a change in pull request #10331: KAFKA-10847: Add a RocksDBTimeOrderedWindowStore to hold records using their timestamp as key prefix

guozhangwang commented on a change in pull request #10331:
URL: https://github.com/apache/kafka/pull/10331#discussion_r599952559



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/Stores.java
##########
@@ -37,6 +37,7 @@
 
 import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
 import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
+import static org.apache.kafka.streams.state.internals.RocksDbWindowBytesStoreSupplier.WindowStoreTypes;

Review comment:
       Since `Stores` are public APIs, we would need to file a KIP in order to change it. On the other hand, `Stores` is used by users to customize their materialized state stores, while for KAFKA-10847 we can just hard-code which types of stores to use not through `Stores` factory.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
##########
@@ -25,4 +26,35 @@
 
     void destroy() throws IOException;
 
+    /**
+     * INTERNAL USE ONLY - Move this method to ReadOnlyKeyValueStore to make it a public API
+     *
+     * Get an iterator over a given range of keys. This iterator must be closed after use.
+     * The returned iterator must be safe from {@link java.util.ConcurrentModificationException}s
+     * and must not return null values.
+     * Order is not guaranteed as bytes lexicographical ordering might not represent key order.
+     *
+     * @param from The first key that could be in the range, where iteration starts from.
+     * @param to   The last key that could be in the range, where iteration ends.
+     * @param prefixScan If true, then it iterates using the common key prefixes.
+     * @return The iterator for this range, from smallest to largest bytes.
+     * @throws NullPointerException       If null is used for from or to.*
+     */
+    KeyValueIterator<Bytes, byte[]> range(Bytes from, Bytes to, boolean prefixScan);
+
+    /**
+     * INTERNAL USE ONLY - - Move this method to ReadOnlyKeyValueStore to make it a public API
+     *
+     * Get a reverse iterator over a given range of keys. This iterator must be closed after use.
+     * The returned iterator must be safe from {@link java.util.ConcurrentModificationException}s
+     * and must not return null values.
+     * Order is not guaranteed as bytes lexicographical ordering might not represent key order.
+     *
+     * @param from The first key that could be in the range, where iteration ends.
+     * @param to   The last key that could be in the range, where iteration starts from.
+     * @param prefixScan If true, then it iterates using the common key prefixes.
+     * @return The reverse iterator for this range, from largest to smallest key bytes.
+     * @throws NullPointerException       If null is used for from or to.
+     */
+    KeyValueIterator<Bytes, byte[]> reverseRange(Bytes from, Bytes to, boolean prefixScan);

Review comment:
       Related to the other comment: since in stream-stream join we do not really need reverse-prefixScan, just adding a forward `prefixScan(..)` interface may be better.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreBuilder.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
+
+public class TimeOrderedWindowStoreBuilder<K, V> extends AbstractStoreBuilder<K, V, WindowStore<K, V>> {

Review comment:
       If we do not allow such stores to be created from `Stores`, maybe we could remove this class as well.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java
##########
@@ -82,9 +93,9 @@ public boolean hasNext() {
                     }
                 } else {
                     if (forward) {
-                        currentIterator = currentSegment.range(from, to);
+                        currentIterator = currentSegment.range(from, to, prefixScan);

Review comment:
       Instead of passing this boolean around and creating overloaded interfaces, could we:
   
   * add `prefixScan` in Segment / SegmentIterator.
   * add a different `RocksDBPrefixIterator` besides `RocksDBRangeIterator` which would be used in `prefixScan`, and then the `RocksDBStore#prefixScan` to be used in SegmentIterator.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org