You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "vcrfxia (via GitHub)" <gi...@apache.org> on 2023/02/14 01:06:41 UTC

[GitHub] [kafka] vcrfxia opened a new pull request, #13243: KAFKA-14491: [7/N] Enforce strict grace period for versioned stores

vcrfxia opened a new pull request, #13243:
URL: https://github.com/apache/kafka/pull/13243

   (This PR is stacked on https://github.com/apache/kafka/pull/13189. Only the last commit needs to be reviewed separately.)
   
   The RocksDB-based implementation for versioned key-value stores introduced in https://github.com/apache/kafka/pull/13188 has a well-defined "history retention" parameter which specifies how far back in time (relative to the current observed stream time) reads may take place, but there is no well-defined equivalent (aka "grace period") for how far back in time writes will be accepted. Instead, there is an implicit grace period whereby the store accepts all writes which affect valid reads. This doesn't quite work, though, because it requires infinite tombstone retention when the latest value for a particular key is a tombstone -- if the latest value for a key is a very old tombstone, we can’t expire it because if there’s an even older non-null put to store later, then without the tombstone we’ll accept this write as the latest value for the key, even though it isn't.
   
   In light of this, this PR changes the versioned store semantics to define an explicit "grace period" property. ([KIP-889](https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores) has been updated accordingly.) For now, grace period will always be equal to the history retention, though in the future we can introduce a new KIP to expose options to configure grace period separately. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13243: KAFKA-14491: [7/N] Enforce strict grace period for versioned stores

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13243:
URL: https://github.com/apache/kafka/pull/13243#discussion_r1107567122


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -283,7 +293,12 @@ public void init(final StateStoreContext context, final StateStore root) {
 
     // VisibleForTesting
     void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> records) {
-        // advance stream time to the max timestamp in the batch
+        // copy the observed stream time, for use in deciding whether to drop records during restore,
+        // when records have exceeded the store's grace period.
+        long streamTimeForRestore = observedStreamTime;

Review Comment:
   > I guess the question is, what is the value of `observedStreamTime` when we start the restore? Are you saying it's `-1` and we basically "replay" `observedStreamTime` during restore?
   
   Yes, that's exactly right. `observedStreamTime` is tracked locally per store. It is initialized to `-1` and only updated on `put()` or during restore. (This is the same as the existing behavior for window stores today.)
   
   > Maybe best to update some variable names?
   
   Are you proposing that `doPut()` takes stream time as a parameter, so that during normal `put()` operation we pass `observedStreamTime` and during restore we pass `endOfBatchStreamTime`, which means we can rename `streamTimeForRestore` to be `observedStreamTime` instead? This SGTM, just want to check whether that's also what you have in mind, since we removed a number of parameters from `doPut()` in a previous PR revision in order to keep the parameter list small.
   
   > I guess follow up work (independent for this KIP) might be, to actually make use of KS runtime streamTime instead of tracking inside the store, and thus won't need `observedStreamTime` any longer, as we could look ahead to the "end-of-restore stream-time" (not just "end-of batch").
   
   What's the scope of the "streamTime" which is tracked by the KS runtime? Is it per-task? Per-processor? Global? I'm wondering how this would work in situations with multiple partitions, or with multiple processors where some processors are expected to see new data earlier than other (downstream) processors.
   
   I guess we'd also need to implement the change from your other comment about not writing records which are expired (based on grace period) into the changelog topic first before we can make this change, otherwise we would not have a way to determine during restore whether records are expired or not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13243: KAFKA-14491: [7/N] Enforce strict grace period for versioned stores

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13243:
URL: https://github.com/apache/kafka/pull/13243#discussion_r1106482112


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -283,7 +293,12 @@ public void init(final StateStoreContext context, final StateStore root) {
 
     // VisibleForTesting
     void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> records) {
-        // advance stream time to the max timestamp in the batch
+        // copy the observed stream time, for use in deciding whether to drop records during restore,
+        // when records have exceeded the store's grace period.
+        long streamTimeForRestore = observedStreamTime;

Review Comment:
   Wondering if this would be correct?
   
   If we have `st = 100`, `grace=10` and we do `put(k,v,95)` the put is correct. If we restore at `st=110`, the would still need to keep `k,v` and not drop it, even if it's timestamp 95 is now "too old"?



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java:
##########
@@ -523,6 +537,63 @@ public void shouldRestoreMultipleBatches() {
         verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 15);
     }
 
+    @Test
+    public void shouldNotRestoreExpired() {
+        final List<DataRecord> records = new ArrayList<>();
+        records.add(new DataRecord("k", "v", HISTORY_RETENTION + 10));
+        records.add(new DataRecord("k1", "v1", HISTORY_RETENTION + 10 - GRACE_PERIOD)); // grace period has not elapsed
+        records.add(new DataRecord("k2", "v2", HISTORY_RETENTION + 9 - GRACE_PERIOD)); // grace period has elapsed, so this record should not be restored
+
+        store.restoreBatch(getChangelogRecords(records));
+
+        verifyGetValueFromStore("k", "v", HISTORY_RETENTION + 10);
+        verifyGetValueFromStore("k1", "v1", HISTORY_RETENTION + 10 - GRACE_PERIOD);
+        verifyGetNullFromStore("k2");
+    }
+
+    @Test
+    public void shouldRestoreEvenIfRecordWouldBeExpiredByEndOfBatch() {
+        final List<DataRecord> records = new ArrayList<>();
+        records.add(new DataRecord("k2", "v2", HISTORY_RETENTION - GRACE_PERIOD)); // this record will be older than grace period by the end of the batch, but should still be restored
+        records.add(new DataRecord("k", "v", HISTORY_RETENTION + 10));
+
+        store.restoreBatch(getChangelogRecords(records));
+
+        verifyGetValueFromStore("k2", "v2", HISTORY_RETENTION - GRACE_PERIOD);
+        verifyGetValueFromStore("k", "v", HISTORY_RETENTION + 10);
+    }
+
+    @Test
+    public void shouldAllowZeroHistoryRetention() {
+        // recreate store with zero history retention
+        store.close();
+        store = new RocksDBVersionedStore(STORE_NAME, METRICS_SCOPE, 0L, SEGMENT_INTERVAL);
+        store.init((StateStoreContext) context, store);
+
+        // put, get, and delete
+        putToStore("k", "v", BASE_TIMESTAMP);
+        verifyGetValueFromStore("k", "v", BASE_TIMESTAMP);
+        verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "v", BASE_TIMESTAMP);
+        verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 1, "v", BASE_TIMESTAMP); // query in "future" is allowed
+
+        // update existing record at same timestamp
+        putToStore("k", "updated", BASE_TIMESTAMP);
+        verifyGetValueFromStore("k", "updated", BASE_TIMESTAMP);
+        verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "updated", BASE_TIMESTAMP);
+
+        // put new record version
+        putToStore("k", "v2", BASE_TIMESTAMP + 2);
+        verifyGetValueFromStore("k", "v2", BASE_TIMESTAMP + 2);
+        verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 2, "v2", BASE_TIMESTAMP + 2);
+
+        // query in past (history retention expired) returns null
+        verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP + 1);
+
+        // put in past (grace period expired) does not update the store

Review Comment:
   Should we also test put-in-past-for-existing record?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -297,6 +312,12 @@ void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> records) {
         // records into memory. how high this memory amplification will be is very much dependent
         // on the specific workload and the value of the "segment interval" parameter.
         for (final ConsumerRecord<byte[], byte[]> record : records) {
+            if (record.timestamp() < streamTimeForRestore - gracePeriod) {
+                // record is older than grace period and was therefore never written to the store

Review Comment:
   If it was never written to the store, if should also not be in the changelog topic?
   
   This might still be useful if we read from the input topic for a KTable I guess? But we might want to update the JavaDoc for to mention this case?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -615,11 +631,10 @@ private <T extends VersionedStoreSegment> PutStatus maybePutToSegments(
                 }
 
                 if (foundMinTs < observedStreamTime - historyRetention) {
-                    // the record being inserted does not affect version history. discard and return
-                    if (expiredRecordSensor.isPresent()) {
-                        expiredRecordSensor.get().record(1.0d, context.currentSystemTimeMs());
-                        LOG.warn("Skipping record for expired put.");
-                    }
+                    // the record being inserted does not affect version history. discard and return.

Review Comment:
   Not sure if I can follow. Why did we record this in the sensor first, but not any longer?
   
   Same below (2x).



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java:
##########
@@ -523,6 +537,63 @@ public void shouldRestoreMultipleBatches() {
         verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 15);
     }
 
+    @Test
+    public void shouldNotRestoreExpired() {
+        final List<DataRecord> records = new ArrayList<>();
+        records.add(new DataRecord("k", "v", HISTORY_RETENTION + 10));
+        records.add(new DataRecord("k1", "v1", HISTORY_RETENTION + 10 - GRACE_PERIOD)); // grace period has not elapsed
+        records.add(new DataRecord("k2", "v2", HISTORY_RETENTION + 9 - GRACE_PERIOD)); // grace period has elapsed, so this record should not be restored

Review Comment:
   Cf comment above. The question seems to be "when" the original `put()` happened with regard to stream-time?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13243: KAFKA-14491: [7/N] Enforce strict grace period for versioned stores

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13243:
URL: https://github.com/apache/kafka/pull/13243#discussion_r1107595402


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -297,6 +312,12 @@ void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> records) {
         // records into memory. how high this memory amplification will be is very much dependent
         // on the specific workload and the value of the "segment interval" parameter.
         for (final ConsumerRecord<byte[], byte[]> record : records) {
+            if (record.timestamp() < streamTimeForRestore - gracePeriod) {
+                // record is older than grace period and was therefore never written to the store

Review Comment:
   Sounds good. Here's the ticket: https://issues.apache.org/jira/browse/KAFKA-14723



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13243: KAFKA-14491: [7/N] Enforce strict grace period for versioned stores

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13243:
URL: https://github.com/apache/kafka/pull/13243#discussion_r1107672610


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -283,7 +293,12 @@ public void init(final StateStoreContext context, final StateStore root) {
 
     // VisibleForTesting
     void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> records) {
-        // advance stream time to the max timestamp in the batch
+        // copy the observed stream time, for use in deciding whether to drop records during restore,
+        // when records have exceeded the store's grace period.
+        long streamTimeForRestore = observedStreamTime;

Review Comment:
   > Are you proposing that doPut() takes stream time as a parameter, so that during normal put() operation we pass observedStreamTime and during restore we pass endOfBatchStreamTime, which means we can rename streamTimeForRestore to be observedStreamTime instead?
   
   Went ahead and made this update in the latest commit. Can revise if it's not what you had envisioned.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13243: KAFKA-14491: [7/N] Enforce strict grace period for versioned stores

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13243:
URL: https://github.com/apache/kafka/pull/13243#discussion_r1107711967


##########
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java:
##########
@@ -523,6 +537,63 @@ public void shouldRestoreMultipleBatches() {
         verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 15);
     }
 
+    @Test
+    public void shouldNotRestoreExpired() {
+        final List<DataRecord> records = new ArrayList<>();
+        records.add(new DataRecord("k", "v", HISTORY_RETENTION + 10));
+        records.add(new DataRecord("k1", "v1", HISTORY_RETENTION + 10 - GRACE_PERIOD)); // grace period has not elapsed
+        records.add(new DataRecord("k2", "v2", HISTORY_RETENTION + 9 - GRACE_PERIOD)); // grace period has elapsed, so this record should not be restored
+
+        store.restoreBatch(getChangelogRecords(records));
+
+        verifyGetValueFromStore("k", "v", HISTORY_RETENTION + 10);
+        verifyGetValueFromStore("k1", "v1", HISTORY_RETENTION + 10 - GRACE_PERIOD);
+        verifyGetNullFromStore("k2");
+    }
+
+    @Test
+    public void shouldRestoreEvenIfRecordWouldBeExpiredByEndOfBatch() {
+        final List<DataRecord> records = new ArrayList<>();
+        records.add(new DataRecord("k2", "v2", HISTORY_RETENTION - GRACE_PERIOD)); // this record will be older than grace period by the end of the batch, but should still be restored
+        records.add(new DataRecord("k", "v", HISTORY_RETENTION + 10));
+
+        store.restoreBatch(getChangelogRecords(records));
+
+        verifyGetValueFromStore("k2", "v2", HISTORY_RETENTION - GRACE_PERIOD);
+        verifyGetValueFromStore("k", "v", HISTORY_RETENTION + 10);
+    }
+
+    @Test
+    public void shouldAllowZeroHistoryRetention() {
+        // recreate store with zero history retention
+        store.close();
+        store = new RocksDBVersionedStore(STORE_NAME, METRICS_SCOPE, 0L, SEGMENT_INTERVAL);
+        store.init((StateStoreContext) context, store);
+
+        // put, get, and delete
+        putToStore("k", "v", BASE_TIMESTAMP);
+        verifyGetValueFromStore("k", "v", BASE_TIMESTAMP);
+        verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "v", BASE_TIMESTAMP);
+        verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 1, "v", BASE_TIMESTAMP); // query in "future" is allowed
+
+        // update existing record at same timestamp
+        putToStore("k", "updated", BASE_TIMESTAMP);
+        verifyGetValueFromStore("k", "updated", BASE_TIMESTAMP);
+        verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "updated", BASE_TIMESTAMP);
+
+        // put new record version
+        putToStore("k", "v2", BASE_TIMESTAMP + 2);
+        verifyGetValueFromStore("k", "v2", BASE_TIMESTAMP + 2);
+        verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 2, "v2", BASE_TIMESTAMP + 2);
+
+        // query in past (history retention expired) returns null
+        verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP + 1);
+
+        // put in past (grace period expired) does not update the store

Review Comment:
   Just see you added the test. Does not hurt to keep it. (We should not write test base on knowing how the implemenation works, but rather treat it as a "black box").



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax merged pull request #13243: KAFKA-14491: [7/N] Enforce strict grace period for versioned stores

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax merged PR #13243:
URL: https://github.com/apache/kafka/pull/13243


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13243: KAFKA-14491: [7/N] Enforce strict grace period for versioned stores

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13243:
URL: https://github.com/apache/kafka/pull/13243#discussion_r1106532591


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -297,6 +312,12 @@ void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> records) {
         // records into memory. how high this memory amplification will be is very much dependent
         // on the specific workload and the value of the "segment interval" parameter.
         for (final ConsumerRecord<byte[], byte[]> record : records) {
+            if (record.timestamp() < streamTimeForRestore - gracePeriod) {
+                // record is older than grace period and was therefore never written to the store

Review Comment:
   > If it was never written to the store, if should also not be in the changelog topic?
   
   Ideally, but unfortunately no. Only the inner layer (RocksDBVersionedStore) contains logic for deciding when grace period has elapsed and a call to `put()` should return without updating the store. The changelogging layer wrapped around this inner layer does not know about grace period, nor do any of the other outer layers. The changelogging layer does call `put()` before calling `log()`, but because `put()` has no return type, it does not convey information about whether an update was actually made or if `put()` simply returned without doing anything. So, the changelogging layer calls `log()` in either case.
   
   This is the existing behavior for window stores, and what I had planned to replicate for versioned stores as well. If we don't want this, we could:
   * update `put()` to return a boolean, indicating whether the update was actually performed, or
   * track observed stream time and grace period at an outer store layer, in order to not call `log()` at the changelogging layer if it's not needed.
   
   I don't particularly like either option. Curious to hear your thoughts.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -615,11 +631,10 @@ private <T extends VersionedStoreSegment> PutStatus maybePutToSegments(
                 }
 
                 if (foundMinTs < observedStreamTime - historyRetention) {
-                    // the record being inserted does not affect version history. discard and return
-                    if (expiredRecordSensor.isPresent()) {
-                        expiredRecordSensor.get().record(1.0d, context.currentSystemTimeMs());
-                        LOG.warn("Skipping record for expired put.");
-                    }
+                    // the record being inserted does not affect version history. discard and return.

Review Comment:
   With the changes in this PR, it is only possible to hit this case during restore now. Previously, we passed `Optional.empty()` for the expiredRecordSensor anyway, because we don't want to call the sensor during restore. So I've simplified the code by removing it entirely.
   
   The reason it is not possible to hit this case during non-restore is because `doPut()` is not called if the record being put is older than grace period, and history retention is always at least as large as grace period. (See my comment above for why it is still possible to hit this case during restore.)



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java:
##########
@@ -523,6 +537,63 @@ public void shouldRestoreMultipleBatches() {
         verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 15);
     }
 
+    @Test
+    public void shouldNotRestoreExpired() {
+        final List<DataRecord> records = new ArrayList<>();
+        records.add(new DataRecord("k", "v", HISTORY_RETENTION + 10));
+        records.add(new DataRecord("k1", "v1", HISTORY_RETENTION + 10 - GRACE_PERIOD)); // grace period has not elapsed
+        records.add(new DataRecord("k2", "v2", HISTORY_RETENTION + 9 - GRACE_PERIOD)); // grace period has elapsed, so this record should not be restored
+
+        store.restoreBatch(getChangelogRecords(records));
+
+        verifyGetValueFromStore("k", "v", HISTORY_RETENTION + 10);
+        verifyGetValueFromStore("k1", "v1", HISTORY_RETENTION + 10 - GRACE_PERIOD);
+        verifyGetNullFromStore("k2");
+    }
+
+    @Test
+    public void shouldRestoreEvenIfRecordWouldBeExpiredByEndOfBatch() {
+        final List<DataRecord> records = new ArrayList<>();
+        records.add(new DataRecord("k2", "v2", HISTORY_RETENTION - GRACE_PERIOD)); // this record will be older than grace period by the end of the batch, but should still be restored
+        records.add(new DataRecord("k", "v", HISTORY_RETENTION + 10));
+
+        store.restoreBatch(getChangelogRecords(records));
+
+        verifyGetValueFromStore("k2", "v2", HISTORY_RETENTION - GRACE_PERIOD);
+        verifyGetValueFromStore("k", "v", HISTORY_RETENTION + 10);
+    }
+
+    @Test
+    public void shouldAllowZeroHistoryRetention() {
+        // recreate store with zero history retention
+        store.close();
+        store = new RocksDBVersionedStore(STORE_NAME, METRICS_SCOPE, 0L, SEGMENT_INTERVAL);
+        store.init((StateStoreContext) context, store);
+
+        // put, get, and delete
+        putToStore("k", "v", BASE_TIMESTAMP);
+        verifyGetValueFromStore("k", "v", BASE_TIMESTAMP);
+        verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "v", BASE_TIMESTAMP);
+        verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 1, "v", BASE_TIMESTAMP); // query in "future" is allowed
+
+        // update existing record at same timestamp
+        putToStore("k", "updated", BASE_TIMESTAMP);
+        verifyGetValueFromStore("k", "updated", BASE_TIMESTAMP);
+        verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "updated", BASE_TIMESTAMP);
+
+        // put new record version
+        putToStore("k", "v2", BASE_TIMESTAMP + 2);
+        verifyGetValueFromStore("k", "v2", BASE_TIMESTAMP + 2);
+        verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 2, "v2", BASE_TIMESTAMP + 2);
+
+        // query in past (history retention expired) returns null
+        verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP + 1);
+
+        // put in past (grace period expired) does not update the store

Review Comment:
   Sure, I can add that. I was worried that the test case was already getting a bit long :)



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java:
##########
@@ -523,6 +537,63 @@ public void shouldRestoreMultipleBatches() {
         verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 15);
     }
 
+    @Test
+    public void shouldNotRestoreExpired() {
+        final List<DataRecord> records = new ArrayList<>();
+        records.add(new DataRecord("k", "v", HISTORY_RETENTION + 10));
+        records.add(new DataRecord("k1", "v1", HISTORY_RETENTION + 10 - GRACE_PERIOD)); // grace period has not elapsed
+        records.add(new DataRecord("k2", "v2", HISTORY_RETENTION + 9 - GRACE_PERIOD)); // grace period has elapsed, so this record should not be restored

Review Comment:
   That's correct. This test case uses the same data as `shouldNotPutExpired()` above. This third record is expired even during normal put operations.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -283,7 +293,12 @@ public void init(final StateStoreContext context, final StateStore root) {
 
     // VisibleForTesting
     void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> records) {
-        // advance stream time to the max timestamp in the batch
+        // copy the observed stream time, for use in deciding whether to drop records during restore,
+        // when records have exceeded the store's grace period.
+        long streamTimeForRestore = observedStreamTime;

Review Comment:
   Yeah this logic is pretty nuanced. (I tried to clarify in the comments but evidently not successfully.)
   
   The `doPut()` method is not responsible for deciding when a put is too old (according to grace period); that check happens before `doPut()` is called. Inside the `doPut()` method, however, `observedStreamTime` is still used to decide when old records have fallen out of history retention. If a record has fallen out of history retention, then we don't need to keep it in the store, and therefore `doPut()` returns.
   
   In this restore logic here, `streamTimeForRestore` is used to perform the grace period check. It would be incorrect to advance `streamTimeForRestore` at once for the entire batch, for the reason you gave above. In your example, we do still want to call `doPut()` for the record with `ts=95`. Assuming that is the first record in the restore batch, then `streamTimeForRestore=100` so `ts=95` and we call `doPut()` as we should. Only once we reach the later records in the restore batch will `streamTimeForRestore` be advanced past 100. 
   
   OTOH, `observedStreamTime` can be advanced to the end of the batch right away. This allows us to optimize situations where, for example, a record near the beginning of the restore batch which we would put into the store would be immediately expired (based on history retention) by the end of the restore batch, and therefore we can skip putting it in inside `doPut()`. Here's an example:
   * stream time is 50 at the start of the restore batch
   * segment interval is 25
   * stream time will be 100 by the end of the restore batch
   * restore batch contains a record `(k, v, 50)` and also `(k, v, 60)`.
   
   During restore when we see `(k, v, 50)`, we have to put it into the store (it's the latest value for the key so far). Then when we see `(k, v, 60)`, we also have to put it into the store (it's the new latest value) but we do NOT have to move `(k, v, 50)` into a segment store, because the segment that it would be moved into will be expired by the end of the restore process.
   
   Here's another example: exact same as above, but the restore batch contains `(k, v, 60)` before `(k, v, 50)`, instead of after. When we see `(k, v, 60)` we have to put it into the store. When we see `(k, v, 50)`, we still call `doPut()` because it's not expired based on grace period, but `doPut()` will see that it is expired based on history retention (using `observedStreamTime=100`, the value it will be by the end of the restore batch) and therefore `doPut()`returns without inserting into the store.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13243: KAFKA-14491: [7/N] Enforce strict grace period for versioned stores

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13243:
URL: https://github.com/apache/kafka/pull/13243#discussion_r1107613538


##########
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java:
##########
@@ -523,6 +537,63 @@ public void shouldRestoreMultipleBatches() {
         verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 15);
     }
 
+    @Test
+    public void shouldNotRestoreExpired() {
+        final List<DataRecord> records = new ArrayList<>();
+        records.add(new DataRecord("k", "v", HISTORY_RETENTION + 10));
+        records.add(new DataRecord("k1", "v1", HISTORY_RETENTION + 10 - GRACE_PERIOD)); // grace period has not elapsed
+        records.add(new DataRecord("k2", "v2", HISTORY_RETENTION + 9 - GRACE_PERIOD)); // grace period has elapsed, so this record should not be restored
+
+        store.restoreBatch(getChangelogRecords(records));
+
+        verifyGetValueFromStore("k", "v", HISTORY_RETENTION + 10);
+        verifyGetValueFromStore("k1", "v1", HISTORY_RETENTION + 10 - GRACE_PERIOD);
+        verifyGetNullFromStore("k2");
+    }
+
+    @Test
+    public void shouldRestoreEvenIfRecordWouldBeExpiredByEndOfBatch() {
+        final List<DataRecord> records = new ArrayList<>();
+        records.add(new DataRecord("k2", "v2", HISTORY_RETENTION - GRACE_PERIOD)); // this record will be older than grace period by the end of the batch, but should still be restored
+        records.add(new DataRecord("k", "v", HISTORY_RETENTION + 10));
+
+        store.restoreBatch(getChangelogRecords(records));
+
+        verifyGetValueFromStore("k2", "v2", HISTORY_RETENTION - GRACE_PERIOD);
+        verifyGetValueFromStore("k", "v", HISTORY_RETENTION + 10);
+    }
+
+    @Test
+    public void shouldAllowZeroHistoryRetention() {
+        // recreate store with zero history retention
+        store.close();
+        store = new RocksDBVersionedStore(STORE_NAME, METRICS_SCOPE, 0L, SEGMENT_INTERVAL);
+        store.init((StateStoreContext) context, store);
+
+        // put, get, and delete
+        putToStore("k", "v", BASE_TIMESTAMP);
+        verifyGetValueFromStore("k", "v", BASE_TIMESTAMP);
+        verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "v", BASE_TIMESTAMP);
+        verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 1, "v", BASE_TIMESTAMP); // query in "future" is allowed
+
+        // update existing record at same timestamp
+        putToStore("k", "updated", BASE_TIMESTAMP);
+        verifyGetValueFromStore("k", "updated", BASE_TIMESTAMP);
+        verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "updated", BASE_TIMESTAMP);
+
+        // put new record version
+        putToStore("k", "v2", BASE_TIMESTAMP + 2);
+        verifyGetValueFromStore("k", "v2", BASE_TIMESTAMP + 2);
+        verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 2, "v2", BASE_TIMESTAMP + 2);
+
+        // query in past (history retention expired) returns null
+        verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP + 1);
+
+        // put in past (grace period expired) does not update the store

Review Comment:
   Hm, just realized it's not possible to add this case in a meaningful way. Suppose observed stream time is `t` and we put-in-past for an existing key at time `t-1`. We cannot query for the value of the key at time `t-1` because that is outside history retention. And if we query for the latest value of the key, then we'll get the record at time `t` regardless of whether the put at time `t-1` was properly rejected or not.
   
   We'd have to query the inner store in order to perform this check, which feels like overkill. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13243: KAFKA-14491: [7/N] Enforce strict grace period for versioned stores

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13243:
URL: https://github.com/apache/kafka/pull/13243#discussion_r1105189382


##########
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java:
##########
@@ -523,6 +537,63 @@ public void shouldRestoreMultipleBatches() {
         verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 15);
     }
 
+    @Test
+    public void shouldNotRestoreExpired() {
+        final List<DataRecord> records = new ArrayList<>();
+        records.add(new DataRecord("k", "v", HISTORY_RETENTION + 10));
+        records.add(new DataRecord("k1", "v1", HISTORY_RETENTION + 10 - GRACE_PERIOD)); // grace period has not elapsed
+        records.add(new DataRecord("k2", "v2", HISTORY_RETENTION + 9 - GRACE_PERIOD)); // grace period has elapsed, so this record should not be restored
+
+        store.restoreBatch(getChangelogRecords(records));
+
+        verifyGetValueFromStore("k", "v", HISTORY_RETENTION + 10);
+        verifyGetValueFromStore("k1", "v1", HISTORY_RETENTION + 10 - GRACE_PERIOD);
+        verifyGetNullFromStore("k2");
+    }
+
+    @Test
+    public void shouldRestoreEvenIfRecordWouldBeExpiredByEndOfBatch() {
+        final List<DataRecord> records = new ArrayList<>();
+        records.add(new DataRecord("k2", "v2", HISTORY_RETENTION - GRACE_PERIOD)); // this record will be older than grace period by the end of the batch, but should still be restored
+        records.add(new DataRecord("k", "v", HISTORY_RETENTION + 10));
+
+        store.restoreBatch(getChangelogRecords(records));
+
+        verifyGetValueFromStore("k2", "v2", HISTORY_RETENTION - GRACE_PERIOD);
+        verifyGetValueFromStore("k", "v", HISTORY_RETENTION + 10);
+    }
+
+    @Test
+    public void shouldAllowZeroHistoryRetention() {

Review Comment:
   Added this extra test in response to a previous PR review comment. 
   
   This is an interesting edge case in that if history retention = grace period = 0, then we don't actually need the segments store because grace period = 0 means we don't need to store tombstones. (Even if a tombstone is the latest value for a given key, the store will never accept earlier writes so the store doesn't need to keep the tombstone after clearing the current value for the key.)
   
   Is it worth it to add extra code to remove the segments store in this case? My instinct says no because this case does not seem very practical. For a user to use grace period = 0 requires that they are confident that all records within a partition, even across keys, are produced in ascending (technically, non-decreasing) timestamp order. I'm not sure how many use cases meet this criterion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13243: KAFKA-14491: [7/N] Enforce strict grace period for versioned stores

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13243:
URL: https://github.com/apache/kafka/pull/13243#discussion_r1106567105


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -283,7 +293,12 @@ public void init(final StateStoreContext context, final StateStore root) {
 
     // VisibleForTesting
     void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> records) {
-        // advance stream time to the max timestamp in the batch
+        // copy the observed stream time, for use in deciding whether to drop records during restore,
+        // when records have exceeded the store's grace period.
+        long streamTimeForRestore = observedStreamTime;

Review Comment:
   I guess the question is, what is the value of `observedStreamTime` when we start the restore? Are you saying it's `-1` and we basically "reply" `observedStreamTime` during restore? I guess I got confused with "streamTime" that is tracked by KS runtime and preserved across restarts; but the store does not use it (IIRC), but rather tracks its own time, right?
   
   Maybe best to update some variable names? In the end, we do a "real reply" of stream-time for "grace period", and we apply an optimization for "history retention" by looking ahead (to the end of the batch) -> `endOfBatchStreamTime`. -- I guess follow up work (independent for this KIP) might be, to actually make use of KS runtime streamTime instead of tracking inside the store, and thus won't need `observedStreamTime` any longer, as we could look ahead to the "end-of-restore stream-time" (not just "end-of batch").



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13243: KAFKA-14491: [7/N] Enforce strict grace period for versioned stores

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13243:
URL: https://github.com/apache/kafka/pull/13243#discussion_r1106568364


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -297,6 +312,12 @@ void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> records) {
         // records into memory. how high this memory amplification will be is very much dependent
         // on the specific workload and the value of the "segment interval" parameter.
         for (final ConsumerRecord<byte[], byte[]> record : records) {
+            if (record.timestamp() < streamTimeForRestore - gracePeriod) {
+                // record is older than grace period and was therefore never written to the store

Review Comment:
   Thanks. Makes sense.
   
   I think it's ok to leave it as-is for now. But could you maybe file a Jira ticket (with all the glory details) for tracking? Might be worth to do some follow up work later to change it (but not worth to delay the KIP implemenation at this point).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13243: KAFKA-14491: [7/N] Enforce strict grace period for versioned stores

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13243:
URL: https://github.com/apache/kafka/pull/13243#discussion_r1106569321


##########
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java:
##########
@@ -523,6 +537,63 @@ public void shouldRestoreMultipleBatches() {
         verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 15);
     }
 
+    @Test
+    public void shouldNotRestoreExpired() {
+        final List<DataRecord> records = new ArrayList<>();
+        records.add(new DataRecord("k", "v", HISTORY_RETENTION + 10));
+        records.add(new DataRecord("k1", "v1", HISTORY_RETENTION + 10 - GRACE_PERIOD)); // grace period has not elapsed
+        records.add(new DataRecord("k2", "v2", HISTORY_RETENTION + 9 - GRACE_PERIOD)); // grace period has elapsed, so this record should not be restored

Review Comment:
   Thx. After you pointed out the "store hierarchy" in your above reply and that the record would go into the changelog, the test makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13243: KAFKA-14491: [7/N] Enforce strict grace period for versioned stores

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13243:
URL: https://github.com/apache/kafka/pull/13243#discussion_r1107705172


##########
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java:
##########
@@ -523,6 +537,63 @@ public void shouldRestoreMultipleBatches() {
         verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 15);
     }
 
+    @Test
+    public void shouldNotRestoreExpired() {
+        final List<DataRecord> records = new ArrayList<>();
+        records.add(new DataRecord("k", "v", HISTORY_RETENTION + 10));
+        records.add(new DataRecord("k1", "v1", HISTORY_RETENTION + 10 - GRACE_PERIOD)); // grace period has not elapsed
+        records.add(new DataRecord("k2", "v2", HISTORY_RETENTION + 9 - GRACE_PERIOD)); // grace period has elapsed, so this record should not be restored
+
+        store.restoreBatch(getChangelogRecords(records));
+
+        verifyGetValueFromStore("k", "v", HISTORY_RETENTION + 10);
+        verifyGetValueFromStore("k1", "v1", HISTORY_RETENTION + 10 - GRACE_PERIOD);
+        verifyGetNullFromStore("k2");
+    }
+
+    @Test
+    public void shouldRestoreEvenIfRecordWouldBeExpiredByEndOfBatch() {
+        final List<DataRecord> records = new ArrayList<>();
+        records.add(new DataRecord("k2", "v2", HISTORY_RETENTION - GRACE_PERIOD)); // this record will be older than grace period by the end of the batch, but should still be restored
+        records.add(new DataRecord("k", "v", HISTORY_RETENTION + 10));
+
+        store.restoreBatch(getChangelogRecords(records));
+
+        verifyGetValueFromStore("k2", "v2", HISTORY_RETENTION - GRACE_PERIOD);
+        verifyGetValueFromStore("k", "v", HISTORY_RETENTION + 10);
+    }
+
+    @Test
+    public void shouldAllowZeroHistoryRetention() {
+        // recreate store with zero history retention
+        store.close();
+        store = new RocksDBVersionedStore(STORE_NAME, METRICS_SCOPE, 0L, SEGMENT_INTERVAL);
+        store.init((StateStoreContext) context, store);
+
+        // put, get, and delete
+        putToStore("k", "v", BASE_TIMESTAMP);
+        verifyGetValueFromStore("k", "v", BASE_TIMESTAMP);
+        verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "v", BASE_TIMESTAMP);
+        verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 1, "v", BASE_TIMESTAMP); // query in "future" is allowed
+
+        // update existing record at same timestamp
+        putToStore("k", "updated", BASE_TIMESTAMP);
+        verifyGetValueFromStore("k", "updated", BASE_TIMESTAMP);
+        verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "updated", BASE_TIMESTAMP);
+
+        // put new record version
+        putToStore("k", "v2", BASE_TIMESTAMP + 2);
+        verifyGetValueFromStore("k", "v2", BASE_TIMESTAMP + 2);
+        verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 2, "v2", BASE_TIMESTAMP + 2);
+
+        // query in past (history retention expired) returns null
+        verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP + 1);
+
+        // put in past (grace period expired) does not update the store

Review Comment:
   Was just an idea. Not a big deal to not have the test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13243: KAFKA-14491: [7/N] Enforce strict grace period for versioned stores

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13243:
URL: https://github.com/apache/kafka/pull/13243#discussion_r1107704159


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -283,7 +293,12 @@ public void init(final StateStoreContext context, final StateStore root) {
 
     // VisibleForTesting
     void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> records) {
-        // advance stream time to the max timestamp in the batch
+        // copy the observed stream time, for use in deciding whether to drop records during restore,
+        // when records have exceeded the store's grace period.
+        long streamTimeForRestore = observedStreamTime;

Review Comment:
   Did not have a concrete proposal. Should be fine I guess.
   
   Currently, `streamTime` is tracked per task (based on input records over all partitions). And yes, there is all kind of tricky things that you call out. Even if we have a filter() downstream processors see only a subset of data and their "internal stream-time (if they have any)" could be different (ie lagging). Caching has a similar effect.
   
   There is a proposal to let KS track streamTime per processor, too.
   
   Bottom line: it's complicated and need proper design and a KIP by itself...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on pull request #13243: KAFKA-14491: [7/N] Enforce strict grace period for versioned stores

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on PR #13243:
URL: https://github.com/apache/kafka/pull/13243#issuecomment-1431970157

   > One more thought: should we add verification about the "droppedRecordSensor" into all unit tests that drop records?
   
   Included this test update in the latest commit. I believe I've addressed/responded to all outstanding comments with the latest commit.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13243: KAFKA-14491: [7/N] Enforce strict grace period for versioned stores

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13243:
URL: https://github.com/apache/kafka/pull/13243#discussion_r1107707006


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -458,8 +460,22 @@ public void writeLatestValues(final WriteBatch batch) throws RocksDBException {
         }
     }
 
+    /**
+     * Helper method shared between put and restore.
+     * <p>
+     * This method does not check whether the record being put is expired based on grace period
+     * or not; that is the caller's responsibility. This method does, however, check whether the
+     * record is expired based on history retention, by using the current
+     * {@code observedStreamTime}, and returns without inserting into the store if so. It can be
+     * possible that a record is not expired based on grace period but is expired based on
+     * history retention, even though history retention is always at least the grace period,
+     * during restore because restore advances {@code observedStreamTime} to the largest timestamp
+     * in the entire restore batch at the beginning of restore, in order to optimize for not
+     * putting records into the store which will have expired by the end of the restore.

Review Comment:
   Thanks for adding this! Great addition!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org