You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "vcrfxia (via GitHub)" <gi...@apache.org> on 2023/02/02 19:42:03 UTC

[GitHub] [kafka] vcrfxia opened a new pull request, #13189: KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog

vcrfxia opened a new pull request, #13189:
URL: https://github.com/apache/kafka/pull/13189

   (This PR is stacked on https://github.com/apache/kafka/pull/13188, which in turn is stacked on https://github.com/apache/kafka/pull/13143. Only the last commit (`add restore`) needs to be reviewed as part of this PR.)
   
   This PR builds on the new RocksDB-based versioned store implementation (see [KIP-889](https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores)) introduced in https://github.com/apache/kafka/pull/13188 by adding code for restoring from changelog. The changelog topic format is the same as for regular timestamped key-value stores: record keys, values, and timestamps are stored in the Kafka message key, value, and timestamp, respectively. The code for actually writing to this changelog will come in a follow-up PR.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13189: KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13189:
URL: https://github.com/apache/kafka/pull/13189#discussion_r1101915894


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer.java:
##########
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.BatchWritingVersionedStoreClient;
+import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreClient;
+import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreSegment;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A write buffer for use in restoring a {@link RocksDBVersionedStore} from its changelog. This
+ * class exposes a {@link VersionedStoreClient} to put records into the write buffer, which may
+ * then be flushed to the store via {@link WriteBatch}es, for improved write efficiency during
+ * restoration.
+ * <p>
+ * The structure of the internals of this write buffer mirrors the structure of the
+ * {@code RocksDBVersionedStore} itself, i.e., data for the latest value store and each of the
+ * segment stores is buffered in a separate object -- specifically, a map.
+ */
+public class RocksDBVersionedStoreRestoreWriteBuffer {
+
+    private static final Logger log = LoggerFactory.getLogger(RocksDBVersionedStoreRestoreWriteBuffer.class);
+
+    // write buffer for latest value store. value type is Optional in order to track tombstones
+    // which must be written to the underlying store.
+    private final Map<Bytes, Optional<byte[]>> latestValueWriteBuffer;
+    // map from segment id to write buffer. segments are stored in reverse-sorted order,
+    // so getReverseSegments() is more efficient
+    private final TreeMap<Long, WriteBufferSegmentWithDbFallback> segmentsWriteBuffer;
+    private final BatchWritingVersionedStoreClient<LogicalKeyValueSegment> dbClient;
+    private final RocksDBVersionedStoreRestoreClient restoreClient;
+
+    /**
+     * Creates a new write buffer.
+     * @param dbClient client for reading from and writing to the underlying persistent store
+     */
+    RocksDBVersionedStoreRestoreWriteBuffer(final BatchWritingVersionedStoreClient<LogicalKeyValueSegment> dbClient) {
+        this.dbClient = Objects.requireNonNull(dbClient);
+
+        this.latestValueWriteBuffer = new HashMap<>();
+        // store in reverse-sorted order, to make getReverseSegments() more efficient
+        this.segmentsWriteBuffer = new TreeMap<>((x, y) -> Long.compare(y, x));
+        this.restoreClient = new RocksDBVersionedStoreRestoreClient();
+    }
+
+    /**
+     * @return client for writing to (and reading from) the write buffer
+     */
+    VersionedStoreClient<?> getClient() {
+        return restoreClient;
+    }
+
+    /**
+     * Flushes the contents of the write buffer into the persistent store, and clears the write
+     * buffer in the process.
+     * @throws RocksDBException if a failure occurs adding to or writing a {@link WriteBatch}
+     */
+    void flush() throws RocksDBException {
+
+        // flush segments first, as this is consistent with the store always writing to
+        // older segments/stores before later ones
+        try (final WriteBatch segmentsBatch = new WriteBatch()) {
+            final List<WriteBufferSegmentWithDbFallback> allSegments = restoreClient.getReverseSegments(Long.MIN_VALUE);
+            if (allSegments.size() > 0) {
+                // collect entries into write batch
+                for (final WriteBufferSegmentWithDbFallback bufferSegment : allSegments) {
+                    final LogicalKeyValueSegment dbSegment = bufferSegment.dbSegment();
+                    for (final Map.Entry<Bytes, byte[]> segmentEntry : bufferSegment.getAll()) {
+                        dbSegment.addToBatch(
+                            new KeyValue<>(segmentEntry.getKey().get(), segmentEntry.getValue()),
+                            segmentsBatch);
+                    }
+                }
+
+                // write to db. all the logical segments share the same physical store,
+                // so we can use any segment to perform the write
+                allSegments.get(0).dbSegment().write(segmentsBatch);
+            }
+        } catch (final RocksDBException e) {
+            log.error("Error restoring batch to RocksDBVersionedStore segments store.");
+            throw e;
+        }
+        segmentsWriteBuffer.clear();
+
+        // flush latest value store
+        try (final WriteBatch latestValueBatch = new WriteBatch()) {
+            // collect entries into write batch
+            for (final Map.Entry<Bytes, Optional<byte[]>> latestValueEntry : latestValueWriteBuffer.entrySet()) {
+                final byte[] value = latestValueEntry.getValue().orElse(null);
+                dbClient.addToLatestValueBatch(
+                    new KeyValue<>(latestValueEntry.getKey().get(), value),
+                    latestValueBatch);
+            }
+
+            // write to db
+            dbClient.writeLatestValues(latestValueBatch);
+        } catch (final RocksDBException e) {
+            log.error("Error restoring batch to RocksDBVersionedStore latest value store.");
+            throw e;
+        }
+        latestValueWriteBuffer.clear();
+    }
+
+    /**
+     * The object representation of the write buffer corresponding to a single segment store.
+     * Contains the write buffer itself (a simple hash map) and also a reference to the underlying
+     * persistent segment store.
+     */
+    private class WriteBufferSegmentWithDbFallback implements VersionedStoreSegment {
+
+        private final long id;
+        private final Map<Bytes, byte[]> data;
+        private final LogicalKeyValueSegment dbSegment;
+
+        WriteBufferSegmentWithDbFallback(final LogicalKeyValueSegment dbSegment) {
+            this.dbSegment = Objects.requireNonNull(dbSegment);
+            this.id = dbSegment.id();
+            this.data = new HashMap<>();
+
+            // register segment with segments store
+            segmentsWriteBuffer.put(id, this);
+        }
+
+        LogicalKeyValueSegment dbSegment() {
+            return dbSegment;
+        }
+
+        @Override
+        public long id() {
+            return id;
+        }
+
+        @Override
+        public void put(final Bytes key, final byte[] value) {
+            // all writes go to the write buffer
+            data.put(key, value);
+        }
+
+        @Override
+        public byte[] get(final Bytes key) {
+            final byte[] bufferValue = data.get(key);
+            if (bufferValue != null) {
+                return bufferValue;
+            }
+            return dbSegment.get(key);
+        }
+
+        Set<Entry<Bytes, byte[]>> getAll() {

Review Comment:
   Minor optimization to not need to copy the map. (If we were to return a map, I don't think the return value should be modifiable.) If you prefer copying the map for readability we can do that too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13189: KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13189:
URL: https://github.com/apache/kafka/pull/13189#discussion_r1103303535


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -0,0 +1,877 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper;
+import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
+import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
+import org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapter;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+import org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue;
+import org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A persistent, versioned key-value store based on RocksDB.
+ * <p>
+ * This store implementation consists of a "latest value store" and "segment stores." The latest
+ * record version for each key is stored in the latest value store, while older record versions
+ * are stored in the segment stores. Conceptually, each record version has two associated
+ * timestamps:
+ * <ul>
+ *     <li>a {@code validFrom} timestamp. This timestamp is explicitly associated with the record
+ *     as part of the {@link VersionedKeyValueStore#put(Object, Object, long)}} call to the store;
+ *     i.e., this is the record's timestamp.</li>
+ *     <li>a {@code validTo} timestamp. This is the timestamp of the next record (or deletion)
+ *     associated with the same key, and is implicitly associated with the record. This timestamp
+ *     can change as new records are inserted into the store.</li>
+ * </ul>
+ * The validity interval of a record is from validFrom (inclusive) to validTo (exclusive), and
+ * can change as new record versions are inserted into the store (and validTo changes as a result).
+ * <p>
+ * Old record versions are stored in segment stores according to their validTo timestamps. This
+ * allows for efficient expiry of old record versions, as entire segments can be dropped from the
+ * store at a time, once the records contained in the segment are no longer relevant based on the
+ * store's history retention (for an explanation of "history retention", see
+ * {@link VersionedKeyValueStore}). Multiple record versions (for the same key) within a single
+ * segment are stored together using the format specified in {@link RocksDBVersionedStoreSegmentValueFormatter}.
+ */
+public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, byte[]> {
+    private static final Logger LOG = LoggerFactory.getLogger(RocksDBVersionedStore.class);
+    // a marker to indicate that no record version has yet been found as part of an ongoing
+    // put() procedure. any value which is not a valid record timestamp will do.
+    private static final long SENTINEL_TIMESTAMP = Long.MIN_VALUE;
+
+    private final String name;
+    private final long historyRetention;
+    private final RocksDBMetricsRecorder metricsRecorder;
+
+    private final RocksDBStore latestValueStore;
+    private final LogicalKeyValueSegments segmentStores;
+    private final LatestValueSchema latestValueSchema;
+    private final SegmentValueSchema segmentValueSchema;
+    private final BatchWritingVersionedStoreClient<LogicalKeyValueSegment> versionedStoreClient;
+    private final RocksDBVersionedStoreRestoreWriteBuffer restoreWriteBuffer;
+
+    private ProcessorContext context;
+    private StateStoreContext stateStoreContext;
+    private Sensor expiredRecordSensor;
+    private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+    private boolean consistencyEnabled = false;
+    private Position position;
+    private OffsetCheckpoint positionCheckpoint;
+    private volatile boolean open;
+
+    RocksDBVersionedStore(final String name, final String metricsScope, final long historyRetention, final long segmentInterval) {
+        this.name = name;
+        this.historyRetention = historyRetention;
+        this.metricsRecorder = new RocksDBMetricsRecorder(metricsScope, name);
+        this.latestValueStore = new RocksDBStore(lvsName(name), name, metricsRecorder);
+        this.segmentStores = new LogicalKeyValueSegments(segmentsName(name), name, historyRetention, segmentInterval, metricsRecorder);
+        this.latestValueSchema = new LatestValueSchema();
+        this.segmentValueSchema = new SegmentValueSchema();
+        this.versionedStoreClient = new RocksDBVersionedStoreClient();
+        this.restoreWriteBuffer = new RocksDBVersionedStoreRestoreWriteBuffer(versionedStoreClient);
+    }
+
+    @Override
+    public void put(final Bytes key, final byte[] value, final long timestamp) {
+
+        doPut(
+            latestValueSchema,
+            segmentValueSchema,
+            versionedStoreClient,
+            Optional.of(expiredRecordSensor),
+            context,
+            observedStreamTime,
+            historyRetention,
+            key,
+            value,
+            timestamp
+        );
+
+        observedStreamTime = Math.max(observedStreamTime, timestamp);
+    }
+
+    @Override
+    public VersionedRecord<byte[]> delete(final Bytes key, final long timestamp) {
+        final VersionedRecord<byte[]> existingRecord = get(key, timestamp);
+        put(key, null, timestamp);
+        return existingRecord;
+    }
+
+    @Override
+    public VersionedRecord<byte[]> get(final Bytes key) {
+        // latest value (if present) is guaranteed to be in the latest value store
+        final byte[] latestValue = latestValueStore.get(key);
+        if (latestValue != null) {
+            return new VersionedRecord<>(
+                latestValueSchema.getValue(latestValue),
+                latestValueSchema.getTimestamp(latestValue)
+            );
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public VersionedRecord<byte[]> get(final Bytes key, final long asOfTimestamp) {
+
+        if (asOfTimestamp < observedStreamTime - historyRetention) {
+            // history retention has elapsed. return null for predictability, even if data
+            // is still present in store.
+            return null;
+        }
+
+        // first check the latest value store
+        final byte[] latestValue = latestValueStore.get(key);
+        if (latestValue != null) {
+            final long latestTimestamp = latestValueSchema.getTimestamp(latestValue);
+            if (latestTimestamp <= asOfTimestamp) {
+                return new VersionedRecord<>(latestValueSchema.getValue(latestValue), latestTimestamp);
+            }
+        }
+
+        // check segment stores
+        final List<LogicalKeyValueSegment> segments = segmentStores.segments(asOfTimestamp, Long.MAX_VALUE, false);
+        for (final LogicalKeyValueSegment segment : segments) {
+            final byte[] segmentValue = segment.get(key);
+            if (segmentValue != null) {
+                final long nextTs = segmentValueSchema.getNextTimestamp(segmentValue);
+                if (nextTs <= asOfTimestamp) {
+                    // this segment contains no data for the queried timestamp, so earlier segments
+                    // cannot either
+                    return null;
+                }
+
+                if (segmentValueSchema.getMinTimestamp(segmentValue) > asOfTimestamp) {
+                    // the segment only contains data for after the queried timestamp. skip and
+                    // continue the search to earlier segments. as an optimization, this code
+                    // could be updated to skip forward to the segment containing the minTimestamp
+                    // in the if-condition above.
+                    continue;
+                }
+
+                // the desired result is contained in this segment
+                final SegmentSearchResult searchResult =
+                    segmentValueSchema.deserialize(segmentValue).find(asOfTimestamp, true);
+                if (searchResult.value() != null) {
+                    return new VersionedRecord<>(searchResult.value(), searchResult.validFrom());
+                } else {
+                    return null;
+                }
+            }
+        }
+
+        // checked all segments and no results found
+        return null;
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public void flush() {
+        // order shouldn't matter since failure to flush is a fatal exception
+        segmentStores.flush();
+        latestValueStore.flush();
+    }
+
+    @Override
+    public void close() {
+        open = false;
+
+        // close latest value store first so that calls to get() immediately begin to fail with
+        // store not open, as all calls to get() first get() from latest value store
+        latestValueStore.close();
+        segmentStores.close();
+    }
+
+    @Override
+    public boolean persistent() {
+        return true;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return open;
+    }
+
+    @Override
+    public Position getPosition() {
+        return position;
+    }
+
+    @Deprecated
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+        this.context = context;
+
+        final StreamsMetricsImpl metrics = ProcessorContextUtils.getMetricsImpl(context);
+        final String threadId = Thread.currentThread().getName();
+        final String taskName = context.taskId().toString();
+
+        expiredRecordSensor = TaskMetrics.droppedRecordsSensor(
+            threadId,
+            taskName,
+            metrics
+        );
+
+        metricsRecorder.init(ProcessorContextUtils.getMetricsImpl(context), context.taskId());
+
+        latestValueStore.openDB(context.appConfigs(), context.stateDir());
+        segmentStores.openExisting(context, observedStreamTime);
+
+        final File positionCheckpointFile = new File(context.stateDir(), name() + ".position");
+        this.positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile);
+        this.position = StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint);
+
+        // register and possibly restore the state from the logs
+        stateStoreContext.register(
+            root,
+            (RecordBatchingStateRestoreCallback) RocksDBVersionedStore.this::restoreBatch,
+            () -> StoreQueryUtils.checkpointPosition(positionCheckpoint, position)
+        );
+
+        open = true;
+
+        consistencyEnabled = StreamsConfig.InternalConfig.getBoolean(
+            context.appConfigs(),
+            IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED,
+            false
+        );
+    }
+
+    @Override
+    public void init(final StateStoreContext context, final StateStore root) {
+        this.stateStoreContext = context;
+        init(StoreToProcessorContextAdapter.adapt(context), root);
+    }
+
+    // VisibleForTesting
+    void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> records) {
+        // advance stream time to the max timestamp in the batch
+        for (final ConsumerRecord<byte[], byte[]> record : records) {
+            observedStreamTime = Math.max(observedStreamTime, record.timestamp());
+        }
+
+        final VersionedStoreClient<?> restoreClient = restoreWriteBuffer.getClient();
+
+        // TODO: handle potential out of memory in this process

Review Comment:
   At least no reason to block this PR. \cc @guozhangwang WDYT about this question?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13189: KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13189:
URL: https://github.com/apache/kafka/pull/13189#discussion_r1101914047


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -333,10 +384,34 @@ interface VersionedStoreSegment {
         long segmentIdForTimestamp(long timestamp);
     }
 
+    /**
+     * A {@link VersionedStoreClient} which additionally supports batch writes into its latest
+     * value store.
+     *
+     * @param <T> the segment type used by this client
+     */
+    interface BatchWritingVersionedStoreClient<T extends VersionedStoreSegment> extends VersionedStoreClient<T> {

Review Comment:
   We could, yeah. It just means we'd need to make RocksDBVersionedStoreClient non-private and require that RocksDBVersionedStoreRestoreWriteBuffer is specifically passed an instance of RocksDBVersionedStoreClient rather than any BatchWritingVersionedStoreClient. I'll make the change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13189: KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13189:
URL: https://github.com/apache/kafka/pull/13189#discussion_r1100902144


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -0,0 +1,877 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper;
+import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
+import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
+import org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapter;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+import org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue;
+import org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A persistent, versioned key-value store based on RocksDB.
+ * <p>
+ * This store implementation consists of a "latest value store" and "segment stores." The latest
+ * record version for each key is stored in the latest value store, while older record versions
+ * are stored in the segment stores. Conceptually, each record version has two associated
+ * timestamps:
+ * <ul>
+ *     <li>a {@code validFrom} timestamp. This timestamp is explicitly associated with the record
+ *     as part of the {@link VersionedKeyValueStore#put(Object, Object, long)}} call to the store;
+ *     i.e., this is the record's timestamp.</li>
+ *     <li>a {@code validTo} timestamp. This is the timestamp of the next record (or deletion)
+ *     associated with the same key, and is implicitly associated with the record. This timestamp
+ *     can change as new records are inserted into the store.</li>
+ * </ul>
+ * The validity interval of a record is from validFrom (inclusive) to validTo (exclusive), and
+ * can change as new record versions are inserted into the store (and validTo changes as a result).
+ * <p>
+ * Old record versions are stored in segment stores according to their validTo timestamps. This
+ * allows for efficient expiry of old record versions, as entire segments can be dropped from the
+ * store at a time, once the records contained in the segment are no longer relevant based on the
+ * store's history retention (for an explanation of "history retention", see
+ * {@link VersionedKeyValueStore}). Multiple record versions (for the same key) within a single
+ * segment are stored together using the format specified in {@link RocksDBVersionedStoreSegmentValueFormatter}.
+ */
+public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, byte[]> {
+    private static final Logger LOG = LoggerFactory.getLogger(RocksDBVersionedStore.class);
+    // a marker to indicate that no record version has yet been found as part of an ongoing
+    // put() procedure. any value which is not a valid record timestamp will do.
+    private static final long SENTINEL_TIMESTAMP = Long.MIN_VALUE;
+
+    private final String name;
+    private final long historyRetention;
+    private final RocksDBMetricsRecorder metricsRecorder;
+
+    private final RocksDBStore latestValueStore;
+    private final LogicalKeyValueSegments segmentStores;
+    private final LatestValueSchema latestValueSchema;
+    private final SegmentValueSchema segmentValueSchema;
+    private final BatchWritingVersionedStoreClient<LogicalKeyValueSegment> versionedStoreClient;
+    private final RocksDBVersionedStoreRestoreWriteBuffer restoreWriteBuffer;
+
+    private ProcessorContext context;
+    private StateStoreContext stateStoreContext;
+    private Sensor expiredRecordSensor;
+    private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+    private boolean consistencyEnabled = false;
+    private Position position;
+    private OffsetCheckpoint positionCheckpoint;
+    private volatile boolean open;
+
+    RocksDBVersionedStore(final String name, final String metricsScope, final long historyRetention, final long segmentInterval) {
+        this.name = name;
+        this.historyRetention = historyRetention;
+        this.metricsRecorder = new RocksDBMetricsRecorder(metricsScope, name);
+        this.latestValueStore = new RocksDBStore(lvsName(name), name, metricsRecorder);
+        this.segmentStores = new LogicalKeyValueSegments(segmentsName(name), name, historyRetention, segmentInterval, metricsRecorder);
+        this.latestValueSchema = new LatestValueSchema();
+        this.segmentValueSchema = new SegmentValueSchema();
+        this.versionedStoreClient = new RocksDBVersionedStoreClient();
+        this.restoreWriteBuffer = new RocksDBVersionedStoreRestoreWriteBuffer(versionedStoreClient);
+    }
+
+    @Override
+    public void put(final Bytes key, final byte[] value, final long timestamp) {
+
+        doPut(
+            latestValueSchema,
+            segmentValueSchema,
+            versionedStoreClient,
+            Optional.of(expiredRecordSensor),
+            context,
+            observedStreamTime,
+            historyRetention,
+            key,
+            value,
+            timestamp
+        );
+
+        observedStreamTime = Math.max(observedStreamTime, timestamp);
+    }
+
+    @Override
+    public VersionedRecord<byte[]> delete(final Bytes key, final long timestamp) {
+        final VersionedRecord<byte[]> existingRecord = get(key, timestamp);
+        put(key, null, timestamp);
+        return existingRecord;
+    }
+
+    @Override
+    public VersionedRecord<byte[]> get(final Bytes key) {
+        // latest value (if present) is guaranteed to be in the latest value store
+        final byte[] latestValue = latestValueStore.get(key);
+        if (latestValue != null) {
+            return new VersionedRecord<>(
+                latestValueSchema.getValue(latestValue),
+                latestValueSchema.getTimestamp(latestValue)
+            );
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public VersionedRecord<byte[]> get(final Bytes key, final long asOfTimestamp) {
+
+        if (asOfTimestamp < observedStreamTime - historyRetention) {
+            // history retention has elapsed. return null for predictability, even if data
+            // is still present in store.
+            return null;
+        }
+
+        // first check the latest value store
+        final byte[] latestValue = latestValueStore.get(key);
+        if (latestValue != null) {
+            final long latestTimestamp = latestValueSchema.getTimestamp(latestValue);
+            if (latestTimestamp <= asOfTimestamp) {
+                return new VersionedRecord<>(latestValueSchema.getValue(latestValue), latestTimestamp);
+            }
+        }
+
+        // check segment stores
+        final List<LogicalKeyValueSegment> segments = segmentStores.segments(asOfTimestamp, Long.MAX_VALUE, false);
+        for (final LogicalKeyValueSegment segment : segments) {
+            final byte[] segmentValue = segment.get(key);
+            if (segmentValue != null) {
+                final long nextTs = segmentValueSchema.getNextTimestamp(segmentValue);
+                if (nextTs <= asOfTimestamp) {
+                    // this segment contains no data for the queried timestamp, so earlier segments
+                    // cannot either
+                    return null;
+                }
+
+                if (segmentValueSchema.getMinTimestamp(segmentValue) > asOfTimestamp) {
+                    // the segment only contains data for after the queried timestamp. skip and
+                    // continue the search to earlier segments. as an optimization, this code
+                    // could be updated to skip forward to the segment containing the minTimestamp
+                    // in the if-condition above.
+                    continue;
+                }
+
+                // the desired result is contained in this segment
+                final SegmentSearchResult searchResult =
+                    segmentValueSchema.deserialize(segmentValue).find(asOfTimestamp, true);
+                if (searchResult.value() != null) {
+                    return new VersionedRecord<>(searchResult.value(), searchResult.validFrom());
+                } else {
+                    return null;
+                }
+            }
+        }
+
+        // checked all segments and no results found
+        return null;
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public void flush() {
+        // order shouldn't matter since failure to flush is a fatal exception
+        segmentStores.flush();
+        latestValueStore.flush();
+    }
+
+    @Override
+    public void close() {
+        open = false;
+
+        // close latest value store first so that calls to get() immediately begin to fail with
+        // store not open, as all calls to get() first get() from latest value store
+        latestValueStore.close();
+        segmentStores.close();
+    }
+
+    @Override
+    public boolean persistent() {
+        return true;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return open;
+    }
+
+    @Override
+    public Position getPosition() {
+        return position;
+    }
+
+    @Deprecated
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+        this.context = context;
+
+        final StreamsMetricsImpl metrics = ProcessorContextUtils.getMetricsImpl(context);
+        final String threadId = Thread.currentThread().getName();
+        final String taskName = context.taskId().toString();
+
+        expiredRecordSensor = TaskMetrics.droppedRecordsSensor(
+            threadId,
+            taskName,
+            metrics
+        );
+
+        metricsRecorder.init(ProcessorContextUtils.getMetricsImpl(context), context.taskId());
+
+        latestValueStore.openDB(context.appConfigs(), context.stateDir());
+        segmentStores.openExisting(context, observedStreamTime);
+
+        final File positionCheckpointFile = new File(context.stateDir(), name() + ".position");
+        this.positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile);
+        this.position = StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint);
+
+        // register and possibly restore the state from the logs
+        stateStoreContext.register(
+            root,
+            (RecordBatchingStateRestoreCallback) RocksDBVersionedStore.this::restoreBatch,
+            () -> StoreQueryUtils.checkpointPosition(positionCheckpoint, position)
+        );
+
+        open = true;
+
+        consistencyEnabled = StreamsConfig.InternalConfig.getBoolean(
+            context.appConfigs(),
+            IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED,
+            false
+        );
+    }
+
+    @Override
+    public void init(final StateStoreContext context, final StateStore root) {
+        this.stateStoreContext = context;
+        init(StoreToProcessorContextAdapter.adapt(context), root);
+    }
+
+    // VisibleForTesting
+    void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> records) {
+        // advance stream time to the max timestamp in the batch
+        for (final ConsumerRecord<byte[], byte[]> record : records) {
+            observedStreamTime = Math.max(observedStreamTime, record.timestamp());
+        }
+
+        final VersionedStoreClient<?> restoreClient = restoreWriteBuffer.getClient();
+
+        // TODO: handle potential out of memory in this process

Review Comment:
   Can we do this? How?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13189: KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13189:
URL: https://github.com/apache/kafka/pull/13189#discussion_r1100920025


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer.java:
##########
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.BatchWritingVersionedStoreClient;
+import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreClient;
+import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreSegment;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A write buffer for use in restoring a {@link RocksDBVersionedStore} from its changelog. This
+ * class exposes a {@link VersionedStoreClient} to put records into the write buffer, which may
+ * then be flushed to the store via {@link WriteBatch}es, for improved write efficiency during
+ * restoration.
+ * <p>
+ * The structure of the internals of this write buffer mirrors the structure of the
+ * {@code RocksDBVersionedStore} itself, i.e., data for the latest value store and each of the
+ * segment stores is buffered in a separate object -- specifically, a map.
+ */
+public class RocksDBVersionedStoreRestoreWriteBuffer {
+
+    private static final Logger log = LoggerFactory.getLogger(RocksDBVersionedStoreRestoreWriteBuffer.class);
+
+    // write buffer for latest value store. value type is Optional in order to track tombstones
+    // which must be written to the underlying store.
+    private final Map<Bytes, Optional<byte[]>> latestValueWriteBuffer;
+    // map from segment id to write buffer. segments are stored in reverse-sorted order,
+    // so getReverseSegments() is more efficient
+    private final TreeMap<Long, WriteBufferSegmentWithDbFallback> segmentsWriteBuffer;
+    private final BatchWritingVersionedStoreClient<LogicalKeyValueSegment> dbClient;
+    private final RocksDBVersionedStoreRestoreClient restoreClient;
+
+    /**
+     * Creates a new write buffer.
+     * @param dbClient client for reading from and writing to the underlying persistent store
+     */
+    RocksDBVersionedStoreRestoreWriteBuffer(final BatchWritingVersionedStoreClient<LogicalKeyValueSegment> dbClient) {
+        this.dbClient = Objects.requireNonNull(dbClient);
+
+        this.latestValueWriteBuffer = new HashMap<>();
+        // store in reverse-sorted order, to make getReverseSegments() more efficient
+        this.segmentsWriteBuffer = new TreeMap<>((x, y) -> Long.compare(y, x));
+        this.restoreClient = new RocksDBVersionedStoreRestoreClient();
+    }
+
+    /**
+     * @return client for writing to (and reading from) the write buffer
+     */
+    VersionedStoreClient<?> getClient() {
+        return restoreClient;
+    }
+
+    /**
+     * Flushes the contents of the write buffer into the persistent store, and clears the write
+     * buffer in the process.
+     * @throws RocksDBException if a failure occurs adding to or writing a {@link WriteBatch}
+     */
+    void flush() throws RocksDBException {
+
+        // flush segments first, as this is consistent with the store always writing to
+        // older segments/stores before later ones
+        try (final WriteBatch segmentsBatch = new WriteBatch()) {
+            final List<WriteBufferSegmentWithDbFallback> allSegments = restoreClient.getReverseSegments(Long.MIN_VALUE);
+            if (allSegments.size() > 0) {
+                // collect entries into write batch
+                for (final WriteBufferSegmentWithDbFallback bufferSegment : allSegments) {
+                    final LogicalKeyValueSegment dbSegment = bufferSegment.dbSegment();
+                    for (final Map.Entry<Bytes, byte[]> segmentEntry : bufferSegment.getAll()) {
+                        dbSegment.addToBatch(
+                            new KeyValue<>(segmentEntry.getKey().get(), segmentEntry.getValue()),
+                            segmentsBatch);
+                    }
+                }
+
+                // write to db. all the logical segments share the same physical store,
+                // so we can use any segment to perform the write
+                allSegments.get(0).dbSegment().write(segmentsBatch);
+            }
+        } catch (final RocksDBException e) {
+            log.error("Error restoring batch to RocksDBVersionedStore segments store.");
+            throw e;
+        }
+        segmentsWriteBuffer.clear();
+
+        // flush latest value store
+        try (final WriteBatch latestValueBatch = new WriteBatch()) {
+            // collect entries into write batch
+            for (final Map.Entry<Bytes, Optional<byte[]>> latestValueEntry : latestValueWriteBuffer.entrySet()) {
+                final byte[] value = latestValueEntry.getValue().orElse(null);
+                dbClient.addToLatestValueBatch(
+                    new KeyValue<>(latestValueEntry.getKey().get(), value),
+                    latestValueBatch);
+            }
+
+            // write to db
+            dbClient.writeLatestValues(latestValueBatch);
+        } catch (final RocksDBException e) {
+            log.error("Error restoring batch to RocksDBVersionedStore latest value store.");
+            throw e;
+        }
+        latestValueWriteBuffer.clear();
+    }
+
+    /**
+     * The object representation of the write buffer corresponding to a single segment store.
+     * Contains the write buffer itself (a simple hash map) and also a reference to the underlying
+     * persistent segment store.
+     */
+    private class WriteBufferSegmentWithDbFallback implements VersionedStoreSegment {
+
+        private final long id;
+        private final Map<Bytes, byte[]> data;
+        private final LogicalKeyValueSegment dbSegment;
+
+        WriteBufferSegmentWithDbFallback(final LogicalKeyValueSegment dbSegment) {
+            this.dbSegment = Objects.requireNonNull(dbSegment);
+            this.id = dbSegment.id();
+            this.data = new HashMap<>();
+
+            // register segment with segments store
+            segmentsWriteBuffer.put(id, this);
+        }
+
+        LogicalKeyValueSegment dbSegment() {
+            return dbSegment;
+        }
+
+        @Override
+        public long id() {
+            return id;
+        }
+
+        @Override
+        public void put(final Bytes key, final byte[] value) {
+            // all writes go to the write buffer
+            data.put(key, value);
+        }
+
+        @Override
+        public byte[] get(final Bytes key) {
+            final byte[] bufferValue = data.get(key);
+            if (bufferValue != null) {
+                return bufferValue;
+            }
+            return dbSegment.get(key);
+        }
+
+        Set<Entry<Bytes, byte[]>> getAll() {
+            return data.entrySet();
+        }
+    }
+
+    /**
+     * Client for writing to (and reading from) the write buffer as part of restore.
+     */
+    private class RocksDBVersionedStoreRestoreClient implements VersionedStoreClient<WriteBufferSegmentWithDbFallback> {
+
+        @Override
+        public byte[] getLatestValue(final Bytes key) {
+            final Optional<byte[]> bufferValue = latestValueWriteBuffer.get(key);
+            if (bufferValue != null) {
+                return bufferValue.orElse(null);
+            }
+            return dbClient.getLatestValue(key);
+        }
+
+        @Override
+        public void putLatestValue(final Bytes key, final byte[] value) {
+            // all writes go to write buffer
+            latestValueWriteBuffer.put(key, Optional.ofNullable(value));
+        }
+
+        @Override
+        public void deleteLatestValue(final Bytes key) {
+            putLatestValue(key, null);
+        }
+
+        @Override
+        public WriteBufferSegmentWithDbFallback getOrCreateSegmentIfLive(final long segmentId, final ProcessorContext context, final long streamTime) {
+            if (!segmentsWriteBuffer.containsKey(segmentId)) {

Review Comment:
   Can we change the flow (easier to read):
   ```
   if (segmentsWriteBuffer.containsKey(segmentId)) {
     return segmentsWriteBuffer.get(segmentId);
   }
   
   // do what lives in "then" path right now.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13189: KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13189:
URL: https://github.com/apache/kafka/pull/13189#discussion_r1102259793


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer.java:
##########
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.BatchWritingVersionedStoreClient;
+import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreClient;
+import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreSegment;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A write buffer for use in restoring a {@link RocksDBVersionedStore} from its changelog. This
+ * class exposes a {@link VersionedStoreClient} to put records into the write buffer, which may
+ * then be flushed to the store via {@link WriteBatch}es, for improved write efficiency during
+ * restoration.
+ * <p>
+ * The structure of the internals of this write buffer mirrors the structure of the
+ * {@code RocksDBVersionedStore} itself, i.e., data for the latest value store and each of the
+ * segment stores is buffered in a separate object -- specifically, a map.
+ */
+public class RocksDBVersionedStoreRestoreWriteBuffer {
+
+    private static final Logger log = LoggerFactory.getLogger(RocksDBVersionedStoreRestoreWriteBuffer.class);
+
+    // write buffer for latest value store. value type is Optional in order to track tombstones
+    // which must be written to the underlying store.
+    private final Map<Bytes, Optional<byte[]>> latestValueWriteBuffer;
+    // map from segment id to write buffer. segments are stored in reverse-sorted order,
+    // so getReverseSegments() is more efficient
+    private final TreeMap<Long, WriteBufferSegmentWithDbFallback> segmentsWriteBuffer;
+    private final BatchWritingVersionedStoreClient<LogicalKeyValueSegment> dbClient;
+    private final RocksDBVersionedStoreRestoreClient restoreClient;
+
+    /**
+     * Creates a new write buffer.
+     * @param dbClient client for reading from and writing to the underlying persistent store
+     */
+    RocksDBVersionedStoreRestoreWriteBuffer(final BatchWritingVersionedStoreClient<LogicalKeyValueSegment> dbClient) {
+        this.dbClient = Objects.requireNonNull(dbClient);
+
+        this.latestValueWriteBuffer = new HashMap<>();
+        // store in reverse-sorted order, to make getReverseSegments() more efficient
+        this.segmentsWriteBuffer = new TreeMap<>((x, y) -> Long.compare(y, x));
+        this.restoreClient = new RocksDBVersionedStoreRestoreClient();
+    }
+
+    /**
+     * @return client for writing to (and reading from) the write buffer
+     */
+    VersionedStoreClient<?> getClient() {
+        return restoreClient;
+    }
+
+    /**
+     * Flushes the contents of the write buffer into the persistent store, and clears the write
+     * buffer in the process.
+     * @throws RocksDBException if a failure occurs adding to or writing a {@link WriteBatch}
+     */
+    void flush() throws RocksDBException {
+
+        // flush segments first, as this is consistent with the store always writing to
+        // older segments/stores before later ones
+        try (final WriteBatch segmentsBatch = new WriteBatch()) {
+            final List<WriteBufferSegmentWithDbFallback> allSegments = restoreClient.getReverseSegments(Long.MIN_VALUE);
+            if (allSegments.size() > 0) {
+                // collect entries into write batch
+                for (final WriteBufferSegmentWithDbFallback bufferSegment : allSegments) {
+                    final LogicalKeyValueSegment dbSegment = bufferSegment.dbSegment();
+                    for (final Map.Entry<Bytes, byte[]> segmentEntry : bufferSegment.getAll()) {
+                        dbSegment.addToBatch(
+                            new KeyValue<>(segmentEntry.getKey().get(), segmentEntry.getValue()),
+                            segmentsBatch);
+                    }
+                }
+
+                // write to db. all the logical segments share the same physical store,
+                // so we can use any segment to perform the write
+                allSegments.get(0).dbSegment().write(segmentsBatch);
+            }
+        } catch (final RocksDBException e) {
+            log.error("Error restoring batch to RocksDBVersionedStore segments store.");
+            throw e;
+        }
+        segmentsWriteBuffer.clear();
+
+        // flush latest value store
+        try (final WriteBatch latestValueBatch = new WriteBatch()) {
+            // collect entries into write batch
+            for (final Map.Entry<Bytes, Optional<byte[]>> latestValueEntry : latestValueWriteBuffer.entrySet()) {
+                final byte[] value = latestValueEntry.getValue().orElse(null);
+                dbClient.addToLatestValueBatch(
+                    new KeyValue<>(latestValueEntry.getKey().get(), value),
+                    latestValueBatch);
+            }
+
+            // write to db
+            dbClient.writeLatestValues(latestValueBatch);
+        } catch (final RocksDBException e) {
+            log.error("Error restoring batch to RocksDBVersionedStore latest value store.");
+            throw e;
+        }
+        latestValueWriteBuffer.clear();
+    }
+
+    /**
+     * The object representation of the write buffer corresponding to a single segment store.
+     * Contains the write buffer itself (a simple hash map) and also a reference to the underlying
+     * persistent segment store.
+     */
+    private class WriteBufferSegmentWithDbFallback implements VersionedStoreSegment {
+
+        private final long id;
+        private final Map<Bytes, byte[]> data;
+        private final LogicalKeyValueSegment dbSegment;
+
+        WriteBufferSegmentWithDbFallback(final LogicalKeyValueSegment dbSegment) {
+            this.dbSegment = Objects.requireNonNull(dbSegment);
+            this.id = dbSegment.id();
+            this.data = new HashMap<>();
+
+            // register segment with segments store
+            segmentsWriteBuffer.put(id, this);
+        }
+
+        LogicalKeyValueSegment dbSegment() {
+            return dbSegment;
+        }
+
+        @Override
+        public long id() {
+            return id;
+        }
+
+        @Override
+        public void put(final Bytes key, final byte[] value) {
+            // all writes go to the write buffer
+            data.put(key, value);
+        }
+
+        @Override
+        public byte[] get(final Bytes key) {
+            final byte[] bufferValue = data.get(key);
+            if (bufferValue != null) {
+                return bufferValue;
+            }
+            return dbSegment.get(key);
+        }
+
+        Set<Entry<Bytes, byte[]>> getAll() {

Review Comment:
   I don't think that a deep copy is required to guard the map from modification (`return Collections.unmodifiableMap();` would do the trick). Given the limited scope who calls `getAll()` it should be fine to just put the guard in the impl without making it a contract of the API (we do this often inside KS code base).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13189: KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13189:
URL: https://github.com/apache/kafka/pull/13189#discussion_r1103148828


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -0,0 +1,877 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper;
+import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
+import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
+import org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapter;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+import org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue;
+import org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A persistent, versioned key-value store based on RocksDB.
+ * <p>
+ * This store implementation consists of a "latest value store" and "segment stores." The latest
+ * record version for each key is stored in the latest value store, while older record versions
+ * are stored in the segment stores. Conceptually, each record version has two associated
+ * timestamps:
+ * <ul>
+ *     <li>a {@code validFrom} timestamp. This timestamp is explicitly associated with the record
+ *     as part of the {@link VersionedKeyValueStore#put(Object, Object, long)}} call to the store;
+ *     i.e., this is the record's timestamp.</li>
+ *     <li>a {@code validTo} timestamp. This is the timestamp of the next record (or deletion)
+ *     associated with the same key, and is implicitly associated with the record. This timestamp
+ *     can change as new records are inserted into the store.</li>
+ * </ul>
+ * The validity interval of a record is from validFrom (inclusive) to validTo (exclusive), and
+ * can change as new record versions are inserted into the store (and validTo changes as a result).
+ * <p>
+ * Old record versions are stored in segment stores according to their validTo timestamps. This
+ * allows for efficient expiry of old record versions, as entire segments can be dropped from the
+ * store at a time, once the records contained in the segment are no longer relevant based on the
+ * store's history retention (for an explanation of "history retention", see
+ * {@link VersionedKeyValueStore}). Multiple record versions (for the same key) within a single
+ * segment are stored together using the format specified in {@link RocksDBVersionedStoreSegmentValueFormatter}.
+ */
+public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, byte[]> {
+    private static final Logger LOG = LoggerFactory.getLogger(RocksDBVersionedStore.class);
+    // a marker to indicate that no record version has yet been found as part of an ongoing
+    // put() procedure. any value which is not a valid record timestamp will do.
+    private static final long SENTINEL_TIMESTAMP = Long.MIN_VALUE;
+
+    private final String name;
+    private final long historyRetention;
+    private final RocksDBMetricsRecorder metricsRecorder;
+
+    private final RocksDBStore latestValueStore;
+    private final LogicalKeyValueSegments segmentStores;
+    private final LatestValueSchema latestValueSchema;
+    private final SegmentValueSchema segmentValueSchema;
+    private final BatchWritingVersionedStoreClient<LogicalKeyValueSegment> versionedStoreClient;
+    private final RocksDBVersionedStoreRestoreWriteBuffer restoreWriteBuffer;
+
+    private ProcessorContext context;
+    private StateStoreContext stateStoreContext;
+    private Sensor expiredRecordSensor;
+    private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+    private boolean consistencyEnabled = false;
+    private Position position;
+    private OffsetCheckpoint positionCheckpoint;
+    private volatile boolean open;
+
+    RocksDBVersionedStore(final String name, final String metricsScope, final long historyRetention, final long segmentInterval) {
+        this.name = name;
+        this.historyRetention = historyRetention;
+        this.metricsRecorder = new RocksDBMetricsRecorder(metricsScope, name);
+        this.latestValueStore = new RocksDBStore(lvsName(name), name, metricsRecorder);
+        this.segmentStores = new LogicalKeyValueSegments(segmentsName(name), name, historyRetention, segmentInterval, metricsRecorder);
+        this.latestValueSchema = new LatestValueSchema();
+        this.segmentValueSchema = new SegmentValueSchema();
+        this.versionedStoreClient = new RocksDBVersionedStoreClient();
+        this.restoreWriteBuffer = new RocksDBVersionedStoreRestoreWriteBuffer(versionedStoreClient);
+    }
+
+    @Override
+    public void put(final Bytes key, final byte[] value, final long timestamp) {
+
+        doPut(
+            latestValueSchema,
+            segmentValueSchema,
+            versionedStoreClient,
+            Optional.of(expiredRecordSensor),
+            context,
+            observedStreamTime,
+            historyRetention,
+            key,
+            value,
+            timestamp
+        );
+
+        observedStreamTime = Math.max(observedStreamTime, timestamp);
+    }
+
+    @Override
+    public VersionedRecord<byte[]> delete(final Bytes key, final long timestamp) {
+        final VersionedRecord<byte[]> existingRecord = get(key, timestamp);
+        put(key, null, timestamp);
+        return existingRecord;
+    }
+
+    @Override
+    public VersionedRecord<byte[]> get(final Bytes key) {
+        // latest value (if present) is guaranteed to be in the latest value store
+        final byte[] latestValue = latestValueStore.get(key);
+        if (latestValue != null) {
+            return new VersionedRecord<>(
+                latestValueSchema.getValue(latestValue),
+                latestValueSchema.getTimestamp(latestValue)
+            );
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public VersionedRecord<byte[]> get(final Bytes key, final long asOfTimestamp) {
+
+        if (asOfTimestamp < observedStreamTime - historyRetention) {
+            // history retention has elapsed. return null for predictability, even if data
+            // is still present in store.
+            return null;
+        }
+
+        // first check the latest value store
+        final byte[] latestValue = latestValueStore.get(key);
+        if (latestValue != null) {
+            final long latestTimestamp = latestValueSchema.getTimestamp(latestValue);
+            if (latestTimestamp <= asOfTimestamp) {
+                return new VersionedRecord<>(latestValueSchema.getValue(latestValue), latestTimestamp);
+            }
+        }
+
+        // check segment stores
+        final List<LogicalKeyValueSegment> segments = segmentStores.segments(asOfTimestamp, Long.MAX_VALUE, false);
+        for (final LogicalKeyValueSegment segment : segments) {
+            final byte[] segmentValue = segment.get(key);
+            if (segmentValue != null) {
+                final long nextTs = segmentValueSchema.getNextTimestamp(segmentValue);
+                if (nextTs <= asOfTimestamp) {
+                    // this segment contains no data for the queried timestamp, so earlier segments
+                    // cannot either
+                    return null;
+                }
+
+                if (segmentValueSchema.getMinTimestamp(segmentValue) > asOfTimestamp) {
+                    // the segment only contains data for after the queried timestamp. skip and
+                    // continue the search to earlier segments. as an optimization, this code
+                    // could be updated to skip forward to the segment containing the minTimestamp
+                    // in the if-condition above.
+                    continue;
+                }
+
+                // the desired result is contained in this segment
+                final SegmentSearchResult searchResult =
+                    segmentValueSchema.deserialize(segmentValue).find(asOfTimestamp, true);
+                if (searchResult.value() != null) {
+                    return new VersionedRecord<>(searchResult.value(), searchResult.validFrom());
+                } else {
+                    return null;
+                }
+            }
+        }
+
+        // checked all segments and no results found
+        return null;
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public void flush() {
+        // order shouldn't matter since failure to flush is a fatal exception
+        segmentStores.flush();
+        latestValueStore.flush();
+    }
+
+    @Override
+    public void close() {
+        open = false;
+
+        // close latest value store first so that calls to get() immediately begin to fail with
+        // store not open, as all calls to get() first get() from latest value store
+        latestValueStore.close();
+        segmentStores.close();
+    }
+
+    @Override
+    public boolean persistent() {
+        return true;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return open;
+    }
+
+    @Override
+    public Position getPosition() {
+        return position;
+    }
+
+    @Deprecated
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+        this.context = context;
+
+        final StreamsMetricsImpl metrics = ProcessorContextUtils.getMetricsImpl(context);
+        final String threadId = Thread.currentThread().getName();
+        final String taskName = context.taskId().toString();
+
+        expiredRecordSensor = TaskMetrics.droppedRecordsSensor(
+            threadId,
+            taskName,
+            metrics
+        );
+
+        metricsRecorder.init(ProcessorContextUtils.getMetricsImpl(context), context.taskId());
+
+        latestValueStore.openDB(context.appConfigs(), context.stateDir());
+        segmentStores.openExisting(context, observedStreamTime);
+
+        final File positionCheckpointFile = new File(context.stateDir(), name() + ".position");
+        this.positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile);
+        this.position = StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint);
+
+        // register and possibly restore the state from the logs
+        stateStoreContext.register(
+            root,
+            (RecordBatchingStateRestoreCallback) RocksDBVersionedStore.this::restoreBatch,
+            () -> StoreQueryUtils.checkpointPosition(positionCheckpoint, position)
+        );
+
+        open = true;
+
+        consistencyEnabled = StreamsConfig.InternalConfig.getBoolean(
+            context.appConfigs(),
+            IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED,
+            false
+        );
+    }
+
+    @Override
+    public void init(final StateStoreContext context, final StateStore root) {
+        this.stateStoreContext = context;
+        init(StoreToProcessorContextAdapter.adapt(context), root);
+    }
+
+    // VisibleForTesting
+    void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> records) {
+        // advance stream time to the max timestamp in the batch
+        for (final ConsumerRecord<byte[], byte[]> record : records) {
+            observedStreamTime = Math.max(observedStreamTime, record.timestamp());
+        }
+
+        final VersionedStoreClient<?> restoreClient = restoreWriteBuffer.getClient();
+
+        // TODO: handle potential out of memory in this process

Review Comment:
   Because of the way that the versioned store implementation stores multiple records (for the same key) together in a single RocksDB "segment" entry, the chance that the new versioned store implementation hits an OOM could be significantly higher than for existing stores -- restoring a single changelog entry could require loading multiple records into memory. How high this memory amplification will be is very much dependent on the specific workload and the value of the segment interval parameter.
   
   In light of this, it feels like there's a case to be made that we should handle OOM for versioned stores, but if you don't feel it's urgent I'm happy to defer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13189: KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13189:
URL: https://github.com/apache/kafka/pull/13189#discussion_r1100910560


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer.java:
##########
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.BatchWritingVersionedStoreClient;
+import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreClient;
+import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreSegment;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A write buffer for use in restoring a {@link RocksDBVersionedStore} from its changelog. This
+ * class exposes a {@link VersionedStoreClient} to put records into the write buffer, which may
+ * then be flushed to the store via {@link WriteBatch}es, for improved write efficiency during
+ * restoration.
+ * <p>
+ * The structure of the internals of this write buffer mirrors the structure of the
+ * {@code RocksDBVersionedStore} itself, i.e., data for the latest value store and each of the
+ * segment stores is buffered in a separate object -- specifically, a map.
+ */
+public class RocksDBVersionedStoreRestoreWriteBuffer {
+
+    private static final Logger log = LoggerFactory.getLogger(RocksDBVersionedStoreRestoreWriteBuffer.class);
+
+    // write buffer for latest value store. value type is Optional in order to track tombstones
+    // which must be written to the underlying store.
+    private final Map<Bytes, Optional<byte[]>> latestValueWriteBuffer;
+    // map from segment id to write buffer. segments are stored in reverse-sorted order,
+    // so getReverseSegments() is more efficient
+    private final TreeMap<Long, WriteBufferSegmentWithDbFallback> segmentsWriteBuffer;
+    private final BatchWritingVersionedStoreClient<LogicalKeyValueSegment> dbClient;
+    private final RocksDBVersionedStoreRestoreClient restoreClient;
+
+    /**
+     * Creates a new write buffer.
+     * @param dbClient client for reading from and writing to the underlying persistent store
+     */
+    RocksDBVersionedStoreRestoreWriteBuffer(final BatchWritingVersionedStoreClient<LogicalKeyValueSegment> dbClient) {
+        this.dbClient = Objects.requireNonNull(dbClient);
+
+        this.latestValueWriteBuffer = new HashMap<>();
+        // store in reverse-sorted order, to make getReverseSegments() more efficient
+        this.segmentsWriteBuffer = new TreeMap<>((x, y) -> Long.compare(y, x));
+        this.restoreClient = new RocksDBVersionedStoreRestoreClient();
+    }
+
+    /**
+     * @return client for writing to (and reading from) the write buffer
+     */
+    VersionedStoreClient<?> getClient() {
+        return restoreClient;
+    }
+
+    /**
+     * Flushes the contents of the write buffer into the persistent store, and clears the write
+     * buffer in the process.
+     * @throws RocksDBException if a failure occurs adding to or writing a {@link WriteBatch}
+     */
+    void flush() throws RocksDBException {
+
+        // flush segments first, as this is consistent with the store always writing to
+        // older segments/stores before later ones
+        try (final WriteBatch segmentsBatch = new WriteBatch()) {
+            final List<WriteBufferSegmentWithDbFallback> allSegments = restoreClient.getReverseSegments(Long.MIN_VALUE);
+            if (allSegments.size() > 0) {
+                // collect entries into write batch
+                for (final WriteBufferSegmentWithDbFallback bufferSegment : allSegments) {
+                    final LogicalKeyValueSegment dbSegment = bufferSegment.dbSegment();
+                    for (final Map.Entry<Bytes, byte[]> segmentEntry : bufferSegment.getAll()) {
+                        dbSegment.addToBatch(
+                            new KeyValue<>(segmentEntry.getKey().get(), segmentEntry.getValue()),
+                            segmentsBatch);
+                    }
+                }
+
+                // write to db. all the logical segments share the same physical store,
+                // so we can use any segment to perform the write
+                allSegments.get(0).dbSegment().write(segmentsBatch);
+            }
+        } catch (final RocksDBException e) {
+            log.error("Error restoring batch to RocksDBVersionedStore segments store.");
+            throw e;
+        }
+        segmentsWriteBuffer.clear();
+
+        // flush latest value store
+        try (final WriteBatch latestValueBatch = new WriteBatch()) {
+            // collect entries into write batch
+            for (final Map.Entry<Bytes, Optional<byte[]>> latestValueEntry : latestValueWriteBuffer.entrySet()) {
+                final byte[] value = latestValueEntry.getValue().orElse(null);
+                dbClient.addToLatestValueBatch(
+                    new KeyValue<>(latestValueEntry.getKey().get(), value),
+                    latestValueBatch);
+            }
+
+            // write to db
+            dbClient.writeLatestValues(latestValueBatch);
+        } catch (final RocksDBException e) {
+            log.error("Error restoring batch to RocksDBVersionedStore latest value store.");
+            throw e;
+        }
+        latestValueWriteBuffer.clear();
+    }
+
+    /**
+     * The object representation of the write buffer corresponding to a single segment store.
+     * Contains the write buffer itself (a simple hash map) and also a reference to the underlying
+     * persistent segment store.
+     */
+    private class WriteBufferSegmentWithDbFallback implements VersionedStoreSegment {
+
+        private final long id;
+        private final Map<Bytes, byte[]> data;
+        private final LogicalKeyValueSegment dbSegment;
+
+        WriteBufferSegmentWithDbFallback(final LogicalKeyValueSegment dbSegment) {
+            this.dbSegment = Objects.requireNonNull(dbSegment);
+            this.id = dbSegment.id();
+            this.data = new HashMap<>();
+
+            // register segment with segments store
+            segmentsWriteBuffer.put(id, this);
+        }
+
+        LogicalKeyValueSegment dbSegment() {
+            return dbSegment;
+        }
+
+        @Override
+        public long id() {
+            return id;
+        }
+
+        @Override
+        public void put(final Bytes key, final byte[] value) {
+            // all writes go to the write buffer
+            data.put(key, value);
+        }
+
+        @Override
+        public byte[] get(final Bytes key) {
+            final byte[] bufferValue = data.get(key);
+            if (bufferValue != null) {
+                return bufferValue;
+            }
+            return dbSegment.get(key);
+        }
+
+        Set<Entry<Bytes, byte[]>> getAll() {

Review Comment:
   Why return `Set<Entry>` here and not just the `data` `Map`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13189: KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13189:
URL: https://github.com/apache/kafka/pull/13189#discussion_r1102257544


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -0,0 +1,877 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper;
+import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
+import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
+import org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapter;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+import org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue;
+import org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A persistent, versioned key-value store based on RocksDB.
+ * <p>
+ * This store implementation consists of a "latest value store" and "segment stores." The latest
+ * record version for each key is stored in the latest value store, while older record versions
+ * are stored in the segment stores. Conceptually, each record version has two associated
+ * timestamps:
+ * <ul>
+ *     <li>a {@code validFrom} timestamp. This timestamp is explicitly associated with the record
+ *     as part of the {@link VersionedKeyValueStore#put(Object, Object, long)}} call to the store;
+ *     i.e., this is the record's timestamp.</li>
+ *     <li>a {@code validTo} timestamp. This is the timestamp of the next record (or deletion)
+ *     associated with the same key, and is implicitly associated with the record. This timestamp
+ *     can change as new records are inserted into the store.</li>
+ * </ul>
+ * The validity interval of a record is from validFrom (inclusive) to validTo (exclusive), and
+ * can change as new record versions are inserted into the store (and validTo changes as a result).
+ * <p>
+ * Old record versions are stored in segment stores according to their validTo timestamps. This
+ * allows for efficient expiry of old record versions, as entire segments can be dropped from the
+ * store at a time, once the records contained in the segment are no longer relevant based on the
+ * store's history retention (for an explanation of "history retention", see
+ * {@link VersionedKeyValueStore}). Multiple record versions (for the same key) within a single
+ * segment are stored together using the format specified in {@link RocksDBVersionedStoreSegmentValueFormatter}.
+ */
+public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, byte[]> {
+    private static final Logger LOG = LoggerFactory.getLogger(RocksDBVersionedStore.class);
+    // a marker to indicate that no record version has yet been found as part of an ongoing
+    // put() procedure. any value which is not a valid record timestamp will do.
+    private static final long SENTINEL_TIMESTAMP = Long.MIN_VALUE;
+
+    private final String name;
+    private final long historyRetention;
+    private final RocksDBMetricsRecorder metricsRecorder;
+
+    private final RocksDBStore latestValueStore;
+    private final LogicalKeyValueSegments segmentStores;
+    private final LatestValueSchema latestValueSchema;
+    private final SegmentValueSchema segmentValueSchema;
+    private final BatchWritingVersionedStoreClient<LogicalKeyValueSegment> versionedStoreClient;
+    private final RocksDBVersionedStoreRestoreWriteBuffer restoreWriteBuffer;
+
+    private ProcessorContext context;
+    private StateStoreContext stateStoreContext;
+    private Sensor expiredRecordSensor;
+    private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+    private boolean consistencyEnabled = false;
+    private Position position;
+    private OffsetCheckpoint positionCheckpoint;
+    private volatile boolean open;
+
+    RocksDBVersionedStore(final String name, final String metricsScope, final long historyRetention, final long segmentInterval) {
+        this.name = name;
+        this.historyRetention = historyRetention;
+        this.metricsRecorder = new RocksDBMetricsRecorder(metricsScope, name);
+        this.latestValueStore = new RocksDBStore(lvsName(name), name, metricsRecorder);
+        this.segmentStores = new LogicalKeyValueSegments(segmentsName(name), name, historyRetention, segmentInterval, metricsRecorder);
+        this.latestValueSchema = new LatestValueSchema();
+        this.segmentValueSchema = new SegmentValueSchema();
+        this.versionedStoreClient = new RocksDBVersionedStoreClient();
+        this.restoreWriteBuffer = new RocksDBVersionedStoreRestoreWriteBuffer(versionedStoreClient);
+    }
+
+    @Override
+    public void put(final Bytes key, final byte[] value, final long timestamp) {
+
+        doPut(
+            latestValueSchema,
+            segmentValueSchema,
+            versionedStoreClient,
+            Optional.of(expiredRecordSensor),
+            context,
+            observedStreamTime,
+            historyRetention,
+            key,
+            value,
+            timestamp
+        );
+
+        observedStreamTime = Math.max(observedStreamTime, timestamp);
+    }
+
+    @Override
+    public VersionedRecord<byte[]> delete(final Bytes key, final long timestamp) {
+        final VersionedRecord<byte[]> existingRecord = get(key, timestamp);
+        put(key, null, timestamp);
+        return existingRecord;
+    }
+
+    @Override
+    public VersionedRecord<byte[]> get(final Bytes key) {
+        // latest value (if present) is guaranteed to be in the latest value store
+        final byte[] latestValue = latestValueStore.get(key);
+        if (latestValue != null) {
+            return new VersionedRecord<>(
+                latestValueSchema.getValue(latestValue),
+                latestValueSchema.getTimestamp(latestValue)
+            );
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public VersionedRecord<byte[]> get(final Bytes key, final long asOfTimestamp) {
+
+        if (asOfTimestamp < observedStreamTime - historyRetention) {
+            // history retention has elapsed. return null for predictability, even if data
+            // is still present in store.
+            return null;
+        }
+
+        // first check the latest value store
+        final byte[] latestValue = latestValueStore.get(key);
+        if (latestValue != null) {
+            final long latestTimestamp = latestValueSchema.getTimestamp(latestValue);
+            if (latestTimestamp <= asOfTimestamp) {
+                return new VersionedRecord<>(latestValueSchema.getValue(latestValue), latestTimestamp);
+            }
+        }
+
+        // check segment stores
+        final List<LogicalKeyValueSegment> segments = segmentStores.segments(asOfTimestamp, Long.MAX_VALUE, false);
+        for (final LogicalKeyValueSegment segment : segments) {
+            final byte[] segmentValue = segment.get(key);
+            if (segmentValue != null) {
+                final long nextTs = segmentValueSchema.getNextTimestamp(segmentValue);
+                if (nextTs <= asOfTimestamp) {
+                    // this segment contains no data for the queried timestamp, so earlier segments
+                    // cannot either
+                    return null;
+                }
+
+                if (segmentValueSchema.getMinTimestamp(segmentValue) > asOfTimestamp) {
+                    // the segment only contains data for after the queried timestamp. skip and
+                    // continue the search to earlier segments. as an optimization, this code
+                    // could be updated to skip forward to the segment containing the minTimestamp
+                    // in the if-condition above.
+                    continue;
+                }
+
+                // the desired result is contained in this segment
+                final SegmentSearchResult searchResult =
+                    segmentValueSchema.deserialize(segmentValue).find(asOfTimestamp, true);
+                if (searchResult.value() != null) {
+                    return new VersionedRecord<>(searchResult.value(), searchResult.validFrom());
+                } else {
+                    return null;
+                }
+            }
+        }
+
+        // checked all segments and no results found
+        return null;
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public void flush() {
+        // order shouldn't matter since failure to flush is a fatal exception
+        segmentStores.flush();
+        latestValueStore.flush();
+    }
+
+    @Override
+    public void close() {
+        open = false;
+
+        // close latest value store first so that calls to get() immediately begin to fail with
+        // store not open, as all calls to get() first get() from latest value store
+        latestValueStore.close();
+        segmentStores.close();
+    }
+
+    @Override
+    public boolean persistent() {
+        return true;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return open;
+    }
+
+    @Override
+    public Position getPosition() {
+        return position;
+    }
+
+    @Deprecated
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+        this.context = context;
+
+        final StreamsMetricsImpl metrics = ProcessorContextUtils.getMetricsImpl(context);
+        final String threadId = Thread.currentThread().getName();
+        final String taskName = context.taskId().toString();
+
+        expiredRecordSensor = TaskMetrics.droppedRecordsSensor(
+            threadId,
+            taskName,
+            metrics
+        );
+
+        metricsRecorder.init(ProcessorContextUtils.getMetricsImpl(context), context.taskId());
+
+        latestValueStore.openDB(context.appConfigs(), context.stateDir());
+        segmentStores.openExisting(context, observedStreamTime);
+
+        final File positionCheckpointFile = new File(context.stateDir(), name() + ".position");
+        this.positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile);
+        this.position = StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint);
+
+        // register and possibly restore the state from the logs
+        stateStoreContext.register(
+            root,
+            (RecordBatchingStateRestoreCallback) RocksDBVersionedStore.this::restoreBatch,
+            () -> StoreQueryUtils.checkpointPosition(positionCheckpoint, position)
+        );
+
+        open = true;
+
+        consistencyEnabled = StreamsConfig.InternalConfig.getBoolean(
+            context.appConfigs(),
+            IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED,
+            false
+        );
+    }
+
+    @Override
+    public void init(final StateStoreContext context, final StateStore root) {
+        this.stateStoreContext = context;
+        init(StoreToProcessorContextAdapter.adapt(context), root);
+    }
+
+    // VisibleForTesting
+    void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> records) {
+        // advance stream time to the max timestamp in the batch
+        for (final ConsumerRecord<byte[], byte[]> record : records) {
+            observedStreamTime = Math.max(observedStreamTime, record.timestamp());
+        }
+
+        final VersionedStoreClient<?> restoreClient = restoreWriteBuffer.getClient();
+
+        // TODO: handle potential out of memory in this process

Review Comment:
   We don't try to handle `OutOfMemoryException` so far, thus, I think it's ok to not start (and if we want to start, make a plan and do it everywhere).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13189: KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13189:
URL: https://github.com/apache/kafka/pull/13189#discussion_r1100920982


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer.java:
##########
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.BatchWritingVersionedStoreClient;
+import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreClient;
+import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreSegment;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A write buffer for use in restoring a {@link RocksDBVersionedStore} from its changelog. This
+ * class exposes a {@link VersionedStoreClient} to put records into the write buffer, which may
+ * then be flushed to the store via {@link WriteBatch}es, for improved write efficiency during
+ * restoration.
+ * <p>
+ * The structure of the internals of this write buffer mirrors the structure of the
+ * {@code RocksDBVersionedStore} itself, i.e., data for the latest value store and each of the
+ * segment stores is buffered in a separate object -- specifically, a map.
+ */
+public class RocksDBVersionedStoreRestoreWriteBuffer {
+
+    private static final Logger log = LoggerFactory.getLogger(RocksDBVersionedStoreRestoreWriteBuffer.class);
+
+    // write buffer for latest value store. value type is Optional in order to track tombstones
+    // which must be written to the underlying store.
+    private final Map<Bytes, Optional<byte[]>> latestValueWriteBuffer;
+    // map from segment id to write buffer. segments are stored in reverse-sorted order,
+    // so getReverseSegments() is more efficient
+    private final TreeMap<Long, WriteBufferSegmentWithDbFallback> segmentsWriteBuffer;
+    private final BatchWritingVersionedStoreClient<LogicalKeyValueSegment> dbClient;
+    private final RocksDBVersionedStoreRestoreClient restoreClient;
+
+    /**
+     * Creates a new write buffer.
+     * @param dbClient client for reading from and writing to the underlying persistent store
+     */
+    RocksDBVersionedStoreRestoreWriteBuffer(final BatchWritingVersionedStoreClient<LogicalKeyValueSegment> dbClient) {
+        this.dbClient = Objects.requireNonNull(dbClient);
+
+        this.latestValueWriteBuffer = new HashMap<>();
+        // store in reverse-sorted order, to make getReverseSegments() more efficient
+        this.segmentsWriteBuffer = new TreeMap<>((x, y) -> Long.compare(y, x));
+        this.restoreClient = new RocksDBVersionedStoreRestoreClient();
+    }
+
+    /**
+     * @return client for writing to (and reading from) the write buffer
+     */
+    VersionedStoreClient<?> getClient() {
+        return restoreClient;
+    }
+
+    /**
+     * Flushes the contents of the write buffer into the persistent store, and clears the write
+     * buffer in the process.
+     * @throws RocksDBException if a failure occurs adding to or writing a {@link WriteBatch}
+     */
+    void flush() throws RocksDBException {
+
+        // flush segments first, as this is consistent with the store always writing to
+        // older segments/stores before later ones
+        try (final WriteBatch segmentsBatch = new WriteBatch()) {
+            final List<WriteBufferSegmentWithDbFallback> allSegments = restoreClient.getReverseSegments(Long.MIN_VALUE);
+            if (allSegments.size() > 0) {
+                // collect entries into write batch
+                for (final WriteBufferSegmentWithDbFallback bufferSegment : allSegments) {
+                    final LogicalKeyValueSegment dbSegment = bufferSegment.dbSegment();
+                    for (final Map.Entry<Bytes, byte[]> segmentEntry : bufferSegment.getAll()) {
+                        dbSegment.addToBatch(
+                            new KeyValue<>(segmentEntry.getKey().get(), segmentEntry.getValue()),
+                            segmentsBatch);
+                    }
+                }
+
+                // write to db. all the logical segments share the same physical store,
+                // so we can use any segment to perform the write
+                allSegments.get(0).dbSegment().write(segmentsBatch);
+            }
+        } catch (final RocksDBException e) {
+            log.error("Error restoring batch to RocksDBVersionedStore segments store.");
+            throw e;
+        }
+        segmentsWriteBuffer.clear();
+
+        // flush latest value store
+        try (final WriteBatch latestValueBatch = new WriteBatch()) {
+            // collect entries into write batch
+            for (final Map.Entry<Bytes, Optional<byte[]>> latestValueEntry : latestValueWriteBuffer.entrySet()) {
+                final byte[] value = latestValueEntry.getValue().orElse(null);
+                dbClient.addToLatestValueBatch(
+                    new KeyValue<>(latestValueEntry.getKey().get(), value),
+                    latestValueBatch);
+            }
+
+            // write to db
+            dbClient.writeLatestValues(latestValueBatch);
+        } catch (final RocksDBException e) {
+            log.error("Error restoring batch to RocksDBVersionedStore latest value store.");
+            throw e;
+        }
+        latestValueWriteBuffer.clear();
+    }
+
+    /**
+     * The object representation of the write buffer corresponding to a single segment store.
+     * Contains the write buffer itself (a simple hash map) and also a reference to the underlying
+     * persistent segment store.
+     */
+    private class WriteBufferSegmentWithDbFallback implements VersionedStoreSegment {
+
+        private final long id;
+        private final Map<Bytes, byte[]> data;
+        private final LogicalKeyValueSegment dbSegment;
+
+        WriteBufferSegmentWithDbFallback(final LogicalKeyValueSegment dbSegment) {
+            this.dbSegment = Objects.requireNonNull(dbSegment);
+            this.id = dbSegment.id();
+            this.data = new HashMap<>();
+
+            // register segment with segments store
+            segmentsWriteBuffer.put(id, this);
+        }
+
+        LogicalKeyValueSegment dbSegment() {
+            return dbSegment;
+        }
+
+        @Override
+        public long id() {
+            return id;
+        }
+
+        @Override
+        public void put(final Bytes key, final byte[] value) {
+            // all writes go to the write buffer
+            data.put(key, value);
+        }
+
+        @Override
+        public byte[] get(final Bytes key) {
+            final byte[] bufferValue = data.get(key);
+            if (bufferValue != null) {
+                return bufferValue;
+            }
+            return dbSegment.get(key);
+        }
+
+        Set<Entry<Bytes, byte[]>> getAll() {
+            return data.entrySet();
+        }
+    }
+
+    /**
+     * Client for writing to (and reading from) the write buffer as part of restore.
+     */
+    private class RocksDBVersionedStoreRestoreClient implements VersionedStoreClient<WriteBufferSegmentWithDbFallback> {
+
+        @Override
+        public byte[] getLatestValue(final Bytes key) {
+            final Optional<byte[]> bufferValue = latestValueWriteBuffer.get(key);
+            if (bufferValue != null) {
+                return bufferValue.orElse(null);
+            }
+            return dbClient.getLatestValue(key);
+        }
+
+        @Override
+        public void putLatestValue(final Bytes key, final byte[] value) {
+            // all writes go to write buffer
+            latestValueWriteBuffer.put(key, Optional.ofNullable(value));
+        }
+
+        @Override
+        public void deleteLatestValue(final Bytes key) {
+            putLatestValue(key, null);
+        }
+
+        @Override
+        public WriteBufferSegmentWithDbFallback getOrCreateSegmentIfLive(final long segmentId, final ProcessorContext context, final long streamTime) {
+            if (!segmentsWriteBuffer.containsKey(segmentId)) {
+                final LogicalKeyValueSegment dbSegment = dbClient.getOrCreateSegmentIfLive(segmentId, context, streamTime);
+                if (dbSegment == null) {
+                    // segment is not live
+                    return null;
+                }
+                // creating a new segment automatically registers it with the segments store
+                return new WriteBufferSegmentWithDbFallback(dbSegment);
+            }
+            return segmentsWriteBuffer.get(segmentId);
+        }
+
+        @Override
+        public List<WriteBufferSegmentWithDbFallback> getReverseSegments(final long timestampFrom) {
+            // head and not tail because the map is sorted in reverse order
+            final long segmentFrom = segmentIdForTimestamp(timestampFrom);
+            final List<WriteBufferSegmentWithDbFallback> bufferSegments =
+                new ArrayList<>(segmentsWriteBuffer.headMap(segmentFrom, true).values());
+
+            final List<LogicalKeyValueSegment> dbSegments = dbClient.getReverseSegments(timestampFrom);
+
+            // merge segments from db with segments from write buffer
+            final List<WriteBufferSegmentWithDbFallback> allSegments = new ArrayList<>();
+            int dbInd = 0;

Review Comment:
   `Ind` == `Index` (as always: avoid abbreviations)



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer.java:
##########
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.BatchWritingVersionedStoreClient;
+import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreClient;
+import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreSegment;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A write buffer for use in restoring a {@link RocksDBVersionedStore} from its changelog. This
+ * class exposes a {@link VersionedStoreClient} to put records into the write buffer, which may
+ * then be flushed to the store via {@link WriteBatch}es, for improved write efficiency during
+ * restoration.
+ * <p>
+ * The structure of the internals of this write buffer mirrors the structure of the
+ * {@code RocksDBVersionedStore} itself, i.e., data for the latest value store and each of the
+ * segment stores is buffered in a separate object -- specifically, a map.
+ */
+public class RocksDBVersionedStoreRestoreWriteBuffer {
+
+    private static final Logger log = LoggerFactory.getLogger(RocksDBVersionedStoreRestoreWriteBuffer.class);
+
+    // write buffer for latest value store. value type is Optional in order to track tombstones
+    // which must be written to the underlying store.
+    private final Map<Bytes, Optional<byte[]>> latestValueWriteBuffer;
+    // map from segment id to write buffer. segments are stored in reverse-sorted order,
+    // so getReverseSegments() is more efficient
+    private final TreeMap<Long, WriteBufferSegmentWithDbFallback> segmentsWriteBuffer;
+    private final BatchWritingVersionedStoreClient<LogicalKeyValueSegment> dbClient;
+    private final RocksDBVersionedStoreRestoreClient restoreClient;
+
+    /**
+     * Creates a new write buffer.
+     * @param dbClient client for reading from and writing to the underlying persistent store
+     */
+    RocksDBVersionedStoreRestoreWriteBuffer(final BatchWritingVersionedStoreClient<LogicalKeyValueSegment> dbClient) {
+        this.dbClient = Objects.requireNonNull(dbClient);
+
+        this.latestValueWriteBuffer = new HashMap<>();
+        // store in reverse-sorted order, to make getReverseSegments() more efficient
+        this.segmentsWriteBuffer = new TreeMap<>((x, y) -> Long.compare(y, x));
+        this.restoreClient = new RocksDBVersionedStoreRestoreClient();
+    }
+
+    /**
+     * @return client for writing to (and reading from) the write buffer
+     */
+    VersionedStoreClient<?> getClient() {
+        return restoreClient;
+    }
+
+    /**
+     * Flushes the contents of the write buffer into the persistent store, and clears the write
+     * buffer in the process.
+     * @throws RocksDBException if a failure occurs adding to or writing a {@link WriteBatch}
+     */
+    void flush() throws RocksDBException {
+
+        // flush segments first, as this is consistent with the store always writing to
+        // older segments/stores before later ones
+        try (final WriteBatch segmentsBatch = new WriteBatch()) {
+            final List<WriteBufferSegmentWithDbFallback> allSegments = restoreClient.getReverseSegments(Long.MIN_VALUE);
+            if (allSegments.size() > 0) {
+                // collect entries into write batch
+                for (final WriteBufferSegmentWithDbFallback bufferSegment : allSegments) {
+                    final LogicalKeyValueSegment dbSegment = bufferSegment.dbSegment();
+                    for (final Map.Entry<Bytes, byte[]> segmentEntry : bufferSegment.getAll()) {
+                        dbSegment.addToBatch(
+                            new KeyValue<>(segmentEntry.getKey().get(), segmentEntry.getValue()),
+                            segmentsBatch);
+                    }
+                }
+
+                // write to db. all the logical segments share the same physical store,
+                // so we can use any segment to perform the write
+                allSegments.get(0).dbSegment().write(segmentsBatch);
+            }
+        } catch (final RocksDBException e) {
+            log.error("Error restoring batch to RocksDBVersionedStore segments store.");
+            throw e;
+        }
+        segmentsWriteBuffer.clear();
+
+        // flush latest value store
+        try (final WriteBatch latestValueBatch = new WriteBatch()) {
+            // collect entries into write batch
+            for (final Map.Entry<Bytes, Optional<byte[]>> latestValueEntry : latestValueWriteBuffer.entrySet()) {
+                final byte[] value = latestValueEntry.getValue().orElse(null);
+                dbClient.addToLatestValueBatch(
+                    new KeyValue<>(latestValueEntry.getKey().get(), value),
+                    latestValueBatch);
+            }
+
+            // write to db
+            dbClient.writeLatestValues(latestValueBatch);
+        } catch (final RocksDBException e) {
+            log.error("Error restoring batch to RocksDBVersionedStore latest value store.");
+            throw e;
+        }
+        latestValueWriteBuffer.clear();
+    }
+
+    /**
+     * The object representation of the write buffer corresponding to a single segment store.
+     * Contains the write buffer itself (a simple hash map) and also a reference to the underlying
+     * persistent segment store.
+     */
+    private class WriteBufferSegmentWithDbFallback implements VersionedStoreSegment {
+
+        private final long id;
+        private final Map<Bytes, byte[]> data;
+        private final LogicalKeyValueSegment dbSegment;
+
+        WriteBufferSegmentWithDbFallback(final LogicalKeyValueSegment dbSegment) {
+            this.dbSegment = Objects.requireNonNull(dbSegment);
+            this.id = dbSegment.id();
+            this.data = new HashMap<>();
+
+            // register segment with segments store
+            segmentsWriteBuffer.put(id, this);
+        }
+
+        LogicalKeyValueSegment dbSegment() {
+            return dbSegment;
+        }
+
+        @Override
+        public long id() {
+            return id;
+        }
+
+        @Override
+        public void put(final Bytes key, final byte[] value) {
+            // all writes go to the write buffer
+            data.put(key, value);
+        }
+
+        @Override
+        public byte[] get(final Bytes key) {
+            final byte[] bufferValue = data.get(key);
+            if (bufferValue != null) {
+                return bufferValue;
+            }
+            return dbSegment.get(key);
+        }
+
+        Set<Entry<Bytes, byte[]>> getAll() {
+            return data.entrySet();
+        }
+    }
+
+    /**
+     * Client for writing to (and reading from) the write buffer as part of restore.
+     */
+    private class RocksDBVersionedStoreRestoreClient implements VersionedStoreClient<WriteBufferSegmentWithDbFallback> {
+
+        @Override
+        public byte[] getLatestValue(final Bytes key) {
+            final Optional<byte[]> bufferValue = latestValueWriteBuffer.get(key);
+            if (bufferValue != null) {
+                return bufferValue.orElse(null);
+            }
+            return dbClient.getLatestValue(key);
+        }
+
+        @Override
+        public void putLatestValue(final Bytes key, final byte[] value) {
+            // all writes go to write buffer
+            latestValueWriteBuffer.put(key, Optional.ofNullable(value));
+        }
+
+        @Override
+        public void deleteLatestValue(final Bytes key) {
+            putLatestValue(key, null);
+        }
+
+        @Override
+        public WriteBufferSegmentWithDbFallback getOrCreateSegmentIfLive(final long segmentId, final ProcessorContext context, final long streamTime) {
+            if (!segmentsWriteBuffer.containsKey(segmentId)) {
+                final LogicalKeyValueSegment dbSegment = dbClient.getOrCreateSegmentIfLive(segmentId, context, streamTime);
+                if (dbSegment == null) {
+                    // segment is not live
+                    return null;
+                }
+                // creating a new segment automatically registers it with the segments store
+                return new WriteBufferSegmentWithDbFallback(dbSegment);
+            }
+            return segmentsWriteBuffer.get(segmentId);
+        }
+
+        @Override
+        public List<WriteBufferSegmentWithDbFallback> getReverseSegments(final long timestampFrom) {
+            // head and not tail because the map is sorted in reverse order
+            final long segmentFrom = segmentIdForTimestamp(timestampFrom);
+            final List<WriteBufferSegmentWithDbFallback> bufferSegments =
+                new ArrayList<>(segmentsWriteBuffer.headMap(segmentFrom, true).values());
+
+            final List<LogicalKeyValueSegment> dbSegments = dbClient.getReverseSegments(timestampFrom);
+
+            // merge segments from db with segments from write buffer
+            final List<WriteBufferSegmentWithDbFallback> allSegments = new ArrayList<>();
+            int dbInd = 0;

Review Comment:
   `Ind` == `Index` ? (as always: avoid abbreviations)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13189: KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13189:
URL: https://github.com/apache/kafka/pull/13189#discussion_r1100936116


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer.java:
##########
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.BatchWritingVersionedStoreClient;
+import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreClient;
+import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreSegment;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A write buffer for use in restoring a {@link RocksDBVersionedStore} from its changelog. This
+ * class exposes a {@link VersionedStoreClient} to put records into the write buffer, which may
+ * then be flushed to the store via {@link WriteBatch}es, for improved write efficiency during
+ * restoration.
+ * <p>
+ * The structure of the internals of this write buffer mirrors the structure of the
+ * {@code RocksDBVersionedStore} itself, i.e., data for the latest value store and each of the
+ * segment stores is buffered in a separate object -- specifically, a map.
+ */
+public class RocksDBVersionedStoreRestoreWriteBuffer {
+
+    private static final Logger log = LoggerFactory.getLogger(RocksDBVersionedStoreRestoreWriteBuffer.class);
+
+    // write buffer for latest value store. value type is Optional in order to track tombstones
+    // which must be written to the underlying store.
+    private final Map<Bytes, Optional<byte[]>> latestValueWriteBuffer;
+    // map from segment id to write buffer. segments are stored in reverse-sorted order,
+    // so getReverseSegments() is more efficient
+    private final TreeMap<Long, WriteBufferSegmentWithDbFallback> segmentsWriteBuffer;
+    private final BatchWritingVersionedStoreClient<LogicalKeyValueSegment> dbClient;
+    private final RocksDBVersionedStoreRestoreClient restoreClient;
+
+    /**
+     * Creates a new write buffer.
+     * @param dbClient client for reading from and writing to the underlying persistent store
+     */
+    RocksDBVersionedStoreRestoreWriteBuffer(final BatchWritingVersionedStoreClient<LogicalKeyValueSegment> dbClient) {
+        this.dbClient = Objects.requireNonNull(dbClient);
+
+        this.latestValueWriteBuffer = new HashMap<>();
+        // store in reverse-sorted order, to make getReverseSegments() more efficient
+        this.segmentsWriteBuffer = new TreeMap<>((x, y) -> Long.compare(y, x));
+        this.restoreClient = new RocksDBVersionedStoreRestoreClient();
+    }
+
+    /**
+     * @return client for writing to (and reading from) the write buffer
+     */
+    VersionedStoreClient<?> getClient() {

Review Comment:
   Why not typed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13189: KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13189:
URL: https://github.com/apache/kafka/pull/13189#discussion_r1101910583


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -0,0 +1,877 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper;
+import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
+import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
+import org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapter;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+import org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue;
+import org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A persistent, versioned key-value store based on RocksDB.
+ * <p>
+ * This store implementation consists of a "latest value store" and "segment stores." The latest
+ * record version for each key is stored in the latest value store, while older record versions
+ * are stored in the segment stores. Conceptually, each record version has two associated
+ * timestamps:
+ * <ul>
+ *     <li>a {@code validFrom} timestamp. This timestamp is explicitly associated with the record
+ *     as part of the {@link VersionedKeyValueStore#put(Object, Object, long)}} call to the store;
+ *     i.e., this is the record's timestamp.</li>
+ *     <li>a {@code validTo} timestamp. This is the timestamp of the next record (or deletion)
+ *     associated with the same key, and is implicitly associated with the record. This timestamp
+ *     can change as new records are inserted into the store.</li>
+ * </ul>
+ * The validity interval of a record is from validFrom (inclusive) to validTo (exclusive), and
+ * can change as new record versions are inserted into the store (and validTo changes as a result).
+ * <p>
+ * Old record versions are stored in segment stores according to their validTo timestamps. This
+ * allows for efficient expiry of old record versions, as entire segments can be dropped from the
+ * store at a time, once the records contained in the segment are no longer relevant based on the
+ * store's history retention (for an explanation of "history retention", see
+ * {@link VersionedKeyValueStore}). Multiple record versions (for the same key) within a single
+ * segment are stored together using the format specified in {@link RocksDBVersionedStoreSegmentValueFormatter}.
+ */
+public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, byte[]> {
+    private static final Logger LOG = LoggerFactory.getLogger(RocksDBVersionedStore.class);
+    // a marker to indicate that no record version has yet been found as part of an ongoing
+    // put() procedure. any value which is not a valid record timestamp will do.
+    private static final long SENTINEL_TIMESTAMP = Long.MIN_VALUE;
+
+    private final String name;
+    private final long historyRetention;
+    private final RocksDBMetricsRecorder metricsRecorder;
+
+    private final RocksDBStore latestValueStore;
+    private final LogicalKeyValueSegments segmentStores;
+    private final LatestValueSchema latestValueSchema;
+    private final SegmentValueSchema segmentValueSchema;
+    private final BatchWritingVersionedStoreClient<LogicalKeyValueSegment> versionedStoreClient;
+    private final RocksDBVersionedStoreRestoreWriteBuffer restoreWriteBuffer;
+
+    private ProcessorContext context;
+    private StateStoreContext stateStoreContext;
+    private Sensor expiredRecordSensor;
+    private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+    private boolean consistencyEnabled = false;
+    private Position position;
+    private OffsetCheckpoint positionCheckpoint;
+    private volatile boolean open;
+
+    RocksDBVersionedStore(final String name, final String metricsScope, final long historyRetention, final long segmentInterval) {
+        this.name = name;
+        this.historyRetention = historyRetention;
+        this.metricsRecorder = new RocksDBMetricsRecorder(metricsScope, name);
+        this.latestValueStore = new RocksDBStore(lvsName(name), name, metricsRecorder);
+        this.segmentStores = new LogicalKeyValueSegments(segmentsName(name), name, historyRetention, segmentInterval, metricsRecorder);
+        this.latestValueSchema = new LatestValueSchema();
+        this.segmentValueSchema = new SegmentValueSchema();
+        this.versionedStoreClient = new RocksDBVersionedStoreClient();
+        this.restoreWriteBuffer = new RocksDBVersionedStoreRestoreWriteBuffer(versionedStoreClient);
+    }
+
+    @Override
+    public void put(final Bytes key, final byte[] value, final long timestamp) {
+
+        doPut(
+            latestValueSchema,
+            segmentValueSchema,
+            versionedStoreClient,
+            Optional.of(expiredRecordSensor),
+            context,
+            observedStreamTime,
+            historyRetention,
+            key,
+            value,
+            timestamp
+        );
+
+        observedStreamTime = Math.max(observedStreamTime, timestamp);
+    }
+
+    @Override
+    public VersionedRecord<byte[]> delete(final Bytes key, final long timestamp) {
+        final VersionedRecord<byte[]> existingRecord = get(key, timestamp);
+        put(key, null, timestamp);
+        return existingRecord;
+    }
+
+    @Override
+    public VersionedRecord<byte[]> get(final Bytes key) {
+        // latest value (if present) is guaranteed to be in the latest value store
+        final byte[] latestValue = latestValueStore.get(key);
+        if (latestValue != null) {
+            return new VersionedRecord<>(
+                latestValueSchema.getValue(latestValue),
+                latestValueSchema.getTimestamp(latestValue)
+            );
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public VersionedRecord<byte[]> get(final Bytes key, final long asOfTimestamp) {
+
+        if (asOfTimestamp < observedStreamTime - historyRetention) {
+            // history retention has elapsed. return null for predictability, even if data
+            // is still present in store.
+            return null;
+        }
+
+        // first check the latest value store
+        final byte[] latestValue = latestValueStore.get(key);
+        if (latestValue != null) {
+            final long latestTimestamp = latestValueSchema.getTimestamp(latestValue);
+            if (latestTimestamp <= asOfTimestamp) {
+                return new VersionedRecord<>(latestValueSchema.getValue(latestValue), latestTimestamp);
+            }
+        }
+
+        // check segment stores
+        final List<LogicalKeyValueSegment> segments = segmentStores.segments(asOfTimestamp, Long.MAX_VALUE, false);
+        for (final LogicalKeyValueSegment segment : segments) {
+            final byte[] segmentValue = segment.get(key);
+            if (segmentValue != null) {
+                final long nextTs = segmentValueSchema.getNextTimestamp(segmentValue);
+                if (nextTs <= asOfTimestamp) {
+                    // this segment contains no data for the queried timestamp, so earlier segments
+                    // cannot either
+                    return null;
+                }
+
+                if (segmentValueSchema.getMinTimestamp(segmentValue) > asOfTimestamp) {
+                    // the segment only contains data for after the queried timestamp. skip and
+                    // continue the search to earlier segments. as an optimization, this code
+                    // could be updated to skip forward to the segment containing the minTimestamp
+                    // in the if-condition above.
+                    continue;
+                }
+
+                // the desired result is contained in this segment
+                final SegmentSearchResult searchResult =
+                    segmentValueSchema.deserialize(segmentValue).find(asOfTimestamp, true);
+                if (searchResult.value() != null) {
+                    return new VersionedRecord<>(searchResult.value(), searchResult.validFrom());
+                } else {
+                    return null;
+                }
+            }
+        }
+
+        // checked all segments and no results found
+        return null;
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public void flush() {
+        // order shouldn't matter since failure to flush is a fatal exception
+        segmentStores.flush();
+        latestValueStore.flush();
+    }
+
+    @Override
+    public void close() {
+        open = false;
+
+        // close latest value store first so that calls to get() immediately begin to fail with
+        // store not open, as all calls to get() first get() from latest value store
+        latestValueStore.close();
+        segmentStores.close();
+    }
+
+    @Override
+    public boolean persistent() {
+        return true;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return open;
+    }
+
+    @Override
+    public Position getPosition() {
+        return position;
+    }
+
+    @Deprecated
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+        this.context = context;
+
+        final StreamsMetricsImpl metrics = ProcessorContextUtils.getMetricsImpl(context);
+        final String threadId = Thread.currentThread().getName();
+        final String taskName = context.taskId().toString();
+
+        expiredRecordSensor = TaskMetrics.droppedRecordsSensor(
+            threadId,
+            taskName,
+            metrics
+        );
+
+        metricsRecorder.init(ProcessorContextUtils.getMetricsImpl(context), context.taskId());
+
+        latestValueStore.openDB(context.appConfigs(), context.stateDir());
+        segmentStores.openExisting(context, observedStreamTime);
+
+        final File positionCheckpointFile = new File(context.stateDir(), name() + ".position");
+        this.positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile);
+        this.position = StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint);
+
+        // register and possibly restore the state from the logs
+        stateStoreContext.register(
+            root,
+            (RecordBatchingStateRestoreCallback) RocksDBVersionedStore.this::restoreBatch,
+            () -> StoreQueryUtils.checkpointPosition(positionCheckpoint, position)
+        );
+
+        open = true;
+
+        consistencyEnabled = StreamsConfig.InternalConfig.getBoolean(
+            context.appConfigs(),
+            IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED,
+            false
+        );
+    }
+
+    @Override
+    public void init(final StateStoreContext context, final StateStore root) {
+        this.stateStoreContext = context;
+        init(StoreToProcessorContextAdapter.adapt(context), root);
+    }
+
+    // VisibleForTesting
+    void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> records) {
+        // advance stream time to the max timestamp in the batch
+        for (final ConsumerRecord<byte[], byte[]> record : records) {
+            observedStreamTime = Math.max(observedStreamTime, record.timestamp());
+        }
+
+        final VersionedStoreClient<?> restoreClient = restoreWriteBuffer.getClient();
+
+        // TODO: handle potential out of memory in this process

Review Comment:
   Heh, I was hoping to ask you about this myself :)
   
   Currently the code restores an entire batch at a time, by reading the relevant values into memory and then writing at all once at the end. If we were able to detect (or catch) a potential out of memory in this process, we could flush the write buffer partway through the batch to skirt the issue.
   
   From reading online, most sources do not recommend trying to catch OutOfMemoryException even though other sources say that if you have a graceful way to recover (which we do in this case) then it could be fine. Another option is we could set a limit on the amount of memory that we want to be available to the restore write buffer, similar to what's done for the Streams ThreadCache today, and flush whenever we exceed the limit (before we hit an actual OOM). This is probably the better/safer option, but figuring out a good default value will be tricky, and we'd probably also want to expose it as a config for advanced users / users with atypical setups. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13189: KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13189:
URL: https://github.com/apache/kafka/pull/13189#discussion_r1100920384


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer.java:
##########
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.BatchWritingVersionedStoreClient;
+import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreClient;
+import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreSegment;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A write buffer for use in restoring a {@link RocksDBVersionedStore} from its changelog. This
+ * class exposes a {@link VersionedStoreClient} to put records into the write buffer, which may
+ * then be flushed to the store via {@link WriteBatch}es, for improved write efficiency during
+ * restoration.
+ * <p>
+ * The structure of the internals of this write buffer mirrors the structure of the
+ * {@code RocksDBVersionedStore} itself, i.e., data for the latest value store and each of the
+ * segment stores is buffered in a separate object -- specifically, a map.
+ */
+public class RocksDBVersionedStoreRestoreWriteBuffer {
+
+    private static final Logger log = LoggerFactory.getLogger(RocksDBVersionedStoreRestoreWriteBuffer.class);
+
+    // write buffer for latest value store. value type is Optional in order to track tombstones
+    // which must be written to the underlying store.
+    private final Map<Bytes, Optional<byte[]>> latestValueWriteBuffer;
+    // map from segment id to write buffer. segments are stored in reverse-sorted order,
+    // so getReverseSegments() is more efficient
+    private final TreeMap<Long, WriteBufferSegmentWithDbFallback> segmentsWriteBuffer;
+    private final BatchWritingVersionedStoreClient<LogicalKeyValueSegment> dbClient;
+    private final RocksDBVersionedStoreRestoreClient restoreClient;
+
+    /**
+     * Creates a new write buffer.
+     * @param dbClient client for reading from and writing to the underlying persistent store
+     */
+    RocksDBVersionedStoreRestoreWriteBuffer(final BatchWritingVersionedStoreClient<LogicalKeyValueSegment> dbClient) {
+        this.dbClient = Objects.requireNonNull(dbClient);
+
+        this.latestValueWriteBuffer = new HashMap<>();
+        // store in reverse-sorted order, to make getReverseSegments() more efficient
+        this.segmentsWriteBuffer = new TreeMap<>((x, y) -> Long.compare(y, x));
+        this.restoreClient = new RocksDBVersionedStoreRestoreClient();
+    }
+
+    /**
+     * @return client for writing to (and reading from) the write buffer
+     */
+    VersionedStoreClient<?> getClient() {
+        return restoreClient;
+    }
+
+    /**
+     * Flushes the contents of the write buffer into the persistent store, and clears the write
+     * buffer in the process.
+     * @throws RocksDBException if a failure occurs adding to or writing a {@link WriteBatch}
+     */
+    void flush() throws RocksDBException {
+
+        // flush segments first, as this is consistent with the store always writing to
+        // older segments/stores before later ones
+        try (final WriteBatch segmentsBatch = new WriteBatch()) {
+            final List<WriteBufferSegmentWithDbFallback> allSegments = restoreClient.getReverseSegments(Long.MIN_VALUE);
+            if (allSegments.size() > 0) {
+                // collect entries into write batch
+                for (final WriteBufferSegmentWithDbFallback bufferSegment : allSegments) {
+                    final LogicalKeyValueSegment dbSegment = bufferSegment.dbSegment();
+                    for (final Map.Entry<Bytes, byte[]> segmentEntry : bufferSegment.getAll()) {
+                        dbSegment.addToBatch(
+                            new KeyValue<>(segmentEntry.getKey().get(), segmentEntry.getValue()),
+                            segmentsBatch);
+                    }
+                }
+
+                // write to db. all the logical segments share the same physical store,
+                // so we can use any segment to perform the write
+                allSegments.get(0).dbSegment().write(segmentsBatch);
+            }
+        } catch (final RocksDBException e) {
+            log.error("Error restoring batch to RocksDBVersionedStore segments store.");
+            throw e;
+        }
+        segmentsWriteBuffer.clear();
+
+        // flush latest value store
+        try (final WriteBatch latestValueBatch = new WriteBatch()) {
+            // collect entries into write batch
+            for (final Map.Entry<Bytes, Optional<byte[]>> latestValueEntry : latestValueWriteBuffer.entrySet()) {
+                final byte[] value = latestValueEntry.getValue().orElse(null);
+                dbClient.addToLatestValueBatch(
+                    new KeyValue<>(latestValueEntry.getKey().get(), value),
+                    latestValueBatch);
+            }
+
+            // write to db
+            dbClient.writeLatestValues(latestValueBatch);
+        } catch (final RocksDBException e) {
+            log.error("Error restoring batch to RocksDBVersionedStore latest value store.");
+            throw e;
+        }
+        latestValueWriteBuffer.clear();
+    }
+
+    /**
+     * The object representation of the write buffer corresponding to a single segment store.
+     * Contains the write buffer itself (a simple hash map) and also a reference to the underlying
+     * persistent segment store.
+     */
+    private class WriteBufferSegmentWithDbFallback implements VersionedStoreSegment {
+
+        private final long id;
+        private final Map<Bytes, byte[]> data;
+        private final LogicalKeyValueSegment dbSegment;
+
+        WriteBufferSegmentWithDbFallback(final LogicalKeyValueSegment dbSegment) {
+            this.dbSegment = Objects.requireNonNull(dbSegment);
+            this.id = dbSegment.id();
+            this.data = new HashMap<>();
+
+            // register segment with segments store
+            segmentsWriteBuffer.put(id, this);
+        }
+
+        LogicalKeyValueSegment dbSegment() {
+            return dbSegment;
+        }
+
+        @Override
+        public long id() {
+            return id;
+        }
+
+        @Override
+        public void put(final Bytes key, final byte[] value) {
+            // all writes go to the write buffer
+            data.put(key, value);
+        }
+
+        @Override
+        public byte[] get(final Bytes key) {
+            final byte[] bufferValue = data.get(key);
+            if (bufferValue != null) {
+                return bufferValue;
+            }
+            return dbSegment.get(key);
+        }
+
+        Set<Entry<Bytes, byte[]>> getAll() {
+            return data.entrySet();
+        }
+    }
+
+    /**
+     * Client for writing to (and reading from) the write buffer as part of restore.
+     */
+    private class RocksDBVersionedStoreRestoreClient implements VersionedStoreClient<WriteBufferSegmentWithDbFallback> {
+
+        @Override
+        public byte[] getLatestValue(final Bytes key) {
+            final Optional<byte[]> bufferValue = latestValueWriteBuffer.get(key);
+            if (bufferValue != null) {
+                return bufferValue.orElse(null);
+            }
+            return dbClient.getLatestValue(key);
+        }
+
+        @Override
+        public void putLatestValue(final Bytes key, final byte[] value) {
+            // all writes go to write buffer
+            latestValueWriteBuffer.put(key, Optional.ofNullable(value));
+        }
+
+        @Override
+        public void deleteLatestValue(final Bytes key) {
+            putLatestValue(key, null);
+        }
+
+        @Override
+        public WriteBufferSegmentWithDbFallback getOrCreateSegmentIfLive(final long segmentId, final ProcessorContext context, final long streamTime) {
+            if (!segmentsWriteBuffer.containsKey(segmentId)) {
+                final LogicalKeyValueSegment dbSegment = dbClient.getOrCreateSegmentIfLive(segmentId, context, streamTime);
+                if (dbSegment == null) {
+                    // segment is not live
+                    return null;
+                }
+                // creating a new segment automatically registers it with the segments store
+                return new WriteBufferSegmentWithDbFallback(dbSegment);
+            }
+            return segmentsWriteBuffer.get(segmentId);
+        }
+
+        @Override
+        public List<WriteBufferSegmentWithDbFallback> getReverseSegments(final long timestampFrom) {
+            // head and not tail because the map is sorted in reverse order

Review Comment:
   great comment!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on pull request #13189: KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on PR #13189:
URL: https://github.com/apache/kafka/pull/13189#issuecomment-1424701063

   Thanks for your review, @mjsax ! Responded to your comments inline and also pushed a commit just now containing the requested changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13189: KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13189:
URL: https://github.com/apache/kafka/pull/13189#discussion_r1102260418


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -273,7 +290,41 @@ public void init(final StateStoreContext context, final StateStore root) {
 
     // VisibleForTesting
     void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> records) {
-        throw new UnsupportedOperationException("not yet implemented");
+        // advance stream time to the max timestamp in the batch
+        for (final ConsumerRecord<byte[], byte[]> record : records) {
+            observedStreamTime = Math.max(observedStreamTime, record.timestamp());
+        }
+
+        final VersionedStoreClient<?> restoreClient = restoreWriteBuffer.getClient();

Review Comment:
   I see -- fair enough (in general, I am a fan on strong typing whenever possible, but it's ok I guess).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax merged pull request #13189: KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax merged PR #13189:
URL: https://github.com/apache/kafka/pull/13189


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13189: KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13189:
URL: https://github.com/apache/kafka/pull/13189#discussion_r1100908054


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -333,10 +384,34 @@ interface VersionedStoreSegment {
         long segmentIdForTimestamp(long timestamp);
     }
 
+    /**
+     * A {@link VersionedStoreClient} which additionally supports batch writes into its latest
+     * value store.
+     *
+     * @param <T> the segment type used by this client
+     */
+    interface BatchWritingVersionedStoreClient<T extends VersionedStoreSegment> extends VersionedStoreClient<T> {

Review Comment:
   Why do we need this interface? Can't we just add both methods to `RocksDBVersionedStoreClient` directly?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13189: KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13189:
URL: https://github.com/apache/kafka/pull/13189#discussion_r1100927416


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -273,7 +290,41 @@ public void init(final StateStoreContext context, final StateStore root) {
 
     // VisibleForTesting
     void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> records) {
-        throw new UnsupportedOperationException("not yet implemented");
+        // advance stream time to the max timestamp in the batch
+        for (final ConsumerRecord<byte[], byte[]> record : records) {
+            observedStreamTime = Math.max(observedStreamTime, record.timestamp());
+        }
+
+        final VersionedStoreClient<?> restoreClient = restoreWriteBuffer.getClient();

Review Comment:
   Why is the generic not typed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13189: KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13189:
URL: https://github.com/apache/kafka/pull/13189#discussion_r1103149123


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer.java:
##########
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.BatchWritingVersionedStoreClient;
+import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreClient;
+import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreSegment;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A write buffer for use in restoring a {@link RocksDBVersionedStore} from its changelog. This
+ * class exposes a {@link VersionedStoreClient} to put records into the write buffer, which may
+ * then be flushed to the store via {@link WriteBatch}es, for improved write efficiency during
+ * restoration.
+ * <p>
+ * The structure of the internals of this write buffer mirrors the structure of the
+ * {@code RocksDBVersionedStore} itself, i.e., data for the latest value store and each of the
+ * segment stores is buffered in a separate object -- specifically, a map.
+ */
+public class RocksDBVersionedStoreRestoreWriteBuffer {
+
+    private static final Logger log = LoggerFactory.getLogger(RocksDBVersionedStoreRestoreWriteBuffer.class);
+
+    // write buffer for latest value store. value type is Optional in order to track tombstones
+    // which must be written to the underlying store.
+    private final Map<Bytes, Optional<byte[]>> latestValueWriteBuffer;
+    // map from segment id to write buffer. segments are stored in reverse-sorted order,
+    // so getReverseSegments() is more efficient
+    private final TreeMap<Long, WriteBufferSegmentWithDbFallback> segmentsWriteBuffer;
+    private final BatchWritingVersionedStoreClient<LogicalKeyValueSegment> dbClient;
+    private final RocksDBVersionedStoreRestoreClient restoreClient;
+
+    /**
+     * Creates a new write buffer.
+     * @param dbClient client for reading from and writing to the underlying persistent store
+     */
+    RocksDBVersionedStoreRestoreWriteBuffer(final BatchWritingVersionedStoreClient<LogicalKeyValueSegment> dbClient) {
+        this.dbClient = Objects.requireNonNull(dbClient);
+
+        this.latestValueWriteBuffer = new HashMap<>();
+        // store in reverse-sorted order, to make getReverseSegments() more efficient
+        this.segmentsWriteBuffer = new TreeMap<>((x, y) -> Long.compare(y, x));
+        this.restoreClient = new RocksDBVersionedStoreRestoreClient();
+    }
+
+    /**
+     * @return client for writing to (and reading from) the write buffer
+     */
+    VersionedStoreClient<?> getClient() {
+        return restoreClient;
+    }
+
+    /**
+     * Flushes the contents of the write buffer into the persistent store, and clears the write
+     * buffer in the process.
+     * @throws RocksDBException if a failure occurs adding to or writing a {@link WriteBatch}
+     */
+    void flush() throws RocksDBException {
+
+        // flush segments first, as this is consistent with the store always writing to
+        // older segments/stores before later ones
+        try (final WriteBatch segmentsBatch = new WriteBatch()) {
+            final List<WriteBufferSegmentWithDbFallback> allSegments = restoreClient.getReverseSegments(Long.MIN_VALUE);
+            if (allSegments.size() > 0) {
+                // collect entries into write batch
+                for (final WriteBufferSegmentWithDbFallback bufferSegment : allSegments) {
+                    final LogicalKeyValueSegment dbSegment = bufferSegment.dbSegment();
+                    for (final Map.Entry<Bytes, byte[]> segmentEntry : bufferSegment.getAll()) {
+                        dbSegment.addToBatch(
+                            new KeyValue<>(segmentEntry.getKey().get(), segmentEntry.getValue()),
+                            segmentsBatch);
+                    }
+                }
+
+                // write to db. all the logical segments share the same physical store,
+                // so we can use any segment to perform the write
+                allSegments.get(0).dbSegment().write(segmentsBatch);
+            }
+        } catch (final RocksDBException e) {
+            log.error("Error restoring batch to RocksDBVersionedStore segments store.");
+            throw e;
+        }
+        segmentsWriteBuffer.clear();
+
+        // flush latest value store
+        try (final WriteBatch latestValueBatch = new WriteBatch()) {
+            // collect entries into write batch
+            for (final Map.Entry<Bytes, Optional<byte[]>> latestValueEntry : latestValueWriteBuffer.entrySet()) {
+                final byte[] value = latestValueEntry.getValue().orElse(null);
+                dbClient.addToLatestValueBatch(
+                    new KeyValue<>(latestValueEntry.getKey().get(), value),
+                    latestValueBatch);
+            }
+
+            // write to db
+            dbClient.writeLatestValues(latestValueBatch);
+        } catch (final RocksDBException e) {
+            log.error("Error restoring batch to RocksDBVersionedStore latest value store.");
+            throw e;
+        }
+        latestValueWriteBuffer.clear();
+    }
+
+    /**
+     * The object representation of the write buffer corresponding to a single segment store.
+     * Contains the write buffer itself (a simple hash map) and also a reference to the underlying
+     * persistent segment store.
+     */
+    private class WriteBufferSegmentWithDbFallback implements VersionedStoreSegment {
+
+        private final long id;
+        private final Map<Bytes, byte[]> data;
+        private final LogicalKeyValueSegment dbSegment;
+
+        WriteBufferSegmentWithDbFallback(final LogicalKeyValueSegment dbSegment) {
+            this.dbSegment = Objects.requireNonNull(dbSegment);
+            this.id = dbSegment.id();
+            this.data = new HashMap<>();
+
+            // register segment with segments store
+            segmentsWriteBuffer.put(id, this);
+        }
+
+        LogicalKeyValueSegment dbSegment() {
+            return dbSegment;
+        }
+
+        @Override
+        public long id() {
+            return id;
+        }
+
+        @Override
+        public void put(final Bytes key, final byte[] value) {
+            // all writes go to the write buffer
+            data.put(key, value);
+        }
+
+        @Override
+        public byte[] get(final Bytes key) {
+            final byte[] bufferValue = data.get(key);
+            if (bufferValue != null) {
+                return bufferValue;
+            }
+            return dbSegment.get(key);
+        }
+
+        Set<Entry<Bytes, byte[]>> getAll() {

Review Comment:
   OK, I'll make the update.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #13189: KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang commented on code in PR #13189:
URL: https://github.com/apache/kafka/pull/13189#discussion_r1103405393


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -0,0 +1,877 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper;
+import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
+import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
+import org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapter;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+import org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue;
+import org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A persistent, versioned key-value store based on RocksDB.
+ * <p>
+ * This store implementation consists of a "latest value store" and "segment stores." The latest
+ * record version for each key is stored in the latest value store, while older record versions
+ * are stored in the segment stores. Conceptually, each record version has two associated
+ * timestamps:
+ * <ul>
+ *     <li>a {@code validFrom} timestamp. This timestamp is explicitly associated with the record
+ *     as part of the {@link VersionedKeyValueStore#put(Object, Object, long)}} call to the store;
+ *     i.e., this is the record's timestamp.</li>
+ *     <li>a {@code validTo} timestamp. This is the timestamp of the next record (or deletion)
+ *     associated with the same key, and is implicitly associated with the record. This timestamp
+ *     can change as new records are inserted into the store.</li>
+ * </ul>
+ * The validity interval of a record is from validFrom (inclusive) to validTo (exclusive), and
+ * can change as new record versions are inserted into the store (and validTo changes as a result).
+ * <p>
+ * Old record versions are stored in segment stores according to their validTo timestamps. This
+ * allows for efficient expiry of old record versions, as entire segments can be dropped from the
+ * store at a time, once the records contained in the segment are no longer relevant based on the
+ * store's history retention (for an explanation of "history retention", see
+ * {@link VersionedKeyValueStore}). Multiple record versions (for the same key) within a single
+ * segment are stored together using the format specified in {@link RocksDBVersionedStoreSegmentValueFormatter}.
+ */
+public class RocksDBVersionedStore implements VersionedKeyValueStore<Bytes, byte[]> {
+    private static final Logger LOG = LoggerFactory.getLogger(RocksDBVersionedStore.class);
+    // a marker to indicate that no record version has yet been found as part of an ongoing
+    // put() procedure. any value which is not a valid record timestamp will do.
+    private static final long SENTINEL_TIMESTAMP = Long.MIN_VALUE;
+
+    private final String name;
+    private final long historyRetention;
+    private final RocksDBMetricsRecorder metricsRecorder;
+
+    private final RocksDBStore latestValueStore;
+    private final LogicalKeyValueSegments segmentStores;
+    private final LatestValueSchema latestValueSchema;
+    private final SegmentValueSchema segmentValueSchema;
+    private final BatchWritingVersionedStoreClient<LogicalKeyValueSegment> versionedStoreClient;
+    private final RocksDBVersionedStoreRestoreWriteBuffer restoreWriteBuffer;
+
+    private ProcessorContext context;
+    private StateStoreContext stateStoreContext;
+    private Sensor expiredRecordSensor;
+    private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+    private boolean consistencyEnabled = false;
+    private Position position;
+    private OffsetCheckpoint positionCheckpoint;
+    private volatile boolean open;
+
+    RocksDBVersionedStore(final String name, final String metricsScope, final long historyRetention, final long segmentInterval) {
+        this.name = name;
+        this.historyRetention = historyRetention;
+        this.metricsRecorder = new RocksDBMetricsRecorder(metricsScope, name);
+        this.latestValueStore = new RocksDBStore(lvsName(name), name, metricsRecorder);
+        this.segmentStores = new LogicalKeyValueSegments(segmentsName(name), name, historyRetention, segmentInterval, metricsRecorder);
+        this.latestValueSchema = new LatestValueSchema();
+        this.segmentValueSchema = new SegmentValueSchema();
+        this.versionedStoreClient = new RocksDBVersionedStoreClient();
+        this.restoreWriteBuffer = new RocksDBVersionedStoreRestoreWriteBuffer(versionedStoreClient);
+    }
+
+    @Override
+    public void put(final Bytes key, final byte[] value, final long timestamp) {
+
+        doPut(
+            latestValueSchema,
+            segmentValueSchema,
+            versionedStoreClient,
+            Optional.of(expiredRecordSensor),
+            context,
+            observedStreamTime,
+            historyRetention,
+            key,
+            value,
+            timestamp
+        );
+
+        observedStreamTime = Math.max(observedStreamTime, timestamp);
+    }
+
+    @Override
+    public VersionedRecord<byte[]> delete(final Bytes key, final long timestamp) {
+        final VersionedRecord<byte[]> existingRecord = get(key, timestamp);
+        put(key, null, timestamp);
+        return existingRecord;
+    }
+
+    @Override
+    public VersionedRecord<byte[]> get(final Bytes key) {
+        // latest value (if present) is guaranteed to be in the latest value store
+        final byte[] latestValue = latestValueStore.get(key);
+        if (latestValue != null) {
+            return new VersionedRecord<>(
+                latestValueSchema.getValue(latestValue),
+                latestValueSchema.getTimestamp(latestValue)
+            );
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public VersionedRecord<byte[]> get(final Bytes key, final long asOfTimestamp) {
+
+        if (asOfTimestamp < observedStreamTime - historyRetention) {
+            // history retention has elapsed. return null for predictability, even if data
+            // is still present in store.
+            return null;
+        }
+
+        // first check the latest value store
+        final byte[] latestValue = latestValueStore.get(key);
+        if (latestValue != null) {
+            final long latestTimestamp = latestValueSchema.getTimestamp(latestValue);
+            if (latestTimestamp <= asOfTimestamp) {
+                return new VersionedRecord<>(latestValueSchema.getValue(latestValue), latestTimestamp);
+            }
+        }
+
+        // check segment stores
+        final List<LogicalKeyValueSegment> segments = segmentStores.segments(asOfTimestamp, Long.MAX_VALUE, false);
+        for (final LogicalKeyValueSegment segment : segments) {
+            final byte[] segmentValue = segment.get(key);
+            if (segmentValue != null) {
+                final long nextTs = segmentValueSchema.getNextTimestamp(segmentValue);
+                if (nextTs <= asOfTimestamp) {
+                    // this segment contains no data for the queried timestamp, so earlier segments
+                    // cannot either
+                    return null;
+                }
+
+                if (segmentValueSchema.getMinTimestamp(segmentValue) > asOfTimestamp) {
+                    // the segment only contains data for after the queried timestamp. skip and
+                    // continue the search to earlier segments. as an optimization, this code
+                    // could be updated to skip forward to the segment containing the minTimestamp
+                    // in the if-condition above.
+                    continue;
+                }
+
+                // the desired result is contained in this segment
+                final SegmentSearchResult searchResult =
+                    segmentValueSchema.deserialize(segmentValue).find(asOfTimestamp, true);
+                if (searchResult.value() != null) {
+                    return new VersionedRecord<>(searchResult.value(), searchResult.validFrom());
+                } else {
+                    return null;
+                }
+            }
+        }
+
+        // checked all segments and no results found
+        return null;
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public void flush() {
+        // order shouldn't matter since failure to flush is a fatal exception
+        segmentStores.flush();
+        latestValueStore.flush();
+    }
+
+    @Override
+    public void close() {
+        open = false;
+
+        // close latest value store first so that calls to get() immediately begin to fail with
+        // store not open, as all calls to get() first get() from latest value store
+        latestValueStore.close();
+        segmentStores.close();
+    }
+
+    @Override
+    public boolean persistent() {
+        return true;
+    }
+
+    @Override
+    public boolean isOpen() {
+        return open;
+    }
+
+    @Override
+    public Position getPosition() {
+        return position;
+    }
+
+    @Deprecated
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+        this.context = context;
+
+        final StreamsMetricsImpl metrics = ProcessorContextUtils.getMetricsImpl(context);
+        final String threadId = Thread.currentThread().getName();
+        final String taskName = context.taskId().toString();
+
+        expiredRecordSensor = TaskMetrics.droppedRecordsSensor(
+            threadId,
+            taskName,
+            metrics
+        );
+
+        metricsRecorder.init(ProcessorContextUtils.getMetricsImpl(context), context.taskId());
+
+        latestValueStore.openDB(context.appConfigs(), context.stateDir());
+        segmentStores.openExisting(context, observedStreamTime);
+
+        final File positionCheckpointFile = new File(context.stateDir(), name() + ".position");
+        this.positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile);
+        this.position = StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint);
+
+        // register and possibly restore the state from the logs
+        stateStoreContext.register(
+            root,
+            (RecordBatchingStateRestoreCallback) RocksDBVersionedStore.this::restoreBatch,
+            () -> StoreQueryUtils.checkpointPosition(positionCheckpoint, position)
+        );
+
+        open = true;
+
+        consistencyEnabled = StreamsConfig.InternalConfig.getBoolean(
+            context.appConfigs(),
+            IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED,
+            false
+        );
+    }
+
+    @Override
+    public void init(final StateStoreContext context, final StateStore root) {
+        this.stateStoreContext = context;
+        init(StoreToProcessorContextAdapter.adapt(context), root);
+    }
+
+    // VisibleForTesting
+    void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> records) {
+        // advance stream time to the max timestamp in the batch
+        for (final ConsumerRecord<byte[], byte[]> record : records) {
+            observedStreamTime = Math.max(observedStreamTime, record.timestamp());
+        }
+
+        final VersionedStoreClient<?> restoreClient = restoreWriteBuffer.getClient();
+
+        // TODO: handle potential out of memory in this process

Review Comment:
   Yeah, we were aware of the risk of OOM since during restoration we need to potentially buffer some records, and made a decision to live with that during the design phase. So I think it’s still okay to not capture OOM explicitly. Admitting the likelihood would be much larger now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13189: KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13189:
URL: https://github.com/apache/kafka/pull/13189#discussion_r1101924540


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -273,7 +290,41 @@ public void init(final StateStoreContext context, final StateStore root) {
 
     // VisibleForTesting
     void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> records) {
-        throw new UnsupportedOperationException("not yet implemented");
+        // advance stream time to the max timestamp in the batch
+        for (final ConsumerRecord<byte[], byte[]> record : records) {
+            observedStreamTime = Math.max(observedStreamTime, record.timestamp());
+        }
+
+        final VersionedStoreClient<?> restoreClient = restoreWriteBuffer.getClient();

Review Comment:
   The VersionedStoreSegment implementation used by the restore client (WriteBufferSegmentWithDbFallback) is currently private to the write buffer class, since the RocksDBVersionedStore doesn't care what the type is; all the outer class needs are the methods provided by the RocksDBVersionedStoreClient interface itself. So I've left the type out in order to avoid polluting the outer class with extra info it doesn't need.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer.java:
##########
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.BatchWritingVersionedStoreClient;
+import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreClient;
+import org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreSegment;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A write buffer for use in restoring a {@link RocksDBVersionedStore} from its changelog. This
+ * class exposes a {@link VersionedStoreClient} to put records into the write buffer, which may
+ * then be flushed to the store via {@link WriteBatch}es, for improved write efficiency during
+ * restoration.
+ * <p>
+ * The structure of the internals of this write buffer mirrors the structure of the
+ * {@code RocksDBVersionedStore} itself, i.e., data for the latest value store and each of the
+ * segment stores is buffered in a separate object -- specifically, a map.
+ */
+public class RocksDBVersionedStoreRestoreWriteBuffer {
+
+    private static final Logger log = LoggerFactory.getLogger(RocksDBVersionedStoreRestoreWriteBuffer.class);
+
+    // write buffer for latest value store. value type is Optional in order to track tombstones
+    // which must be written to the underlying store.
+    private final Map<Bytes, Optional<byte[]>> latestValueWriteBuffer;
+    // map from segment id to write buffer. segments are stored in reverse-sorted order,
+    // so getReverseSegments() is more efficient
+    private final TreeMap<Long, WriteBufferSegmentWithDbFallback> segmentsWriteBuffer;
+    private final BatchWritingVersionedStoreClient<LogicalKeyValueSegment> dbClient;
+    private final RocksDBVersionedStoreRestoreClient restoreClient;
+
+    /**
+     * Creates a new write buffer.
+     * @param dbClient client for reading from and writing to the underlying persistent store
+     */
+    RocksDBVersionedStoreRestoreWriteBuffer(final BatchWritingVersionedStoreClient<LogicalKeyValueSegment> dbClient) {
+        this.dbClient = Objects.requireNonNull(dbClient);
+
+        this.latestValueWriteBuffer = new HashMap<>();
+        // store in reverse-sorted order, to make getReverseSegments() more efficient
+        this.segmentsWriteBuffer = new TreeMap<>((x, y) -> Long.compare(y, x));
+        this.restoreClient = new RocksDBVersionedStoreRestoreClient();
+    }
+
+    /**
+     * @return client for writing to (and reading from) the write buffer
+     */
+    VersionedStoreClient<?> getClient() {

Review Comment:
   See above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13189: KAFKA-14491: [6/N] Support restoring RocksDB versioned store from changelog

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13189:
URL: https://github.com/apache/kafka/pull/13189#discussion_r1102262205


##########
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java:
##########
@@ -383,6 +389,114 @@ public void shouldDistinguishEmptyAndNull() {
         verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 8);
     }
 
+    @Test
+    public void shouldRestore() {
+        final List<DataRecord> records = new ArrayList<>();
+        records.add(new DataRecord("k", "vp20", SEGMENT_INTERVAL + 20));
+        records.add(new DataRecord("k", "vp10", SEGMENT_INTERVAL + 10));
+        records.add(new DataRecord("k", "vn10", SEGMENT_INTERVAL - 10));
+        records.add(new DataRecord("k", "vn2", SEGMENT_INTERVAL - 2));
+        records.add(new DataRecord("k", "vn1", SEGMENT_INTERVAL - 1));
+        records.add(new DataRecord("k", "vp1", SEGMENT_INTERVAL + 1));
+
+        store.restoreBatch(getChangelogRecords(records));
+
+        verifyGetValueFromStore("k", "vp20", SEGMENT_INTERVAL + 20);
+        verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 30, "vp20", SEGMENT_INTERVAL + 20);
+        verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 15, "vp10", SEGMENT_INTERVAL + 10);
+        verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 5, "vp1", SEGMENT_INTERVAL + 1);
+        verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL, "vn1", SEGMENT_INTERVAL - 1);
+        verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 1, "vn1", SEGMENT_INTERVAL - 1);
+        verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 2, "vn2", SEGMENT_INTERVAL - 2);
+        verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 5, "vn10", SEGMENT_INTERVAL - 10);
+    }
+
+    @Test
+    public void shouldRestoreWithNulls() {
+        final List<DataRecord> records = new ArrayList<>();
+        records.add(new DataRecord("k", null, SEGMENT_INTERVAL + 20));
+        records.add(new DataRecord("k", null, SEGMENT_INTERVAL - 1));
+        records.add(new DataRecord("k", null, SEGMENT_INTERVAL + 1));
+        records.add(new DataRecord("k", null, SEGMENT_INTERVAL - 10));
+        records.add(new DataRecord("k", null, SEGMENT_INTERVAL + 10));
+        records.add(new DataRecord("k", "vp5", SEGMENT_INTERVAL + 5));
+        records.add(new DataRecord("k", "vn5", SEGMENT_INTERVAL - 5));
+        records.add(new DataRecord("k", "vn6", SEGMENT_INTERVAL - 6));
+
+        store.restoreBatch(getChangelogRecords(records));
+
+        verifyGetNullFromStore("k");
+        verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 30);
+        verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 15);
+        verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL + 6, "vp5", SEGMENT_INTERVAL + 5);
+        verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL + 2);
+        verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL);
+        verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 1);
+        verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 5, "vn5", SEGMENT_INTERVAL - 5);
+        verifyTimestampedGetValueFromStore("k", SEGMENT_INTERVAL - 6, "vn6", SEGMENT_INTERVAL - 6);
+        verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 8);
+    }
+
+    @Test
+    public void shouldRestoreWithNullsAndRepeatTimestamps() {
+        final List<DataRecord> records = new ArrayList<>();
+        records.add(new DataRecord("k", "to_be_replaced", SEGMENT_INTERVAL + 20));
+        records.add(new DataRecord("k", null, SEGMENT_INTERVAL - 10));
+        records.add(new DataRecord("k", "to_be_replaced", SEGMENT_INTERVAL - 10));
+        records.add(new DataRecord("k", null, SEGMENT_INTERVAL - 10));
+        records.add(new DataRecord("k", "to_be_replaced", SEGMENT_INTERVAL - 1));
+        records.add(new DataRecord("k", "to_be_replaced", SEGMENT_INTERVAL + 1));
+        records.add(new DataRecord("k", null, SEGMENT_INTERVAL - 1));
+        records.add(new DataRecord("k", null, SEGMENT_INTERVAL + 1));
+        records.add(new DataRecord("k", null, SEGMENT_INTERVAL + 10));
+        records.add(new DataRecord("k", null, SEGMENT_INTERVAL + 5));
+        records.add(new DataRecord("k", "vp5", SEGMENT_INTERVAL + 5));
+        records.add(new DataRecord("k", "to_be_replaced", SEGMENT_INTERVAL - 5));
+        records.add(new DataRecord("k", "vn5", SEGMENT_INTERVAL - 5));
+        records.add(new DataRecord("k", null, SEGMENT_INTERVAL + 20));
+        records.add(new DataRecord("k", null, SEGMENT_INTERVAL + 20));
+        records.add(new DataRecord("k", "vn6", SEGMENT_INTERVAL - 6));

Review Comment:
   Similar to N/5 -- hard to keep track without writing it down. Can we add some comments?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org