You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "mjsax (via GitHub)" <gi...@apache.org> on 2023/02/14 23:39:35 UTC

[GitHub] [kafka] mjsax commented on a diff in pull request #13243: KAFKA-14491: [7/N] Enforce strict grace period for versioned stores

mjsax commented on code in PR #13243:
URL: https://github.com/apache/kafka/pull/13243#discussion_r1106482112


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -283,7 +293,12 @@ public void init(final StateStoreContext context, final StateStore root) {
 
     // VisibleForTesting
     void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> records) {
-        // advance stream time to the max timestamp in the batch
+        // copy the observed stream time, for use in deciding whether to drop records during restore,
+        // when records have exceeded the store's grace period.
+        long streamTimeForRestore = observedStreamTime;

Review Comment:
   Wondering if this would be correct?
   
   If we have `st = 100`, `grace=10` and we do `put(k,v,95)` the put is correct. If we restore at `st=110`, the would still need to keep `k,v` and not drop it, even if it's timestamp 95 is now "too old"?



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java:
##########
@@ -523,6 +537,63 @@ public void shouldRestoreMultipleBatches() {
         verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 15);
     }
 
+    @Test
+    public void shouldNotRestoreExpired() {
+        final List<DataRecord> records = new ArrayList<>();
+        records.add(new DataRecord("k", "v", HISTORY_RETENTION + 10));
+        records.add(new DataRecord("k1", "v1", HISTORY_RETENTION + 10 - GRACE_PERIOD)); // grace period has not elapsed
+        records.add(new DataRecord("k2", "v2", HISTORY_RETENTION + 9 - GRACE_PERIOD)); // grace period has elapsed, so this record should not be restored
+
+        store.restoreBatch(getChangelogRecords(records));
+
+        verifyGetValueFromStore("k", "v", HISTORY_RETENTION + 10);
+        verifyGetValueFromStore("k1", "v1", HISTORY_RETENTION + 10 - GRACE_PERIOD);
+        verifyGetNullFromStore("k2");
+    }
+
+    @Test
+    public void shouldRestoreEvenIfRecordWouldBeExpiredByEndOfBatch() {
+        final List<DataRecord> records = new ArrayList<>();
+        records.add(new DataRecord("k2", "v2", HISTORY_RETENTION - GRACE_PERIOD)); // this record will be older than grace period by the end of the batch, but should still be restored
+        records.add(new DataRecord("k", "v", HISTORY_RETENTION + 10));
+
+        store.restoreBatch(getChangelogRecords(records));
+
+        verifyGetValueFromStore("k2", "v2", HISTORY_RETENTION - GRACE_PERIOD);
+        verifyGetValueFromStore("k", "v", HISTORY_RETENTION + 10);
+    }
+
+    @Test
+    public void shouldAllowZeroHistoryRetention() {
+        // recreate store with zero history retention
+        store.close();
+        store = new RocksDBVersionedStore(STORE_NAME, METRICS_SCOPE, 0L, SEGMENT_INTERVAL);
+        store.init((StateStoreContext) context, store);
+
+        // put, get, and delete
+        putToStore("k", "v", BASE_TIMESTAMP);
+        verifyGetValueFromStore("k", "v", BASE_TIMESTAMP);
+        verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "v", BASE_TIMESTAMP);
+        verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 1, "v", BASE_TIMESTAMP); // query in "future" is allowed
+
+        // update existing record at same timestamp
+        putToStore("k", "updated", BASE_TIMESTAMP);
+        verifyGetValueFromStore("k", "updated", BASE_TIMESTAMP);
+        verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "updated", BASE_TIMESTAMP);
+
+        // put new record version
+        putToStore("k", "v2", BASE_TIMESTAMP + 2);
+        verifyGetValueFromStore("k", "v2", BASE_TIMESTAMP + 2);
+        verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 2, "v2", BASE_TIMESTAMP + 2);
+
+        // query in past (history retention expired) returns null
+        verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP + 1);
+
+        // put in past (grace period expired) does not update the store

Review Comment:
   Should we also test put-in-past-for-existing record?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -297,6 +312,12 @@ void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> records) {
         // records into memory. how high this memory amplification will be is very much dependent
         // on the specific workload and the value of the "segment interval" parameter.
         for (final ConsumerRecord<byte[], byte[]> record : records) {
+            if (record.timestamp() < streamTimeForRestore - gracePeriod) {
+                // record is older than grace period and was therefore never written to the store

Review Comment:
   If it was never written to the store, if should also not be in the changelog topic?
   
   This might still be useful if we read from the input topic for a KTable I guess? But we might want to update the JavaDoc for to mention this case?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -615,11 +631,10 @@ private <T extends VersionedStoreSegment> PutStatus maybePutToSegments(
                 }
 
                 if (foundMinTs < observedStreamTime - historyRetention) {
-                    // the record being inserted does not affect version history. discard and return
-                    if (expiredRecordSensor.isPresent()) {
-                        expiredRecordSensor.get().record(1.0d, context.currentSystemTimeMs());
-                        LOG.warn("Skipping record for expired put.");
-                    }
+                    // the record being inserted does not affect version history. discard and return.

Review Comment:
   Not sure if I can follow. Why did we record this in the sensor first, but not any longer?
   
   Same below (2x).



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java:
##########
@@ -523,6 +537,63 @@ public void shouldRestoreMultipleBatches() {
         verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 15);
     }
 
+    @Test
+    public void shouldNotRestoreExpired() {
+        final List<DataRecord> records = new ArrayList<>();
+        records.add(new DataRecord("k", "v", HISTORY_RETENTION + 10));
+        records.add(new DataRecord("k1", "v1", HISTORY_RETENTION + 10 - GRACE_PERIOD)); // grace period has not elapsed
+        records.add(new DataRecord("k2", "v2", HISTORY_RETENTION + 9 - GRACE_PERIOD)); // grace period has elapsed, so this record should not be restored

Review Comment:
   Cf comment above. The question seems to be "when" the original `put()` happened with regard to stream-time?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org