You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2023/02/03 05:49:06 UTC
[kafka] branch trunk updated: KAFKA-14491: [4/N] Improvements to segment value format for RocksDB versioned store (#13186)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b8e606355b7 KAFKA-14491: [4/N] Improvements to segment value format for RocksDB versioned store (#13186)
b8e606355b7 is described below
commit b8e606355b71528aa438e80ffeb3042d0d586998
Author: Victoria Xia <vi...@confluent.io>
AuthorDate: Thu Feb 2 21:48:40 2023 -0800
KAFKA-14491: [4/N] Improvements to segment value format for RocksDB versioned store (#13186)
Reviewers: Matthias J. Sax <ma...@confluent.io>
---
...RocksDBVersionedStoreSegmentValueFormatter.java | 133 +++++++++++++--------
...sDBVersionedStoreSegmentValueFormatterTest.java | 12 +-
2 files changed, 87 insertions(+), 58 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java
index ac6fbf3c1dc..7ea9cee354e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java
@@ -22,16 +22,17 @@ import java.util.List;
/**
* Helper utility for managing the bytes layout of the value stored in segments of the {@link RocksDBVersionedStore}.
- * The value format is:
+ * All record versions for the same key (in the same segment) are collected into a single row.
+ * The value format for each row is:
* <pre>
* <next_timestamp> + <min_timestamp> + <list of <timestamp, value_size>, reverse-sorted by timestamp> + <list of values, forward-sorted by timestamp>
* </pre>
* where:
* <ul>
* <li>{@code next_timestamp} is the validTo timestamp of the latest record version stored in this
- * segment,</li>
+ * row,</li>
* <li>{@code min_timestamp} is the validFrom timestamp of the earliest record version stored
- * in this segment, and</li>
+ * in this row, and</li>
* <li>Negative {@code value_size} is used to indicate that the value stored is a tombstone,
* in order to distinguish from empty array which has {@code value_size} of zero. In practice,
* {@code value_size} is always set to -1 for the tombstone case, though this need not be true
@@ -39,35 +40,54 @@ import java.util.List;
* </ul>
* <p>
* Note that the value format above does not store the number of record versions contained in the
- * segment. It is not necessary to store this information separately because this information is
+ * row. It is not necessary to store this information separately because this information is
* never required on its own. Record versions are always deserialized in order, and we can
* determine when we have reached the end of the list based on whether the (validFrom) timestamp of
* the record version equals the {@code min_timestamp}.
* <p>
* There is one edge case with regards to the segment value format described above, which is useful
* to know for understanding the code in this file, but not relevant for callers of the class.
+ * Only continue reading if you want to understand details for reading the code in this file itself.
+ * <p>
* In the typical case, all record (validFrom) timestamps and the {@code next_timestamp} of the
- * segment will form a strictly increasing sequence, i.e., it is not valid to have a record version
+ * segment row will form a strictly increasing sequence, i.e., it is not valid to have a record version
* with validTo timestamp equal to (or less than) its validFrom timestamp. The one edge case /
- * exception is when the latest record version (for a particular key) is a tombstone, and the
- * segment in which this tombstone is to be stored contains currently no record versions.
- * This case will result in a "degenerate" segment containing the single tombstone, with both
+ * exception occurs when the store contains no record versions (for a particular key) and a tombstone
+ * is put. This case will result in a "degenerate" segment row containing the single tombstone, with both
* {@code min_timestamp} and {@code next_timestamp} equal to the (validFrom) timestamp of the
- * tombstone. (It is valid to interpret this tombstone's validTo timestamp as being equal to its
- * validFrom timestamp, as querying for the latest record version as of a later timestamp will
- * correctly return that no record version is present.) Note also that after a "degenerate" segment
- * has formed, it's possible that the segment will remain degenerate even as newer record versions
- * are added. (For example, if additional puts happen with later timestamps such that those puts
- * only affect later segments, then the earlier degenerate segment will remain degenerate.)
+ * tombstone. (The reverse-sorted list of {@code <timestamp, value_size>} in the row format will
+ * still contain a single tombstone, as usual.)
+ * <p>
+ * The reason this happens is because tracking the tombstone is still required to maintain the correct
+ * history for the key in question (in case an out-of-order record inserted to the store later
+ * is deleted by this tombstone). As usual, this tombstone cannot be put into the latest value store,
+ * as the latest value store has no expiry mechanism and we risk tombstones accumulating indefinitely.
+ * What's different about this case is that usually we put the tombstone into a segment store as the
+ * validTo timestamp of another record (the previous contents of the latest value store), but there
+ * is no other record in this case. So, the tombstone is put into the segment as its own record,
+ * with a well-defined validFrom timestamp but an undefined validTo. Because we can't use a validTo
+ * of infinity to pick a segment, we instead use validTo as the tombstone's own timestamp as a
+ * surrogate to select a segment. This works because querying for the latest record version as of a
+ * later timestamp will correctly return that no record version is present
+ * <p>
+ * The same edge case also occurs if the store does not literally contain no existing record
+ * versions (for the particular key), so long as no record versions are relevant for the put process.
+ * Specifically, if the latest record version is already a tombstone and that tombstone is stored
+ * in an older segment than is relevant for the timestamp of the new tombstone being put, then the
+ * store "appears" empty during the put process for the new tombstone as the old segments are not
+ * searched for the newer put. Note also that after a "degenerate" segment row has formed, it's possible
+ * that the row will remain degenerate even as newer record versions are added. (For example,
+ * if additional puts happen with later timestamps such that those puts only affect later segments,
+ * then the earlier segment with the degenerate row will remain the same.)
* <p>
- * Callers of this class need not concern themselves with this detail because all the exposed
- * methods function as expected, even in the degenerate segment case. All methods may still be
- * called, with the exception of {@link SegmentValue#find(long, boolean)} and those that depend
- * on it (i.e., {@link SegmentValue#updateRecord(long, byte[], int)} and
+ * Callers of this class need not concern themselves with this edge case / details of degenerate
+ * segment rows because all the exposed methods function as expected, even in the degenerate case.
+ * All methods may still be called, with the exception of {@link SegmentValue#find(long, boolean)}
+ * and those that depend on it (i.e., {@link SegmentValue#updateRecord(long, byte[], int)} and
* {@link SegmentValue#insert(long, byte[], int)}). Missing support for calling these methods on
- * degenerate segments is not an issue because the same timestamp bounds restrictions required for
- * calling {@link SegmentValue#find(long, boolean)} on regular segments serve to prevent callers
- * from calling the method on degenerate segments as well.
+ * degenerate rows is not an issue because the same timestamp bounds restrictions required for
+ * calling {@link SegmentValue#find(long, boolean)} on regular segment rows serve to prevent callers
+ * from calling the method on degenerate rows as well.
*/
final class RocksDBVersionedStoreSegmentValueFormatter {
private static final int TIMESTAMP_SIZE = 8;
@@ -114,11 +134,10 @@ final class RocksDBVersionedStoreSegmentValueFormatter {
interface SegmentValue {
/**
- * Finds the latest record in this segment with (validFrom) timestamp not exceeding the
+ * Finds the latest record in this segment row with (validFrom) timestamp not exceeding the
* provided timestamp bound. This method requires that the provided timestamp bound exists
- * in this segment, i.e., that the provided timestamp bound is at least minTimestamp and
- * is smaller than nextTimestamp. As a result of this requirement, it is not permitted to
- * call this method on degenerate segments.
+ * in this segment, i.e., that the provided timestamp bound is at least {@code minTimestamp}
+ * and is smaller than {@code nextTimestamp}.
*
* @param timestamp the timestamp to find
* @param includeValue whether the value of the found record should be returned with the result
@@ -128,14 +147,13 @@ final class RocksDBVersionedStoreSegmentValueFormatter {
SegmentSearchResult find(long timestamp, boolean includeValue);
/**
- * Inserts the provided record into the segment as the latest record in the segment.
- * This operation is allowed even if the segment is degenerate.
+ * Inserts the provided record into the segment as the latest record in the segment row.
* <p>
* It is the caller's responsibility to ensure that this action is desirable. In the event
* that the new record's (validFrom) timestamp is smaller than the current
- * {@code nextTimestamp} of the segment, the operation will still be performed, and the
- * segment's existing contents will be truncated to ensure consistency of timestamps within
- * the segment. This truncation behavior helps reconcile inconsistencies between different
+ * {@code nextTimestamp} of the segment row, the operation will still be performed, and the
+ * row's existing contents will be truncated to ensure consistency of timestamps within
+ * the segment row. This truncation behavior helps reconcile inconsistencies between different
* segments, or between a segment and the latest value store, of a
* {@link RocksDBVersionedStore} instance.
*
@@ -146,10 +164,10 @@ final class RocksDBVersionedStoreSegmentValueFormatter {
void insertAsLatest(long validFrom, long validTo, byte[] value);
/**
- * Inserts the provided record into the segment as the earliest record in the segment.
- * This operation is allowed even if the segment is degenerate. It is the caller's responsibility
- * to ensure that this action is valid, i.e., that record's (validFrom) timestamp is smaller
- * than the current {@code minTimestamp} of the segment.
+ * Inserts the provided record into the segment row as the earliest record in the row.
+ * It is the caller's responsibility to ensure that this action is valid, i.e.,
+ * that record's (validFrom) timestamp is smaller than the current {@code minTimestamp}
+ * of the segment row.
*
* @param timestamp the (validFrom) timestamp of the record to insert
* @param value the value of the record to insert
@@ -157,10 +175,9 @@ final class RocksDBVersionedStoreSegmentValueFormatter {
void insertAsEarliest(long timestamp, byte[] value);
/**
- * Inserts the provided record into the segment at the provided index. This operation
- * requires that the segment is not degenerate, and that
- * {@link SegmentValue#find(long, boolean)} has already been called in order to deserialize
- * the relevant index (to insert into index n requires that index n-1 has already been deserialized).
+ * Inserts the provided record into the segment row at the provided index. This operation
+ * requires that {@link SegmentValue#find(long, boolean)} has already been called in order to deserialize
+ * the relevant index (to insert into index n requires that index n has already been deserialized).
* <p>
* It is the caller's responsibility to ensure that this action makes sense, i.e., that the
* insertion index is correct for the (validFrom) timestamp of the record being inserted.
@@ -168,8 +185,8 @@ final class RocksDBVersionedStoreSegmentValueFormatter {
* @param timestamp the (validFrom) timestamp of the record to insert
* @param value the value of the record to insert
* @param index the index that the newly inserted record should occupy
- * @throws IllegalArgumentException if the segment is degenerate, if the provided index is out of
- * bounds, or if {@code find()} has not been called to deserialize the relevant index.
+ * @throws IllegalArgumentException if the provided index is out of bounds, or if
+ * {@code find()} has not been called to deserialize the relevant index.
*/
void insert(long timestamp, byte[] value, int index);
@@ -177,10 +194,9 @@ final class RocksDBVersionedStoreSegmentValueFormatter {
* Updates the record at the provided index with the provided value and (validFrom)
* timestamp. This operation requires that {@link SegmentValue#find(long, boolean)} has
* already been called in order to deserialize the relevant index (i.e., the one being updated).
- * (As a result, it is not valid to call this method on a degenerate segment.)
* <p>
* It is the caller's responsibility to ensure that this action makes sense, i.e., that the
- * updated (validFrom) timestamp does not violate timestamp order within the segment.
+ * updated (validFrom) timestamp does not violate timestamp order within the segment row.
*
* @param timestamp the updated record (validFrom) timestamp
* @param value the updated record value
@@ -268,6 +284,8 @@ final class RocksDBVersionedStoreSegmentValueFormatter {
if (timestamp >= nextTimestamp) {
throw new IllegalArgumentException("Timestamp is too large to be found in this segment.");
}
+ // for degenerate segments, minTimestamp == nextTimestamp, and we will always throw an exception
+ // thus, we don't need to handle the degenerate case below
long currNextTimestamp = nextTimestamp;
long currTimestamp = -1L; // choose an invalid timestamp. if this is valid, this needs to be re-worked
@@ -319,7 +337,7 @@ final class RocksDBVersionedStoreSegmentValueFormatter {
public void insertAsLatest(final long validFrom, final long validTo, final byte[] valueOrNull) {
final ValueAndValueSize value = new ValueAndValueSize(valueOrNull);
- if (nextTimestamp > validFrom) {
+ if (validFrom < nextTimestamp) {
// detected inconsistency edge case where older segment has [a,b) while newer store
// has [a,c), due to [b,c) having failed to write to newer store.
// remove entries from this store until the overlap is resolved.
@@ -327,21 +345,21 @@ final class RocksDBVersionedStoreSegmentValueFormatter {
throw new UnsupportedOperationException("case not yet implemented");
}
- if (nextTimestamp != validFrom) {
- // move nextTimestamp into list as tombstone and add new record on top
+ if (nextTimestamp == validFrom) {
+ // nextTimestamp is moved into segment automatically as record is added on top
if (isDegenerate) {
- initializeWithRecord(new ValueAndValueSize(null), nextTimestamp, validFrom);
+ initializeWithRecord(value, validFrom, validTo);
} else {
- insert(nextTimestamp, null, 0);
+ doInsert(validFrom, value, 0);
}
- doInsert(validFrom, value, 0);
} else {
- // nextTimestamp is moved into segment automatically as record is added on top
+ // move nextTimestamp into list as tombstone and add new record on top
if (isDegenerate) {
- initializeWithRecord(value, validFrom, validTo);
+ initializeWithRecord(new ValueAndValueSize(null), nextTimestamp, validFrom);
} else {
- doInsert(validFrom, value, 0);
+ doInsert(nextTimestamp, new ValueAndValueSize(null), 0);
}
+ doInsert(validFrom, value, 0);
}
// update nextTimestamp
nextTimestamp = validTo;
@@ -362,13 +380,22 @@ final class RocksDBVersionedStoreSegmentValueFormatter {
@Override
public void insert(final long timestamp, final byte[] valueOrNull, final int index) {
+ // public-facing method contains stricter index requirement than the internal helper
+ // method doInsert() below
+ if (index > deserIndex) {
+ throw new IllegalArgumentException("Must invoke find() to deserialize record before insert() at specific index.");
+ }
+
final ValueAndValueSize value = new ValueAndValueSize(valueOrNull);
doInsert(timestamp, value, index);
}
private void doInsert(final long timestamp, final ValueAndValueSize value, final int index) {
- if (isDegenerate || index > deserIndex + 1 || index < 0) {
- throw new IllegalArgumentException("Must invoke find() to deserialize record before insert() at specific index.");
+ if (index > deserIndex + 1) {
+ throw new IllegalStateException("Must invoke find() to deserialize record before insert() at specific index.");
+ }
+ if (isDegenerate || index < 0) {
+ throw new IllegalStateException("Cannot insert at negative index or into degenerate segment.");
}
final boolean needsMinTsUpdate = isLastIndex(index - 1);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatterTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatterTest.java
index 9b08197a061..1311aac4078 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatterTest.java
@@ -95,7 +95,7 @@ public class RocksDBVersionedStoreSegmentValueFormatterTest {
}
// test inserting at each possible index
- for (int insertIdx = 0; insertIdx <= testCase.records.size(); insertIdx++) {
+ for (int insertIdx = 0; insertIdx <= testCase.records.size() - 1; insertIdx++) {
// build record to insert
final long newRecordTimestamp;
if (insertIdx == 0) {
@@ -106,7 +106,7 @@ public class RocksDBVersionedStoreSegmentValueFormatterTest {
}
} else {
newRecordTimestamp = testCase.records.get(insertIdx - 1).timestamp - 1;
- if (newRecordTimestamp < 0 || (insertIdx < testCase.records.size() && newRecordTimestamp == testCase.records.get(insertIdx).timestamp)) {
+ if (newRecordTimestamp < 0 || (newRecordTimestamp == testCase.records.get(insertIdx).timestamp)) {
// cannot insert because timestamps of existing records are adjacent
continue;
}
@@ -116,9 +116,8 @@ public class RocksDBVersionedStoreSegmentValueFormatterTest {
final SegmentValue segmentValue = buildSegmentWithInsertLatest(testCase);
// insert() first requires a call to find()
- if (insertIdx > 0) {
- segmentValue.find(testCase.records.get(insertIdx - 1).timestamp, false);
- }
+ segmentValue.find(testCase.records.get(insertIdx).timestamp, false);
+
segmentValue.insert(newRecord.timestamp, newRecord.value, insertIdx);
// create expected results
@@ -176,6 +175,9 @@ public class RocksDBVersionedStoreSegmentValueFormatterTest {
// build expected mapping from timestamp -> record
final Map<Long, Integer> expectedRecordIndices = new HashMap<>();
+ // it's important that this for-loop iterates backwards through the record indices, so that
+ // when adjacent records have adjacent timestamps, then the record with the later timestamp
+ // (i.e., the earlier index) takes precedence
for (int recordIdx = testCase.records.size() - 1; recordIdx >= 0; recordIdx--) {
if (recordIdx < testCase.records.size() - 1) {
expectedRecordIndices.put(testCase.records.get(recordIdx).timestamp - 1, recordIdx + 1);