You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "vcrfxia (via GitHub)" <gi...@apache.org> on 2023/03/17 19:33:00 UTC

[GitHub] [kafka] vcrfxia commented on a diff in pull request #13409: KAFKA-14491: [18/N] Update versioned store to check latest value on timestamped get

vcrfxia commented on code in PR #13409:
URL: https://github.com/apache/kafka/pull/13409#discussion_r1140636734


##########
streams/src/main/java/org/apache/kafka/streams/state/VersionedKeyValueStore.java:
##########
@@ -59,12 +59,15 @@
      * Delete the value associated with this key from the store, at the specified timestamp
      * (if there is such a value), and return the deleted value.
      * <p>
-     * This operation is semantically equivalent to {@link #get(Object, long) #get(key, timestamp)}
-     * followed by {@link #put(Object, Object, long) #put(key, null, timestamp)}.
-     * <p>
      * If the timestamp associated with this deletion is older than the store's grace period
      * (i.e., history retention) relative to the current observed stream time, then the deletion
-     * will not be performed.
+     * will not be performed and {@code null} will be returned.
+     * <p>
+     * This operation is semantically equivalent to {@link #get(Object, long) #get(key, timestamp)}
+     * followed by {@link #put(Object, Object, long) #put(key, null, timestamp)}, with
+     * a caveat that if the deletion timestamp is older than the store's grace period
+     * (i.e., history retention) then the return value is always {@code null}, regardless
+     * of what {@link #get(Object, long) #get(key, timestamp)} would return.
      *

Review Comment:
   Added. LMK what you think.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -156,9 +171,22 @@ public VersionedRecord<byte[]> get(final Bytes key) {
     public VersionedRecord<byte[]> get(final Bytes key, final long asOfTimestamp) {
 
         if (asOfTimestamp < observedStreamTime - historyRetention) {
-            LOG.warn("Returning null for expired get.");
+            // history retention exceeded. check latest value store only

Review Comment:
   Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org