You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/04/01 22:20:56 UTC

[GitHub] [kafka] guozhangwang commented on a change in pull request #10331: KAFKA-10847: Add a RocksDBTimeOrderedWindowStore to hold records using their timestamp as key prefix

guozhangwang commented on a change in pull request #10331:
URL: https://github.com/apache/kafka/pull/10331#discussion_r605943732



##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java
##########
@@ -242,7 +242,7 @@ public void shouldConvertToBinaryAndBack() {
     }
 
     @Test
-    public void shouldExtractEndTimeFromBinary() {
+    public void shouldExtractSequenceFromBinary() {

Review comment:
       Thank you! :)

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSegmentedBytesStore.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.state.KeyValueIterator;
+
+import java.util.List;
+
+public class RocksDBTimeOrderedSegmentedBytesStore extends AbstractRocksDBSegmentedBytesStore<KeyValueSegment> {
+    private final KeySchema keySchema;
+    private final AbstractSegments<KeyValueSegment> segments;
+
+    RocksDBTimeOrderedSegmentedBytesStore(final String name,
+                                          final String metricsScope,
+                                          final long retention,
+                                          final long segmentInterval,
+                                          final KeySchema keySchema) {
+        this(name, metricsScope, keySchema, new KeyValueSegments(name, metricsScope, retention, segmentInterval));
+    }
+
+    private RocksDBTimeOrderedSegmentedBytesStore(final String name,
+                                          final String metricsScope,
+                                          final KeySchema keySchema,
+                                          final AbstractSegments<KeyValueSegment> segments) {
+        super(name, metricsScope, keySchema, segments);
+        this.keySchema = keySchema;
+        this.segments = segments;
+    }
+
+    @Override
+    public KeyValueIterator<Bytes, byte[]> fetchAll(final long timeFrom,

Review comment:
       Why need to override here? Seems exactly the same as `AbstractRocksDBSegmentedBytesStore`.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/OrderedBytes.java
##########
@@ -65,4 +65,16 @@ static Bytes lowerRange(final Bytes key, final byte[] minSuffix) {
                 .array()
         );
     }
+
+    static Bytes range(final byte[] prefix, final Bytes key) {

Review comment:
       nit: rename to `prefixRange`?

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSegmentedBytesStore.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.state.KeyValueIterator;
+
+import java.util.List;
+
+public class RocksDBTimeOrderedSegmentedBytesStore extends AbstractRocksDBSegmentedBytesStore<KeyValueSegment> {
+    private final KeySchema keySchema;
+    private final AbstractSegments<KeyValueSegment> segments;
+
+    RocksDBTimeOrderedSegmentedBytesStore(final String name,

Review comment:
       I think this store would only work with `TimeOrderedKeySchema` and no others, right? If yes, then we can just create the schema internally at construction, and no need to ask for a pass-in parameter.

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSegmentedBytesStoreTest.java
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+public class RocksDBTimeOrderedSegmentedBytesStoreTest extends AbstractRocksDBSegmentedBytesStoreTest<KeyValueSegment> {
+    private final static String METRICS_SCOPE = "metrics-scope";
+
+    RocksDBTimeOrderedSegmentedBytesStore getBytesStore() {
+        return new RocksDBTimeOrderedSegmentedBytesStore(

Review comment:
       This test would cover for all three types of `schema` in the `AbstractRocksDBSegmentedBytesStoreTest`, is this intentional? See my other comment below.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeySchema.java
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.state.StateSerdes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ * A {@link RocksDBSegmentedBytesStore.KeySchema} to serialize/deserialize a RocksDB store
+ * key into a schema combined of (time,seq,key). This key schema is more efficient when doing
+ * range queries between a time interval. For key range queries better use {@link WindowKeySchema}.
+ */
+public class TimeOrderedKeySchema implements RocksDBSegmentedBytesStore.KeySchema {
+    private static final Logger LOG = LoggerFactory.getLogger(TimeOrderedKeySchema.class);
+
+    private static final int TIMESTAMP_SIZE = 8;
+    private static final int SEQNUM_SIZE = 4;
+    private static final int PREFIX_SIZE = TIMESTAMP_SIZE + SEQNUM_SIZE;
+
+    public static Bytes lowerTimeRange(final long from) {

Review comment:
       Following my previous comments: we can be more strict to not support any other operations than `put` and `all`, and as a result we can also just not support these `XXrange` APIs as well but only `toStoreKeyBinary / extractStoreTimestamp` for puts and get deserialization. Basically, we want to be eliminate vulnerability to bugs from those not expected function calls

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
##########
@@ -93,7 +93,7 @@
 
     @Parameters(name = "{0}")
     public static Object[] getKeySchemas() {
-        return new Object[] {new SessionKeySchema(), new WindowKeySchema()};
+        return new Object[] {new SessionKeySchema(), new WindowKeySchema(), new TimeOrderedKeySchema()};

Review comment:
       Should `TimeOrderedKeySchema` only be used for `RocksDBTimeOrderedSegmentedBytesStoreTest`? In this way the added schema would be used for other extended test classes as well.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSegmentedBytesStore.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.state.KeyValueIterator;
+
+import java.util.List;
+
+public class RocksDBTimeOrderedSegmentedBytesStore extends AbstractRocksDBSegmentedBytesStore<KeyValueSegment> {
+    private final KeySchema keySchema;
+    private final AbstractSegments<KeyValueSegment> segments;
+
+    RocksDBTimeOrderedSegmentedBytesStore(final String name,
+                                          final String metricsScope,
+                                          final long retention,
+                                          final long segmentInterval,
+                                          final KeySchema keySchema) {
+        this(name, metricsScope, keySchema, new KeyValueSegments(name, metricsScope, retention, segmentInterval));
+    }
+
+    private RocksDBTimeOrderedSegmentedBytesStore(final String name,
+                                          final String metricsScope,

Review comment:
       nit: indentations.

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
##########
@@ -1173,14 +1173,23 @@ private void putSecondBatch(final WindowStore<Integer, String> store,
         store.put(2, "two+6");
     }
 
+    long extractStoreTimestamp(final byte[] binaryKey) {

Review comment:
       About test coverage: for `all()` function today we only have a single `shouldNotThrowConcurrentModificationException`. Let's add some scenarios as well:
   
   1) deleted records (create some that would be at the head of the all iterators, mimicking the stream-stream join behavior) would not be returned from `all`.
   2) returned iterators can be closed early, and subsequent `all` would still return the whole set.
   3) Ordering of `all` are indeed lexico for prefix -- that's the key we would leverage on for stream-stream join.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+/**
+ * A persistent (time-key)-value store based on RocksDB.
+ */
+public class RocksDBTimeOrderedWindowStore

Review comment:
       Could we declare this class as `extends RocksDBWindowStore` so that we can save on those functions like `init` as their logics do not change, also no duplicated `maybeUpdateSeqnumForDups`.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+/**
+ * A persistent (time-key)-value store based on RocksDB.
+ */
+public class RocksDBTimeOrderedWindowStore

Review comment:
       Actually as I think twice I think we can just implement `put` and `all`, and for other read operations directly throw unsupported exceptions to make sure if they are called by mistake, we can fail fast. Also by doing that, we can just make `TimeOrderedWindowStoreIteratorWrapper` an inner class since it is only used by `all()`

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreBuilder.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
+
+public class TimeOrderedWindowStoreBuilder<K, V> extends AbstractStoreBuilder<K, V, WindowStore<K, V>> {

Review comment:
       Makes sense. I realized this store cannot be removed now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org