You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "vcrfxia (via GitHub)" <gi...@apache.org> on 2023/01/24 21:35:57 UTC

[GitHub] [kafka] vcrfxia commented on a diff in pull request #13142: KAFKA-14491: [2/N] Refactor RocksDB store open iterator management

vcrfxia commented on code in PR #13142:
URL: https://github.com/apache/kafka/pull/13142#discussion_r1085935187


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java:
##########
@@ -383,7 +383,9 @@ public KeyValue<Bytes, byte[]> makeNext() {
 
         @Override
         public synchronized void close() {
-            openIterators.remove(this);
+            if (closeCallback != null) {

Review Comment:
   Unclear whether we want to require that a closeCallback is always registered in general, but it is true that for these two specific classes (RocksDbIterator and RocksDBDualCFIterator), we do want to require that a closeCallback is always set. I've updated the code to reflect this.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java:
##########
@@ -351,13 +360,23 @@ public <R> QueryResult<R> query(
     @Override
     public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefixScan(final P prefix,
                                                                                     final PS prefixKeySerializer) {
+        if (userManagedIterators) {
+            throw new IllegalStateException("Must specify openIterators in call to prefixScan()");
+        }
+        return prefixScan(prefix, prefixKeySerializer, openIterators);
+    }
+
+    <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefixScan(final P prefix,
+                                                                             final PS prefixKeySerializer,
+                                                                             final Set<KeyValueIterator<Bytes, byte[]>> openIterators) {

Review Comment:
   No. I've just added this additional validation, which required refactoring these calls so that the two versions of the method do not call each other. Instead they each call a third (helper) method.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java:
##########
@@ -114,6 +114,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
     private boolean userSpecifiedStatistics = false;
 
     private final RocksDBMetricsRecorder metricsRecorder;
+    private final boolean userManagedIterators;

Review Comment:
   Good point. I think `selfManagedIterators` is also confusing though because it's unclear whether "self" means the store itself or the caller themselves.
   
   I've updated this to `autoManagedIterators` which means the opposite of what I initially had (for `userManagedIterators`) and added a comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org