You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/04/14 19:49:58 UTC

[GitHub] [kafka] spena opened a new pull request #10537: KAFKA-10847: Delete Time-ordered duplicated records using deleteRange() internally

spena opened a new pull request #10537:
URL: https://github.com/apache/kafka/pull/10537


   This PR changes the `TimeOrderedKeySchema` composite key from `time-seq-key` -> `time-key-seq` to allow deletion of duplicated time-key records using the RocksDB `deleteRange`  API. It also removes all duplicates when `put(key, null)` is called. Currently, the `put(key, null)` was a no-op, which was causing problems because there was no way to delete any keys when duplicates are allowed.
   
   The RocksDB `deleteRange(keyFrom, keyTo)` deletes a range of keys from `keyFrom` (inclusive) to `keyTo` (exclusive). To make `keyTo` inclusive, I incremented the end key by one when calling the `RocksDBAccessor`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #10537: KAFKA-10847: Delete Time-ordered duplicated records using deleteRange() internally

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #10537:
URL: https://github.com/apache/kafka/pull/10537#issuecomment-822036117


   Merged to trunk, thanks @spena !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] spena commented on a change in pull request #10537: KAFKA-10847: Delete Time-ordered duplicated records using deleteRange() internally

Posted by GitBox <gi...@apache.org>.
spena commented on a change in pull request #10537:
URL: https://github.com/apache/kafka/pull/10537#discussion_r615119263



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeySchema.java
##########
@@ -30,62 +30,37 @@
 
 /**
  * A {@link RocksDBSegmentedBytesStore.KeySchema} to serialize/deserialize a RocksDB store
- * key into a schema combined of (time,seq,key). This key schema is more efficient when doing
- * range queries between a time interval. For key range queries better use {@link WindowKeySchema}.
+ * key into a schema combined of (time,key,seq).

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] spena commented on a change in pull request #10537: KAFKA-10847: Delete Time-ordered duplicated records using deleteRange() internally

Posted by GitBox <gi...@apache.org>.
spena commented on a change in pull request #10537:
URL: https://github.com/apache/kafka/pull/10537#discussion_r613544412



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
##########
@@ -196,6 +196,15 @@ public void remove(final Bytes key) {
         segment.delete(key);
     }
 
+    @Override
+    public void remove(final Bytes key, final long timestamp) {

Review comment:
       I wasn't sure how to name this method. I initially called `removeRange(key, from, to)`, but I don't want to support a time range with a specific key because time-ordered key schema will delete other keys between `from-key` and `to-key`.
   
   So I thought of just using one timestamp, to make sure this is not called with a time range. But `removeRange(key, timestamp)` does not look like a range. I ended up just calling it `remove`. Any thoughts?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] spena commented on a change in pull request #10537: KAFKA-10847: Delete Time-ordered duplicated records using deleteRange() internally

Posted by GitBox <gi...@apache.org>.
spena commented on a change in pull request #10537:
URL: https://github.com/apache/kafka/pull/10537#discussion_r615117396



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
##########
@@ -151,6 +159,18 @@
          */
         Bytes lowerRange(final Bytes key, final long from);
 
+        /**
+         * Given a record key and a time, construct a Segmented key to search when performing
+         * prefixed queries.
+         *
+         * @param key
+         * @param timestamp
+         * @return  The key that represents the prefixed Segmented key in bytes.
+         */
+        default Bytes toBinary(final Bytes key, long timestamp) {

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #10537: KAFKA-10847: Delete Time-ordered duplicated records using deleteRange() internally

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #10537:
URL: https://github.com/apache/kafka/pull/10537#discussion_r615051072



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
##########
@@ -151,6 +159,18 @@
          */
         Bytes lowerRange(final Bytes key, final long from);
 
+        /**
+         * Given a record key and a time, construct a Segmented key to search when performing
+         * prefixed queries.
+         *
+         * @param key
+         * @param timestamp
+         * @return  The key that represents the prefixed Segmented key in bytes.
+         */
+        default Bytes toBinary(final Bytes key, long timestamp) {

Review comment:
       nit: how about "toStoreBinaryKeyPrefix"?

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
##########
@@ -196,6 +196,15 @@ public void remove(final Bytes key) {
         segment.delete(key);
     }
 
+    @Override
+    public void remove(final Bytes key, final long timestamp) {

Review comment:
       I think just calling `remove` is totally fine :)

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeySchema.java
##########
@@ -96,31 +71,36 @@ public long segmentTimestamp(final Bytes key) {
     /**
      * {@inheritdoc}
      *
-     * This method is not optimized for {@link TimeOrderedKeySchema}. The method may do unnecessary
-     * checks to find the next record.
+     * This method is optimized for {@link RocksDBTimeOrderedWindowStore#all()} only. Key and time
+     * range queries are not supported.
      */
     @Override
     public HasNextCondition hasNextCondition(final Bytes binaryKeyFrom, final Bytes binaryKeyTo, final long from, final long to) {
-        return iterator -> {
-            while (iterator.hasNext()) {
-                final Bytes bytes = iterator.peekNextKey();
-                final Bytes keyBytes = Bytes.wrap(extractStoreKeyBytes(bytes.get()));
-                final long time = extractStoreTimestamp(bytes.get());
-                if ((binaryKeyFrom == null || keyBytes.compareTo(binaryKeyFrom) >= 0)
-                    && (binaryKeyTo == null || keyBytes.compareTo(binaryKeyTo) <= 0)
-                    && time >= from
-                    && time <= to) {
-                    return true;
-                }
-                iterator.next();
-            }
-            return false;
-        };
+        if (binaryKeyFrom != null || binaryKeyTo != null) {
+            throw new IllegalArgumentException("binaryKeyFrom/binaryKeyTo keys cannot be non-null. Key and time range queries are not supported.");
+        }
+
+        if (from != 0 && to != Long.MAX_VALUE) {
+            throw new IllegalArgumentException("from/to time ranges should be 0 to Long.MAX_VALUE. Key and time range queries are not supported.");
+        }
+
+        return iterator -> iterator.hasNext();
     }
 
     @Override
     public <S extends Segment> List<S> segmentsToSearch(final Segments<S> segments, final long from, final long to, final boolean forward) {
-        return segments.segments(from, to, forward);
+        throw new UnsupportedOperationException();
+    }
+
+    public static Bytes toStoreKeyBinary(final Bytes key,

Review comment:
       ditto: toStoreKeyBinaryPrefix.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeySchema.java
##########
@@ -30,62 +30,37 @@
 
 /**
  * A {@link RocksDBSegmentedBytesStore.KeySchema} to serialize/deserialize a RocksDB store
- * key into a schema combined of (time,seq,key). This key schema is more efficient when doing
- * range queries between a time interval. For key range queries better use {@link WindowKeySchema}.
+ * key into a schema combined of (time,key,seq).

Review comment:
       nit: Add a note that since `key` is variable length while time/seq is fixed length, when formatting in this order varying time range query would be very inefficient since we'd need to be very conservative in picking the from / to boundaries; however for now we do not expect any varying time range access at all, only fixed time range only.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang merged pull request #10537: KAFKA-10847: Delete Time-ordered duplicated records using deleteRange() internally

Posted by GitBox <gi...@apache.org>.
guozhangwang merged pull request #10537:
URL: https://github.com/apache/kafka/pull/10537


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] spena commented on a change in pull request #10537: KAFKA-10847: Delete Time-ordered duplicated records using deleteRange() internally

Posted by GitBox <gi...@apache.org>.
spena commented on a change in pull request #10537:
URL: https://github.com/apache/kafka/pull/10537#discussion_r615119322



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeySchema.java
##########
@@ -96,31 +71,36 @@ public long segmentTimestamp(final Bytes key) {
     /**
      * {@inheritdoc}
      *
-     * This method is not optimized for {@link TimeOrderedKeySchema}. The method may do unnecessary
-     * checks to find the next record.
+     * This method is optimized for {@link RocksDBTimeOrderedWindowStore#all()} only. Key and time
+     * range queries are not supported.
      */
     @Override
     public HasNextCondition hasNextCondition(final Bytes binaryKeyFrom, final Bytes binaryKeyTo, final long from, final long to) {
-        return iterator -> {
-            while (iterator.hasNext()) {
-                final Bytes bytes = iterator.peekNextKey();
-                final Bytes keyBytes = Bytes.wrap(extractStoreKeyBytes(bytes.get()));
-                final long time = extractStoreTimestamp(bytes.get());
-                if ((binaryKeyFrom == null || keyBytes.compareTo(binaryKeyFrom) >= 0)
-                    && (binaryKeyTo == null || keyBytes.compareTo(binaryKeyTo) <= 0)
-                    && time >= from
-                    && time <= to) {
-                    return true;
-                }
-                iterator.next();
-            }
-            return false;
-        };
+        if (binaryKeyFrom != null || binaryKeyTo != null) {
+            throw new IllegalArgumentException("binaryKeyFrom/binaryKeyTo keys cannot be non-null. Key and time range queries are not supported.");
+        }
+
+        if (from != 0 && to != Long.MAX_VALUE) {
+            throw new IllegalArgumentException("from/to time ranges should be 0 to Long.MAX_VALUE. Key and time range queries are not supported.");
+        }
+
+        return iterator -> iterator.hasNext();
     }
 
     @Override
     public <S extends Segment> List<S> segmentsToSearch(final Segments<S> segments, final long from, final long to, final boolean forward) {
-        return segments.segments(from, to, forward);
+        throw new UnsupportedOperationException();
+    }
+
+    public static Bytes toStoreKeyBinary(final Bytes key,

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #10537: KAFKA-10847: Delete Time-ordered duplicated records using deleteRange() internally

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #10537:
URL: https://github.com/apache/kafka/pull/10537#issuecomment-821576713


   Thanks @spena , I will merge after green builds.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] spena commented on pull request #10537: KAFKA-10847: Delete Time-ordered duplicated records using deleteRange() internally

Posted by GitBox <gi...@apache.org>.
spena commented on pull request #10537:
URL: https://github.com/apache/kafka/pull/10537#issuecomment-821561295


   Thanks @guozhangwang , I applied the changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org