You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2023/02/03 05:49:06 UTC

[kafka] branch trunk updated: KAFKA-14491: [4/N] Improvements to segment value format for RocksDB versioned store (#13186)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b8e606355b7 KAFKA-14491: [4/N] Improvements to segment value format for RocksDB versioned store (#13186)
b8e606355b7 is described below

commit b8e606355b71528aa438e80ffeb3042d0d586998
Author: Victoria Xia <vi...@confluent.io>
AuthorDate: Thu Feb 2 21:48:40 2023 -0800

    KAFKA-14491: [4/N] Improvements to segment value format for RocksDB versioned store (#13186)
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>
---
 ...RocksDBVersionedStoreSegmentValueFormatter.java | 133 +++++++++++++--------
 ...sDBVersionedStoreSegmentValueFormatterTest.java |  12 +-
 2 files changed, 87 insertions(+), 58 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java
index ac6fbf3c1dc..7ea9cee354e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java
@@ -22,16 +22,17 @@ import java.util.List;
 
 /**
  * Helper utility for managing the bytes layout of the value stored in segments of the {@link RocksDBVersionedStore}.
- * The value format is:
+ * All record versions for the same key (in the same segment) are collected into a single row.
+ * The value format for each row is:
  * <pre>
  *     <next_timestamp> + <min_timestamp> + <list of <timestamp, value_size>, reverse-sorted by timestamp> + <list of values, forward-sorted by timestamp>
  * </pre>
  * where:
  * <ul>
  * <li>{@code next_timestamp} is the validTo timestamp of the latest record version stored in this
- * segment,</li>
+ * row,</li>
  * <li>{@code min_timestamp} is the validFrom timestamp of the earliest record version stored
- * in this segment, and</li>
+ * in this row, and</li>
  * <li>Negative {@code value_size} is used to indicate that the value stored is a tombstone,
  * in order to distinguish from empty array which has {@code value_size} of zero. In practice,
  * {@code value_size} is always set to -1 for the tombstone case, though this need not be true
@@ -39,35 +40,54 @@ import java.util.List;
  * </ul>
  * <p>
  * Note that the value format above does not store the number of record versions contained in the
- * segment. It is not necessary to store this information separately because this information is
+ * row. It is not necessary to store this information separately because this information is
  * never required on its own. Record versions are always deserialized in order, and we can
  * determine when we have reached the end of the list based on whether the (validFrom) timestamp of
  * the record version equals the {@code min_timestamp}.
  * <p>
  * There is one edge case with regards to the segment value format described above, which is useful
  * to know for understanding the code in this file, but not relevant for callers of the class.
+ * Only continue reading if you want to understand details for reading the code in this file itself.
+ * <p>
  * In the typical case, all record (validFrom) timestamps and the {@code next_timestamp} of the
- * segment will form a strictly increasing sequence, i.e., it is not valid to have a record version
+ * segment row will form a strictly increasing sequence, i.e., it is not valid to have a record version
  * with validTo timestamp equal to (or less than) its validFrom timestamp. The one edge case /
- * exception is when the latest record version (for a particular key) is a tombstone, and the
- * segment in which this tombstone is to be stored contains currently no record versions.
- * This case will result in a "degenerate" segment containing the single tombstone, with both
+ * exception occurs when the store contains no record versions (for a particular key) and a tombstone
+ * is put. This case will result in a "degenerate" segment row containing the single tombstone, with both
  * {@code min_timestamp} and {@code next_timestamp} equal to the (validFrom) timestamp of the
- * tombstone. (It is valid to interpret this tombstone's validTo timestamp as being equal to its
- * validFrom timestamp, as querying for the latest record version as of a later timestamp will
- * correctly return that no record version is present.) Note also that after a "degenerate" segment
- * has formed, it's possible that the segment will remain degenerate even as newer record versions
- * are added. (For example, if additional puts happen with later timestamps such that those puts
- * only affect later segments, then the earlier degenerate segment will remain degenerate.)
+ * tombstone. (The reverse-sorted list of {@code <timestamp, value_size>} in the row format will
+ * still contain a single tombstone, as usual.)
+ * <p>
+ * The reason this happens is because tracking the tombstone is still required to maintain the correct
+ * history for the key in question (in case an out-of-order record inserted to the store later
+ * is deleted by this tombstone). As usual, this tombstone cannot be put into the latest value store,
+ * as the latest value store has no expiry mechanism and we risk tombstones accumulating indefinitely.
+ * What's different about this case is that usually we put the tombstone into a segment store as the
+ * validTo timestamp of another record (the previous contents of the latest value store), but there
+ * is no other record in this case. So, the tombstone is put into the segment as its own record,
+ * with a well-defined validFrom timestamp but an undefined validTo. Because we can't use a validTo
+ * of infinity to pick a segment, we instead use validTo as the tombstone's own timestamp as a
+ * surrogate to select a segment. This works because querying for the latest record version as of a
+ * later timestamp will correctly return that no record version is present
+ * <p>
+ * The same edge case also occurs if the store does not literally contain no existing record
+ * versions (for the particular key), so long as no record versions are relevant for the put process.
+ * Specifically, if the latest record version is already a tombstone and that tombstone is stored
+ * in an older segment than is relevant for the timestamp of the new tombstone being put, then the
+ * store "appears" empty during the put process for the new tombstone as the old segments are not
+ * searched for the newer put. Note also that after a "degenerate" segment row has formed, it's possible
+ * that the row will remain degenerate even as newer record versions are added. (For example,
+ * if additional puts happen with later timestamps such that those puts only affect later segments,
+ * then the earlier segment with the degenerate row will remain the same.)
  * <p>
- * Callers of this class need not concern themselves with this detail because all the exposed
- * methods function as expected, even in the degenerate segment case. All methods may still be
- * called, with the exception of {@link SegmentValue#find(long, boolean)} and those that depend
- * on it (i.e., {@link SegmentValue#updateRecord(long, byte[], int)} and
+ * Callers of this class need not concern themselves with this edge case / details of degenerate
+ * segment rows because all the exposed methods function as expected, even in the degenerate case.
+ * All methods may still be called, with the exception of {@link SegmentValue#find(long, boolean)}
+ * and those that depend on it (i.e., {@link SegmentValue#updateRecord(long, byte[], int)} and
  * {@link SegmentValue#insert(long, byte[], int)}). Missing support for calling these methods on
- * degenerate segments is not an issue because the same timestamp bounds restrictions required for
- * calling {@link SegmentValue#find(long, boolean)} on regular segments serve to prevent callers
- * from calling the method on degenerate segments as well.
+ * degenerate rows is not an issue because the same timestamp bounds restrictions required for
+ * calling {@link SegmentValue#find(long, boolean)} on regular segment rows serve to prevent callers
+ * from calling the method on degenerate rows as well.
  */
 final class RocksDBVersionedStoreSegmentValueFormatter {
     private static final int TIMESTAMP_SIZE = 8;
@@ -114,11 +134,10 @@ final class RocksDBVersionedStoreSegmentValueFormatter {
     interface SegmentValue {
 
         /**
-         * Finds the latest record in this segment with (validFrom) timestamp not exceeding the
+         * Finds the latest record in this segment row with (validFrom) timestamp not exceeding the
          * provided timestamp bound. This method requires that the provided timestamp bound exists
-         * in this segment, i.e., that the provided timestamp bound is at least minTimestamp and
-         * is smaller than nextTimestamp. As a result of this requirement, it is not permitted to
-         * call this method on degenerate segments.
+         * in this segment, i.e., that the provided timestamp bound is at least {@code minTimestamp}
+         * and is smaller than {@code nextTimestamp}.
          *
          * @param timestamp the timestamp to find
          * @param includeValue whether the value of the found record should be returned with the result
@@ -128,14 +147,13 @@ final class RocksDBVersionedStoreSegmentValueFormatter {
         SegmentSearchResult find(long timestamp, boolean includeValue);
 
         /**
-         * Inserts the provided record into the segment as the latest record in the segment.
-         * This operation is allowed even if the segment is degenerate.
+         * Inserts the provided record into the segment as the latest record in the segment row.
          * <p>
          * It is the caller's responsibility to ensure that this action is desirable. In the event
          * that the new record's (validFrom) timestamp is smaller than the current
-         * {@code nextTimestamp} of the segment, the operation will still be performed, and the
-         * segment's existing contents will be truncated to ensure consistency of timestamps within
-         * the segment. This truncation behavior helps reconcile inconsistencies between different
+         * {@code nextTimestamp} of the segment row, the operation will still be performed, and the
+         * row's existing contents will be truncated to ensure consistency of timestamps within
+         * the segment row. This truncation behavior helps reconcile inconsistencies between different
          * segments, or between a segment and the latest value store, of a
          * {@link RocksDBVersionedStore} instance.
          *
@@ -146,10 +164,10 @@ final class RocksDBVersionedStoreSegmentValueFormatter {
         void insertAsLatest(long validFrom, long validTo, byte[] value);
 
         /**
-         * Inserts the provided record into the segment as the earliest record in the segment.
-         * This operation is allowed even if the segment is degenerate. It is the caller's responsibility
-         * to ensure that this action is valid, i.e., that record's (validFrom) timestamp is smaller
-         * than the current {@code minTimestamp} of the segment.
+         * Inserts the provided record into the segment row as the earliest record in the row.
+         * It is the caller's responsibility to ensure that this action is valid, i.e.,
+         * that record's (validFrom) timestamp is smaller than the current {@code minTimestamp}
+         * of the segment row.
          *
          * @param timestamp the (validFrom) timestamp of the record to insert
          * @param value the value of the record to insert
@@ -157,10 +175,9 @@ final class RocksDBVersionedStoreSegmentValueFormatter {
         void insertAsEarliest(long timestamp, byte[] value);
 
         /**
-         * Inserts the provided record into the segment at the provided index. This operation
-         * requires that the segment is not degenerate, and that
-         * {@link SegmentValue#find(long, boolean)} has already been called in order to deserialize
-         * the relevant index (to insert into index n requires that index n-1 has already been deserialized).
+         * Inserts the provided record into the segment row at the provided index. This operation
+         * requires that {@link SegmentValue#find(long, boolean)} has already been called in order to deserialize
+         * the relevant index (to insert into index n requires that index n has already been deserialized).
          * <p>
          * It is the caller's responsibility to ensure that this action makes sense, i.e., that the
          * insertion index is correct for the (validFrom) timestamp of the record being inserted.
@@ -168,8 +185,8 @@ final class RocksDBVersionedStoreSegmentValueFormatter {
          * @param timestamp the (validFrom) timestamp of the record to insert
          * @param value the value of the record to insert
          * @param index the index that the newly inserted record should occupy
-         * @throws IllegalArgumentException if the segment is degenerate, if the provided index is out of
-         *         bounds, or if {@code find()} has not been called to deserialize the relevant index.
+         * @throws IllegalArgumentException if the provided index is out of bounds, or if
+         *         {@code find()} has not been called to deserialize the relevant index.
          */
         void insert(long timestamp, byte[] value, int index);
 
@@ -177,10 +194,9 @@ final class RocksDBVersionedStoreSegmentValueFormatter {
          * Updates the record at the provided index with the provided value and (validFrom)
          * timestamp. This operation requires that {@link SegmentValue#find(long, boolean)} has
          * already been called in order to deserialize the relevant index (i.e., the one being updated).
-         * (As a result, it is not valid to call this method on a degenerate segment.)
          * <p>
          * It is the caller's responsibility to ensure that this action makes sense, i.e., that the
-         * updated (validFrom) timestamp does not violate timestamp order within the segment.
+         * updated (validFrom) timestamp does not violate timestamp order within the segment row.
          *
          * @param timestamp the updated record (validFrom) timestamp
          * @param value the updated record value
@@ -268,6 +284,8 @@ final class RocksDBVersionedStoreSegmentValueFormatter {
             if (timestamp >= nextTimestamp) {
                 throw new IllegalArgumentException("Timestamp is too large to be found in this segment.");
             }
+            // for degenerate segments, minTimestamp == nextTimestamp, and we will always throw an exception
+            // thus, we don't need to handle the degenerate case below
 
             long currNextTimestamp = nextTimestamp;
             long currTimestamp = -1L; // choose an invalid timestamp. if this is valid, this needs to be re-worked
@@ -319,7 +337,7 @@ final class RocksDBVersionedStoreSegmentValueFormatter {
         public void insertAsLatest(final long validFrom, final long validTo, final byte[] valueOrNull) {
             final ValueAndValueSize value = new ValueAndValueSize(valueOrNull);
 
-            if (nextTimestamp > validFrom) {
+            if (validFrom < nextTimestamp) {
                 // detected inconsistency edge case where older segment has [a,b) while newer store
                 // has [a,c), due to [b,c) having failed to write to newer store.
                 // remove entries from this store until the overlap is resolved.
@@ -327,21 +345,21 @@ final class RocksDBVersionedStoreSegmentValueFormatter {
                 throw new UnsupportedOperationException("case not yet implemented");
             }
 
-            if (nextTimestamp != validFrom) {
-                // move nextTimestamp into list as tombstone and add new record on top
+            if (nextTimestamp == validFrom) {
+                // nextTimestamp is moved into segment automatically as record is added on top
                 if (isDegenerate) {
-                    initializeWithRecord(new ValueAndValueSize(null), nextTimestamp, validFrom);
+                    initializeWithRecord(value, validFrom, validTo);
                 } else {
-                    insert(nextTimestamp, null, 0);
+                    doInsert(validFrom, value, 0);
                 }
-                doInsert(validFrom, value, 0);
             } else {
-                // nextTimestamp is moved into segment automatically as record is added on top
+                // move nextTimestamp into list as tombstone and add new record on top
                 if (isDegenerate) {
-                    initializeWithRecord(value, validFrom, validTo);
+                    initializeWithRecord(new ValueAndValueSize(null), nextTimestamp, validFrom);
                 } else {
-                    doInsert(validFrom, value, 0);
+                    doInsert(nextTimestamp, new ValueAndValueSize(null), 0);
                 }
+                doInsert(validFrom, value, 0);
             }
             // update nextTimestamp
             nextTimestamp = validTo;
@@ -362,13 +380,22 @@ final class RocksDBVersionedStoreSegmentValueFormatter {
 
         @Override
         public void insert(final long timestamp, final byte[] valueOrNull, final int index) {
+            // public-facing method contains stricter index requirement than the internal helper
+            // method doInsert() below
+            if (index > deserIndex) {
+                throw new IllegalArgumentException("Must invoke find() to deserialize record before insert() at specific index.");
+            }
+
             final ValueAndValueSize value = new ValueAndValueSize(valueOrNull);
             doInsert(timestamp, value, index);
         }
 
         private void doInsert(final long timestamp, final ValueAndValueSize value, final int index) {
-            if (isDegenerate || index > deserIndex + 1 || index < 0) {
-                throw new IllegalArgumentException("Must invoke find() to deserialize record before insert() at specific index.");
+            if (index > deserIndex + 1) {
+                throw new IllegalStateException("Must invoke find() to deserialize record before insert() at specific index.");
+            }
+            if (isDegenerate || index < 0) {
+                throw new IllegalStateException("Cannot insert at negative index or into degenerate segment.");
             }
 
             final boolean needsMinTsUpdate = isLastIndex(index - 1);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatterTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatterTest.java
index 9b08197a061..1311aac4078 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatterTest.java
@@ -95,7 +95,7 @@ public class RocksDBVersionedStoreSegmentValueFormatterTest {
         }
 
         // test inserting at each possible index
-        for (int insertIdx = 0; insertIdx <= testCase.records.size(); insertIdx++) {
+        for (int insertIdx = 0; insertIdx <= testCase.records.size() - 1; insertIdx++) {
             // build record to insert
             final long newRecordTimestamp;
             if (insertIdx == 0) {
@@ -106,7 +106,7 @@ public class RocksDBVersionedStoreSegmentValueFormatterTest {
                 }
             } else {
                 newRecordTimestamp = testCase.records.get(insertIdx - 1).timestamp - 1;
-                if (newRecordTimestamp < 0 || (insertIdx < testCase.records.size() && newRecordTimestamp == testCase.records.get(insertIdx).timestamp)) {
+                if (newRecordTimestamp < 0 || (newRecordTimestamp == testCase.records.get(insertIdx).timestamp)) {
                     // cannot insert because timestamps of existing records are adjacent
                     continue;
                 }
@@ -116,9 +116,8 @@ public class RocksDBVersionedStoreSegmentValueFormatterTest {
             final SegmentValue segmentValue = buildSegmentWithInsertLatest(testCase);
 
             // insert() first requires a call to find()
-            if (insertIdx > 0) {
-                segmentValue.find(testCase.records.get(insertIdx - 1).timestamp, false);
-            }
+            segmentValue.find(testCase.records.get(insertIdx).timestamp, false);
+
             segmentValue.insert(newRecord.timestamp, newRecord.value, insertIdx);
 
             // create expected results
@@ -176,6 +175,9 @@ public class RocksDBVersionedStoreSegmentValueFormatterTest {
 
         // build expected mapping from timestamp -> record
         final Map<Long, Integer> expectedRecordIndices = new HashMap<>();
+        // it's important that this for-loop iterates backwards through the record indices, so that
+        // when adjacent records have adjacent timestamps, then the record with the later timestamp
+        // (i.e., the earlier index) takes precedence
         for (int recordIdx = testCase.records.size() - 1; recordIdx >= 0; recordIdx--) {
             if (recordIdx < testCase.records.size() - 1) {
                 expectedRecordIndices.put(testCase.records.get(recordIdx).timestamp - 1, recordIdx + 1);