You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "vcrfxia (via GitHub)" <gi...@apache.org> on 2023/02/14 01:17:48 UTC

[GitHub] [kafka] vcrfxia commented on a diff in pull request #13243: KAFKA-14491: [7/N] Enforce strict grace period for versioned stores

vcrfxia commented on code in PR #13243:
URL: https://github.com/apache/kafka/pull/13243#discussion_r1105189382


##########
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java:
##########
@@ -523,6 +537,63 @@ public void shouldRestoreMultipleBatches() {
         verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 15);
     }
 
+    @Test
+    public void shouldNotRestoreExpired() {
+        final List<DataRecord> records = new ArrayList<>();
+        records.add(new DataRecord("k", "v", HISTORY_RETENTION + 10));
+        records.add(new DataRecord("k1", "v1", HISTORY_RETENTION + 10 - GRACE_PERIOD)); // grace period has not elapsed
+        records.add(new DataRecord("k2", "v2", HISTORY_RETENTION + 9 - GRACE_PERIOD)); // grace period has elapsed, so this record should not be restored
+
+        store.restoreBatch(getChangelogRecords(records));
+
+        verifyGetValueFromStore("k", "v", HISTORY_RETENTION + 10);
+        verifyGetValueFromStore("k1", "v1", HISTORY_RETENTION + 10 - GRACE_PERIOD);
+        verifyGetNullFromStore("k2");
+    }
+
+    @Test
+    public void shouldRestoreEvenIfRecordWouldBeExpiredByEndOfBatch() {
+        final List<DataRecord> records = new ArrayList<>();
+        records.add(new DataRecord("k2", "v2", HISTORY_RETENTION - GRACE_PERIOD)); // this record will be older than grace period by the end of the batch, but should still be restored
+        records.add(new DataRecord("k", "v", HISTORY_RETENTION + 10));
+
+        store.restoreBatch(getChangelogRecords(records));
+
+        verifyGetValueFromStore("k2", "v2", HISTORY_RETENTION - GRACE_PERIOD);
+        verifyGetValueFromStore("k", "v", HISTORY_RETENTION + 10);
+    }
+
+    @Test
+    public void shouldAllowZeroHistoryRetention() {

Review Comment:
   Added this extra test in response to a previous PR review comment. 
   
   This is an interesting edge case in that if history retention = grace period = 0, then we don't actually need the segments store because grace period = 0 means we don't need to store tombstones. (Even if a tombstone is the latest value for a given key, the store will never accept earlier writes so the store doesn't need to keep the tombstone after clearing the current value for the key.)
   
   Is it worth it to add extra code to remove the segments store in this case? My instinct says no because this case does not seem very practical. For a user to use grace period = 0 requires that they are confident that all records within a partition, even across keys, are produced in ascending (technically, non-decreasing) timestamp order. I'm not sure how many use cases meet this criterion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org