You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "mjsax (via GitHub)" <gi...@apache.org> on 2023/02/01 21:18:46 UTC

[GitHub] [kafka] mjsax commented on a diff in pull request #13126: KAFKA-14491: [1/N] Add segment value format for RocksDB versioned store

mjsax commented on code in PR #13126:
URL: https://github.com/apache/kafka/pull/13126#discussion_r1084814140


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##########
@@ -0,0 +1,554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Helper utility for managing the bytes layout of the value stored in segments of the {@link RocksDBVersionedStore}.
+ * The value format is:
+ * <pre>
+ *     <next_timestamp> + <min_timestamp> + <list of <timestamp, value_size>, reverse-sorted by timestamp> + <list of values, forward-sorted by timestamp>
+ * </pre>
+ * where:
+ * <ul>
+ * <li>{@code next_timestamp} is the validTo timestamp of the latest record version stored in this
+ * segment,</li>

Review Comment:
   `stored in this row`



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##########
@@ -0,0 +1,554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Helper utility for managing the bytes layout of the value stored in segments of the {@link RocksDBVersionedStore}.
+ * The value format is:
+ * <pre>
+ *     <next_timestamp> + <min_timestamp> + <list of <timestamp, value_size>, reverse-sorted by timestamp> + <list of values, forward-sorted by timestamp>
+ * </pre>
+ * where:
+ * <ul>
+ * <li>{@code next_timestamp} is the validTo timestamp of the latest record version stored in this
+ * segment,</li>
+ * <li>{@code min_timestamp} is the validFrom timestamp of the earliest record version stored
+ * in this segment, and</li>

Review Comment:
   `in this row`



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##########
@@ -0,0 +1,554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Helper utility for managing the bytes layout of the value stored in segments of the {@link RocksDBVersionedStore}.
+ * The value format is:

Review Comment:
   `row format`



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##########
@@ -0,0 +1,554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Helper utility for managing the bytes layout of the value stored in segments of the {@link RocksDBVersionedStore}.
+ * The value format is:
+ * <pre>
+ *     <next_timestamp> + <min_timestamp> + <list of <timestamp, value_size>, reverse-sorted by timestamp> + <list of values, forward-sorted by timestamp>
+ * </pre>
+ * where:
+ * <ul>
+ * <li>{@code next_timestamp} is the validTo timestamp of the latest record version stored in this
+ * segment,</li>
+ * <li>{@code min_timestamp} is the validFrom timestamp of the earliest record version stored
+ * in this segment, and</li>
+ * <li>Negative {@code value_size} is used to indicate that the value stored is a tombstone,
+ * in order to distinguish from empty array which has {@code value_size} of zero. In practice,
+ * {@code value_size} is always set to -1 for the tombstone case, though this need not be true
+ * in general.</li>
+ * </ul>
+ * <p>
+ * Note that the value format above does not store the number of record versions contained in the

Review Comment:
   `row format`



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##########
@@ -0,0 +1,554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Helper utility for managing the bytes layout of the value stored in segments of the {@link RocksDBVersionedStore}.

Review Comment:
   Maybe we can introduce the term `row` (or `segment row`, but I think just `row` is sufficient) to refer to the entry of a single key inside a segment? While "segment value" works sometimes, it may lead to confusion as "value" could also be a "version of the record's value" and if we avoid overloading the term "value" is might be helpful.
   
   ```
   ... the bytes layout of a row (ie, list of value versions for a single key) store in segments ...
   ```



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##########
@@ -0,0 +1,518 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Helper utility for managing the bytes layout of the value stored in segments of the {@link RocksDBVersionedStore}.
+ * The value format is:
+ * <pre>
+ *     <next_timestamp> + <min_timestamp> + <list of <timestamp, value_size>, reverse-sorted by timestamp> + <list of values, forward-sorted by timestamp>
+ * </pre>
+ * where:
+ * <ul>
+ * <li>{@code next_timestamp} is the validTo timestamp of the latest record version stored in this
+ * segment,</li>
+ * <li>{@code min_timestamp} is the validFrom timestamp of the earliest record version stored
+ * in this segment, and</li>
+ * <li>Negative {@code value_size} is used to indicate that the value stored is a tombstone,
+ * in order to distinguish from empty array which has {@code value_size} of zero. In practice,
+ * {@code value_size} is always set to -1 for the tombstone case, though this need not be true
+ * in general.</li>
+ * </ul>
+ * <p>
+ * Note that the value format above does not store the number of record versions contained in the
+ * segment. It is not necessary to store this information separately because this information is
+ * never required on its own. Record versions are always deserialized in order, and we can
+ * determine when we have reached the end of the list based on whether the (validFrom) timestamp of
+ * the record version equals the {@code min_timestamp}.
+ * <p>
+ * There is one edge case with regards to the segment value format described above, which is useful
+ * to know for understanding the code in this file, but not relevant for callers of the class.
+ * In the typical case, all record (validFrom) timestamps and the {@code next_timestamp} of the
+ * segment will form a strictly increasing sequence, i.e., it is not valid to have a record version
+ * with validTo timestamp equal to (or less than) its validFrom timestamp. The one edge case /
+ * exception is when the latest record version (for a particular key) is a tombstone, and the
+ * segment in which this tombstone is to be stored contains currently no record versions.
+ * This case will result in a "degenerate" segment containing the single tombstone, with both
+ * {@code min_timestamp} and {@code next_timestamp} equal to the (validFrom) timestamp of the
+ * tombstone. (It is valid to interpret this tombstone's validTo timestamp as being equal to its
+ * validFrom timestamp, as querying for the latest record version as of a later timestamp will
+ * correctly return that no record version is present.) Note also that after a "degenerate" segment
+ * has formed, it's possible that the segment will remain degenerate even as newer record versions
+ * are added. (For example, if additional puts happen with later timestamps such that those puts
+ * only affect later segments, then the earlier degenerate segment will remain degenerate.)
+ * <p>
+ * Callers of this class need not concern themselves with this detail because all the exposed
+ * methods function as expected, even in the degenerate segment case. All methods may still be
+ * called, with the exception of {@link SegmentValue#find(long, boolean)} and those that depend
+ * on it (i.e., {@link SegmentValue#updateRecord(long, byte[], int)} and
+ * {@link SegmentValue#insert(long, byte[], int)}). Missing support for calling these methods on
+ * degenerate segments is not an issue because the same timestamp bounds restrictions required for
+ * calling {@link SegmentValue#find(long, boolean)} on regular segments serve to prevent callers
+ * from calling the method on degenerate segments as well.
+ */
+final class RocksDBVersionedStoreSegmentValueFormatter {
+    private static final int TIMESTAMP_SIZE = 8;
+    private static final int VALUE_SIZE = 4;
+
+    /**
+     * @return the validTo timestamp of the latest record in the provided segment
+     */
+    static long getNextTimestamp(final byte[] segmentValue) {
+        return ByteBuffer.wrap(segmentValue).getLong(0);
+    }
+
+    /**
+     * @return the (validFrom) timestamp of the earliest record in the provided segment.
+     */
+    static long getMinTimestamp(final byte[] segmentValue) {
+        return ByteBuffer.wrap(segmentValue).getLong(TIMESTAMP_SIZE);
+    }
+
+    /**
+     * @return the deserialized segment value
+     */
+    static SegmentValue deserialize(final byte[] segmentValue) {
+        return new PartiallyDeserializedSegmentValue(segmentValue);
+    }
+
+    /**
+     * Creates a new segment value that contains the provided record.
+     * <p>
+     * This method may also be used to create a "degenerate" segment with {@code null} value and
+     * {@code validFrom} timestamp equal to {@code validTo}. (For more on degenerate segments,
+     * see the main javadoc for this class.)
+     *
+     * @param value the record value
+     * @param validFrom the record's (validFrom) timestamp
+     * @param validTo the record's validTo timestamp
+     * @return the newly created segment value
+     */
+    static SegmentValue newSegmentValueWithRecord(
+        final byte[] value, final long validFrom, final long validTo) {
+        return new PartiallyDeserializedSegmentValue(value, validFrom, validTo);
+    }
+
+    interface SegmentValue {
+
+        /**
+         * Finds the latest record in this segment with (validFrom) timestamp not exceeding the
+         * provided timestamp bound. This method requires that the provided timestamp bound exists
+         * in this segment, i.e., that the provided timestamp bound is at least minTimestamp and
+         * is smaller than nextTimestamp. As a result of this requirement, it is not permitted to
+         * call this method on degenerate segments.

Review Comment:
   > As a result of this requirement, it is not permitted to call this method on degenerate segments.
   
   Do we need this? Above you explain that called don't need to worry about the degenerate case? Would a user even be able to know, if a segment is degenerated or not (my understanding is "no", so this sentence seems not to be useful)



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##########
@@ -0,0 +1,518 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Helper utility for managing the bytes layout of the value stored in segments of the {@link RocksDBVersionedStore}.
+ * The value format is:
+ * <pre>
+ *     <next_timestamp> + <min_timestamp> + <list of <timestamp, value_size>, reverse-sorted by timestamp> + <list of values, forward-sorted by timestamp>
+ * </pre>
+ * where:
+ * <ul>
+ * <li>{@code next_timestamp} is the validTo timestamp of the latest record version stored in this
+ * segment,</li>
+ * <li>{@code min_timestamp} is the validFrom timestamp of the earliest record version stored
+ * in this segment, and</li>
+ * <li>Negative {@code value_size} is used to indicate that the value stored is a tombstone,
+ * in order to distinguish from empty array which has {@code value_size} of zero. In practice,
+ * {@code value_size} is always set to -1 for the tombstone case, though this need not be true
+ * in general.</li>
+ * </ul>
+ * <p>
+ * Note that the value format above does not store the number of record versions contained in the
+ * segment. It is not necessary to store this information separately because this information is
+ * never required on its own. Record versions are always deserialized in order, and we can
+ * determine when we have reached the end of the list based on whether the (validFrom) timestamp of
+ * the record version equals the {@code min_timestamp}.
+ * <p>
+ * There is one edge case with regards to the segment value format described above, which is useful
+ * to know for understanding the code in this file, but not relevant for callers of the class.
+ * In the typical case, all record (validFrom) timestamps and the {@code next_timestamp} of the
+ * segment will form a strictly increasing sequence, i.e., it is not valid to have a record version
+ * with validTo timestamp equal to (or less than) its validFrom timestamp. The one edge case /
+ * exception is when the latest record version (for a particular key) is a tombstone, and the
+ * segment in which this tombstone is to be stored contains currently no record versions.
+ * This case will result in a "degenerate" segment containing the single tombstone, with both
+ * {@code min_timestamp} and {@code next_timestamp} equal to the (validFrom) timestamp of the
+ * tombstone. (It is valid to interpret this tombstone's validTo timestamp as being equal to its
+ * validFrom timestamp, as querying for the latest record version as of a later timestamp will
+ * correctly return that no record version is present.) Note also that after a "degenerate" segment
+ * has formed, it's possible that the segment will remain degenerate even as newer record versions
+ * are added. (For example, if additional puts happen with later timestamps such that those puts
+ * only affect later segments, then the earlier degenerate segment will remain degenerate.)
+ * <p>
+ * Callers of this class need not concern themselves with this detail because all the exposed
+ * methods function as expected, even in the degenerate segment case. All methods may still be
+ * called, with the exception of {@link SegmentValue#find(long, boolean)} and those that depend
+ * on it (i.e., {@link SegmentValue#updateRecord(long, byte[], int)} and
+ * {@link SegmentValue#insert(long, byte[], int)}). Missing support for calling these methods on
+ * degenerate segments is not an issue because the same timestamp bounds restrictions required for
+ * calling {@link SegmentValue#find(long, boolean)} on regular segments serve to prevent callers
+ * from calling the method on degenerate segments as well.
+ */
+final class RocksDBVersionedStoreSegmentValueFormatter {
+    private static final int TIMESTAMP_SIZE = 8;
+    private static final int VALUE_SIZE = 4;
+
+    /**
+     * @return the validTo timestamp of the latest record in the provided segment
+     */
+    static long getNextTimestamp(final byte[] segmentValue) {
+        return ByteBuffer.wrap(segmentValue).getLong(0);
+    }
+
+    /**
+     * @return the (validFrom) timestamp of the earliest record in the provided segment.
+     */
+    static long getMinTimestamp(final byte[] segmentValue) {
+        return ByteBuffer.wrap(segmentValue).getLong(TIMESTAMP_SIZE);
+    }
+
+    /**
+     * @return the deserialized segment value
+     */
+    static SegmentValue deserialize(final byte[] segmentValue) {
+        return new PartiallyDeserializedSegmentValue(segmentValue);
+    }
+
+    /**
+     * Creates a new segment value that contains the provided record.
+     * <p>
+     * This method may also be used to create a "degenerate" segment with {@code null} value and
+     * {@code validFrom} timestamp equal to {@code validTo}. (For more on degenerate segments,
+     * see the main javadoc for this class.)
+     *
+     * @param value the record value
+     * @param validFrom the record's (validFrom) timestamp
+     * @param validTo the record's validTo timestamp
+     * @return the newly created segment value
+     */
+    static SegmentValue newSegmentValueWithRecord(
+        final byte[] value, final long validFrom, final long validTo) {
+        return new PartiallyDeserializedSegmentValue(value, validFrom, validTo);
+    }
+
+    interface SegmentValue {
+
+        /**
+         * Finds the latest record in this segment with (validFrom) timestamp not exceeding the
+         * provided timestamp bound. This method requires that the provided timestamp bound exists
+         * in this segment, i.e., that the provided timestamp bound is at least minTimestamp and
+         * is smaller than nextTimestamp. As a result of this requirement, it is not permitted to
+         * call this method on degenerate segments.
+         *
+         * @param timestamp the timestamp to find
+         * @param includeValue whether the value of the found record should be returned with the result
+         * @return the record that is found
+         * @throws IllegalArgumentException if the provided timestamp is not contained within this segment
+         */
+        SegmentSearchResult find(long timestamp, boolean includeValue);
+
+        /**
+         * Inserts the provided record into the segment as the latest record in the segment.
+         * This operation is allowed even if the segment is degenerate.
+         * <p>
+         * It is the caller's responsibility to ensure that this action is desirable. In the event
+         * that the new record's (validFrom) timestamp is smaller than the current
+         * {@code nextTimestamp} of the segment, the operation will still be performed, and the
+         * segment's existing contents will be truncated to ensure consistency of timestamps within
+         * the segment. This truncation behavior helps reconcile inconsistencies between different
+         * segments, or between a segment and the latest value store, of a
+         * {@link RocksDBVersionedStore} instance.
+         *
+         * @param validFrom the (validFrom) timestamp of the record to insert
+         * @param validTo the validTo timestamp of the record to insert
+         * @param value the value of the record to insert
+         */
+        void insertAsLatest(long validFrom, long validTo, byte[] value);
+
+        /**
+         * Inserts the provided record into the segment as the earliest record in the segment.
+         * This operation is allowed even if the segment is degenerate. It is the caller's responsibility

Review Comment:
   As above.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##########
@@ -0,0 +1,518 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Helper utility for managing the bytes layout of the value stored in segments of the {@link RocksDBVersionedStore}.
+ * The value format is:
+ * <pre>
+ *     <next_timestamp> + <min_timestamp> + <list of <timestamp, value_size>, reverse-sorted by timestamp> + <list of values, forward-sorted by timestamp>
+ * </pre>
+ * where:
+ * <ul>
+ * <li>{@code next_timestamp} is the validTo timestamp of the latest record version stored in this
+ * segment,</li>
+ * <li>{@code min_timestamp} is the validFrom timestamp of the earliest record version stored
+ * in this segment, and</li>
+ * <li>Negative {@code value_size} is used to indicate that the value stored is a tombstone,
+ * in order to distinguish from empty array which has {@code value_size} of zero. In practice,
+ * {@code value_size} is always set to -1 for the tombstone case, though this need not be true
+ * in general.</li>
+ * </ul>
+ * <p>
+ * Note that the value format above does not store the number of record versions contained in the
+ * segment. It is not necessary to store this information separately because this information is
+ * never required on its own. Record versions are always deserialized in order, and we can
+ * determine when we have reached the end of the list based on whether the (validFrom) timestamp of
+ * the record version equals the {@code min_timestamp}.
+ * <p>
+ * There is one edge case with regards to the segment value format described above, which is useful
+ * to know for understanding the code in this file, but not relevant for callers of the class.
+ * In the typical case, all record (validFrom) timestamps and the {@code next_timestamp} of the
+ * segment will form a strictly increasing sequence, i.e., it is not valid to have a record version
+ * with validTo timestamp equal to (or less than) its validFrom timestamp. The one edge case /
+ * exception is when the latest record version (for a particular key) is a tombstone, and the
+ * segment in which this tombstone is to be stored contains currently no record versions.
+ * This case will result in a "degenerate" segment containing the single tombstone, with both
+ * {@code min_timestamp} and {@code next_timestamp} equal to the (validFrom) timestamp of the
+ * tombstone. (It is valid to interpret this tombstone's validTo timestamp as being equal to its
+ * validFrom timestamp, as querying for the latest record version as of a later timestamp will
+ * correctly return that no record version is present.) Note also that after a "degenerate" segment
+ * has formed, it's possible that the segment will remain degenerate even as newer record versions
+ * are added. (For example, if additional puts happen with later timestamps such that those puts
+ * only affect later segments, then the earlier degenerate segment will remain degenerate.)
+ * <p>
+ * Callers of this class need not concern themselves with this detail because all the exposed
+ * methods function as expected, even in the degenerate segment case. All methods may still be
+ * called, with the exception of {@link SegmentValue#find(long, boolean)} and those that depend
+ * on it (i.e., {@link SegmentValue#updateRecord(long, byte[], int)} and
+ * {@link SegmentValue#insert(long, byte[], int)}). Missing support for calling these methods on
+ * degenerate segments is not an issue because the same timestamp bounds restrictions required for
+ * calling {@link SegmentValue#find(long, boolean)} on regular segments serve to prevent callers
+ * from calling the method on degenerate segments as well.
+ */
+final class RocksDBVersionedStoreSegmentValueFormatter {
+    private static final int TIMESTAMP_SIZE = 8;
+    private static final int VALUE_SIZE = 4;
+
+    /**
+     * @return the validTo timestamp of the latest record in the provided segment
+     */
+    static long getNextTimestamp(final byte[] segmentValue) {
+        return ByteBuffer.wrap(segmentValue).getLong(0);
+    }
+
+    /**
+     * @return the (validFrom) timestamp of the earliest record in the provided segment.
+     */
+    static long getMinTimestamp(final byte[] segmentValue) {
+        return ByteBuffer.wrap(segmentValue).getLong(TIMESTAMP_SIZE);
+    }
+
+    /**
+     * @return the deserialized segment value
+     */
+    static SegmentValue deserialize(final byte[] segmentValue) {
+        return new PartiallyDeserializedSegmentValue(segmentValue);
+    }
+
+    /**
+     * Creates a new segment value that contains the provided record.
+     * <p>
+     * This method may also be used to create a "degenerate" segment with {@code null} value and
+     * {@code validFrom} timestamp equal to {@code validTo}. (For more on degenerate segments,
+     * see the main javadoc for this class.)
+     *
+     * @param value the record value
+     * @param validFrom the record's (validFrom) timestamp
+     * @param validTo the record's validTo timestamp
+     * @return the newly created segment value
+     */
+    static SegmentValue newSegmentValueWithRecord(
+        final byte[] value, final long validFrom, final long validTo) {
+        return new PartiallyDeserializedSegmentValue(value, validFrom, validTo);
+    }
+
+    interface SegmentValue {
+
+        /**
+         * Finds the latest record in this segment with (validFrom) timestamp not exceeding the
+         * provided timestamp bound. This method requires that the provided timestamp bound exists
+         * in this segment, i.e., that the provided timestamp bound is at least minTimestamp and
+         * is smaller than nextTimestamp. As a result of this requirement, it is not permitted to
+         * call this method on degenerate segments.
+         *
+         * @param timestamp the timestamp to find
+         * @param includeValue whether the value of the found record should be returned with the result
+         * @return the record that is found
+         * @throws IllegalArgumentException if the provided timestamp is not contained within this segment
+         */
+        SegmentSearchResult find(long timestamp, boolean includeValue);
+
+        /**
+         * Inserts the provided record into the segment as the latest record in the segment.
+         * This operation is allowed even if the segment is degenerate.
+         * <p>
+         * It is the caller's responsibility to ensure that this action is desirable. In the event
+         * that the new record's (validFrom) timestamp is smaller than the current
+         * {@code nextTimestamp} of the segment, the operation will still be performed, and the
+         * segment's existing contents will be truncated to ensure consistency of timestamps within
+         * the segment. This truncation behavior helps reconcile inconsistencies between different
+         * segments, or between a segment and the latest value store, of a
+         * {@link RocksDBVersionedStore} instance.
+         *
+         * @param validFrom the (validFrom) timestamp of the record to insert
+         * @param validTo the validTo timestamp of the record to insert
+         * @param value the value of the record to insert
+         */
+        void insertAsLatest(long validFrom, long validTo, byte[] value);
+
+        /**
+         * Inserts the provided record into the segment as the earliest record in the segment.
+         * This operation is allowed even if the segment is degenerate. It is the caller's responsibility
+         * to ensure that this action is valid, i.e., that record's (validFrom) timestamp is smaller
+         * than the current {@code minTimestamp} of the segment.
+         *
+         * @param timestamp the (validFrom) timestamp of the record to insert
+         * @param value the value of the record to insert
+         */
+        void insertAsEarliest(long timestamp, byte[] value);
+
+        /**
+         * Inserts the provided record into the segment at the provided index. This operation
+         * requires that the segment is not degenerate, and that
+         * {@link SegmentValue#find(long, boolean)} has already been called in order to deserialize
+         * the relevant index (to insert into index n requires that index n-1 has already been deserialized).
+         * <p>
+         * It is the caller's responsibility to ensure that this action makes sense, i.e., that the
+         * insertion index is correct for the (validFrom) timestamp of the record being inserted.
+         *
+         * @param timestamp the (validFrom) timestamp of the record to insert
+         * @param value the value of the record to insert
+         * @param index the index that the newly inserted record should occupy
+         * @throws IllegalArgumentException if the segment is degenerate, if the provided index is out of
+         *         bounds, or if {@code find()} has not been called to deserialize the relevant index.
+         */
+        void insert(long timestamp, byte[] value, int index);
+
+        /**
+         * Updates the record at the provided index with the provided value and (validFrom)
+         * timestamp. This operation requires that {@link SegmentValue#find(long, boolean)} has
+         * already been called in order to deserialize the relevant index (i.e., the one being updated).
+         * (As a result, it is not valid to call this method on a degenerate segment.)

Review Comment:
   As above. 



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##########
@@ -0,0 +1,554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Helper utility for managing the bytes layout of the value stored in segments of the {@link RocksDBVersionedStore}.
+ * The value format is:
+ * <pre>
+ *     <next_timestamp> + <min_timestamp> + <list of <timestamp, value_size>, reverse-sorted by timestamp> + <list of values, forward-sorted by timestamp>
+ * </pre>
+ * where:
+ * <ul>
+ * <li>{@code next_timestamp} is the validTo timestamp of the latest record version stored in this
+ * segment,</li>
+ * <li>{@code min_timestamp} is the validFrom timestamp of the earliest record version stored
+ * in this segment, and</li>
+ * <li>Negative {@code value_size} is used to indicate that the value stored is a tombstone,
+ * in order to distinguish from empty array which has {@code value_size} of zero. In practice,
+ * {@code value_size} is always set to -1 for the tombstone case, though this need not be true
+ * in general.</li>
+ * </ul>
+ * <p>
+ * Note that the value format above does not store the number of record versions contained in the
+ * segment. It is not necessary to store this information separately because this information is
+ * never required on its own. Record versions are always deserialized in order, and we can
+ * determine when we have reached the end of the list based on whether the (validFrom) timestamp of
+ * the record version equals the {@code min_timestamp}.
+ * <p>
+ * Additionally, there is one edge case / exception to the segment value format described above.
+ * If the latest record version (for a particular key) is a tombstone, and the segment in which

Review Comment:
   I was thinking about the question "when does the edge case apply" over and over again, and I am not sure if the current description is easy enough (nor if it's 100% correct -- maybe it's correct, and I just don't understand it though...)
   
   If we get a in-order delete, and the record exist in the "latest value store", we have the regular case.
    
   
   
   
   I would rephrase this paragraph as follows:
   
   > [...] described above: In general, we store the latest record version in the "latest value store" and store older versions in the "segment store". Entries in the segment store inherit their {@code validTo} timestamp fro the latest entry's {@code validFrom} timestamp.
   
   However, if we have not seen a key yet (or all previous version did expire already) and we receive a tombstone there is nothing to be deleted from the "latest value store", but we still need to track the received tombstone (note: we cannot store the tombstone in the "latest value store", as a tombstone in the "latest value store" is just a delete). Tracking the tombstone is required to maintain the correct history for the key in question (in case a later out-of-order record would be delete by this tombstone). Thus, we put the tombstone into the "segment store" eagerly, what implies that we don't have a {@code validTo} timestamp for it yet (there is no newer value for the key inserted after the tombstone yet; the tombstone's {@code validTo} timestamp is semantically infinity at this point). We cannot use infinity to pick a segment to store the tombstone though. As surrogate we set {@code validTo := validFrom} to select a segment. We encode the row as {@code next_timestamp := validT
 o (= validFrom)} and leave {@code min_timestamp} blank. This format is basically a placeholder for the existence of a tombstone (and an unknown {@code validTo} timestamp) and we can "convert" it back to the regular row format in case a value for this key is inserted into the segment later. Because the tombstone is store in a segment, it will expire eventually as desired. 



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##########
@@ -0,0 +1,518 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Helper utility for managing the bytes layout of the value stored in segments of the {@link RocksDBVersionedStore}.
+ * The value format is:
+ * <pre>
+ *     <next_timestamp> + <min_timestamp> + <list of <timestamp, value_size>, reverse-sorted by timestamp> + <list of values, forward-sorted by timestamp>
+ * </pre>
+ * where:
+ * <ul>
+ * <li>{@code next_timestamp} is the validTo timestamp of the latest record version stored in this
+ * segment,</li>
+ * <li>{@code min_timestamp} is the validFrom timestamp of the earliest record version stored
+ * in this segment, and</li>
+ * <li>Negative {@code value_size} is used to indicate that the value stored is a tombstone,
+ * in order to distinguish from empty array which has {@code value_size} of zero. In practice,
+ * {@code value_size} is always set to -1 for the tombstone case, though this need not be true
+ * in general.</li>
+ * </ul>
+ * <p>
+ * Note that the value format above does not store the number of record versions contained in the
+ * segment. It is not necessary to store this information separately because this information is
+ * never required on its own. Record versions are always deserialized in order, and we can
+ * determine when we have reached the end of the list based on whether the (validFrom) timestamp of
+ * the record version equals the {@code min_timestamp}.
+ * <p>
+ * There is one edge case with regards to the segment value format described above, which is useful
+ * to know for understanding the code in this file, but not relevant for callers of the class.
+ * In the typical case, all record (validFrom) timestamps and the {@code next_timestamp} of the
+ * segment will form a strictly increasing sequence, i.e., it is not valid to have a record version
+ * with validTo timestamp equal to (or less than) its validFrom timestamp. The one edge case /
+ * exception is when the latest record version (for a particular key) is a tombstone, and the
+ * segment in which this tombstone is to be stored contains currently no record versions.
+ * This case will result in a "degenerate" segment containing the single tombstone, with both
+ * {@code min_timestamp} and {@code next_timestamp} equal to the (validFrom) timestamp of the
+ * tombstone. (It is valid to interpret this tombstone's validTo timestamp as being equal to its
+ * validFrom timestamp, as querying for the latest record version as of a later timestamp will
+ * correctly return that no record version is present.) Note also that after a "degenerate" segment
+ * has formed, it's possible that the segment will remain degenerate even as newer record versions
+ * are added. (For example, if additional puts happen with later timestamps such that those puts
+ * only affect later segments, then the earlier degenerate segment will remain degenerate.)
+ * <p>
+ * Callers of this class need not concern themselves with this detail because all the exposed
+ * methods function as expected, even in the degenerate segment case. All methods may still be
+ * called, with the exception of {@link SegmentValue#find(long, boolean)} and those that depend
+ * on it (i.e., {@link SegmentValue#updateRecord(long, byte[], int)} and
+ * {@link SegmentValue#insert(long, byte[], int)}). Missing support for calling these methods on
+ * degenerate segments is not an issue because the same timestamp bounds restrictions required for
+ * calling {@link SegmentValue#find(long, boolean)} on regular segments serve to prevent callers
+ * from calling the method on degenerate segments as well.
+ */
+final class RocksDBVersionedStoreSegmentValueFormatter {
+    private static final int TIMESTAMP_SIZE = 8;
+    private static final int VALUE_SIZE = 4;
+
+    /**
+     * @return the validTo timestamp of the latest record in the provided segment
+     */
+    static long getNextTimestamp(final byte[] segmentValue) {
+        return ByteBuffer.wrap(segmentValue).getLong(0);
+    }
+
+    /**
+     * @return the (validFrom) timestamp of the earliest record in the provided segment.
+     */
+    static long getMinTimestamp(final byte[] segmentValue) {
+        return ByteBuffer.wrap(segmentValue).getLong(TIMESTAMP_SIZE);
+    }
+
+    /**
+     * @return the deserialized segment value
+     */
+    static SegmentValue deserialize(final byte[] segmentValue) {
+        return new PartiallyDeserializedSegmentValue(segmentValue);
+    }
+
+    /**
+     * Creates a new segment value that contains the provided record.
+     * <p>
+     * This method may also be used to create a "degenerate" segment with {@code null} value and
+     * {@code validFrom} timestamp equal to {@code validTo}. (For more on degenerate segments,
+     * see the main javadoc for this class.)
+     *
+     * @param value the record value
+     * @param validFrom the record's (validFrom) timestamp
+     * @param validTo the record's validTo timestamp
+     * @return the newly created segment value
+     */
+    static SegmentValue newSegmentValueWithRecord(
+        final byte[] value, final long validFrom, final long validTo) {
+        return new PartiallyDeserializedSegmentValue(value, validFrom, validTo);
+    }
+
+    interface SegmentValue {
+
+        /**
+         * Finds the latest record in this segment with (validFrom) timestamp not exceeding the
+         * provided timestamp bound. This method requires that the provided timestamp bound exists
+         * in this segment, i.e., that the provided timestamp bound is at least minTimestamp and
+         * is smaller than nextTimestamp. As a result of this requirement, it is not permitted to
+         * call this method on degenerate segments.
+         *
+         * @param timestamp the timestamp to find
+         * @param includeValue whether the value of the found record should be returned with the result
+         * @return the record that is found
+         * @throws IllegalArgumentException if the provided timestamp is not contained within this segment
+         */
+        SegmentSearchResult find(long timestamp, boolean includeValue);
+
+        /**
+         * Inserts the provided record into the segment as the latest record in the segment.
+         * This operation is allowed even if the segment is degenerate.

Review Comment:
   As above.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##########
@@ -0,0 +1,554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Helper utility for managing the bytes layout of the value stored in segments of the {@link RocksDBVersionedStore}.
+ * The value format is:
+ * <pre>
+ *     <next_timestamp> + <min_timestamp> + <list of <timestamp, value_size>, reverse-sorted by timestamp> + <list of values, forward-sorted by timestamp>
+ * </pre>
+ * where:
+ * <ul>
+ * <li>{@code next_timestamp} is the validTo timestamp of the latest record version stored in this
+ * segment,</li>
+ * <li>{@code min_timestamp} is the validFrom timestamp of the earliest record version stored
+ * in this segment, and</li>
+ * <li>Negative {@code value_size} is used to indicate that the value stored is a tombstone,
+ * in order to distinguish from empty array which has {@code value_size} of zero. In practice,
+ * {@code value_size} is always set to -1 for the tombstone case, though this need not be true
+ * in general.</li>
+ * </ul>
+ * <p>
+ * Note that the value format above does not store the number of record versions contained in the
+ * segment. It is not necessary to store this information separately because this information is

Review Comment:
   `segment` -> `row`
   
   (I stop here to mark it -- there is more below, in case you follow my suggestion)



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##########
@@ -0,0 +1,554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Helper utility for managing the bytes layout of the value stored in segments of the {@link RocksDBVersionedStore}.
+ * The value format is:
+ * <pre>
+ *     <next_timestamp> + <min_timestamp> + <list of <timestamp, value_size>, reverse-sorted by timestamp> + <list of values, forward-sorted by timestamp>
+ * </pre>
+ * where:
+ * <ul>
+ * <li>{@code next_timestamp} is the validTo timestamp of the latest record version stored in this
+ * segment,</li>
+ * <li>{@code min_timestamp} is the validFrom timestamp of the earliest record version stored
+ * in this segment, and</li>
+ * <li>Negative {@code value_size} is used to indicate that the value stored is a tombstone,
+ * in order to distinguish from empty array which has {@code value_size} of zero. In practice,
+ * {@code value_size} is always set to -1 for the tombstone case, though this need not be true
+ * in general.</li>
+ * </ul>
+ * <p>
+ * Note that the value format above does not store the number of record versions contained in the
+ * segment. It is not necessary to store this information separately because this information is
+ * never required on its own. Record versions are always deserialized in order, and we can
+ * determine when we have reached the end of the list based on whether the (validFrom) timestamp of

Review Comment:
   nit `{@code validFrom}`



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##########
@@ -0,0 +1,518 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Helper utility for managing the bytes layout of the value stored in segments of the {@link RocksDBVersionedStore}.
+ * The value format is:
+ * <pre>
+ *     <next_timestamp> + <min_timestamp> + <list of <timestamp, value_size>, reverse-sorted by timestamp> + <list of values, forward-sorted by timestamp>
+ * </pre>
+ * where:
+ * <ul>
+ * <li>{@code next_timestamp} is the validTo timestamp of the latest record version stored in this
+ * segment,</li>
+ * <li>{@code min_timestamp} is the validFrom timestamp of the earliest record version stored
+ * in this segment, and</li>
+ * <li>Negative {@code value_size} is used to indicate that the value stored is a tombstone,
+ * in order to distinguish from empty array which has {@code value_size} of zero. In practice,
+ * {@code value_size} is always set to -1 for the tombstone case, though this need not be true
+ * in general.</li>
+ * </ul>
+ * <p>
+ * Note that the value format above does not store the number of record versions contained in the
+ * segment. It is not necessary to store this information separately because this information is
+ * never required on its own. Record versions are always deserialized in order, and we can
+ * determine when we have reached the end of the list based on whether the (validFrom) timestamp of
+ * the record version equals the {@code min_timestamp}.
+ * <p>
+ * There is one edge case with regards to the segment value format described above, which is useful
+ * to know for understanding the code in this file, but not relevant for callers of the class.
+ * In the typical case, all record (validFrom) timestamps and the {@code next_timestamp} of the
+ * segment will form a strictly increasing sequence, i.e., it is not valid to have a record version
+ * with validTo timestamp equal to (or less than) its validFrom timestamp. The one edge case /
+ * exception is when the latest record version (for a particular key) is a tombstone, and the
+ * segment in which this tombstone is to be stored contains currently no record versions.
+ * This case will result in a "degenerate" segment containing the single tombstone, with both
+ * {@code min_timestamp} and {@code next_timestamp} equal to the (validFrom) timestamp of the
+ * tombstone. (It is valid to interpret this tombstone's validTo timestamp as being equal to its
+ * validFrom timestamp, as querying for the latest record version as of a later timestamp will
+ * correctly return that no record version is present.) Note also that after a "degenerate" segment
+ * has formed, it's possible that the segment will remain degenerate even as newer record versions
+ * are added. (For example, if additional puts happen with later timestamps such that those puts
+ * only affect later segments, then the earlier degenerate segment will remain degenerate.)
+ * <p>
+ * Callers of this class need not concern themselves with this detail because all the exposed
+ * methods function as expected, even in the degenerate segment case. All methods may still be
+ * called, with the exception of {@link SegmentValue#find(long, boolean)} and those that depend
+ * on it (i.e., {@link SegmentValue#updateRecord(long, byte[], int)} and
+ * {@link SegmentValue#insert(long, byte[], int)}). Missing support for calling these methods on
+ * degenerate segments is not an issue because the same timestamp bounds restrictions required for
+ * calling {@link SegmentValue#find(long, boolean)} on regular segments serve to prevent callers
+ * from calling the method on degenerate segments as well.
+ */
+final class RocksDBVersionedStoreSegmentValueFormatter {
+    private static final int TIMESTAMP_SIZE = 8;
+    private static final int VALUE_SIZE = 4;
+
+    /**
+     * @return the validTo timestamp of the latest record in the provided segment
+     */
+    static long getNextTimestamp(final byte[] segmentValue) {
+        return ByteBuffer.wrap(segmentValue).getLong(0);
+    }
+
+    /**
+     * @return the (validFrom) timestamp of the earliest record in the provided segment.
+     */
+    static long getMinTimestamp(final byte[] segmentValue) {
+        return ByteBuffer.wrap(segmentValue).getLong(TIMESTAMP_SIZE);
+    }
+
+    /**
+     * @return the deserialized segment value
+     */
+    static SegmentValue deserialize(final byte[] segmentValue) {
+        return new PartiallyDeserializedSegmentValue(segmentValue);
+    }
+
+    /**
+     * Creates a new segment value that contains the provided record.
+     * <p>
+     * This method may also be used to create a "degenerate" segment with {@code null} value and
+     * {@code validFrom} timestamp equal to {@code validTo}. (For more on degenerate segments,
+     * see the main javadoc for this class.)
+     *
+     * @param value the record value
+     * @param validFrom the record's (validFrom) timestamp
+     * @param validTo the record's validTo timestamp
+     * @return the newly created segment value
+     */
+    static SegmentValue newSegmentValueWithRecord(
+        final byte[] value, final long validFrom, final long validTo) {
+        return new PartiallyDeserializedSegmentValue(value, validFrom, validTo);
+    }
+
+    interface SegmentValue {
+
+        /**
+         * Finds the latest record in this segment with (validFrom) timestamp not exceeding the
+         * provided timestamp bound. This method requires that the provided timestamp bound exists
+         * in this segment, i.e., that the provided timestamp bound is at least minTimestamp and
+         * is smaller than nextTimestamp. As a result of this requirement, it is not permitted to
+         * call this method on degenerate segments.
+         *
+         * @param timestamp the timestamp to find
+         * @param includeValue whether the value of the found record should be returned with the result
+         * @return the record that is found
+         * @throws IllegalArgumentException if the provided timestamp is not contained within this segment
+         */
+        SegmentSearchResult find(long timestamp, boolean includeValue);
+
+        /**
+         * Inserts the provided record into the segment as the latest record in the segment.
+         * This operation is allowed even if the segment is degenerate.
+         * <p>
+         * It is the caller's responsibility to ensure that this action is desirable. In the event
+         * that the new record's (validFrom) timestamp is smaller than the current
+         * {@code nextTimestamp} of the segment, the operation will still be performed, and the
+         * segment's existing contents will be truncated to ensure consistency of timestamps within
+         * the segment. This truncation behavior helps reconcile inconsistencies between different
+         * segments, or between a segment and the latest value store, of a
+         * {@link RocksDBVersionedStore} instance.
+         *
+         * @param validFrom the (validFrom) timestamp of the record to insert
+         * @param validTo the validTo timestamp of the record to insert
+         * @param value the value of the record to insert
+         */
+        void insertAsLatest(long validFrom, long validTo, byte[] value);
+
+        /**
+         * Inserts the provided record into the segment as the earliest record in the segment.
+         * This operation is allowed even if the segment is degenerate. It is the caller's responsibility
+         * to ensure that this action is valid, i.e., that record's (validFrom) timestamp is smaller
+         * than the current {@code minTimestamp} of the segment.
+         *
+         * @param timestamp the (validFrom) timestamp of the record to insert
+         * @param value the value of the record to insert
+         */
+        void insertAsEarliest(long timestamp, byte[] value);
+
+        /**
+         * Inserts the provided record into the segment at the provided index. This operation
+         * requires that the segment is not degenerate, and that

Review Comment:
   As above.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##########
@@ -0,0 +1,554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Helper utility for managing the bytes layout of the value stored in segments of the {@link RocksDBVersionedStore}.
+ * The value format is:
+ * <pre>
+ *     <next_timestamp> + <min_timestamp> + <list of <timestamp, value_size>, reverse-sorted by timestamp> + <list of values, forward-sorted by timestamp>
+ * </pre>
+ * where:
+ * <ul>
+ * <li>{@code next_timestamp} is the validTo timestamp of the latest record version stored in this
+ * segment,</li>
+ * <li>{@code min_timestamp} is the validFrom timestamp of the earliest record version stored
+ * in this segment, and</li>
+ * <li>Negative {@code value_size} is used to indicate that the value stored is a tombstone,
+ * in order to distinguish from empty array which has {@code value_size} of zero. In practice,
+ * {@code value_size} is always set to -1 for the tombstone case, though this need not be true
+ * in general.</li>
+ * </ul>
+ * <p>
+ * Note that the value format above does not store the number of record versions contained in the
+ * segment. It is not necessary to store this information separately because this information is
+ * never required on its own. Record versions are always deserialized in order, and we can
+ * determine when we have reached the end of the list based on whether the (validFrom) timestamp of
+ * the record version equals the {@code min_timestamp}.
+ * <p>
+ * Additionally, there is one edge case / exception to the segment value format described above.
+ * If the latest record version (for a particular key) is a tombstone, and the segment in which
+ * this tombstone is to be stored contains currently no record versions, then this will result in
+ * an "empty" segment -- i.e., the segment will contain only a single tombstone with no validTo
+ * timestamp associated with it. When this happens, the serialized segment will contain the
+ * tombstone's (validFrom) timestamp and nothing else. Upon deserializing an empty segment, the
+ * tombstone's timestamp can be fetched as the {@code next_timestamp} of the segment. (An empty
+ * segment can be thought of as having {@code min_timestamp} and {@code next_timestamp} both equal
+ * to the timestamp of the single tombstone record version. To avoid the redundancy of serializing
+ * the same timestamp twice, it is only serialized once and stored as the first timestamp of the
+ * segment, which happens to be {@code next_timestamp}.)
+ */
+final class RocksDBVersionedStoreSegmentValueFormatter {
+    private static final int TIMESTAMP_SIZE = 8;
+    private static final int VALUE_SIZE = 4;
+
+    /**
+     * @return the validTo timestamp of the latest record in the provided segment
+     */
+    static long getNextTimestamp(final byte[] segmentValue) {

Review Comment:
   `segmentValue` -> `row` ? (also elsewhere)



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##########
@@ -0,0 +1,554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Helper utility for managing the bytes layout of the value stored in segments of the {@link RocksDBVersionedStore}.
+ * The value format is:
+ * <pre>
+ *     <next_timestamp> + <min_timestamp> + <list of <timestamp, value_size>, reverse-sorted by timestamp> + <list of values, forward-sorted by timestamp>
+ * </pre>
+ * where:
+ * <ul>
+ * <li>{@code next_timestamp} is the validTo timestamp of the latest record version stored in this
+ * segment,</li>
+ * <li>{@code min_timestamp} is the validFrom timestamp of the earliest record version stored
+ * in this segment, and</li>
+ * <li>Negative {@code value_size} is used to indicate that the value stored is a tombstone,
+ * in order to distinguish from empty array which has {@code value_size} of zero. In practice,
+ * {@code value_size} is always set to -1 for the tombstone case, though this need not be true
+ * in general.</li>
+ * </ul>
+ * <p>
+ * Note that the value format above does not store the number of record versions contained in the
+ * segment. It is not necessary to store this information separately because this information is
+ * never required on its own. Record versions are always deserialized in order, and we can
+ * determine when we have reached the end of the list based on whether the (validFrom) timestamp of
+ * the record version equals the {@code min_timestamp}.
+ * <p>
+ * Additionally, there is one edge case / exception to the segment value format described above.
+ * If the latest record version (for a particular key) is a tombstone, and the segment in which
+ * this tombstone is to be stored contains currently no record versions, then this will result in
+ * an "empty" segment -- i.e., the segment will contain only a single tombstone with no validTo
+ * timestamp associated with it. When this happens, the serialized segment will contain the
+ * tombstone's (validFrom) timestamp and nothing else. Upon deserializing an empty segment, the
+ * tombstone's timestamp can be fetched as the {@code next_timestamp} of the segment. (An empty
+ * segment can be thought of as having {@code min_timestamp} and {@code next_timestamp} both equal
+ * to the timestamp of the single tombstone record version. To avoid the redundancy of serializing
+ * the same timestamp twice, it is only serialized once and stored as the first timestamp of the
+ * segment, which happens to be {@code next_timestamp}.)
+ */
+final class RocksDBVersionedStoreSegmentValueFormatter {
+    private static final int TIMESTAMP_SIZE = 8;
+    private static final int VALUE_SIZE = 4;
+
+    /**
+     * @return the validTo timestamp of the latest record in the provided segment
+     */
+    static long getNextTimestamp(final byte[] segmentValue) {
+        return ByteBuffer.wrap(segmentValue).getLong(0);
+    }
+
+    /**
+     * Returns whether the provided segment is "empty." An empty segment is one that
+     * contains only a single tombstone with no validTo timestamp specified. In this case,
+     * the serialized segment contains only the timestamp of the tombstone (stored as the segment's
+     * {@code nextTimestamp}) and nothing else.
+     * <p>
+     * This can happen if, e.g., the only record inserted for a particular key is
+     * a tombstone. In this case, the tombstone must be stored in a segment
+     * (as the latest value store does not store tombstones), but also has no validTo
+     * timestamp associated with it.
+     *
+     * @return whether the segment is "empty"
+     */
+    static boolean isEmpty(final byte[] segmentValue) {
+        return segmentValue.length <= TIMESTAMP_SIZE;
+    }
+
+    /**
+     * Requires that the segment is not empty. Caller is responsible for verifying that this
+     * is the case before calling this method.
+     *
+     * @return the (validFrom) timestamp of the earliest record in the provided segment.
+     */
+    static long getMinTimestamp(final byte[] segmentValue) {
+        return ByteBuffer.wrap(segmentValue).getLong(TIMESTAMP_SIZE);
+    }
+
+    /**
+     * @return the deserialized segment value
+     */
+    static SegmentValue deserialize(final byte[] segmentValue) {

Review Comment:
   Rename class `SegmentValue` to `Row` ?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##########
@@ -0,0 +1,554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Helper utility for managing the bytes layout of the value stored in segments of the {@link RocksDBVersionedStore}.
+ * The value format is:
+ * <pre>
+ *     <next_timestamp> + <min_timestamp> + <list of <timestamp, value_size>, reverse-sorted by timestamp> + <list of values, forward-sorted by timestamp>
+ * </pre>
+ * where:
+ * <ul>
+ * <li>{@code next_timestamp} is the validTo timestamp of the latest record version stored in this
+ * segment,</li>
+ * <li>{@code min_timestamp} is the validFrom timestamp of the earliest record version stored
+ * in this segment, and</li>
+ * <li>Negative {@code value_size} is used to indicate that the value stored is a tombstone,
+ * in order to distinguish from empty array which has {@code value_size} of zero. In practice,
+ * {@code value_size} is always set to -1 for the tombstone case, though this need not be true
+ * in general.</li>
+ * </ul>
+ * <p>
+ * Note that the value format above does not store the number of record versions contained in the
+ * segment. It is not necessary to store this information separately because this information is
+ * never required on its own. Record versions are always deserialized in order, and we can
+ * determine when we have reached the end of the list based on whether the (validFrom) timestamp of
+ * the record version equals the {@code min_timestamp}.
+ * <p>
+ * Additionally, there is one edge case / exception to the segment value format described above.
+ * If the latest record version (for a particular key) is a tombstone, and the segment in which
+ * this tombstone is to be stored contains currently no record versions, then this will result in
+ * an "empty" segment -- i.e., the segment will contain only a single tombstone with no validTo
+ * timestamp associated with it. When this happens, the serialized segment will contain the
+ * tombstone's (validFrom) timestamp and nothing else. Upon deserializing an empty segment, the
+ * tombstone's timestamp can be fetched as the {@code next_timestamp} of the segment. (An empty
+ * segment can be thought of as having {@code min_timestamp} and {@code next_timestamp} both equal
+ * to the timestamp of the single tombstone record version. To avoid the redundancy of serializing
+ * the same timestamp twice, it is only serialized once and stored as the first timestamp of the
+ * segment, which happens to be {@code next_timestamp}.)
+ */
+final class RocksDBVersionedStoreSegmentValueFormatter {
+    private static final int TIMESTAMP_SIZE = 8;
+    private static final int VALUE_SIZE = 4;
+
+    /**
+     * @return the validTo timestamp of the latest record in the provided segment
+     */
+    static long getNextTimestamp(final byte[] segmentValue) {
+        return ByteBuffer.wrap(segmentValue).getLong(0);
+    }
+
+    /**
+     * Returns whether the provided segment is "empty." An empty segment is one that
+     * contains only a single tombstone with no validTo timestamp specified. In this case,
+     * the serialized segment contains only the timestamp of the tombstone (stored as the segment's
+     * {@code nextTimestamp}) and nothing else.
+     * <p>
+     * This can happen if, e.g., the only record inserted for a particular key is
+     * a tombstone. In this case, the tombstone must be stored in a segment
+     * (as the latest value store does not store tombstones), but also has no validTo
+     * timestamp associated with it.
+     *
+     * @return whether the segment is "empty"
+     */
+    static boolean isEmpty(final byte[] segmentValue) {
+        return segmentValue.length <= TIMESTAMP_SIZE;
+    }
+
+    /**
+     * Requires that the segment is not empty. Caller is responsible for verifying that this
+     * is the case before calling this method.
+     *
+     * @return the (validFrom) timestamp of the earliest record in the provided segment.
+     */
+    static long getMinTimestamp(final byte[] segmentValue) {
+        return ByteBuffer.wrap(segmentValue).getLong(TIMESTAMP_SIZE);
+    }
+
+    /**
+     * @return the deserialized segment value
+     */
+    static SegmentValue deserialize(final byte[] segmentValue) {
+        return new PartiallyDeserializedSegmentValue(segmentValue);
+    }
+
+    /**
+     * Creates a new segment value that contains the provided record.
+     *
+     * @param value the record value
+     * @param validFrom the record's (validFrom) timestamp
+     * @param validTo the record's validTo timestamp
+     * @return the newly created segment value
+     */
+    static SegmentValue newSegmentValueWithRecord(
+        final byte[] value, final long validFrom, final long validTo) {
+        return new PartiallyDeserializedSegmentValue(value, validFrom, validTo);
+    }
+
+    /**
+     * Creates a new empty segment value.
+     *
+     * @param timestamp the (validFrom) timestamp of the tombstone for this empty segment value
+     * @return the newly created segment value
+     */
+    static SegmentValue newSegmentValueWithTombstone(final long timestamp) {
+        return new PartiallyDeserializedSegmentValue(timestamp);
+    }
+
+    interface SegmentValue {
+
+        /**
+         * @return whether the segment is empty. See
+         * {@link RocksDBVersionedStoreSegmentValueFormatter#isEmpty(byte[])} for details.
+         */
+        boolean isEmpty();
+
+        /**
+         * Finds the latest record in this segment with (validFrom) timestamp not exceeding the
+         * provided timestamp bound. This method requires that the provided timestamp bound exists
+         * in this segment, i.e., the segment is not empty, and the provided timestamp bound is
+         * at least minTimestamp and is smaller than nextTimestamp.
+         *
+         * @param timestamp the timestamp to find
+         * @param includeValue whether the value of the found record should be returned with the result
+         * @return the record that is found
+         * @throws IllegalArgumentException if the segment is empty, or if the provided timestamp
+         *         is not contained within this segment
+         */
+        SegmentSearchResult find(long timestamp, boolean includeValue);
+
+        /**
+         * Inserts the provided record into the segment as the latest record in the segment.
+         * This operation is allowed even if the segment is empty.
+         * <p>
+         * It is the caller's responsibility to ensure that this action is desirable. In the event
+         * that the new record's (validFrom) timestamp is smaller than the current
+         * {@code nextTimestamp} of the segment, the operation will still be performed, and the
+         * segment's existing contents will be truncated to ensure consistency of timestamps within
+         * the segment. This truncation behavior helps reconcile inconsistencies between different
+         * segments, or between a segment and the latest value store, of a
+         * {@link RocksDBVersionedStore} instance.
+         *
+         * @param validFrom the (validFrom) timestamp of the record to insert
+         * @param validTo the validTo timestamp of the record to insert
+         * @param value the value of the record to insert
+         */
+        void insertAsLatest(long validFrom, long validTo, byte[] value);
+
+        /**
+         * Inserts the provided record into the segment as the earliest record in the segment.
+         * This operation is allowed even if the segment is empty. It is the caller's responsibility
+         * to ensure that this action is valid, i.e., that record's (validFrom) timestamp is smaller
+         * than the current {@code minTimestamp} of the segment, or the segment is empty.
+         *
+         * @param timestamp the (validFrom) timestamp of the record to insert
+         * @param value the value of the record to insert
+         */
+        void insertAsEarliest(long timestamp, byte[] value);
+
+        /**
+         * Inserts the provided record into the segment at the provided index. This operation
+         * requires that the segment is not empty, and that {@link SegmentValue#find(long, boolean)}
+         * has already been called in order to deserialize the relevant index (to insert into
+         * index n requires that index n-1 has already been deserialized).
+         * <p>
+         * It is the caller's responsibility to ensure that this action makes sense, i.e., that the
+         * insertion index is correct for the (validFrom) timestamp of the record being inserted.
+         *
+         * @param timestamp the (validFrom) timestamp of the record to insert
+         * @param value the value of the record to insert
+         * @param index the index that the newly inserted record should occupy
+         * @throws IllegalArgumentException if the segment is empty, if the provided index is out of
+         *         bounds, or if {@code find()} has not been called to deserialize the relevant index.
+         */
+        void insert(long timestamp, byte[] value, int index);
+
+        /**
+         * Updates the record at the provided index with the provided value and (validFrom)
+         * timestamp. This operation requires that the segment is not empty, and that
+         * {@link SegmentValue#find(long, boolean)} has already been called in order to deserialize
+         * the relevant index (i.e., the one being updated).
+         * <p>
+         * It is the caller's responsibility to ensure that this action makes sense, i.e., that the
+         * updated (validFrom) timestamp does not violate timestamp order within the segment.
+         *
+         * @param timestamp the updated record (validFrom) timestamp
+         * @param value the updated record value
+         * @param index the index of the record to update
+         */
+        void updateRecord(long timestamp, byte[] value, int index);
+
+        /**
+         * @return the bytes serialization for this segment value
+         */
+        byte[] serialize();
+
+        class SegmentSearchResult {

Review Comment:
   `RowSearchResult` ?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##########
@@ -0,0 +1,518 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Helper utility for managing the bytes layout of the value stored in segments of the {@link RocksDBVersionedStore}.
+ * The value format is:
+ * <pre>
+ *     <next_timestamp> + <min_timestamp> + <list of <timestamp, value_size>, reverse-sorted by timestamp> + <list of values, forward-sorted by timestamp>
+ * </pre>
+ * where:
+ * <ul>
+ * <li>{@code next_timestamp} is the validTo timestamp of the latest record version stored in this
+ * segment,</li>
+ * <li>{@code min_timestamp} is the validFrom timestamp of the earliest record version stored
+ * in this segment, and</li>
+ * <li>Negative {@code value_size} is used to indicate that the value stored is a tombstone,
+ * in order to distinguish from empty array which has {@code value_size} of zero. In practice,
+ * {@code value_size} is always set to -1 for the tombstone case, though this need not be true
+ * in general.</li>
+ * </ul>
+ * <p>
+ * Note that the value format above does not store the number of record versions contained in the
+ * segment. It is not necessary to store this information separately because this information is
+ * never required on its own. Record versions are always deserialized in order, and we can
+ * determine when we have reached the end of the list based on whether the (validFrom) timestamp of
+ * the record version equals the {@code min_timestamp}.
+ * <p>
+ * There is one edge case with regards to the segment value format described above, which is useful
+ * to know for understanding the code in this file, but not relevant for callers of the class.
+ * In the typical case, all record (validFrom) timestamps and the {@code next_timestamp} of the

Review Comment:
   nit: {@code validFrom} (I actually don't care if we use the {@code} markup, but currently it's a mix of both, so please use it everywhere or nowhere (will stop marking it, I assume there is more of it below).



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##########
@@ -0,0 +1,518 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Helper utility for managing the bytes layout of the value stored in segments of the {@link RocksDBVersionedStore}.
+ * The value format is:
+ * <pre>
+ *     <next_timestamp> + <min_timestamp> + <list of <timestamp, value_size>, reverse-sorted by timestamp> + <list of values, forward-sorted by timestamp>
+ * </pre>
+ * where:
+ * <ul>
+ * <li>{@code next_timestamp} is the validTo timestamp of the latest record version stored in this
+ * segment,</li>
+ * <li>{@code min_timestamp} is the validFrom timestamp of the earliest record version stored
+ * in this segment, and</li>
+ * <li>Negative {@code value_size} is used to indicate that the value stored is a tombstone,
+ * in order to distinguish from empty array which has {@code value_size} of zero. In practice,
+ * {@code value_size} is always set to -1 for the tombstone case, though this need not be true
+ * in general.</li>
+ * </ul>
+ * <p>
+ * Note that the value format above does not store the number of record versions contained in the
+ * segment. It is not necessary to store this information separately because this information is
+ * never required on its own. Record versions are always deserialized in order, and we can
+ * determine when we have reached the end of the list based on whether the (validFrom) timestamp of
+ * the record version equals the {@code min_timestamp}.
+ * <p>
+ * There is one edge case with regards to the segment value format described above, which is useful
+ * to know for understanding the code in this file, but not relevant for callers of the class.
+ * In the typical case, all record (validFrom) timestamps and the {@code next_timestamp} of the
+ * segment will form a strictly increasing sequence, i.e., it is not valid to have a record version
+ * with validTo timestamp equal to (or less than) its validFrom timestamp. The one edge case /
+ * exception is when the latest record version (for a particular key) is a tombstone, and the
+ * segment in which this tombstone is to be stored contains currently no record versions.
+ * This case will result in a "degenerate" segment containing the single tombstone, with both
+ * {@code min_timestamp} and {@code next_timestamp} equal to the (validFrom) timestamp of the
+ * tombstone. (It is valid to interpret this tombstone's validTo timestamp as being equal to its

Review Comment:
   Add
   ```
   tombstone; the list of {@code <timestamp,value-size>} will contain a regular tombstone entry (more details in other comments below). (It is valid...
   ```



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##########
@@ -0,0 +1,554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Helper utility for managing the bytes layout of the value stored in segments of the {@link RocksDBVersionedStore}.
+ * The value format is:
+ * <pre>
+ *     <next_timestamp> + <min_timestamp> + <list of <timestamp, value_size>, reverse-sorted by timestamp> + <list of values, forward-sorted by timestamp>
+ * </pre>
+ * where:
+ * <ul>
+ * <li>{@code next_timestamp} is the validTo timestamp of the latest record version stored in this
+ * segment,</li>
+ * <li>{@code min_timestamp} is the validFrom timestamp of the earliest record version stored
+ * in this segment, and</li>
+ * <li>Negative {@code value_size} is used to indicate that the value stored is a tombstone,
+ * in order to distinguish from empty array which has {@code value_size} of zero. In practice,
+ * {@code value_size} is always set to -1 for the tombstone case, though this need not be true
+ * in general.</li>
+ * </ul>
+ * <p>
+ * Note that the value format above does not store the number of record versions contained in the
+ * segment. It is not necessary to store this information separately because this information is
+ * never required on its own. Record versions are always deserialized in order, and we can
+ * determine when we have reached the end of the list based on whether the (validFrom) timestamp of
+ * the record version equals the {@code min_timestamp}.
+ * <p>
+ * Additionally, there is one edge case / exception to the segment value format described above.
+ * If the latest record version (for a particular key) is a tombstone, and the segment in which
+ * this tombstone is to be stored contains currently no record versions, then this will result in
+ * an "empty" segment -- i.e., the segment will contain only a single tombstone with no validTo
+ * timestamp associated with it. When this happens, the serialized segment will contain the
+ * tombstone's (validFrom) timestamp and nothing else. Upon deserializing an empty segment, the
+ * tombstone's timestamp can be fetched as the {@code next_timestamp} of the segment. (An empty
+ * segment can be thought of as having {@code min_timestamp} and {@code next_timestamp} both equal
+ * to the timestamp of the single tombstone record version. To avoid the redundancy of serializing
+ * the same timestamp twice, it is only serialized once and stored as the first timestamp of the
+ * segment, which happens to be {@code next_timestamp}.)
+ */
+final class RocksDBVersionedStoreSegmentValueFormatter {
+    private static final int TIMESTAMP_SIZE = 8;
+    private static final int VALUE_SIZE = 4;
+
+    /**
+     * @return the validTo timestamp of the latest record in the provided segment
+     */
+    static long getNextTimestamp(final byte[] segmentValue) {
+        return ByteBuffer.wrap(segmentValue).getLong(0);
+    }
+
+    /**
+     * Returns whether the provided segment is "empty." An empty segment is one that
+     * contains only a single tombstone with no validTo timestamp specified. In this case,
+     * the serialized segment contains only the timestamp of the tombstone (stored as the segment's
+     * {@code nextTimestamp}) and nothing else.
+     * <p>
+     * This can happen if, e.g., the only record inserted for a particular key is
+     * a tombstone. In this case, the tombstone must be stored in a segment
+     * (as the latest value store does not store tombstones), but also has no validTo
+     * timestamp associated with it.
+     *
+     * @return whether the segment is "empty"
+     */
+    static boolean isEmpty(final byte[] segmentValue) {
+        return segmentValue.length <= TIMESTAMP_SIZE;
+    }
+
+    /**
+     * Requires that the segment is not empty. Caller is responsible for verifying that this
+     * is the case before calling this method.
+     *
+     * @return the (validFrom) timestamp of the earliest record in the provided segment.
+     */
+    static long getMinTimestamp(final byte[] segmentValue) {
+        return ByteBuffer.wrap(segmentValue).getLong(TIMESTAMP_SIZE);
+    }
+
+    /**
+     * @return the deserialized segment value
+     */
+    static SegmentValue deserialize(final byte[] segmentValue) {
+        return new PartiallyDeserializedSegmentValue(segmentValue);
+    }
+
+    /**
+     * Creates a new segment value that contains the provided record.
+     *
+     * @param value the record value
+     * @param validFrom the record's (validFrom) timestamp
+     * @param validTo the record's validTo timestamp
+     * @return the newly created segment value
+     */
+    static SegmentValue newSegmentValueWithRecord(

Review Comment:
   rename to `newRowWithValue(...)` ?
   
   We usually use `Record` to refer to a Kafka message.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##########
@@ -0,0 +1,554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Helper utility for managing the bytes layout of the value stored in segments of the {@link RocksDBVersionedStore}.
+ * The value format is:
+ * <pre>
+ *     <next_timestamp> + <min_timestamp> + <list of <timestamp, value_size>, reverse-sorted by timestamp> + <list of values, forward-sorted by timestamp>
+ * </pre>
+ * where:
+ * <ul>
+ * <li>{@code next_timestamp} is the validTo timestamp of the latest record version stored in this
+ * segment,</li>
+ * <li>{@code min_timestamp} is the validFrom timestamp of the earliest record version stored
+ * in this segment, and</li>
+ * <li>Negative {@code value_size} is used to indicate that the value stored is a tombstone,
+ * in order to distinguish from empty array which has {@code value_size} of zero. In practice,
+ * {@code value_size} is always set to -1 for the tombstone case, though this need not be true
+ * in general.</li>
+ * </ul>
+ * <p>
+ * Note that the value format above does not store the number of record versions contained in the
+ * segment. It is not necessary to store this information separately because this information is
+ * never required on its own. Record versions are always deserialized in order, and we can
+ * determine when we have reached the end of the list based on whether the (validFrom) timestamp of
+ * the record version equals the {@code min_timestamp}.
+ * <p>
+ * Additionally, there is one edge case / exception to the segment value format described above.
+ * If the latest record version (for a particular key) is a tombstone, and the segment in which
+ * this tombstone is to be stored contains currently no record versions, then this will result in
+ * an "empty" segment -- i.e., the segment will contain only a single tombstone with no validTo
+ * timestamp associated with it. When this happens, the serialized segment will contain the
+ * tombstone's (validFrom) timestamp and nothing else. Upon deserializing an empty segment, the
+ * tombstone's timestamp can be fetched as the {@code next_timestamp} of the segment. (An empty
+ * segment can be thought of as having {@code min_timestamp} and {@code next_timestamp} both equal
+ * to the timestamp of the single tombstone record version. To avoid the redundancy of serializing
+ * the same timestamp twice, it is only serialized once and stored as the first timestamp of the
+ * segment, which happens to be {@code next_timestamp}.)
+ */
+final class RocksDBVersionedStoreSegmentValueFormatter {
+    private static final int TIMESTAMP_SIZE = 8;
+    private static final int VALUE_SIZE = 4;
+
+    /**
+     * @return the validTo timestamp of the latest record in the provided segment
+     */
+    static long getNextTimestamp(final byte[] segmentValue) {
+        return ByteBuffer.wrap(segmentValue).getLong(0);
+    }
+
+    /**
+     * Returns whether the provided segment is "empty." An empty segment is one that
+     * contains only a single tombstone with no validTo timestamp specified. In this case,
+     * the serialized segment contains only the timestamp of the tombstone (stored as the segment's
+     * {@code nextTimestamp}) and nothing else.
+     * <p>
+     * This can happen if, e.g., the only record inserted for a particular key is
+     * a tombstone. In this case, the tombstone must be stored in a segment
+     * (as the latest value store does not store tombstones), but also has no validTo
+     * timestamp associated with it.
+     *
+     * @return whether the segment is "empty"
+     */
+    static boolean isEmpty(final byte[] segmentValue) {
+        return segmentValue.length <= TIMESTAMP_SIZE;
+    }
+
+    /**
+     * Requires that the segment is not empty. Caller is responsible for verifying that this
+     * is the case before calling this method.
+     *
+     * @return the (validFrom) timestamp of the earliest record in the provided segment.
+     */
+    static long getMinTimestamp(final byte[] segmentValue) {
+        return ByteBuffer.wrap(segmentValue).getLong(TIMESTAMP_SIZE);
+    }
+
+    /**
+     * @return the deserialized segment value
+     */
+    static SegmentValue deserialize(final byte[] segmentValue) {
+        return new PartiallyDeserializedSegmentValue(segmentValue);
+    }
+
+    /**
+     * Creates a new segment value that contains the provided record.
+     *
+     * @param value the record value
+     * @param validFrom the record's (validFrom) timestamp
+     * @param validTo the record's validTo timestamp
+     * @return the newly created segment value
+     */
+    static SegmentValue newSegmentValueWithRecord(
+        final byte[] value, final long validFrom, final long validTo) {
+        return new PartiallyDeserializedSegmentValue(value, validFrom, validTo);
+    }
+
+    /**
+     * Creates a new empty segment value.
+     *
+     * @param timestamp the (validFrom) timestamp of the tombstone for this empty segment value
+     * @return the newly created segment value
+     */
+    static SegmentValue newSegmentValueWithTombstone(final long timestamp) {
+        return new PartiallyDeserializedSegmentValue(timestamp);
+    }
+
+    interface SegmentValue {
+
+        /**
+         * @return whether the segment is empty. See
+         * {@link RocksDBVersionedStoreSegmentValueFormatter#isEmpty(byte[])} for details.
+         */
+        boolean isEmpty();
+
+        /**
+         * Finds the latest record in this segment with (validFrom) timestamp not exceeding the
+         * provided timestamp bound. This method requires that the provided timestamp bound exists
+         * in this segment, i.e., the segment is not empty, and the provided timestamp bound is
+         * at least minTimestamp and is smaller than nextTimestamp.
+         *
+         * @param timestamp the timestamp to find
+         * @param includeValue whether the value of the found record should be returned with the result
+         * @return the record that is found
+         * @throws IllegalArgumentException if the segment is empty, or if the provided timestamp
+         *         is not contained within this segment
+         */
+        SegmentSearchResult find(long timestamp, boolean includeValue);
+
+        /**
+         * Inserts the provided record into the segment as the latest record in the segment.
+         * This operation is allowed even if the segment is empty.
+         * <p>
+         * It is the caller's responsibility to ensure that this action is desirable. In the event
+         * that the new record's (validFrom) timestamp is smaller than the current
+         * {@code nextTimestamp} of the segment, the operation will still be performed, and the
+         * segment's existing contents will be truncated to ensure consistency of timestamps within
+         * the segment. This truncation behavior helps reconcile inconsistencies between different
+         * segments, or between a segment and the latest value store, of a
+         * {@link RocksDBVersionedStore} instance.
+         *
+         * @param validFrom the (validFrom) timestamp of the record to insert
+         * @param validTo the validTo timestamp of the record to insert
+         * @param value the value of the record to insert
+         */
+        void insertAsLatest(long validFrom, long validTo, byte[] value);
+
+        /**
+         * Inserts the provided record into the segment as the earliest record in the segment.
+         * This operation is allowed even if the segment is empty. It is the caller's responsibility
+         * to ensure that this action is valid, i.e., that record's (validFrom) timestamp is smaller
+         * than the current {@code minTimestamp} of the segment, or the segment is empty.
+         *
+         * @param timestamp the (validFrom) timestamp of the record to insert
+         * @param value the value of the record to insert
+         */
+        void insertAsEarliest(long timestamp, byte[] value);
+
+        /**
+         * Inserts the provided record into the segment at the provided index. This operation
+         * requires that the segment is not empty, and that {@link SegmentValue#find(long, boolean)}
+         * has already been called in order to deserialize the relevant index (to insert into
+         * index n requires that index n-1 has already been deserialized).
+         * <p>
+         * It is the caller's responsibility to ensure that this action makes sense, i.e., that the
+         * insertion index is correct for the (validFrom) timestamp of the record being inserted.
+         *
+         * @param timestamp the (validFrom) timestamp of the record to insert
+         * @param value the value of the record to insert
+         * @param index the index that the newly inserted record should occupy
+         * @throws IllegalArgumentException if the segment is empty, if the provided index is out of
+         *         bounds, or if {@code find()} has not been called to deserialize the relevant index.
+         */
+        void insert(long timestamp, byte[] value, int index);
+
+        /**
+         * Updates the record at the provided index with the provided value and (validFrom)
+         * timestamp. This operation requires that the segment is not empty, and that
+         * {@link SegmentValue#find(long, boolean)} has already been called in order to deserialize
+         * the relevant index (i.e., the one being updated).
+         * <p>
+         * It is the caller's responsibility to ensure that this action makes sense, i.e., that the
+         * updated (validFrom) timestamp does not violate timestamp order within the segment.
+         *
+         * @param timestamp the updated record (validFrom) timestamp
+         * @param value the updated record value
+         * @param index the index of the record to update
+         */
+        void updateRecord(long timestamp, byte[] value, int index);
+
+        /**
+         * @return the bytes serialization for this segment value
+         */
+        byte[] serialize();
+
+        class SegmentSearchResult {
+            private final int index;
+            private final long validFrom;
+            private final long validTo;
+            private final byte[] value;
+
+            SegmentSearchResult(final int index, final long validFrom, final long validTo) {
+                this(index, validFrom, validTo, null);
+            }
+
+            SegmentSearchResult(final int index, final long validFrom, final long validTo,
+                                final byte[] value) {
+                this.index = index;
+                this.validFrom = validFrom;
+                this.validTo = validTo;
+                this.value = value;
+            }
+
+            int index() {
+                return index;
+            }
+
+            long validFrom() {
+                return validFrom;
+            }
+
+            long validTo() {
+                return validTo;
+            }
+
+            /**
+             * This value will be null if the caller did not specify that the value should be
+             * included with the return result from {@link SegmentValue#find(long, boolean)}.
+             * In this case, it is up to the caller to not call this method, as the "value"
+             * returned will not be meaningful.
+             */
+            byte[] value() {
+                return value;
+            }
+        }
+    }
+
+    private static class PartiallyDeserializedSegmentValue implements SegmentValue {

Review Comment:
   `PartiallyDeserializedRow` ?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##########
@@ -0,0 +1,518 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Helper utility for managing the bytes layout of the value stored in segments of the {@link RocksDBVersionedStore}.
+ * The value format is:
+ * <pre>
+ *     <next_timestamp> + <min_timestamp> + <list of <timestamp, value_size>, reverse-sorted by timestamp> + <list of values, forward-sorted by timestamp>
+ * </pre>
+ * where:
+ * <ul>
+ * <li>{@code next_timestamp} is the validTo timestamp of the latest record version stored in this
+ * segment,</li>
+ * <li>{@code min_timestamp} is the validFrom timestamp of the earliest record version stored
+ * in this segment, and</li>
+ * <li>Negative {@code value_size} is used to indicate that the value stored is a tombstone,
+ * in order to distinguish from empty array which has {@code value_size} of zero. In practice,
+ * {@code value_size} is always set to -1 for the tombstone case, though this need not be true
+ * in general.</li>
+ * </ul>
+ * <p>
+ * Note that the value format above does not store the number of record versions contained in the
+ * segment. It is not necessary to store this information separately because this information is
+ * never required on its own. Record versions are always deserialized in order, and we can
+ * determine when we have reached the end of the list based on whether the (validFrom) timestamp of
+ * the record version equals the {@code min_timestamp}.
+ * <p>
+ * There is one edge case with regards to the segment value format described above, which is useful
+ * to know for understanding the code in this file, but not relevant for callers of the class.
+ * In the typical case, all record (validFrom) timestamps and the {@code next_timestamp} of the
+ * segment will form a strictly increasing sequence, i.e., it is not valid to have a record version
+ * with validTo timestamp equal to (or less than) its validFrom timestamp. The one edge case /
+ * exception is when the latest record version (for a particular key) is a tombstone, and the
+ * segment in which this tombstone is to be stored contains currently no record versions.
+ * This case will result in a "degenerate" segment containing the single tombstone, with both
+ * {@code min_timestamp} and {@code next_timestamp} equal to the (validFrom) timestamp of the
+ * tombstone. (It is valid to interpret this tombstone's validTo timestamp as being equal to its
+ * validFrom timestamp, as querying for the latest record version as of a later timestamp will
+ * correctly return that no record version is present.) Note also that after a "degenerate" segment
+ * has formed, it's possible that the segment will remain degenerate even as newer record versions
+ * are added. (For example, if additional puts happen with later timestamps such that those puts
+ * only affect later segments, then the earlier degenerate segment will remain degenerate.)
+ * <p>
+ * Callers of this class need not concern themselves with this detail because all the exposed
+ * methods function as expected, even in the degenerate segment case. All methods may still be
+ * called, with the exception of {@link SegmentValue#find(long, boolean)} and those that depend

Review Comment:
   Why mention an exception, if you later say, it's not really an exception because making the call is not allowed to begin with?



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatterTest.java:
##########
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertThrows;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue;
+import org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class RocksDBVersionedStoreSegmentValueFormatterTest {
+
+    private static final List<TestCase> TEST_CASES = new ArrayList<>();
+    static {
+        // test cases are expected to have timestamps in strictly decreasing order (except for the degenerate case)
+        TEST_CASES.add(new TestCase("degenerate", 10, new TestRecord(null, 10)));
+        TEST_CASES.add(new TestCase("single record", 10, new TestRecord("foo".getBytes(), 1)));
+        TEST_CASES.add(new TestCase("multiple records", 10, new TestRecord("foo".getBytes(), 8), new TestRecord("bar".getBytes(), 3), new TestRecord("baz".getBytes(), 0)));
+        TEST_CASES.add(new TestCase("single tombstone", 10, new TestRecord(null, 1)));
+        TEST_CASES.add(new TestCase("multiple tombstone", 10, new TestRecord(null, 4), new TestRecord(null, 1)));
+        TEST_CASES.add(new TestCase("tombstones and records (r, t, r)", 10, new TestRecord("foo".getBytes(), 5), new TestRecord(null, 2), new TestRecord("bar".getBytes(), 1)));
+        TEST_CASES.add(new TestCase("tombstones and records (t, r, t)", 10, new TestRecord(null, 5), new TestRecord("foo".getBytes(), 2), new TestRecord(null, 1)));
+        TEST_CASES.add(new TestCase("tombstones and records (r, r, t, t)", 10, new TestRecord("foo".getBytes(), 6), new TestRecord("bar".getBytes(), 5), new TestRecord(null, 2), new TestRecord(null, 1)));
+        TEST_CASES.add(new TestCase("tombstones and records (t, t, r, r)", 10, new TestRecord(null, 7), new TestRecord(null, 6), new TestRecord("foo".getBytes(), 2), new TestRecord("bar".getBytes(), 1)));
+        TEST_CASES.add(new TestCase("record with empty bytes", 10, new TestRecord(new byte[0], 1)));
+        TEST_CASES.add(new TestCase("records with empty bytes (r, e)", 10, new TestRecord("foo".getBytes(), 4), new TestRecord(new byte[0], 1)));
+        TEST_CASES.add(new TestCase("records with empty bytes (e, e, r)", 10, new TestRecord(new byte[0], 8), new TestRecord(new byte[0], 2), new TestRecord("foo".getBytes(), 1)));
+    }
+
+    private final TestCase testCase;
+
+    public RocksDBVersionedStoreSegmentValueFormatterTest(final TestCase testCase) {
+        this.testCase = testCase;
+    }
+
+    @Parameterized.Parameters(name = "{0}")
+    public static Collection<TestCase> data() {
+        return TEST_CASES;
+    }
+
+    @Test
+    public void shouldSerializeAndDeserialize() {
+        final SegmentValue segmentValue = buildSegmentWithInsertLatest(testCase);
+
+        final byte[] serialized = segmentValue.serialize();
+        final SegmentValue deserialized = RocksDBVersionedStoreSegmentValueFormatter.deserialize(serialized);
+
+        verifySegmentContents(deserialized, testCase);
+    }
+
+    @Test
+    public void shouldBuildWithInsertLatest() {
+        final SegmentValue segmentValue = buildSegmentWithInsertLatest(testCase);
+
+        verifySegmentContents(segmentValue, testCase);
+    }
+
+    @Test
+    public void shouldBuildWithInsertEarliest() {
+        final SegmentValue segmentValue = buildSegmentWithInsertEarliest(testCase);
+
+        verifySegmentContents(segmentValue, testCase);
+    }
+
+    @Test
+    public void shouldInsertAtIndex() {
+        if (testCase.isDegenerate) {
+            // cannot insert into degenerate segment
+            return;
+        }
+
+        // test inserting at each possible index
+        for (int insertIdx = 0; insertIdx <= testCase.records.size(); insertIdx++) {
+            // build record to insert
+            final long newRecordTimestamp;
+            if (insertIdx == 0) {
+                newRecordTimestamp = testCase.records.get(0).timestamp + 1;
+                if (newRecordTimestamp == testCase.nextTimestamp) {
+                    // cannot insert because no timestamp exists between last record and nextTimestamp
+                    continue;
+                }
+            } else {
+                newRecordTimestamp = testCase.records.get(insertIdx - 1).timestamp - 1;
+                if (newRecordTimestamp < 0 || (insertIdx < testCase.records.size() && newRecordTimestamp == testCase.records.get(insertIdx).timestamp)) {
+                    // cannot insert because timestamps of existing records are adjacent
+                    continue;
+                }
+            }
+            final TestRecord newRecord = new TestRecord("new".getBytes(), newRecordTimestamp);
+
+            final SegmentValue segmentValue = buildSegmentWithInsertLatest(testCase);
+
+            // insert() first requires a call to find()
+            if (insertIdx > 0) {
+                segmentValue.find(testCase.records.get(insertIdx - 1).timestamp, false);
+            }
+            segmentValue.insert(newRecord.timestamp, newRecord.value, insertIdx);
+
+            // create expected results
+            final List<TestRecord> expectedRecords = new ArrayList<>(testCase.records);
+            expectedRecords.add(insertIdx, newRecord);
+
+            verifySegmentContents(segmentValue, new TestCase("expected", testCase.nextTimestamp, expectedRecords));
+        }
+    }
+
+    @Test
+    public void shouldUpdateAtIndex() {
+        if (testCase.isDegenerate) {
+            // cannot update degenerate segment
+            return;
+        }
+
+        // test updating at each possible index
+        for (int updateIdx = 0; updateIdx < testCase.records.size(); updateIdx++) {
+            // build updated record
+            long updatedRecordTimestamp = testCase.records.get(updateIdx).timestamp - 1;

Review Comment:
   If I understand the test correctly, we try to update the timestamp and fall back to use the same timestamp if there is no "gap". It seems we don't test both case where the timestamp is increase or decreased but use decreased, and only "fall back" to increase does not work. Should we try to test both cases instead?



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatterTest.java:
##########
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertThrows;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue;
+import org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class RocksDBVersionedStoreSegmentValueFormatterTest {
+
+    private static final List<TestCase> TEST_CASES = new ArrayList<>();
+    static {
+        // test cases are expected to have timestamps in strictly decreasing order (except for the degenerate case)
+        TEST_CASES.add(new TestCase("degenerate", 10, new TestRecord(null, 10)));
+        TEST_CASES.add(new TestCase("single record", 10, new TestRecord("foo".getBytes(), 1)));
+        TEST_CASES.add(new TestCase("multiple records", 10, new TestRecord("foo".getBytes(), 8), new TestRecord("bar".getBytes(), 3), new TestRecord("baz".getBytes(), 0)));
+        TEST_CASES.add(new TestCase("single tombstone", 10, new TestRecord(null, 1)));
+        TEST_CASES.add(new TestCase("multiple tombstone", 10, new TestRecord(null, 4), new TestRecord(null, 1)));
+        TEST_CASES.add(new TestCase("tombstones and records (r, t, r)", 10, new TestRecord("foo".getBytes(), 5), new TestRecord(null, 2), new TestRecord("bar".getBytes(), 1)));
+        TEST_CASES.add(new TestCase("tombstones and records (t, r, t)", 10, new TestRecord(null, 5), new TestRecord("foo".getBytes(), 2), new TestRecord(null, 1)));
+        TEST_CASES.add(new TestCase("tombstones and records (r, r, t, t)", 10, new TestRecord("foo".getBytes(), 6), new TestRecord("bar".getBytes(), 5), new TestRecord(null, 2), new TestRecord(null, 1)));
+        TEST_CASES.add(new TestCase("tombstones and records (t, t, r, r)", 10, new TestRecord(null, 7), new TestRecord(null, 6), new TestRecord("foo".getBytes(), 2), new TestRecord("bar".getBytes(), 1)));
+        TEST_CASES.add(new TestCase("record with empty bytes", 10, new TestRecord(new byte[0], 1)));
+        TEST_CASES.add(new TestCase("records with empty bytes (r, e)", 10, new TestRecord("foo".getBytes(), 4), new TestRecord(new byte[0], 1)));
+        TEST_CASES.add(new TestCase("records with empty bytes (e, e, r)", 10, new TestRecord(new byte[0], 8), new TestRecord(new byte[0], 2), new TestRecord("foo".getBytes(), 1)));
+    }
+
+    private final TestCase testCase;
+
+    public RocksDBVersionedStoreSegmentValueFormatterTest(final TestCase testCase) {
+        this.testCase = testCase;
+    }
+
+    @Parameterized.Parameters(name = "{0}")
+    public static Collection<TestCase> data() {
+        return TEST_CASES;
+    }
+
+    @Test
+    public void shouldSerializeAndDeserialize() {
+        final SegmentValue segmentValue = buildSegmentWithInsertLatest(testCase);
+
+        final byte[] serialized = segmentValue.serialize();
+        final SegmentValue deserialized = RocksDBVersionedStoreSegmentValueFormatter.deserialize(serialized);
+
+        verifySegmentContents(deserialized, testCase);
+    }
+
+    @Test
+    public void shouldBuildWithInsertLatest() {
+        final SegmentValue segmentValue = buildSegmentWithInsertLatest(testCase);
+
+        verifySegmentContents(segmentValue, testCase);
+    }
+
+    @Test
+    public void shouldBuildWithInsertEarliest() {
+        final SegmentValue segmentValue = buildSegmentWithInsertEarliest(testCase);
+
+        verifySegmentContents(segmentValue, testCase);
+    }
+
+    @Test
+    public void shouldInsertAtIndex() {
+        if (testCase.isDegenerate) {
+            // cannot insert into degenerate segment
+            return;
+        }
+
+        // test inserting at each possible index
+        for (int insertIdx = 0; insertIdx <= testCase.records.size(); insertIdx++) {
+            // build record to insert
+            final long newRecordTimestamp;
+            if (insertIdx == 0) {
+                newRecordTimestamp = testCase.records.get(0).timestamp + 1;
+                if (newRecordTimestamp == testCase.nextTimestamp) {
+                    // cannot insert because no timestamp exists between last record and nextTimestamp
+                    continue;
+                }
+            } else {
+                newRecordTimestamp = testCase.records.get(insertIdx - 1).timestamp - 1;
+                if (newRecordTimestamp < 0 || (insertIdx < testCase.records.size() && newRecordTimestamp == testCase.records.get(insertIdx).timestamp)) {
+                    // cannot insert because timestamps of existing records are adjacent
+                    continue;
+                }
+            }
+            final TestRecord newRecord = new TestRecord("new".getBytes(), newRecordTimestamp);
+
+            final SegmentValue segmentValue = buildSegmentWithInsertLatest(testCase);
+
+            // insert() first requires a call to find()
+            if (insertIdx > 0) {
+                segmentValue.find(testCase.records.get(insertIdx - 1).timestamp, false);
+            }
+            segmentValue.insert(newRecord.timestamp, newRecord.value, insertIdx);
+
+            // create expected results
+            final List<TestRecord> expectedRecords = new ArrayList<>(testCase.records);
+            expectedRecords.add(insertIdx, newRecord);
+
+            verifySegmentContents(segmentValue, new TestCase("expected", testCase.nextTimestamp, expectedRecords));
+        }
+    }
+
+    @Test
+    public void shouldUpdateAtIndex() {
+        if (testCase.isDegenerate) {
+            // cannot update degenerate segment
+            return;
+        }
+
+        // test updating at each possible index
+        for (int updateIdx = 0; updateIdx < testCase.records.size(); updateIdx++) {
+            // build updated record
+            long updatedRecordTimestamp = testCase.records.get(updateIdx).timestamp - 1;
+            if (updatedRecordTimestamp < 0 || (updateIdx < testCase.records.size() - 1 && updatedRecordTimestamp == testCase.records.get(updateIdx + 1).timestamp)) {
+                // found timestamp conflict. try again
+                updatedRecordTimestamp = testCase.records.get(updateIdx).timestamp + 1;
+                if (updateIdx > 0 && updatedRecordTimestamp == testCase.records.get(updateIdx - 1).timestamp) {
+                    // found timestamp conflict. use original timestamp
+                    updatedRecordTimestamp = testCase.records.get(updateIdx).timestamp;
+                }
+            }
+            final TestRecord updatedRecord = new TestRecord("updated".getBytes(), updatedRecordTimestamp);
+
+            final SegmentValue segmentValue = buildSegmentWithInsertLatest(testCase);
+
+            // updateRecord() first requires a call to find()
+            segmentValue.find(testCase.records.get(updateIdx).timestamp, false);
+            segmentValue.updateRecord(updatedRecord.timestamp, updatedRecord.value, updateIdx);
+
+            // create expected results
+            final List<TestRecord> expectedRecords = new ArrayList<>(testCase.records);
+            expectedRecords.remove(updateIdx);
+            expectedRecords.add(updateIdx, updatedRecord);
+
+            verifySegmentContents(segmentValue, new TestCase("expected", testCase.nextTimestamp, expectedRecords));
+        }
+    }
+
+    @Test
+    public void shouldFindByTimestamp() {
+        if (testCase.isDegenerate) {
+            // cannot find() on degenerate segment
+            return;
+        }
+
+        final SegmentValue segmentValue = buildSegmentWithInsertLatest(testCase);
+
+        // build expected mapping from timestamp -> record
+        final Map<Long, Integer> expectedRecordIndices = new HashMap<>();
+        for (int recordIdx = testCase.records.size() - 1; recordIdx >= 0; recordIdx--) {
+            if (recordIdx < testCase.records.size() - 1) {
+                expectedRecordIndices.put(testCase.records.get(recordIdx).timestamp - 1, recordIdx + 1);
+            }
+            if (recordIdx > 0) {
+                expectedRecordIndices.put(testCase.records.get(recordIdx).timestamp + 1, recordIdx);
+            }
+            expectedRecordIndices.put(testCase.records.get(recordIdx).timestamp, recordIdx);
+        }
+
+        // verify results
+        for (final Map.Entry<Long, Integer> entry : expectedRecordIndices.entrySet()) {
+            final TestRecord expectedRecord = testCase.records.get(entry.getValue());
+            final long expectedValidTo = entry.getValue() == 0 ? testCase.nextTimestamp : testCase.records.get(entry.getValue() - 1).timestamp;
+
+            final SegmentSearchResult result = segmentValue.find(entry.getKey(), true);
+
+            assertThat(result.index(), equalTo(entry.getValue()));
+            assertThat(result.value(), equalTo(expectedRecord.value));
+            assertThat(result.validFrom(), equalTo(expectedRecord.timestamp));
+            assertThat(result.validTo(), equalTo(expectedValidTo));
+        }
+
+        // verify exception when timestamp is out of range
+        assertThrows(IllegalArgumentException.class, () -> segmentValue.find(testCase.nextTimestamp, false));
+        assertThrows(IllegalArgumentException.class, () -> segmentValue.find(testCase.nextTimestamp + 1, false));

Review Comment:
   Why do we need to test this one? Seems the line above using `nextTimestamp` is sufficient?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##########
@@ -0,0 +1,523 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Helper utility for managing the bytes layout of the value stored in segments of the {@link RocksDBVersionedStore}.
+ * The value format is:
+ * <pre>
+ *     <next_timestamp> + <min_timestamp> + <list of <timestamp, value_size>, reverse-sorted by timestamp> + <list of values, forward-sorted by timestamp>
+ * </pre>
+ * Negative {@code value_size} is used to indicate that the value stored is a tombstone, in order to
+ * distinguish from empty array which has {@code value_size} of zero. In practice, {@code value_size}
+ * is always set to -1 for the tombstone case, though this need not be true in general.
+ */
+final class RocksDBVersionedStoreSegmentValueFormatter {
+    private static final int TIMESTAMP_SIZE = 8;
+    private static final int VALUE_SIZE = 4;
+
+    /**
+     * @return the validTo timestamp of the latest record in the provided segment
+     */
+    static long getNextTimestamp(final byte[] segmentValue) {
+        return ByteBuffer.wrap(segmentValue).getLong(0);
+    }
+
+    /**
+     * Returns whether the provided segment is "empty." An empty segment is one that
+     * contains only a single tombstone with no validTo timestamp specified. In this case,
+     * the serialized segment contains only the timestamp of the tombstone (stored as the segment's
+     * {@code nextTimestamp}) and nothing else.
+     * <p>
+     * This can happen if, e.g., the only record inserted for a particular key is
+     * a tombstone. In this case, the tombstone must be stored in a segment
+     * (as the latest value store does not store tombstones), but also has no validTo
+     * timestamp associated with it.

Review Comment:
   Thanks. Also for the in-person sync. I think I got it now. Cf my other comments.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##########
@@ -0,0 +1,518 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Helper utility for managing the bytes layout of the value stored in segments of the {@link RocksDBVersionedStore}.
+ * The value format is:
+ * <pre>
+ *     <next_timestamp> + <min_timestamp> + <list of <timestamp, value_size>, reverse-sorted by timestamp> + <list of values, forward-sorted by timestamp>
+ * </pre>
+ * where:
+ * <ul>
+ * <li>{@code next_timestamp} is the validTo timestamp of the latest record version stored in this
+ * segment,</li>
+ * <li>{@code min_timestamp} is the validFrom timestamp of the earliest record version stored
+ * in this segment, and</li>
+ * <li>Negative {@code value_size} is used to indicate that the value stored is a tombstone,
+ * in order to distinguish from empty array which has {@code value_size} of zero. In practice,
+ * {@code value_size} is always set to -1 for the tombstone case, though this need not be true
+ * in general.</li>
+ * </ul>
+ * <p>
+ * Note that the value format above does not store the number of record versions contained in the
+ * segment. It is not necessary to store this information separately because this information is
+ * never required on its own. Record versions are always deserialized in order, and we can
+ * determine when we have reached the end of the list based on whether the (validFrom) timestamp of
+ * the record version equals the {@code min_timestamp}.
+ * <p>
+ * There is one edge case with regards to the segment value format described above, which is useful
+ * to know for understanding the code in this file, but not relevant for callers of the class.
+ * In the typical case, all record (validFrom) timestamps and the {@code next_timestamp} of the
+ * segment will form a strictly increasing sequence, i.e., it is not valid to have a record version
+ * with validTo timestamp equal to (or less than) its validFrom timestamp. The one edge case /
+ * exception is when the latest record version (for a particular key) is a tombstone, and the
+ * segment in which this tombstone is to be stored contains currently no record versions.
+ * This case will result in a "degenerate" segment containing the single tombstone, with both
+ * {@code min_timestamp} and {@code next_timestamp} equal to the (validFrom) timestamp of the
+ * tombstone. (It is valid to interpret this tombstone's validTo timestamp as being equal to its
+ * validFrom timestamp, as querying for the latest record version as of a later timestamp will
+ * correctly return that no record version is present.) Note also that after a "degenerate" segment
+ * has formed, it's possible that the segment will remain degenerate even as newer record versions
+ * are added. (For example, if additional puts happen with later timestamps such that those puts
+ * only affect later segments, then the earlier degenerate segment will remain degenerate.)
+ * <p>
+ * Callers of this class need not concern themselves with this detail because all the exposed
+ * methods function as expected, even in the degenerate segment case. All methods may still be
+ * called, with the exception of {@link SegmentValue#find(long, boolean)} and those that depend
+ * on it (i.e., {@link SegmentValue#updateRecord(long, byte[], int)} and
+ * {@link SegmentValue#insert(long, byte[], int)}). Missing support for calling these methods on
+ * degenerate segments is not an issue because the same timestamp bounds restrictions required for
+ * calling {@link SegmentValue#find(long, boolean)} on regular segments serve to prevent callers
+ * from calling the method on degenerate segments as well.
+ */
+final class RocksDBVersionedStoreSegmentValueFormatter {
+    private static final int TIMESTAMP_SIZE = 8;
+    private static final int VALUE_SIZE = 4;
+
+    /**
+     * @return the validTo timestamp of the latest record in the provided segment
+     */
+    static long getNextTimestamp(final byte[] segmentValue) {
+        return ByteBuffer.wrap(segmentValue).getLong(0);
+    }
+
+    /**
+     * @return the (validFrom) timestamp of the earliest record in the provided segment.
+     */
+    static long getMinTimestamp(final byte[] segmentValue) {
+        return ByteBuffer.wrap(segmentValue).getLong(TIMESTAMP_SIZE);
+    }
+
+    /**
+     * @return the deserialized segment value
+     */
+    static SegmentValue deserialize(final byte[] segmentValue) {
+        return new PartiallyDeserializedSegmentValue(segmentValue);
+    }
+
+    /**
+     * Creates a new segment value that contains the provided record.
+     * <p>
+     * This method may also be used to create a "degenerate" segment with {@code null} value and
+     * {@code validFrom} timestamp equal to {@code validTo}. (For more on degenerate segments,
+     * see the main javadoc for this class.)
+     *
+     * @param value the record value
+     * @param validFrom the record's (validFrom) timestamp
+     * @param validTo the record's validTo timestamp
+     * @return the newly created segment value
+     */
+    static SegmentValue newSegmentValueWithRecord(
+        final byte[] value, final long validFrom, final long validTo) {
+        return new PartiallyDeserializedSegmentValue(value, validFrom, validTo);
+    }
+
+    interface SegmentValue {
+
+        /**
+         * Finds the latest record in this segment with (validFrom) timestamp not exceeding the
+         * provided timestamp bound. This method requires that the provided timestamp bound exists
+         * in this segment, i.e., that the provided timestamp bound is at least minTimestamp and
+         * is smaller than nextTimestamp. As a result of this requirement, it is not permitted to
+         * call this method on degenerate segments.
+         *
+         * @param timestamp the timestamp to find
+         * @param includeValue whether the value of the found record should be returned with the result
+         * @return the record that is found
+         * @throws IllegalArgumentException if the provided timestamp is not contained within this segment
+         */
+        SegmentSearchResult find(long timestamp, boolean includeValue);
+
+        /**
+         * Inserts the provided record into the segment as the latest record in the segment.
+         * This operation is allowed even if the segment is degenerate.
+         * <p>
+         * It is the caller's responsibility to ensure that this action is desirable. In the event
+         * that the new record's (validFrom) timestamp is smaller than the current
+         * {@code nextTimestamp} of the segment, the operation will still be performed, and the
+         * segment's existing contents will be truncated to ensure consistency of timestamps within
+         * the segment. This truncation behavior helps reconcile inconsistencies between different
+         * segments, or between a segment and the latest value store, of a
+         * {@link RocksDBVersionedStore} instance.
+         *
+         * @param validFrom the (validFrom) timestamp of the record to insert
+         * @param validTo the validTo timestamp of the record to insert
+         * @param value the value of the record to insert
+         */
+        void insertAsLatest(long validFrom, long validTo, byte[] value);
+
+        /**
+         * Inserts the provided record into the segment as the earliest record in the segment.
+         * This operation is allowed even if the segment is degenerate. It is the caller's responsibility
+         * to ensure that this action is valid, i.e., that record's (validFrom) timestamp is smaller
+         * than the current {@code minTimestamp} of the segment.
+         *
+         * @param timestamp the (validFrom) timestamp of the record to insert
+         * @param value the value of the record to insert
+         */
+        void insertAsEarliest(long timestamp, byte[] value);
+
+        /**
+         * Inserts the provided record into the segment at the provided index. This operation
+         * requires that the segment is not degenerate, and that
+         * {@link SegmentValue#find(long, boolean)} has already been called in order to deserialize
+         * the relevant index (to insert into index n requires that index n-1 has already been deserialized).
+         * <p>
+         * It is the caller's responsibility to ensure that this action makes sense, i.e., that the
+         * insertion index is correct for the (validFrom) timestamp of the record being inserted.
+         *
+         * @param timestamp the (validFrom) timestamp of the record to insert
+         * @param value the value of the record to insert
+         * @param index the index that the newly inserted record should occupy
+         * @throws IllegalArgumentException if the segment is degenerate, if the provided index is out of
+         *         bounds, or if {@code find()} has not been called to deserialize the relevant index.
+         */
+        void insert(long timestamp, byte[] value, int index);
+
+        /**
+         * Updates the record at the provided index with the provided value and (validFrom)
+         * timestamp. This operation requires that {@link SegmentValue#find(long, boolean)} has
+         * already been called in order to deserialize the relevant index (i.e., the one being updated).
+         * (As a result, it is not valid to call this method on a degenerate segment.)
+         * <p>
+         * It is the caller's responsibility to ensure that this action makes sense, i.e., that the
+         * updated (validFrom) timestamp does not violate timestamp order within the segment.
+         *
+         * @param timestamp the updated record (validFrom) timestamp
+         * @param value the updated record value
+         * @param index the index of the record to update
+         */
+        void updateRecord(long timestamp, byte[] value, int index);
+
+        /**
+         * @return the bytes serialization for this segment value
+         */
+        byte[] serialize();
+
+        class SegmentSearchResult {
+            private final int index;
+            private final long validFrom;
+            private final long validTo;
+            private final byte[] value;
+
+            SegmentSearchResult(final int index, final long validFrom, final long validTo) {
+                this(index, validFrom, validTo, null);
+            }
+
+            SegmentSearchResult(final int index, final long validFrom, final long validTo,
+                                final byte[] value) {
+                this.index = index;
+                this.validFrom = validFrom;
+                this.validTo = validTo;
+                this.value = value;
+            }
+
+            int index() {
+                return index;
+            }
+
+            long validFrom() {
+                return validFrom;
+            }
+
+            long validTo() {
+                return validTo;
+            }
+
+            /**
+             * This value will be null if the caller did not specify that the value should be
+             * included with the return result from {@link SegmentValue#find(long, boolean)}.
+             * In this case, it is up to the caller to not call this method, as the "value"
+             * returned will not be meaningful.
+             */
+            byte[] value() {
+                return value;
+            }
+        }
+    }
+
+    private static class PartiallyDeserializedSegmentValue implements SegmentValue {
+        private byte[] segmentValue;
+        private long nextTimestamp;
+        private long minTimestamp;
+        private boolean isDegenerate;
+
+        private int deserIndex = -1; // index up through which this segment has been deserialized (inclusive)
+        private List<TimestampAndValueSize> unpackedReversedTimestampAndValueSizes;
+        private List<Integer> cumulativeValueSizes; // ordered same as timestamp and value sizes (reverse time-sorted)
+
+        private PartiallyDeserializedSegmentValue(final byte[] segmentValue) {
+            this.segmentValue = segmentValue;
+            this.nextTimestamp =
+                RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(segmentValue);
+            this.minTimestamp =
+                RocksDBVersionedStoreSegmentValueFormatter.getMinTimestamp(segmentValue);
+            this.isDegenerate = nextTimestamp == minTimestamp;
+            resetDeserHelpers();
+        }
+
+        private PartiallyDeserializedSegmentValue(
+            final byte[] valueOrNull, final long validFrom, final long validTo) {
+            initializeWithRecord(new ValueAndValueSize(valueOrNull), validFrom, validTo);
+        }
+
+        @Override
+        public SegmentSearchResult find(final long timestamp, final boolean includeValue) {
+            if (timestamp < minTimestamp) {

Review Comment:
   Add comment:
   ```
   // for degenerate segments, minTimestamp == nextTimestamp, and we will always throw an exception
   // thus, we don't need to handle the degenerate case below



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##########
@@ -0,0 +1,518 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Helper utility for managing the bytes layout of the value stored in segments of the {@link RocksDBVersionedStore}.
+ * The value format is:
+ * <pre>
+ *     <next_timestamp> + <min_timestamp> + <list of <timestamp, value_size>, reverse-sorted by timestamp> + <list of values, forward-sorted by timestamp>
+ * </pre>
+ * where:
+ * <ul>
+ * <li>{@code next_timestamp} is the validTo timestamp of the latest record version stored in this
+ * segment,</li>
+ * <li>{@code min_timestamp} is the validFrom timestamp of the earliest record version stored
+ * in this segment, and</li>
+ * <li>Negative {@code value_size} is used to indicate that the value stored is a tombstone,
+ * in order to distinguish from empty array which has {@code value_size} of zero. In practice,
+ * {@code value_size} is always set to -1 for the tombstone case, though this need not be true
+ * in general.</li>
+ * </ul>
+ * <p>
+ * Note that the value format above does not store the number of record versions contained in the
+ * segment. It is not necessary to store this information separately because this information is
+ * never required on its own. Record versions are always deserialized in order, and we can
+ * determine when we have reached the end of the list based on whether the (validFrom) timestamp of
+ * the record version equals the {@code min_timestamp}.
+ * <p>
+ * There is one edge case with regards to the segment value format described above, which is useful
+ * to know for understanding the code in this file, but not relevant for callers of the class.
+ * In the typical case, all record (validFrom) timestamps and the {@code next_timestamp} of the
+ * segment will form a strictly increasing sequence, i.e., it is not valid to have a record version
+ * with validTo timestamp equal to (or less than) its validFrom timestamp. The one edge case /
+ * exception is when the latest record version (for a particular key) is a tombstone, and the
+ * segment in which this tombstone is to be stored contains currently no record versions.
+ * This case will result in a "degenerate" segment containing the single tombstone, with both
+ * {@code min_timestamp} and {@code next_timestamp} equal to the (validFrom) timestamp of the
+ * tombstone. (It is valid to interpret this tombstone's validTo timestamp as being equal to its
+ * validFrom timestamp, as querying for the latest record version as of a later timestamp will
+ * correctly return that no record version is present.) Note also that after a "degenerate" segment
+ * has formed, it's possible that the segment will remain degenerate even as newer record versions
+ * are added. (For example, if additional puts happen with later timestamps such that those puts
+ * only affect later segments, then the earlier degenerate segment will remain degenerate.)
+ * <p>
+ * Callers of this class need not concern themselves with this detail because all the exposed
+ * methods function as expected, even in the degenerate segment case. All methods may still be
+ * called, with the exception of {@link SegmentValue#find(long, boolean)} and those that depend
+ * on it (i.e., {@link SegmentValue#updateRecord(long, byte[], int)} and
+ * {@link SegmentValue#insert(long, byte[], int)}). Missing support for calling these methods on
+ * degenerate segments is not an issue because the same timestamp bounds restrictions required for
+ * calling {@link SegmentValue#find(long, boolean)} on regular segments serve to prevent callers
+ * from calling the method on degenerate segments as well.
+ */
+final class RocksDBVersionedStoreSegmentValueFormatter {
+    private static final int TIMESTAMP_SIZE = 8;
+    private static final int VALUE_SIZE = 4;
+
+    /**
+     * @return the validTo timestamp of the latest record in the provided segment
+     */
+    static long getNextTimestamp(final byte[] segmentValue) {
+        return ByteBuffer.wrap(segmentValue).getLong(0);
+    }
+
+    /**
+     * @return the (validFrom) timestamp of the earliest record in the provided segment.
+     */
+    static long getMinTimestamp(final byte[] segmentValue) {
+        return ByteBuffer.wrap(segmentValue).getLong(TIMESTAMP_SIZE);
+    }
+
+    /**
+     * @return the deserialized segment value
+     */
+    static SegmentValue deserialize(final byte[] segmentValue) {
+        return new PartiallyDeserializedSegmentValue(segmentValue);
+    }
+
+    /**
+     * Creates a new segment value that contains the provided record.
+     * <p>
+     * This method may also be used to create a "degenerate" segment with {@code null} value and
+     * {@code validFrom} timestamp equal to {@code validTo}. (For more on degenerate segments,
+     * see the main javadoc for this class.)
+     *
+     * @param value the record value
+     * @param validFrom the record's (validFrom) timestamp
+     * @param validTo the record's validTo timestamp
+     * @return the newly created segment value
+     */
+    static SegmentValue newSegmentValueWithRecord(
+        final byte[] value, final long validFrom, final long validTo) {
+        return new PartiallyDeserializedSegmentValue(value, validFrom, validTo);
+    }
+
+    interface SegmentValue {
+
+        /**
+         * Finds the latest record in this segment with (validFrom) timestamp not exceeding the
+         * provided timestamp bound. This method requires that the provided timestamp bound exists
+         * in this segment, i.e., that the provided timestamp bound is at least minTimestamp and
+         * is smaller than nextTimestamp. As a result of this requirement, it is not permitted to
+         * call this method on degenerate segments.
+         *
+         * @param timestamp the timestamp to find
+         * @param includeValue whether the value of the found record should be returned with the result
+         * @return the record that is found
+         * @throws IllegalArgumentException if the provided timestamp is not contained within this segment
+         */
+        SegmentSearchResult find(long timestamp, boolean includeValue);
+
+        /**
+         * Inserts the provided record into the segment as the latest record in the segment.
+         * This operation is allowed even if the segment is degenerate.
+         * <p>
+         * It is the caller's responsibility to ensure that this action is desirable. In the event
+         * that the new record's (validFrom) timestamp is smaller than the current
+         * {@code nextTimestamp} of the segment, the operation will still be performed, and the
+         * segment's existing contents will be truncated to ensure consistency of timestamps within
+         * the segment. This truncation behavior helps reconcile inconsistencies between different
+         * segments, or between a segment and the latest value store, of a
+         * {@link RocksDBVersionedStore} instance.
+         *
+         * @param validFrom the (validFrom) timestamp of the record to insert
+         * @param validTo the validTo timestamp of the record to insert
+         * @param value the value of the record to insert
+         */
+        void insertAsLatest(long validFrom, long validTo, byte[] value);
+
+        /**
+         * Inserts the provided record into the segment as the earliest record in the segment.
+         * This operation is allowed even if the segment is degenerate. It is the caller's responsibility
+         * to ensure that this action is valid, i.e., that record's (validFrom) timestamp is smaller
+         * than the current {@code minTimestamp} of the segment.
+         *
+         * @param timestamp the (validFrom) timestamp of the record to insert
+         * @param value the value of the record to insert
+         */
+        void insertAsEarliest(long timestamp, byte[] value);
+
+        /**
+         * Inserts the provided record into the segment at the provided index. This operation
+         * requires that the segment is not degenerate, and that
+         * {@link SegmentValue#find(long, boolean)} has already been called in order to deserialize
+         * the relevant index (to insert into index n requires that index n-1 has already been deserialized).
+         * <p>
+         * It is the caller's responsibility to ensure that this action makes sense, i.e., that the
+         * insertion index is correct for the (validFrom) timestamp of the record being inserted.
+         *
+         * @param timestamp the (validFrom) timestamp of the record to insert
+         * @param value the value of the record to insert
+         * @param index the index that the newly inserted record should occupy
+         * @throws IllegalArgumentException if the segment is degenerate, if the provided index is out of
+         *         bounds, or if {@code find()} has not been called to deserialize the relevant index.
+         */
+        void insert(long timestamp, byte[] value, int index);
+
+        /**
+         * Updates the record at the provided index with the provided value and (validFrom)
+         * timestamp. This operation requires that {@link SegmentValue#find(long, boolean)} has
+         * already been called in order to deserialize the relevant index (i.e., the one being updated).
+         * (As a result, it is not valid to call this method on a degenerate segment.)
+         * <p>
+         * It is the caller's responsibility to ensure that this action makes sense, i.e., that the
+         * updated (validFrom) timestamp does not violate timestamp order within the segment.
+         *
+         * @param timestamp the updated record (validFrom) timestamp
+         * @param value the updated record value
+         * @param index the index of the record to update
+         */
+        void updateRecord(long timestamp, byte[] value, int index);
+
+        /**
+         * @return the bytes serialization for this segment value
+         */
+        byte[] serialize();
+
+        class SegmentSearchResult {
+            private final int index;
+            private final long validFrom;
+            private final long validTo;
+            private final byte[] value;
+
+            SegmentSearchResult(final int index, final long validFrom, final long validTo) {
+                this(index, validFrom, validTo, null);
+            }
+
+            SegmentSearchResult(final int index, final long validFrom, final long validTo,
+                                final byte[] value) {
+                this.index = index;
+                this.validFrom = validFrom;
+                this.validTo = validTo;
+                this.value = value;
+            }
+
+            int index() {
+                return index;
+            }
+
+            long validFrom() {
+                return validFrom;
+            }
+
+            long validTo() {
+                return validTo;
+            }
+
+            /**
+             * This value will be null if the caller did not specify that the value should be
+             * included with the return result from {@link SegmentValue#find(long, boolean)}.
+             * In this case, it is up to the caller to not call this method, as the "value"
+             * returned will not be meaningful.
+             */
+            byte[] value() {
+                return value;
+            }
+        }
+    }
+
+    private static class PartiallyDeserializedSegmentValue implements SegmentValue {
+        private byte[] segmentValue;
+        private long nextTimestamp;
+        private long minTimestamp;
+        private boolean isDegenerate;
+
+        private int deserIndex = -1; // index up through which this segment has been deserialized (inclusive)
+        private List<TimestampAndValueSize> unpackedReversedTimestampAndValueSizes;
+        private List<Integer> cumulativeValueSizes; // ordered same as timestamp and value sizes (reverse time-sorted)
+
+        private PartiallyDeserializedSegmentValue(final byte[] segmentValue) {
+            this.segmentValue = segmentValue;
+            this.nextTimestamp =
+                RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(segmentValue);
+            this.minTimestamp =
+                RocksDBVersionedStoreSegmentValueFormatter.getMinTimestamp(segmentValue);
+            this.isDegenerate = nextTimestamp == minTimestamp;
+            resetDeserHelpers();
+        }
+
+        private PartiallyDeserializedSegmentValue(
+            final byte[] valueOrNull, final long validFrom, final long validTo) {
+            initializeWithRecord(new ValueAndValueSize(valueOrNull), validFrom, validTo);
+        }
+
+        @Override
+        public SegmentSearchResult find(final long timestamp, final boolean includeValue) {
+            if (timestamp < minTimestamp) {
+                throw new IllegalArgumentException("Timestamp is too small to be found in this segment.");
+            }
+            if (timestamp >= nextTimestamp) {
+                throw new IllegalArgumentException("Timestamp is too large to be found in this segment.");
+            }
+
+            long currNextTimestamp = nextTimestamp;
+            long currTimestamp = -1L; // choose an invalid timestamp. if this is valid, this needs to be re-worked
+            int currValueSize;
+            int currIndex = 0;
+            int cumValueSize = 0;
+            while (currTimestamp != minTimestamp) {
+                if (currIndex <= deserIndex) {
+                    final TimestampAndValueSize curr = unpackedReversedTimestampAndValueSizes.get(currIndex);
+                    currTimestamp = curr.timestamp;
+                    currValueSize = curr.valueSize;
+                    cumValueSize = cumulativeValueSizes.get(currIndex);
+                } else {
+                    final int timestampSegmentIndex = 2 * TIMESTAMP_SIZE + currIndex * (TIMESTAMP_SIZE + VALUE_SIZE);
+                    currTimestamp = ByteBuffer.wrap(segmentValue).getLong(timestampSegmentIndex);
+                    currValueSize = ByteBuffer.wrap(segmentValue).getInt(timestampSegmentIndex + TIMESTAMP_SIZE);
+                    cumValueSize += Math.max(currValueSize, 0);
+
+                    deserIndex = currIndex;
+                    unpackedReversedTimestampAndValueSizes.add(new TimestampAndValueSize(currTimestamp, currValueSize));
+                    cumulativeValueSizes.add(cumValueSize);
+                }
+
+                if (currTimestamp <= timestamp) {
+                    // found result
+                    if (includeValue) {
+                        if (currValueSize >= 0) {
+                            final byte[] value = new byte[currValueSize];
+                            final int valueSegmentIndex = segmentValue.length - cumValueSize;
+                            System.arraycopy(segmentValue, valueSegmentIndex, value, 0, currValueSize);
+                            return new SegmentSearchResult(currIndex, currTimestamp, currNextTimestamp, value);
+                        } else {
+                            return new SegmentSearchResult(currIndex, currTimestamp, currNextTimestamp, null);
+                        }
+                    } else {
+                        return new SegmentSearchResult(currIndex, currTimestamp, currNextTimestamp);
+                    }
+                }
+
+                // prep for next iteration
+                currNextTimestamp = currTimestamp;
+                currIndex++;
+            }
+
+            throw new IllegalStateException("Search in segment expected to find result but did not.");
+        }
+
+        @Override
+        public void insertAsLatest(final long validFrom, final long validTo, final byte[] valueOrNull) {
+            final ValueAndValueSize value = new ValueAndValueSize(valueOrNull);
+
+            if (nextTimestamp > validFrom) {

Review Comment:
   nit: can we flip this to `validFrom < nextTimestamp` (seems easier to read -- at least to me).



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##########
@@ -0,0 +1,518 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Helper utility for managing the bytes layout of the value stored in segments of the {@link RocksDBVersionedStore}.
+ * The value format is:
+ * <pre>
+ *     <next_timestamp> + <min_timestamp> + <list of <timestamp, value_size>, reverse-sorted by timestamp> + <list of values, forward-sorted by timestamp>
+ * </pre>
+ * where:
+ * <ul>
+ * <li>{@code next_timestamp} is the validTo timestamp of the latest record version stored in this
+ * segment,</li>
+ * <li>{@code min_timestamp} is the validFrom timestamp of the earliest record version stored
+ * in this segment, and</li>
+ * <li>Negative {@code value_size} is used to indicate that the value stored is a tombstone,
+ * in order to distinguish from empty array which has {@code value_size} of zero. In practice,
+ * {@code value_size} is always set to -1 for the tombstone case, though this need not be true
+ * in general.</li>
+ * </ul>
+ * <p>
+ * Note that the value format above does not store the number of record versions contained in the
+ * segment. It is not necessary to store this information separately because this information is
+ * never required on its own. Record versions are always deserialized in order, and we can
+ * determine when we have reached the end of the list based on whether the (validFrom) timestamp of
+ * the record version equals the {@code min_timestamp}.
+ * <p>
+ * There is one edge case with regards to the segment value format described above, which is useful
+ * to know for understanding the code in this file, but not relevant for callers of the class.
+ * In the typical case, all record (validFrom) timestamps and the {@code next_timestamp} of the
+ * segment will form a strictly increasing sequence, i.e., it is not valid to have a record version
+ * with validTo timestamp equal to (or less than) its validFrom timestamp. The one edge case /
+ * exception is when the latest record version (for a particular key) is a tombstone, and the
+ * segment in which this tombstone is to be stored contains currently no record versions.
+ * This case will result in a "degenerate" segment containing the single tombstone, with both
+ * {@code min_timestamp} and {@code next_timestamp} equal to the (validFrom) timestamp of the
+ * tombstone. (It is valid to interpret this tombstone's validTo timestamp as being equal to its
+ * validFrom timestamp, as querying for the latest record version as of a later timestamp will
+ * correctly return that no record version is present.) Note also that after a "degenerate" segment
+ * has formed, it's possible that the segment will remain degenerate even as newer record versions
+ * are added. (For example, if additional puts happen with later timestamps such that those puts
+ * only affect later segments, then the earlier degenerate segment will remain degenerate.)
+ * <p>
+ * Callers of this class need not concern themselves with this detail because all the exposed
+ * methods function as expected, even in the degenerate segment case. All methods may still be
+ * called, with the exception of {@link SegmentValue#find(long, boolean)} and those that depend
+ * on it (i.e., {@link SegmentValue#updateRecord(long, byte[], int)} and
+ * {@link SegmentValue#insert(long, byte[], int)}). Missing support for calling these methods on
+ * degenerate segments is not an issue because the same timestamp bounds restrictions required for
+ * calling {@link SegmentValue#find(long, boolean)} on regular segments serve to prevent callers
+ * from calling the method on degenerate segments as well.
+ */
+final class RocksDBVersionedStoreSegmentValueFormatter {
+    private static final int TIMESTAMP_SIZE = 8;
+    private static final int VALUE_SIZE = 4;
+
+    /**
+     * @return the validTo timestamp of the latest record in the provided segment
+     */
+    static long getNextTimestamp(final byte[] segmentValue) {
+        return ByteBuffer.wrap(segmentValue).getLong(0);
+    }
+
+    /**
+     * @return the (validFrom) timestamp of the earliest record in the provided segment.
+     */
+    static long getMinTimestamp(final byte[] segmentValue) {
+        return ByteBuffer.wrap(segmentValue).getLong(TIMESTAMP_SIZE);
+    }
+
+    /**
+     * @return the deserialized segment value
+     */
+    static SegmentValue deserialize(final byte[] segmentValue) {
+        return new PartiallyDeserializedSegmentValue(segmentValue);
+    }
+
+    /**
+     * Creates a new segment value that contains the provided record.
+     * <p>
+     * This method may also be used to create a "degenerate" segment with {@code null} value and
+     * {@code validFrom} timestamp equal to {@code validTo}. (For more on degenerate segments,
+     * see the main javadoc for this class.)
+     *
+     * @param value the record value
+     * @param validFrom the record's (validFrom) timestamp
+     * @param validTo the record's validTo timestamp
+     * @return the newly created segment value
+     */
+    static SegmentValue newSegmentValueWithRecord(
+        final byte[] value, final long validFrom, final long validTo) {
+        return new PartiallyDeserializedSegmentValue(value, validFrom, validTo);
+    }
+
+    interface SegmentValue {
+
+        /**
+         * Finds the latest record in this segment with (validFrom) timestamp not exceeding the
+         * provided timestamp bound. This method requires that the provided timestamp bound exists
+         * in this segment, i.e., that the provided timestamp bound is at least minTimestamp and
+         * is smaller than nextTimestamp. As a result of this requirement, it is not permitted to
+         * call this method on degenerate segments.
+         *
+         * @param timestamp the timestamp to find
+         * @param includeValue whether the value of the found record should be returned with the result
+         * @return the record that is found
+         * @throws IllegalArgumentException if the provided timestamp is not contained within this segment
+         */
+        SegmentSearchResult find(long timestamp, boolean includeValue);
+
+        /**
+         * Inserts the provided record into the segment as the latest record in the segment.
+         * This operation is allowed even if the segment is degenerate.
+         * <p>
+         * It is the caller's responsibility to ensure that this action is desirable. In the event
+         * that the new record's (validFrom) timestamp is smaller than the current
+         * {@code nextTimestamp} of the segment, the operation will still be performed, and the
+         * segment's existing contents will be truncated to ensure consistency of timestamps within
+         * the segment. This truncation behavior helps reconcile inconsistencies between different
+         * segments, or between a segment and the latest value store, of a
+         * {@link RocksDBVersionedStore} instance.
+         *
+         * @param validFrom the (validFrom) timestamp of the record to insert
+         * @param validTo the validTo timestamp of the record to insert
+         * @param value the value of the record to insert
+         */
+        void insertAsLatest(long validFrom, long validTo, byte[] value);
+
+        /**
+         * Inserts the provided record into the segment as the earliest record in the segment.
+         * This operation is allowed even if the segment is degenerate. It is the caller's responsibility
+         * to ensure that this action is valid, i.e., that record's (validFrom) timestamp is smaller
+         * than the current {@code minTimestamp} of the segment.
+         *
+         * @param timestamp the (validFrom) timestamp of the record to insert
+         * @param value the value of the record to insert
+         */
+        void insertAsEarliest(long timestamp, byte[] value);
+
+        /**
+         * Inserts the provided record into the segment at the provided index. This operation
+         * requires that the segment is not degenerate, and that
+         * {@link SegmentValue#find(long, boolean)} has already been called in order to deserialize
+         * the relevant index (to insert into index n requires that index n-1 has already been deserialized).
+         * <p>
+         * It is the caller's responsibility to ensure that this action makes sense, i.e., that the
+         * insertion index is correct for the (validFrom) timestamp of the record being inserted.
+         *
+         * @param timestamp the (validFrom) timestamp of the record to insert
+         * @param value the value of the record to insert
+         * @param index the index that the newly inserted record should occupy
+         * @throws IllegalArgumentException if the segment is degenerate, if the provided index is out of
+         *         bounds, or if {@code find()} has not been called to deserialize the relevant index.
+         */
+        void insert(long timestamp, byte[] value, int index);
+
+        /**
+         * Updates the record at the provided index with the provided value and (validFrom)
+         * timestamp. This operation requires that {@link SegmentValue#find(long, boolean)} has
+         * already been called in order to deserialize the relevant index (i.e., the one being updated).
+         * (As a result, it is not valid to call this method on a degenerate segment.)
+         * <p>
+         * It is the caller's responsibility to ensure that this action makes sense, i.e., that the
+         * updated (validFrom) timestamp does not violate timestamp order within the segment.
+         *
+         * @param timestamp the updated record (validFrom) timestamp
+         * @param value the updated record value
+         * @param index the index of the record to update
+         */
+        void updateRecord(long timestamp, byte[] value, int index);
+
+        /**
+         * @return the bytes serialization for this segment value
+         */
+        byte[] serialize();
+
+        class SegmentSearchResult {
+            private final int index;
+            private final long validFrom;
+            private final long validTo;
+            private final byte[] value;
+
+            SegmentSearchResult(final int index, final long validFrom, final long validTo) {
+                this(index, validFrom, validTo, null);
+            }
+
+            SegmentSearchResult(final int index, final long validFrom, final long validTo,
+                                final byte[] value) {
+                this.index = index;
+                this.validFrom = validFrom;
+                this.validTo = validTo;
+                this.value = value;
+            }
+
+            int index() {
+                return index;
+            }
+
+            long validFrom() {
+                return validFrom;
+            }
+
+            long validTo() {
+                return validTo;
+            }
+
+            /**
+             * This value will be null if the caller did not specify that the value should be
+             * included with the return result from {@link SegmentValue#find(long, boolean)}.
+             * In this case, it is up to the caller to not call this method, as the "value"
+             * returned will not be meaningful.
+             */
+            byte[] value() {
+                return value;
+            }
+        }
+    }
+
+    private static class PartiallyDeserializedSegmentValue implements SegmentValue {
+        private byte[] segmentValue;
+        private long nextTimestamp;
+        private long minTimestamp;
+        private boolean isDegenerate;
+
+        private int deserIndex = -1; // index up through which this segment has been deserialized (inclusive)
+        private List<TimestampAndValueSize> unpackedReversedTimestampAndValueSizes;
+        private List<Integer> cumulativeValueSizes; // ordered same as timestamp and value sizes (reverse time-sorted)
+
+        private PartiallyDeserializedSegmentValue(final byte[] segmentValue) {
+            this.segmentValue = segmentValue;
+            this.nextTimestamp =
+                RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(segmentValue);
+            this.minTimestamp =
+                RocksDBVersionedStoreSegmentValueFormatter.getMinTimestamp(segmentValue);
+            this.isDegenerate = nextTimestamp == minTimestamp;
+            resetDeserHelpers();
+        }
+
+        private PartiallyDeserializedSegmentValue(
+            final byte[] valueOrNull, final long validFrom, final long validTo) {
+            initializeWithRecord(new ValueAndValueSize(valueOrNull), validFrom, validTo);
+        }
+
+        @Override
+        public SegmentSearchResult find(final long timestamp, final boolean includeValue) {
+            if (timestamp < minTimestamp) {
+                throw new IllegalArgumentException("Timestamp is too small to be found in this segment.");
+            }
+            if (timestamp >= nextTimestamp) {
+                throw new IllegalArgumentException("Timestamp is too large to be found in this segment.");
+            }
+
+            long currNextTimestamp = nextTimestamp;
+            long currTimestamp = -1L; // choose an invalid timestamp. if this is valid, this needs to be re-worked
+            int currValueSize;
+            int currIndex = 0;
+            int cumValueSize = 0;
+            while (currTimestamp != minTimestamp) {
+                if (currIndex <= deserIndex) {
+                    final TimestampAndValueSize curr = unpackedReversedTimestampAndValueSizes.get(currIndex);
+                    currTimestamp = curr.timestamp;
+                    currValueSize = curr.valueSize;
+                    cumValueSize = cumulativeValueSizes.get(currIndex);
+                } else {
+                    final int timestampSegmentIndex = 2 * TIMESTAMP_SIZE + currIndex * (TIMESTAMP_SIZE + VALUE_SIZE);
+                    currTimestamp = ByteBuffer.wrap(segmentValue).getLong(timestampSegmentIndex);
+                    currValueSize = ByteBuffer.wrap(segmentValue).getInt(timestampSegmentIndex + TIMESTAMP_SIZE);
+                    cumValueSize += Math.max(currValueSize, 0);
+
+                    deserIndex = currIndex;
+                    unpackedReversedTimestampAndValueSizes.add(new TimestampAndValueSize(currTimestamp, currValueSize));
+                    cumulativeValueSizes.add(cumValueSize);
+                }
+
+                if (currTimestamp <= timestamp) {
+                    // found result
+                    if (includeValue) {
+                        if (currValueSize >= 0) {
+                            final byte[] value = new byte[currValueSize];
+                            final int valueSegmentIndex = segmentValue.length - cumValueSize;
+                            System.arraycopy(segmentValue, valueSegmentIndex, value, 0, currValueSize);
+                            return new SegmentSearchResult(currIndex, currTimestamp, currNextTimestamp, value);
+                        } else {
+                            return new SegmentSearchResult(currIndex, currTimestamp, currNextTimestamp, null);
+                        }
+                    } else {
+                        return new SegmentSearchResult(currIndex, currTimestamp, currNextTimestamp);
+                    }
+                }
+
+                // prep for next iteration
+                currNextTimestamp = currTimestamp;
+                currIndex++;
+            }
+
+            throw new IllegalStateException("Search in segment expected to find result but did not.");
+        }
+
+        @Override
+        public void insertAsLatest(final long validFrom, final long validTo, final byte[] valueOrNull) {
+            final ValueAndValueSize value = new ValueAndValueSize(valueOrNull);
+
+            if (nextTimestamp > validFrom) {
+                // detected inconsistency edge case where older segment has [a,b) while newer store
+                // has [a,c), due to [b,c) having failed to write to newer store.
+                // remove entries from this store until the overlap is resolved.
+                // TODO: will be implemented in a follow-up PR
+                throw new UnsupportedOperationException("case not yet implemented");
+            }
+
+            if (nextTimestamp != validFrom) {

Review Comment:
   Can we flip if/else and put `if (validFrom == nextTimestamp)` here to get a "logical flow" of the three cases `validFrom < ...`, `validFrom == ...`, `validFrom > ...` (the current `nextTimestamp != validFrom` means `next < validFrom` effectively and it's hard to read right now anyway from my POV).



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatterTest.java:
##########
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertThrows;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue;
+import org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class RocksDBVersionedStoreSegmentValueFormatterTest {
+
+    private static final List<TestCase> TEST_CASES = new ArrayList<>();
+    static {
+        // test cases are expected to have timestamps in strictly decreasing order (except for the degenerate case)
+        TEST_CASES.add(new TestCase("degenerate", 10, new TestRecord(null, 10)));
+        TEST_CASES.add(new TestCase("single record", 10, new TestRecord("foo".getBytes(), 1)));
+        TEST_CASES.add(new TestCase("multiple records", 10, new TestRecord("foo".getBytes(), 8), new TestRecord("bar".getBytes(), 3), new TestRecord("baz".getBytes(), 0)));
+        TEST_CASES.add(new TestCase("single tombstone", 10, new TestRecord(null, 1)));
+        TEST_CASES.add(new TestCase("multiple tombstone", 10, new TestRecord(null, 4), new TestRecord(null, 1)));
+        TEST_CASES.add(new TestCase("tombstones and records (r, t, r)", 10, new TestRecord("foo".getBytes(), 5), new TestRecord(null, 2), new TestRecord("bar".getBytes(), 1)));
+        TEST_CASES.add(new TestCase("tombstones and records (t, r, t)", 10, new TestRecord(null, 5), new TestRecord("foo".getBytes(), 2), new TestRecord(null, 1)));
+        TEST_CASES.add(new TestCase("tombstones and records (r, r, t, t)", 10, new TestRecord("foo".getBytes(), 6), new TestRecord("bar".getBytes(), 5), new TestRecord(null, 2), new TestRecord(null, 1)));
+        TEST_CASES.add(new TestCase("tombstones and records (t, t, r, r)", 10, new TestRecord(null, 7), new TestRecord(null, 6), new TestRecord("foo".getBytes(), 2), new TestRecord("bar".getBytes(), 1)));
+        TEST_CASES.add(new TestCase("record with empty bytes", 10, new TestRecord(new byte[0], 1)));
+        TEST_CASES.add(new TestCase("records with empty bytes (r, e)", 10, new TestRecord("foo".getBytes(), 4), new TestRecord(new byte[0], 1)));
+        TEST_CASES.add(new TestCase("records with empty bytes (e, e, r)", 10, new TestRecord(new byte[0], 8), new TestRecord(new byte[0], 2), new TestRecord("foo".getBytes(), 1)));
+    }
+
+    private final TestCase testCase;
+
+    public RocksDBVersionedStoreSegmentValueFormatterTest(final TestCase testCase) {
+        this.testCase = testCase;
+    }
+
+    @Parameterized.Parameters(name = "{0}")
+    public static Collection<TestCase> data() {
+        return TEST_CASES;
+    }
+
+    @Test
+    public void shouldSerializeAndDeserialize() {
+        final SegmentValue segmentValue = buildSegmentWithInsertLatest(testCase);
+
+        final byte[] serialized = segmentValue.serialize();
+        final SegmentValue deserialized = RocksDBVersionedStoreSegmentValueFormatter.deserialize(serialized);
+
+        verifySegmentContents(deserialized, testCase);
+    }
+
+    @Test
+    public void shouldBuildWithInsertLatest() {
+        final SegmentValue segmentValue = buildSegmentWithInsertLatest(testCase);
+
+        verifySegmentContents(segmentValue, testCase);
+    }
+
+    @Test
+    public void shouldBuildWithInsertEarliest() {
+        final SegmentValue segmentValue = buildSegmentWithInsertEarliest(testCase);
+
+        verifySegmentContents(segmentValue, testCase);
+    }
+
+    @Test
+    public void shouldInsertAtIndex() {
+        if (testCase.isDegenerate) {
+            // cannot insert into degenerate segment
+            return;
+        }
+
+        // test inserting at each possible index
+        for (int insertIdx = 0; insertIdx <= testCase.records.size(); insertIdx++) {
+            // build record to insert
+            final long newRecordTimestamp;
+            if (insertIdx == 0) {
+                newRecordTimestamp = testCase.records.get(0).timestamp + 1;
+                if (newRecordTimestamp == testCase.nextTimestamp) {
+                    // cannot insert because no timestamp exists between last record and nextTimestamp
+                    continue;
+                }
+            } else {
+                newRecordTimestamp = testCase.records.get(insertIdx - 1).timestamp - 1;
+                if (newRecordTimestamp < 0 || (insertIdx < testCase.records.size() && newRecordTimestamp == testCase.records.get(insertIdx).timestamp)) {
+                    // cannot insert because timestamps of existing records are adjacent
+                    continue;
+                }
+            }
+            final TestRecord newRecord = new TestRecord("new".getBytes(), newRecordTimestamp);
+
+            final SegmentValue segmentValue = buildSegmentWithInsertLatest(testCase);
+
+            // insert() first requires a call to find()
+            if (insertIdx > 0) {
+                segmentValue.find(testCase.records.get(insertIdx - 1).timestamp, false);
+            }
+            segmentValue.insert(newRecord.timestamp, newRecord.value, insertIdx);
+
+            // create expected results
+            final List<TestRecord> expectedRecords = new ArrayList<>(testCase.records);
+            expectedRecords.add(insertIdx, newRecord);
+
+            verifySegmentContents(segmentValue, new TestCase("expected", testCase.nextTimestamp, expectedRecords));
+        }
+    }
+
+    @Test
+    public void shouldUpdateAtIndex() {
+        if (testCase.isDegenerate) {
+            // cannot update degenerate segment
+            return;
+        }
+
+        // test updating at each possible index
+        for (int updateIdx = 0; updateIdx < testCase.records.size(); updateIdx++) {
+            // build updated record
+            long updatedRecordTimestamp = testCase.records.get(updateIdx).timestamp - 1;
+            if (updatedRecordTimestamp < 0 || (updateIdx < testCase.records.size() - 1 && updatedRecordTimestamp == testCase.records.get(updateIdx + 1).timestamp)) {
+                // found timestamp conflict. try again
+                updatedRecordTimestamp = testCase.records.get(updateIdx).timestamp + 1;
+                if (updateIdx > 0 && updatedRecordTimestamp == testCase.records.get(updateIdx - 1).timestamp) {
+                    // found timestamp conflict. use original timestamp
+                    updatedRecordTimestamp = testCase.records.get(updateIdx).timestamp;
+                }
+            }
+            final TestRecord updatedRecord = new TestRecord("updated".getBytes(), updatedRecordTimestamp);
+
+            final SegmentValue segmentValue = buildSegmentWithInsertLatest(testCase);
+
+            // updateRecord() first requires a call to find()
+            segmentValue.find(testCase.records.get(updateIdx).timestamp, false);
+            segmentValue.updateRecord(updatedRecord.timestamp, updatedRecord.value, updateIdx);
+
+            // create expected results
+            final List<TestRecord> expectedRecords = new ArrayList<>(testCase.records);
+            expectedRecords.remove(updateIdx);
+            expectedRecords.add(updateIdx, updatedRecord);
+
+            verifySegmentContents(segmentValue, new TestCase("expected", testCase.nextTimestamp, expectedRecords));
+        }
+    }
+
+    @Test
+    public void shouldFindByTimestamp() {
+        if (testCase.isDegenerate) {
+            // cannot find() on degenerate segment
+            return;
+        }
+
+        final SegmentValue segmentValue = buildSegmentWithInsertLatest(testCase);
+
+        // build expected mapping from timestamp -> record
+        final Map<Long, Integer> expectedRecordIndices = new HashMap<>();
+        for (int recordIdx = testCase.records.size() - 1; recordIdx >= 0; recordIdx--) {
+            if (recordIdx < testCase.records.size() - 1) {
+                expectedRecordIndices.put(testCase.records.get(recordIdx).timestamp - 1, recordIdx + 1);
+            }
+            if (recordIdx > 0) {
+                expectedRecordIndices.put(testCase.records.get(recordIdx).timestamp + 1, recordIdx);

Review Comment:
   Not sure if I understand this case? If we search at `timestamp + 1`, we could also find `recordIdx - 1` if the test-record has interval `[t,t+1)`, right? Maybe we don't have a test record like this, but than the question is, should we add one?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org