You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "vcrfxia (via GitHub)" <gi...@apache.org> on 2023/03/28 19:09:57 UTC

[GitHub] [kafka] vcrfxia commented on a diff in pull request #13364: KAFKA-14491: [16/N] Add recovery logic for store inconsistency due to failed write

vcrfxia commented on code in PR #13364:
URL: https://github.com/apache/kafka/pull/13364#discussion_r1151049820


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##########
@@ -341,8 +345,10 @@ public void insertAsLatest(final long validFrom, final long validTo, final byte[
                 // detected inconsistency edge case where older segment has [a,b) while newer store
                 // has [a,c), due to [b,c) having failed to write to newer store.
                 // remove entries from this store until the overlap is resolved.

Review Comment:
   Yep, that's spot on. I will update the comment with your suggestions.
   
   After a partial write failure is encountered, when the store next resumes processing then the store should first reprocess records which it has already processed previously (if EOS is not enabled), including the one during which the failure occurred, before processing any newly seen records. In this case, we would only expect a single record to be truncated (and to be truncated in full) as part of the recovery process -- as you've outlined above. The recovery code is more general, though, in that it also supports truncating multiple records and/or partial records, in case my above understanding about the types of failures which can occur is not accurate or changes in the future.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##########
@@ -495,6 +501,41 @@ private boolean isLastIndex(final int index) {
             return unpackedReversedTimestampAndValueSizes.get(index).timestamp == minTimestamp;
         }
 
+        private void truncateRecordsToTimestamp(final long timestamp) {
+            if (timestamp <= minTimestamp) {
+                // delete everything in this current segment by replacing it with a degenerate segment
+                initializeWithRecord(new ValueAndValueSize(null), timestamp, timestamp);
+                return;
+            }
+
+            final SegmentSearchResult searchResult = find(timestamp, false);
+            // all records with later timestamps should be removed
+            int fullRecordsToTruncate = searchResult.index();
+            // additionally remove the current record as well, if its validFrom equals the
+            // timestamp to truncate to
+            if (searchResult.validFrom() == timestamp) {
+                fullRecordsToTruncate++;

Review Comment:
   Yes, `find()` is inclusive. That's also what "not exceeding" means (if the timestamps are equal, then neither exceeds the other), so I don't think there's a contradiction here? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org