You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/07/08 12:13:22 UTC

[GitHub] [kafka] Gerrrr opened a new pull request, #12393: WIP: KAFKA-12549

Gerrrr opened a new pull request, #12393:
URL: https://github.com/apache/kafka/pull/12393

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] nicktelford commented on a diff in pull request #12393: WIP: KAFKA-12549 Prototype for transactional state stores

Posted by GitBox <gi...@apache.org>.
nicktelford commented on code in PR #12393:
URL: https://github.com/apache/kafka/pull/12393#discussion_r1003504997


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractTransactionalStore.java:
##########
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+
+public abstract class AbstractTransactionalStore<T extends KeyValueStore<Bytes, byte[]>> implements KeyValueStore<Bytes, byte[]> {
+    private static final byte MODIFICATION = 0x1;
+    private static final byte DELETION = 0x2;
+    private static final byte[] DELETION_VAL = {DELETION};
+
+    private StateStoreContext context;
+
+    static final String PREFIX = "transactional-";
+    //VisibleForTesting
+    public static final String TMP_SUFFIX = ".tmp";
+
+    private final Set<MergeKeyValueIterator> openIterators = Collections.synchronizedSet(new HashSet<>());
+
+    Map<String, Object> configs;
+    File stateDir;
+
+    private boolean consistencyEnabled = false;
+    private Position position;
+    protected OffsetCheckpoint positionCheckpoint;
+
+    KeyValueSegment createTmpStore(final String segmentName,
+                                   final String windowName,
+                                   final long segmentId,
+                                   final RocksDBMetricsRecorder metricsRecorder) {
+        return new KeyValueSegment(segmentName + TMP_SUFFIX,
+                                    windowName,
+                                    segmentId,
+                                    metricsRecorder);
+    }
+
+    public abstract T mainStore();
+
+    public abstract KeyValueSegment tmpStore();
+
+    @Deprecated
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+        if (context instanceof StateStoreContext) {
+            init((StateStoreContext) context, root);
+        } else {
+            throw new UnsupportedOperationException(
+                "Use TransactionalKeyValueStore#init(StateStoreContext, StateStore) instead."
+            );
+        }
+    }
+
+    @Override
+    public void init(final StateStoreContext context, final StateStore root) {
+        this.context = context;
+
+        doInit(context.appConfigs(), context.stateDir());
+        ((RocksDBStore) mainStore()).openDB(configs, stateDir);
+
+        final File positionCheckpointFile = new File(context.stateDir(), name() + ".position");
+        this.positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile);
+        this.position = StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint);
+        tmpStore().consistencyEnabled = consistencyEnabled;
+
+        // register and possibly restore the state from the logs
+        context.register(
+            root,
+            (RecordBatchingStateRestoreCallback) this::restoreBatch,
+            () -> StoreQueryUtils.checkpointPosition(positionCheckpoint, position)
+        );
+
+        consistencyEnabled = StreamsConfig.InternalConfig.getBoolean(
+            context.appConfigs(),
+            IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED,
+            false);
+    }
+
+    private void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> records) {
+        final Collection<ConsumerRecord<byte[], byte[]>> changelogRecords = records
+            .stream()
+            .map(record -> new ConsumerRecord<>(
+                record.topic(),
+                record.partition(),
+                record.offset(),
+                record.key(),
+                toUncommittedValue(record.value())))
+            .collect(Collectors.toList());
+        tmpStore().restoreBatch(changelogRecords);
+        commit(null);
+    }
+
+    void doInit(final Map<String, Object> configs, final File stateDir) {
+        this.configs = configs;
+        this.stateDir = stateDir;
+        tmpStore().openDB(configs, stateDir);
+    }
+
+    @Override
+    public synchronized void close() {
+        final HashSet<KeyValueIterator<Bytes, byte[]>> iterators;
+        synchronized (openIterators) {
+            iterators = new HashSet<>(openIterators);
+        }
+        for (final KeyValueIterator<Bytes, byte[]> iterator : iterators) {
+            iterator.close();
+        }
+
+        tmpStore().close();
+        mainStore().close();
+    }
+
+    @Override
+    public void commit(final Long changelogOffset) {
+        tmpStore().commit(changelogOffset);
+        doCommit();
+    }
+
+    @Override
+    public boolean recover(final Long changelogOffset) {
+        truncateTmpStore();
+        return true;
+    }
+
+    private void truncateTmpStore() {
+        try {
+            tmpStore().close();
+            tmpStore().destroy();
+            tmpStore().openDB(configs, stateDir);
+        } catch (final IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public boolean persistent() {
+        return mainStore().persistent();
+    }
+
+    @Override
+    public boolean isOpen() {
+        return tmpStore().isOpen() && mainStore().isOpen();
+    }
+
+    @Override
+    public void put(final Bytes key, final byte[] value) {
+        StoreQueryUtils.updatePosition(position, context);
+        tmpStore().put(key, toUncommittedValue(value));
+    }
+
+    @Override
+    public byte[] putIfAbsent(final Bytes key, final byte[] value) {
+        final byte[] prev = get(key);
+        if (prev == null) {
+            StoreQueryUtils.updatePosition(position, context);
+            tmpStore().put(key, toUncommittedValue(value));
+        }
+        return prev;
+    }
+
+    @Override
+    public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
+        StoreQueryUtils.updatePosition(position, context);
+        final List<KeyValue<Bytes, byte[]>> tmpEntries = entries
+            .stream()
+            .map(e -> new KeyValue<>(e.key, toUncommittedValue(e.value)))
+            .collect(Collectors.toList());
+        tmpStore().putAll(tmpEntries);
+    }
+
+    @Override
+    public byte[] delete(final Bytes key) {
+        final byte[] value = get(key);
+        tmpStore().put(key, DELETION_VAL);
+        return value;
+    }
+
+    @Override
+    public byte[] get(final Bytes key) {
+        final byte[] tmpValue = tmpStore().get(key);
+        if (tmpValue == null) {
+            return mainStore().get(key);
+        } else if (tmpValue[0] == DELETION) {
+            return null;
+        } else {
+            return fromUncommittedValue(tmpValue);
+        }
+    }
+
+    @Override
+    public KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes to) {
+        final MergeKeyValueIterator iterator = new MergeKeyValueIterator(
+            tmpStore().range(from, to), mainStore().range(from, to), openIterators);
+        openIterators.add(iterator);
+        return iterator;
+    }
+
+    @Override
+    public KeyValueIterator<Bytes, byte[]> reverseRange(final Bytes from, final Bytes to) {
+        final MergeKeyValueIterator iterator = new MergeKeyValueIterator(
+            tmpStore().reverseRange(from, to),
+            mainStore().reverseRange(from, to),
+            true,
+            openIterators);
+        openIterators.add(iterator);
+        return iterator;
+    }
+
+    @Override
+    public KeyValueIterator<Bytes, byte[]> all() {
+        final MergeKeyValueIterator iterator = new MergeKeyValueIterator(
+            tmpStore().all(), mainStore().all(), openIterators);
+        openIterators.add(iterator);
+        return iterator;
+    }
+
+    @Override
+    public KeyValueIterator<Bytes, byte[]> reverseAll() {
+        final MergeKeyValueIterator iterator = new MergeKeyValueIterator(
+            tmpStore().reverseAll(), mainStore().reverseAll(), true, openIterators);
+        openIterators.add(iterator);
+        return iterator;
+    }
+
+    @Override
+    public long approximateNumEntries() {
+        try {
+            return Math.addExact(tmpStore().approximateNumEntries(), mainStore().approximateNumEntries());
+        } catch (final ArithmeticException e) {
+            return Long.MAX_VALUE;
+        }
+    }
+
+    @Override
+    public Position getPosition() {
+        return position;
+    }
+
+    private void doCommit() {
+        try (final KeyValueIterator<Bytes, byte[]> it = tmpStore().all()) {
+            while (it.hasNext()) {
+                final KeyValue<Bytes, byte[]> kv = it.next();
+                mainStore().put(kv.key, fromUncommittedValue(kv.value));
+            }
+        }
+
+        truncateTmpStore();
+    }
+
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query, final PositionBound positionBound,
+        final QueryConfig config) {
+        return StoreQueryUtils.handleBasicQueries(
+            query,
+            positionBound,
+            config,
+            this,
+            position,
+            context
+        );
+    }
+
+    @Override
+    public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefixScan(final P prefix,
+                                                                                    final PS prefixKeySerializer) {
+        final MergeKeyValueIterator iterator = new MergeKeyValueIterator(
+            tmpStore().prefixScan(prefix, prefixKeySerializer),
+            mainStore().prefixScan(prefix, prefixKeySerializer),
+            openIterators);
+        openIterators.add(iterator);
+        return iterator;
+    }
+
+    private static KeyValue<Bytes, byte[]> fromUncommittedKV(final KeyValue<Bytes, byte[]> kv) {
+        if (kv.value[0] == DELETION) {
+            return null;
+        } else {
+            final byte[] value = new byte[kv.value.length - 1];
+            System.arraycopy(kv.value, 1, value, 0, value.length);
+            return new KeyValue<>(kv.key, value);
+        }
+    }
+
+    private static byte[] fromUncommittedValue(final byte[] value) {

Review Comment:
   Hmm, this is tricky. We could introduce a _third_ store, which stores uncommitted tombstones. For most use-cases, this store would be so lightweight that it could be in-memory. We could even use a heuristic to automatically choose the most appropriate type of store:
   
   - On startup, if an on-disk RocksDB tombstone store exists, use that.
   - Otherwise, use an in-memory store.
   - If the in-memory store exceeds a configurable threshold of keys, a RocksDB store is created to replace it, populated with the in-memory store keys.
   
   When querying, you'd query all 3 stores, using the tombstone store to omit results, provided the tombstones are newer than any values found in the other stores.
   
   I know it introduces _yet another store_, but I believe the majority of use-cases are so light on deletes that they'd comfortably fit in-memory, and therefore add very little overhead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] nicktelford commented on a diff in pull request #12393: WIP: KAFKA-12549 Prototype for transactional state stores

Posted by GitBox <gi...@apache.org>.
nicktelford commented on code in PR #12393:
URL: https://github.com/apache/kafka/pull/12393#discussion_r1008188697


##########
streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java:
##########
@@ -73,16 +75,25 @@ public class EOSUncleanShutdownIntegrationTest {
 
     @SuppressWarnings("deprecation")
     @Parameterized.Parameters(name = "{0}")
-    public static Collection<String[]> data() {
-        return Arrays.asList(new String[][] {
-            {StreamsConfig.EXACTLY_ONCE},
-            {StreamsConfig.EXACTLY_ONCE_V2}
-        });
+    public static Collection<Object[]> data() {
+        final List<String> eosConfigs = Arrays.asList(
+            StreamsConfig.EXACTLY_ONCE,
+            StreamsConfig.EXACTLY_ONCE_V2
+        );
+        final List<Object[]> data = new ArrayList<>(eosConfigs.size() * 2);
+        for (final String eosConfig : eosConfigs) {
+            //data.add(new Object[] {eosConfig, false});
+            data.add(new Object[] {eosConfig, true});

Review Comment:
   Looks like you intended for this test to be parameterized for both transactional and non-transactional stores, but have not yet completed it?



##########
streams/src/test/java/org/apache/kafka/streams/integration/EOSUncleanShutdownIntegrationTest.java:
##########
@@ -115,6 +126,9 @@ public void shouldWorkWithUncleanShutdownWipeOutStateStore() throws InterruptedE
         final String appId = "shouldWorkWithUncleanShutdownWipeOutStateStore";
         STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
         STREAMS_CONFIG.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig);
+        if (stateStoreTransactional) {

Review Comment:
   Does this test make sense when the state stores are transactional, since transactional state isn't wiped on unclean shutdown?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] nicktelford commented on a diff in pull request #12393: WIP: KAFKA-12549 Prototype for transactional state stores

Posted by GitBox <gi...@apache.org>.
nicktelford commented on code in PR #12393:
URL: https://github.com/apache/kafka/pull/12393#discussion_r991332250


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java:
##########
@@ -115,7 +115,7 @@ static void closeStateManager(final Logger log,
                                   final StateDirectory stateDirectory,
                                   final TaskType taskType) {
         // if EOS is enabled, wipe out the whole state store for unclean close since it is now invalid
-        final boolean wipeStateStore = !closeClean && eosEnabled;
+        final boolean wipeStateStore = !closeClean && eosEnabled && !stateMgr.transactional();

Review Comment:
   If there are transactional and non-transactional stores in the same Task, any corruption in that Task will cause _all_ state in that Task to be wiped, including the transactional stores, because there's no other way to recover the non-transactional stores.
   
   For this reason, it's likely very important that every store within the same sub-topology is configured with the same transactionality.
   
   Maybe we should include a warning in `ProcessorState#Manager#transactional()` (and `GlobalStateManager#transactional()`) when there's a mix of stores with different transactionality?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractTransactionalStore.java:
##########
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+
+public abstract class AbstractTransactionalStore<T extends KeyValueStore<Bytes, byte[]>> implements KeyValueStore<Bytes, byte[]> {
+    private static final byte MODIFICATION = 0x1;
+    private static final byte DELETION = 0x2;
+    private static final byte[] DELETION_VAL = {DELETION};
+
+    private StateStoreContext context;
+
+    static final String PREFIX = "transactional-";
+    //VisibleForTesting
+    public static final String TMP_SUFFIX = ".tmp";
+
+    private final Set<MergeKeyValueIterator> openIterators = Collections.synchronizedSet(new HashSet<>());
+
+    Map<String, Object> configs;
+    File stateDir;
+
+    private boolean consistencyEnabled = false;
+    private Position position;
+    protected OffsetCheckpoint positionCheckpoint;
+
+    KeyValueSegment createTmpStore(final String segmentName,
+                                   final String windowName,
+                                   final long segmentId,
+                                   final RocksDBMetricsRecorder metricsRecorder) {
+        return new KeyValueSegment(segmentName + TMP_SUFFIX,
+                                    windowName,
+                                    segmentId,
+                                    metricsRecorder);
+    }
+
+    public abstract T mainStore();
+
+    public abstract KeyValueSegment tmpStore();
+
+    @Deprecated
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+        if (context instanceof StateStoreContext) {
+            init((StateStoreContext) context, root);
+        } else {
+            throw new UnsupportedOperationException(
+                "Use TransactionalKeyValueStore#init(StateStoreContext, StateStore) instead."
+            );
+        }
+    }
+
+    @Override
+    public void init(final StateStoreContext context, final StateStore root) {
+        this.context = context;
+
+        doInit(context.appConfigs(), context.stateDir());
+        ((RocksDBStore) mainStore()).openDB(configs, stateDir);
+
+        final File positionCheckpointFile = new File(context.stateDir(), name() + ".position");
+        this.positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile);
+        this.position = StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint);
+        tmpStore().consistencyEnabled = consistencyEnabled;
+
+        // register and possibly restore the state from the logs
+        context.register(
+            root,
+            (RecordBatchingStateRestoreCallback) this::restoreBatch,
+            () -> StoreQueryUtils.checkpointPosition(positionCheckpoint, position)
+        );
+
+        consistencyEnabled = StreamsConfig.InternalConfig.getBoolean(
+            context.appConfigs(),
+            IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED,
+            false);
+    }
+
+    private void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> records) {
+        final Collection<ConsumerRecord<byte[], byte[]>> changelogRecords = records
+            .stream()
+            .map(record -> new ConsumerRecord<>(
+                record.topic(),
+                record.partition(),
+                record.offset(),
+                record.key(),
+                toUncommittedValue(record.value())))
+            .collect(Collectors.toList());
+        tmpStore().restoreBatch(changelogRecords);
+        commit(null);
+    }
+
+    void doInit(final Map<String, Object> configs, final File stateDir) {
+        this.configs = configs;
+        this.stateDir = stateDir;
+        tmpStore().openDB(configs, stateDir);
+    }
+
+    @Override
+    public synchronized void close() {
+        final HashSet<KeyValueIterator<Bytes, byte[]>> iterators;
+        synchronized (openIterators) {
+            iterators = new HashSet<>(openIterators);
+        }
+        for (final KeyValueIterator<Bytes, byte[]> iterator : iterators) {
+            iterator.close();
+        }
+
+        tmpStore().close();
+        mainStore().close();
+    }
+
+    @Override
+    public void commit(final Long changelogOffset) {
+        tmpStore().commit(changelogOffset);
+        doCommit();
+    }

Review Comment:
   On a related note: since flushing memtables is an expensive operation, and must be tightly coupled with checkpointing, we may want to separate out checkpointing from commit by introducing a new `checkpoint.interval.ms`, (which must be greater than or equal to commit.interval.ms).
   
   This would enable users to have a low commit.interval.ms (as the default is 100ms today), but reduce the write-amplification associated with a high flush-rate of memtables.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractTransactionalStore.java:
##########
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+
+public abstract class AbstractTransactionalStore<T extends KeyValueStore<Bytes, byte[]>> implements KeyValueStore<Bytes, byte[]> {
+    private static final byte MODIFICATION = 0x1;
+    private static final byte DELETION = 0x2;
+    private static final byte[] DELETION_VAL = {DELETION};
+
+    private StateStoreContext context;
+
+    static final String PREFIX = "transactional-";
+    //VisibleForTesting
+    public static final String TMP_SUFFIX = ".tmp";
+
+    private final Set<MergeKeyValueIterator> openIterators = Collections.synchronizedSet(new HashSet<>());
+
+    Map<String, Object> configs;
+    File stateDir;
+
+    private boolean consistencyEnabled = false;
+    private Position position;
+    protected OffsetCheckpoint positionCheckpoint;
+
+    KeyValueSegment createTmpStore(final String segmentName,
+                                   final String windowName,
+                                   final long segmentId,
+                                   final RocksDBMetricsRecorder metricsRecorder) {
+        return new KeyValueSegment(segmentName + TMP_SUFFIX,
+                                    windowName,
+                                    segmentId,
+                                    metricsRecorder);
+    }
+
+    public abstract T mainStore();
+
+    public abstract KeyValueSegment tmpStore();
+
+    @Deprecated
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+        if (context instanceof StateStoreContext) {
+            init((StateStoreContext) context, root);
+        } else {
+            throw new UnsupportedOperationException(
+                "Use TransactionalKeyValueStore#init(StateStoreContext, StateStore) instead."
+            );
+        }
+    }
+
+    @Override
+    public void init(final StateStoreContext context, final StateStore root) {
+        this.context = context;
+
+        doInit(context.appConfigs(), context.stateDir());
+        ((RocksDBStore) mainStore()).openDB(configs, stateDir);
+
+        final File positionCheckpointFile = new File(context.stateDir(), name() + ".position");
+        this.positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile);
+        this.position = StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint);
+        tmpStore().consistencyEnabled = consistencyEnabled;

Review Comment:
   Looks like `consistencyEnabled` is used _before_ the value is read from the config on line 114



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractTransactionalStore.java:
##########
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+
+public abstract class AbstractTransactionalStore<T extends KeyValueStore<Bytes, byte[]>> implements KeyValueStore<Bytes, byte[]> {
+    private static final byte MODIFICATION = 0x1;
+    private static final byte DELETION = 0x2;
+    private static final byte[] DELETION_VAL = {DELETION};
+
+    private StateStoreContext context;
+
+    static final String PREFIX = "transactional-";
+    //VisibleForTesting
+    public static final String TMP_SUFFIX = ".tmp";
+
+    private final Set<MergeKeyValueIterator> openIterators = Collections.synchronizedSet(new HashSet<>());
+
+    Map<String, Object> configs;
+    File stateDir;
+
+    private boolean consistencyEnabled = false;
+    private Position position;
+    protected OffsetCheckpoint positionCheckpoint;
+
+    KeyValueSegment createTmpStore(final String segmentName,
+                                   final String windowName,
+                                   final long segmentId,
+                                   final RocksDBMetricsRecorder metricsRecorder) {
+        return new KeyValueSegment(segmentName + TMP_SUFFIX,
+                                    windowName,
+                                    segmentId,
+                                    metricsRecorder);
+    }
+
+    public abstract T mainStore();
+
+    public abstract KeyValueSegment tmpStore();
+
+    @Deprecated
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+        if (context instanceof StateStoreContext) {
+            init((StateStoreContext) context, root);
+        } else {
+            throw new UnsupportedOperationException(
+                "Use TransactionalKeyValueStore#init(StateStoreContext, StateStore) instead."
+            );
+        }
+    }
+
+    @Override
+    public void init(final StateStoreContext context, final StateStore root) {
+        this.context = context;
+
+        doInit(context.appConfigs(), context.stateDir());
+        ((RocksDBStore) mainStore()).openDB(configs, stateDir);
+
+        final File positionCheckpointFile = new File(context.stateDir(), name() + ".position");
+        this.positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile);
+        this.position = StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint);
+        tmpStore().consistencyEnabled = consistencyEnabled;
+
+        // register and possibly restore the state from the logs
+        context.register(
+            root,
+            (RecordBatchingStateRestoreCallback) this::restoreBatch,
+            () -> StoreQueryUtils.checkpointPosition(positionCheckpoint, position)
+        );
+
+        consistencyEnabled = StreamsConfig.InternalConfig.getBoolean(
+            context.appConfigs(),
+            IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED,
+            false);
+    }
+
+    private void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> records) {
+        final Collection<ConsumerRecord<byte[], byte[]>> changelogRecords = records
+            .stream()
+            .map(record -> new ConsumerRecord<>(
+                record.topic(),
+                record.partition(),
+                record.offset(),
+                record.key(),
+                toUncommittedValue(record.value())))
+            .collect(Collectors.toList());
+        tmpStore().restoreBatch(changelogRecords);
+        commit(null);
+    }
+
+    void doInit(final Map<String, Object> configs, final File stateDir) {
+        this.configs = configs;
+        this.stateDir = stateDir;
+        tmpStore().openDB(configs, stateDir);
+    }
+
+    @Override
+    public synchronized void close() {
+        final HashSet<KeyValueIterator<Bytes, byte[]>> iterators;
+        synchronized (openIterators) {
+            iterators = new HashSet<>(openIterators);
+        }
+        for (final KeyValueIterator<Bytes, byte[]> iterator : iterators) {
+            iterator.close();
+        }
+
+        tmpStore().close();
+        mainStore().close();
+    }
+
+    @Override
+    public void commit(final Long changelogOffset) {
+        tmpStore().commit(changelogOffset);
+        doCommit();
+    }

Review Comment:
   It looks like the `commit` path doesn't call `mainStore#commit()`? This means that the checkpointing (that is done immediately after `commit` returns) will checkpoint offsets for data only stored in memtables for `mainStore`, that have not yet been flushed to disk (SSTables).
   
   This means that, in the event of a crash, since the data in the memtables would be lost, data would be lost from the `mainStore`, despite the checkpoint indicating that the data should be safe on-disk.
   
   I think we need to call `mainStore.commit()` here, or find another way to ensure that the checkpoint file is only updated once the memtables are flushed to disk, (perhaps using [EventListener](https://javadoc.io/static/org.rocksdb/rocksdbjni/6.20.3/org/rocksdb/EventListener.html)).



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractTransactionalStore.java:
##########
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+
+public abstract class AbstractTransactionalStore<T extends KeyValueStore<Bytes, byte[]>> implements KeyValueStore<Bytes, byte[]> {
+    private static final byte MODIFICATION = 0x1;
+    private static final byte DELETION = 0x2;
+    private static final byte[] DELETION_VAL = {DELETION};
+
+    private StateStoreContext context;
+
+    static final String PREFIX = "transactional-";
+    //VisibleForTesting
+    public static final String TMP_SUFFIX = ".tmp";
+
+    private final Set<MergeKeyValueIterator> openIterators = Collections.synchronizedSet(new HashSet<>());
+
+    Map<String, Object> configs;
+    File stateDir;
+
+    private boolean consistencyEnabled = false;
+    private Position position;
+    protected OffsetCheckpoint positionCheckpoint;
+
+    KeyValueSegment createTmpStore(final String segmentName,
+                                   final String windowName,
+                                   final long segmentId,
+                                   final RocksDBMetricsRecorder metricsRecorder) {
+        return new KeyValueSegment(segmentName + TMP_SUFFIX,
+                                    windowName,
+                                    segmentId,
+                                    metricsRecorder);
+    }
+
+    public abstract T mainStore();
+
+    public abstract KeyValueSegment tmpStore();
+
+    @Deprecated
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+        if (context instanceof StateStoreContext) {
+            init((StateStoreContext) context, root);
+        } else {
+            throw new UnsupportedOperationException(
+                "Use TransactionalKeyValueStore#init(StateStoreContext, StateStore) instead."
+            );
+        }
+    }
+
+    @Override
+    public void init(final StateStoreContext context, final StateStore root) {
+        this.context = context;
+
+        doInit(context.appConfigs(), context.stateDir());
+        ((RocksDBStore) mainStore()).openDB(configs, stateDir);
+
+        final File positionCheckpointFile = new File(context.stateDir(), name() + ".position");
+        this.positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile);
+        this.position = StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint);
+        tmpStore().consistencyEnabled = consistencyEnabled;
+
+        // register and possibly restore the state from the logs
+        context.register(
+            root,
+            (RecordBatchingStateRestoreCallback) this::restoreBatch,
+            () -> StoreQueryUtils.checkpointPosition(positionCheckpoint, position)
+        );
+
+        consistencyEnabled = StreamsConfig.InternalConfig.getBoolean(
+            context.appConfigs(),
+            IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED,
+            false);
+    }
+
+    private void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> records) {
+        final Collection<ConsumerRecord<byte[], byte[]>> changelogRecords = records
+            .stream()
+            .map(record -> new ConsumerRecord<>(
+                record.topic(),
+                record.partition(),
+                record.offset(),
+                record.key(),
+                toUncommittedValue(record.value())))
+            .collect(Collectors.toList());
+        tmpStore().restoreBatch(changelogRecords);
+        commit(null);
+    }
+
+    void doInit(final Map<String, Object> configs, final File stateDir) {
+        this.configs = configs;
+        this.stateDir = stateDir;
+        tmpStore().openDB(configs, stateDir);
+    }
+
+    @Override
+    public synchronized void close() {
+        final HashSet<KeyValueIterator<Bytes, byte[]>> iterators;
+        synchronized (openIterators) {
+            iterators = new HashSet<>(openIterators);
+        }
+        for (final KeyValueIterator<Bytes, byte[]> iterator : iterators) {
+            iterator.close();
+        }
+
+        tmpStore().close();
+        mainStore().close();
+    }
+
+    @Override
+    public void commit(final Long changelogOffset) {
+        tmpStore().commit(changelogOffset);
+        doCommit();
+    }
+
+    @Override
+    public boolean recover(final Long changelogOffset) {
+        truncateTmpStore();
+        return true;
+    }
+
+    private void truncateTmpStore() {
+        try {
+            tmpStore().close();
+            tmpStore().destroy();
+            tmpStore().openDB(configs, stateDir);
+        } catch (final IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public boolean persistent() {
+        return mainStore().persistent();
+    }
+
+    @Override
+    public boolean isOpen() {
+        return tmpStore().isOpen() && mainStore().isOpen();
+    }
+
+    @Override
+    public void put(final Bytes key, final byte[] value) {
+        StoreQueryUtils.updatePosition(position, context);
+        tmpStore().put(key, toUncommittedValue(value));
+    }
+
+    @Override
+    public byte[] putIfAbsent(final Bytes key, final byte[] value) {
+        final byte[] prev = get(key);
+        if (prev == null) {
+            StoreQueryUtils.updatePosition(position, context);
+            tmpStore().put(key, toUncommittedValue(value));
+        }
+        return prev;
+    }
+
+    @Override
+    public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
+        StoreQueryUtils.updatePosition(position, context);
+        final List<KeyValue<Bytes, byte[]>> tmpEntries = entries
+            .stream()
+            .map(e -> new KeyValue<>(e.key, toUncommittedValue(e.value)))
+            .collect(Collectors.toList());
+        tmpStore().putAll(tmpEntries);
+    }
+
+    @Override
+    public byte[] delete(final Bytes key) {
+        final byte[] value = get(key);
+        tmpStore().put(key, DELETION_VAL);
+        return value;
+    }
+
+    @Override
+    public byte[] get(final Bytes key) {
+        final byte[] tmpValue = tmpStore().get(key);
+        if (tmpValue == null) {
+            return mainStore().get(key);
+        } else if (tmpValue[0] == DELETION) {
+            return null;
+        } else {
+            return fromUncommittedValue(tmpValue);
+        }
+    }
+
+    @Override
+    public KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes to) {
+        final MergeKeyValueIterator iterator = new MergeKeyValueIterator(
+            tmpStore().range(from, to), mainStore().range(from, to), openIterators);
+        openIterators.add(iterator);
+        return iterator;
+    }
+
+    @Override
+    public KeyValueIterator<Bytes, byte[]> reverseRange(final Bytes from, final Bytes to) {
+        final MergeKeyValueIterator iterator = new MergeKeyValueIterator(
+            tmpStore().reverseRange(from, to),
+            mainStore().reverseRange(from, to),
+            true,
+            openIterators);
+        openIterators.add(iterator);
+        return iterator;
+    }
+
+    @Override
+    public KeyValueIterator<Bytes, byte[]> all() {
+        final MergeKeyValueIterator iterator = new MergeKeyValueIterator(
+            tmpStore().all(), mainStore().all(), openIterators);
+        openIterators.add(iterator);
+        return iterator;
+    }
+
+    @Override
+    public KeyValueIterator<Bytes, byte[]> reverseAll() {
+        final MergeKeyValueIterator iterator = new MergeKeyValueIterator(
+            tmpStore().reverseAll(), mainStore().reverseAll(), true, openIterators);
+        openIterators.add(iterator);
+        return iterator;
+    }
+
+    @Override
+    public long approximateNumEntries() {
+        try {
+            return Math.addExact(tmpStore().approximateNumEntries(), mainStore().approximateNumEntries());
+        } catch (final ArithmeticException e) {
+            return Long.MAX_VALUE;
+        }
+    }
+
+    @Override
+    public Position getPosition() {
+        return position;
+    }
+
+    private void doCommit() {
+        try (final KeyValueIterator<Bytes, byte[]> it = tmpStore().all()) {
+            while (it.hasNext()) {
+                final KeyValue<Bytes, byte[]> kv = it.next();
+                mainStore().put(kv.key, fromUncommittedValue(kv.value));
+            }
+        }
+
+        truncateTmpStore();
+    }
+
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query, final PositionBound positionBound,
+        final QueryConfig config) {
+        return StoreQueryUtils.handleBasicQueries(
+            query,
+            positionBound,
+            config,
+            this,
+            position,
+            context
+        );
+    }
+
+    @Override
+    public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefixScan(final P prefix,
+                                                                                    final PS prefixKeySerializer) {
+        final MergeKeyValueIterator iterator = new MergeKeyValueIterator(
+            tmpStore().prefixScan(prefix, prefixKeySerializer),
+            mainStore().prefixScan(prefix, prefixKeySerializer),
+            openIterators);
+        openIterators.add(iterator);
+        return iterator;
+    }
+
+    private static KeyValue<Bytes, byte[]> fromUncommittedKV(final KeyValue<Bytes, byte[]> kv) {
+        if (kv.value[0] == DELETION) {
+            return null;
+        } else {
+            final byte[] value = new byte[kv.value.length - 1];
+            System.arraycopy(kv.value, 1, value, 0, value.length);
+            return new KeyValue<>(kv.key, value);
+        }
+    }
+
+    private static byte[] fromUncommittedValue(final byte[] value) {

Review Comment:
   Instead of having to treat deletion tombstones specially like this and paying an extra copy on write, and another copy on commit, could we make use of the `IngestExternalFile` RocksDB API to simply incorporate the SST files from the `tmpStore` directly in to the `mainStore`?
   
   See:
   - https://issues.apache.org/jira/browse/KAFKA-13239
   - http://rocksdb.org/blog/2017/02/17/bulkoad-ingest-sst-file.html
   - https://github.com/facebook/rocksdb/wiki/Creating-and-Ingesting-SST-files
   - https://github.com/facebook/rocksdb/blob/main/include/rocksdb/db.h
   
   I don't think this is a deal-breaker though, and could be implemented as an optimization in the future instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: KAFKA-12549 Prototype for transactional state stores [kafka]

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax closed pull request #12393: WIP: KAFKA-12549 Prototype for transactional state stores
URL: https://github.com/apache/kafka/pull/12393


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on pull request #12393: WIP: KAFKA-12549 Prototype for transactional state stores

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on PR #12393:
URL: https://github.com/apache/kafka/pull/12393#issuecomment-1249922054

   Hello @Gerrrr , just checking with you is this PR ready for reviews now?
   
   Also, I'm thinking if we can break it up into smaller PRs (e.g. one with public API + configs, one with integration tests, etc) so that we can merge them quicker: past experience shows that big PRs would turn longer time to merge compare with break-up smaller ones aggregated, hence I'm thinking if we can take any ways to make the KIP adopted quicker.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] nicktelford commented on a diff in pull request #12393: WIP: KAFKA-12549 Prototype for transactional state stores

Posted by GitBox <gi...@apache.org>.
nicktelford commented on code in PR #12393:
URL: https://github.com/apache/kafka/pull/12393#discussion_r1003497784


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractTransactionalStore.java:
##########
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+
+public abstract class AbstractTransactionalStore<T extends KeyValueStore<Bytes, byte[]>> implements KeyValueStore<Bytes, byte[]> {
+    private static final byte MODIFICATION = 0x1;
+    private static final byte DELETION = 0x2;
+    private static final byte[] DELETION_VAL = {DELETION};
+
+    private StateStoreContext context;
+
+    static final String PREFIX = "transactional-";
+    //VisibleForTesting
+    public static final String TMP_SUFFIX = ".tmp";
+
+    private final Set<MergeKeyValueIterator> openIterators = Collections.synchronizedSet(new HashSet<>());
+
+    Map<String, Object> configs;
+    File stateDir;
+
+    private boolean consistencyEnabled = false;
+    private Position position;
+    protected OffsetCheckpoint positionCheckpoint;
+
+    KeyValueSegment createTmpStore(final String segmentName,
+                                   final String windowName,
+                                   final long segmentId,
+                                   final RocksDBMetricsRecorder metricsRecorder) {
+        return new KeyValueSegment(segmentName + TMP_SUFFIX,
+                                    windowName,
+                                    segmentId,
+                                    metricsRecorder);
+    }
+
+    public abstract T mainStore();
+
+    public abstract KeyValueSegment tmpStore();
+
+    @Deprecated
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+        if (context instanceof StateStoreContext) {
+            init((StateStoreContext) context, root);
+        } else {
+            throw new UnsupportedOperationException(
+                "Use TransactionalKeyValueStore#init(StateStoreContext, StateStore) instead."
+            );
+        }
+    }
+
+    @Override
+    public void init(final StateStoreContext context, final StateStore root) {
+        this.context = context;
+
+        doInit(context.appConfigs(), context.stateDir());
+        ((RocksDBStore) mainStore()).openDB(configs, stateDir);
+
+        final File positionCheckpointFile = new File(context.stateDir(), name() + ".position");
+        this.positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile);
+        this.position = StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint);
+        tmpStore().consistencyEnabled = consistencyEnabled;
+
+        // register and possibly restore the state from the logs
+        context.register(
+            root,
+            (RecordBatchingStateRestoreCallback) this::restoreBatch,
+            () -> StoreQueryUtils.checkpointPosition(positionCheckpoint, position)
+        );
+
+        consistencyEnabled = StreamsConfig.InternalConfig.getBoolean(
+            context.appConfigs(),
+            IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED,
+            false);
+    }
+
+    private void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> records) {
+        final Collection<ConsumerRecord<byte[], byte[]>> changelogRecords = records
+            .stream()
+            .map(record -> new ConsumerRecord<>(
+                record.topic(),
+                record.partition(),
+                record.offset(),
+                record.key(),
+                toUncommittedValue(record.value())))
+            .collect(Collectors.toList());
+        tmpStore().restoreBatch(changelogRecords);
+        commit(null);
+    }
+
+    void doInit(final Map<String, Object> configs, final File stateDir) {
+        this.configs = configs;
+        this.stateDir = stateDir;
+        tmpStore().openDB(configs, stateDir);
+    }
+
+    @Override
+    public synchronized void close() {
+        final HashSet<KeyValueIterator<Bytes, byte[]>> iterators;
+        synchronized (openIterators) {
+            iterators = new HashSet<>(openIterators);
+        }
+        for (final KeyValueIterator<Bytes, byte[]> iterator : iterators) {
+            iterator.close();
+        }
+
+        tmpStore().close();
+        mainStore().close();
+    }
+
+    @Override
+    public void commit(final Long changelogOffset) {
+        tmpStore().commit(changelogOffset);
+        doCommit();
+    }

Review Comment:
   > How about we address it in a follow-up issue?
   Good idea. It introduces a new configuration option, which will require a KIP.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractTransactionalStore.java:
##########
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+
+public abstract class AbstractTransactionalStore<T extends KeyValueStore<Bytes, byte[]>> implements KeyValueStore<Bytes, byte[]> {
+    private static final byte MODIFICATION = 0x1;
+    private static final byte DELETION = 0x2;
+    private static final byte[] DELETION_VAL = {DELETION};
+
+    private StateStoreContext context;
+
+    static final String PREFIX = "transactional-";
+    //VisibleForTesting
+    public static final String TMP_SUFFIX = ".tmp";
+
+    private final Set<MergeKeyValueIterator> openIterators = Collections.synchronizedSet(new HashSet<>());
+
+    Map<String, Object> configs;
+    File stateDir;
+
+    private boolean consistencyEnabled = false;
+    private Position position;
+    protected OffsetCheckpoint positionCheckpoint;
+
+    KeyValueSegment createTmpStore(final String segmentName,
+                                   final String windowName,
+                                   final long segmentId,
+                                   final RocksDBMetricsRecorder metricsRecorder) {
+        return new KeyValueSegment(segmentName + TMP_SUFFIX,
+                                    windowName,
+                                    segmentId,
+                                    metricsRecorder);
+    }
+
+    public abstract T mainStore();
+
+    public abstract KeyValueSegment tmpStore();
+
+    @Deprecated
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+        if (context instanceof StateStoreContext) {
+            init((StateStoreContext) context, root);
+        } else {
+            throw new UnsupportedOperationException(
+                "Use TransactionalKeyValueStore#init(StateStoreContext, StateStore) instead."
+            );
+        }
+    }
+
+    @Override
+    public void init(final StateStoreContext context, final StateStore root) {
+        this.context = context;
+
+        doInit(context.appConfigs(), context.stateDir());
+        ((RocksDBStore) mainStore()).openDB(configs, stateDir);
+
+        final File positionCheckpointFile = new File(context.stateDir(), name() + ".position");
+        this.positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile);
+        this.position = StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint);
+        tmpStore().consistencyEnabled = consistencyEnabled;
+
+        // register and possibly restore the state from the logs
+        context.register(
+            root,
+            (RecordBatchingStateRestoreCallback) this::restoreBatch,
+            () -> StoreQueryUtils.checkpointPosition(positionCheckpoint, position)
+        );
+
+        consistencyEnabled = StreamsConfig.InternalConfig.getBoolean(
+            context.appConfigs(),
+            IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED,
+            false);
+    }
+
+    private void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> records) {
+        final Collection<ConsumerRecord<byte[], byte[]>> changelogRecords = records
+            .stream()
+            .map(record -> new ConsumerRecord<>(
+                record.topic(),
+                record.partition(),
+                record.offset(),
+                record.key(),
+                toUncommittedValue(record.value())))
+            .collect(Collectors.toList());
+        tmpStore().restoreBatch(changelogRecords);
+        commit(null);
+    }
+
+    void doInit(final Map<String, Object> configs, final File stateDir) {
+        this.configs = configs;
+        this.stateDir = stateDir;
+        tmpStore().openDB(configs, stateDir);
+    }
+
+    @Override
+    public synchronized void close() {
+        final HashSet<KeyValueIterator<Bytes, byte[]>> iterators;
+        synchronized (openIterators) {
+            iterators = new HashSet<>(openIterators);
+        }
+        for (final KeyValueIterator<Bytes, byte[]> iterator : iterators) {
+            iterator.close();
+        }
+
+        tmpStore().close();
+        mainStore().close();
+    }
+
+    @Override
+    public void commit(final Long changelogOffset) {
+        tmpStore().commit(changelogOffset);
+        doCommit();
+    }

Review Comment:
   > How about we address it in a follow-up issue?
   
   Good idea. It introduces a new configuration option, which will require a KIP.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] nicktelford commented on a diff in pull request #12393: WIP: KAFKA-12549 Prototype for transactional state stores

Posted by GitBox <gi...@apache.org>.
nicktelford commented on code in PR #12393:
URL: https://github.com/apache/kafka/pull/12393#discussion_r989161053


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java:
##########
@@ -115,7 +115,7 @@ static void closeStateManager(final Logger log,
                                   final StateDirectory stateDirectory,
                                   final TaskType taskType) {
         // if EOS is enabled, wipe out the whole state store for unclean close since it is now invalid
-        final boolean wipeStateStore = !closeClean && eosEnabled;
+        final boolean wipeStateStore = !closeClean && eosEnabled && stateMgr.transactional();

Review Comment:
   Shouldn't this be `&& !stateMgr.transactional()`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Gerrrr commented on a diff in pull request #12393: WIP: KAFKA-12549 Prototype for transactional state stores

Posted by GitBox <gi...@apache.org>.
Gerrrr commented on code in PR #12393:
URL: https://github.com/apache/kafka/pull/12393#discussion_r998928697


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java:
##########
@@ -115,7 +115,7 @@ static void closeStateManager(final Logger log,
                                   final StateDirectory stateDirectory,
                                   final TaskType taskType) {
         // if EOS is enabled, wipe out the whole state store for unclean close since it is now invalid
-        final boolean wipeStateStore = !closeClean && eosEnabled;
+        final boolean wipeStateStore = !closeClean && eosEnabled && !stateMgr.transactional();

Review Comment:
   Fixed in [a5e1fe3](https://github.com/apache/kafka/pull/12393/commits/a5e1fe3122280429f2eb1c5ce119dd756620ffb3).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] littlehorse-eng commented on a diff in pull request #12393: WIP: KAFKA-12549 Prototype for transactional state stores

Posted by GitBox <gi...@apache.org>.
littlehorse-eng commented on code in PR #12393:
URL: https://github.com/apache/kafka/pull/12393#discussion_r1002288689


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractTransactionalStore.java:
##########
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+
+public abstract class AbstractTransactionalStore<T extends KeyValueStore<Bytes, byte[]>> implements KeyValueStore<Bytes, byte[]> {
+    private static final byte MODIFICATION = 0x1;
+    private static final byte DELETION = 0x2;
+    private static final byte[] DELETION_VAL = {DELETION};
+
+    private StateStoreContext context;
+
+    static final String PREFIX = "transactional-";
+    //VisibleForTesting
+    public static final String TMP_SUFFIX = ".tmp";
+
+    private final Set<MergeKeyValueIterator> openIterators = Collections.synchronizedSet(new HashSet<>());
+
+    Map<String, Object> configs;
+    File stateDir;
+
+    private boolean consistencyEnabled = false;
+    private Position position;
+    protected OffsetCheckpoint positionCheckpoint;
+
+    KeyValueSegment createTmpStore(final String segmentName,
+                                   final String windowName,
+                                   final long segmentId,
+                                   final RocksDBMetricsRecorder metricsRecorder) {
+        return new KeyValueSegment(segmentName + TMP_SUFFIX,
+                                    windowName,
+                                    segmentId,
+                                    metricsRecorder);
+    }
+
+    public abstract T mainStore();
+
+    public abstract KeyValueSegment tmpStore();
+
+    @Deprecated
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+        if (context instanceof StateStoreContext) {
+            init((StateStoreContext) context, root);
+        } else {
+            throw new UnsupportedOperationException(
+                "Use TransactionalKeyValueStore#init(StateStoreContext, StateStore) instead."
+            );
+        }
+    }
+
+    @Override
+    public void init(final StateStoreContext context, final StateStore root) {
+        this.context = context;
+
+        doInit(context.appConfigs(), context.stateDir());
+        ((RocksDBStore) mainStore()).openDB(configs, stateDir);
+
+        final File positionCheckpointFile = new File(context.stateDir(), name() + ".position");
+        this.positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile);
+        this.position = StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint);
+        tmpStore().consistencyEnabled = consistencyEnabled;
+
+        // register and possibly restore the state from the logs
+        context.register(
+            root,
+            (RecordBatchingStateRestoreCallback) this::restoreBatch,
+            () -> StoreQueryUtils.checkpointPosition(positionCheckpoint, position)
+        );
+
+        consistencyEnabled = StreamsConfig.InternalConfig.getBoolean(
+            context.appConfigs(),
+            IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED,
+            false);
+    }
+
+    private void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> records) {
+        final Collection<ConsumerRecord<byte[], byte[]>> changelogRecords = records
+            .stream()
+            .map(record -> new ConsumerRecord<>(
+                record.topic(),
+                record.partition(),
+                record.offset(),
+                record.key(),
+                toUncommittedValue(record.value())))
+            .collect(Collectors.toList());
+        tmpStore().restoreBatch(changelogRecords);
+        commit(null);
+    }
+
+    void doInit(final Map<String, Object> configs, final File stateDir) {
+        this.configs = configs;
+        this.stateDir = stateDir;
+        tmpStore().openDB(configs, stateDir);
+    }
+
+    @Override
+    public synchronized void close() {
+        final HashSet<KeyValueIterator<Bytes, byte[]>> iterators;
+        synchronized (openIterators) {
+            iterators = new HashSet<>(openIterators);
+        }
+        for (final KeyValueIterator<Bytes, byte[]> iterator : iterators) {
+            iterator.close();
+        }
+
+        tmpStore().close();
+        mainStore().close();
+    }
+
+    @Override
+    public void commit(final Long changelogOffset) {
+        tmpStore().commit(changelogOffset);
+        doCommit();
+    }

Review Comment:
   I like the idea of decoupling rocksdb flush from the kafka producer commit because a slow commit/flush process would greatly reduce the feasibility of having a small commit interval (i.e. 50ms) in latency-sensitive applications.
   
   However, one interesting side effect of coupling the producer.commit() with tmpStore.commit() is that it would enable a later KIP which allows Interactive Queries to only read committed values by directing the IQ to only the main store. As we stand, IQ can read uncommitted values from the store (generally that's not a _huge_ problem since most processing is deterministic).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Gerrrr commented on a diff in pull request #12393: WIP: KAFKA-12549 Prototype for transactional state stores

Posted by GitBox <gi...@apache.org>.
Gerrrr commented on code in PR #12393:
URL: https://github.com/apache/kafka/pull/12393#discussion_r989362523


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java:
##########
@@ -115,7 +115,7 @@ static void closeStateManager(final Logger log,
                                   final StateDirectory stateDirectory,
                                   final TaskType taskType) {
         // if EOS is enabled, wipe out the whole state store for unclean close since it is now invalid
-        final boolean wipeStateStore = !closeClean && eosEnabled;
+        final boolean wipeStateStore = !closeClean && eosEnabled && stateMgr.transactional();

Review Comment:
   Excellent catch. Thank you!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12393: WIP: KAFKA-12549 Prototype for transactional state stores

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12393:
URL: https://github.com/apache/kafka/pull/12393#discussion_r939822059


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java:
##########
@@ -309,9 +344,17 @@ private <K, V1, V2> StoreBuilder<KeyValueStore<TimestampedKeyAndJoinSide<K>, Lef
         final TimestampedKeyAndJoinSideSerde<K> timestampedKeyAndJoinSideSerde = new TimestampedKeyAndJoinSideSerde<>(streamJoinedInternal.keySerde());
         final LeftOrRightValueSerde<V1, V2> leftOrRightValueSerde = new LeftOrRightValueSerde<>(streamJoinedInternal.valueSerde(), streamJoinedInternal.otherValueSerde());
 
+        final KeyValueBytesStoreSupplier storeSupplier;

Review Comment:
   This is a meta comment regarding "2. To determine if the persistent kv store in KStreamImplJoin should be
   transactional" in the KIP discussion: it seems that we are passing the transactional flag from `materialized` -> "storeSupplier" -> "KStreamImplJoin (not sure why we'd need this one though, see my other comment)", and then depend on the flag we create the corresponding stores here.
   
   What I'm thinking though, is that instead of setting this flag on a per-store basis, we just set it on a global config, and this global config is only for compatibility purposes: i.e. in the long run we would consider all built-in stores are transactional by default, unless users say otherwise. So for now this global config is default to "not transactional" while in some future versions we just set default to "transaction".
   
   Then we would not need to pass this flag all along this hierarchy, and as we discussed on the KIP. When doing commit / recover we can have a unified logic that do not need to check the store's transactional flag any more. What do you think?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/StreamJoined.java:
##########
@@ -62,7 +64,8 @@ private StreamJoined(final Serde<K> keySerde,
                          final String name,
                          final String storeName,
                          final boolean loggingEnabled,
-                         final Map<String, String> topicConfig) {
+                         final Map<String, String> topicConfig,
+                         final boolean transactional) {

Review Comment:
   It seems here that, we enforce both sides of the joining to be transactional, or none. If only one of the store is defined as txnal via materialized while the other is not, then we just enforce both as non-txnal. What's the rationale behind this?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##########
@@ -608,6 +618,11 @@ public void checkpoint() {
         // checkpoint those stores that are only logged and persistent to the checkpoint file
         final Map<TopicPartition, Long> checkpointingOffsets = new HashMap<>();
         for (final StateStoreMetadata storeMetadata : stores.values()) {
+            // do not checkpont non-transactional state stores under EOS
+            if (eosEnabled && !storeMetadata.stateStore.transactional()) {

Review Comment:
   As we discussed in the KIP, I think we can just remove this branched logic as well and always checkpoint.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractTransactionalStore.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.List;
+import java.util.Map;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+
+public abstract class AbstractTransactionalStore implements KeyValueStore<Bytes, byte[]> {
+    static final String PREFIX = "transactional-";
+    private final ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
+
+    private Path commitMarker;
+
+    final KeyValueSegment tmpStore;
+
+    Map<String, Object> configs;
+    File stateDir;
+
+    abstract KeyValueStore<Bytes, byte[]> mainStore();
+
+    AbstractTransactionalStore(final KeyValueSegment tmpStore) {
+        this.tmpStore = tmpStore;
+    }
+
+    @Deprecated
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+        if (context instanceof StateStoreContext) {
+            init((StateStoreContext) context, root);
+        } else {
+            throw new UnsupportedOperationException(
+                "Use TransactionalKeyValueStore#init(StateStoreContext, StateStore) instead."
+            );
+        }
+    }
+
+    @Override
+    public void init(final StateStoreContext context, final StateStore root) {
+        doInit(context.appConfigs(), context.stateDir());
+        mainStore().init(context, root);
+    }
+
+    void doInit(final Map<String, Object> configs, final File stateDir) {
+        this.configs = configs;
+        this.stateDir = stateDir;
+        commitMarker = stateDir.toPath().resolve(PREFIX + name() + ".commit");
+        tmpStore.openDB(configs, stateDir);
+    }
+
+    @Override
+    public void close() {
+        tmpStore.close();
+        mainStore().close();
+    }
+
+    @Override
+    public void commit(final Long changelogOffset) {
+        tmpStore.commit(changelogOffset);
+        createCommitMarker(changelogOffset);
+        doCommit();
+    }
+
+    @Override
+    public Long recover(final Long changelogOffset) {
+        if (Files.exists(commitMarker)) {

Review Comment:
   If the marker file does exist but is corrupted and hence we cannot read out the long value, do we really want to return `null` and let the caller to wipe out the whole store?
   
   If this happens, should we be certain that we fail at "   2. Create a commit marker with a changelog offset corresponding to the state we are committing." and hence we know nothing from the tmp store is written, so we can just safely return the changelog offset?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractTransactionalStore.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.List;
+import java.util.Map;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+
+public abstract class AbstractTransactionalStore implements KeyValueStore<Bytes, byte[]> {
+    static final String PREFIX = "transactional-";
+    private final ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
+
+    private Path commitMarker;
+
+    final KeyValueSegment tmpStore;
+
+    Map<String, Object> configs;
+    File stateDir;
+
+    abstract KeyValueStore<Bytes, byte[]> mainStore();
+
+    AbstractTransactionalStore(final KeyValueSegment tmpStore) {
+        this.tmpStore = tmpStore;
+    }
+
+    @Deprecated
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+        if (context instanceof StateStoreContext) {
+            init((StateStoreContext) context, root);
+        } else {
+            throw new UnsupportedOperationException(
+                "Use TransactionalKeyValueStore#init(StateStoreContext, StateStore) instead."
+            );
+        }
+    }
+
+    @Override
+    public void init(final StateStoreContext context, final StateStore root) {
+        doInit(context.appConfigs(), context.stateDir());
+        mainStore().init(context, root);
+    }
+
+    void doInit(final Map<String, Object> configs, final File stateDir) {
+        this.configs = configs;
+        this.stateDir = stateDir;
+        commitMarker = stateDir.toPath().resolve(PREFIX + name() + ".commit");
+        tmpStore.openDB(configs, stateDir);
+    }
+
+    @Override
+    public void close() {
+        tmpStore.close();
+        mainStore().close();
+    }
+
+    @Override
+    public void commit(final Long changelogOffset) {
+        tmpStore.commit(changelogOffset);
+        createCommitMarker(changelogOffset);
+        doCommit();
+    }
+
+    @Override
+    public Long recover(final Long changelogOffset) {
+        if (Files.exists(commitMarker)) {

Review Comment:
   This is another comment: it seems that in the current proposal, we do not necessarily guarantee that the returned offset is always strictly reflecting the store's current snapshot, and lays on the hope that since restoration is idempotent, all we need to make sure is that non committed data are not written to the store.
   
   In that case, I'm actually wondering if the txn marker, plus the `recover()` function is necessary: since we do this tmp -> actual store copying after the whole EOS commit transaction completes, then upon failures we know that our actual store's image is definitely no more than the already-committed txn's changelog offset, so we can always just blindly start restoring from the checkpointed offset to the log-end offset (in read committed mode of course). Right?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##########
@@ -247,8 +247,22 @@ void initializeStoreOffsetsFromCheckpoint(final boolean storeDirIsEmpty) {
                              store.changelogPartition, store.stateStore.name());
                 } else if (store.offset() == null) {
                     if (loadedCheckpoints.containsKey(store.changelogPartition)) {
-                        final Long offset = changelogOffsetFromCheckpointedOffset(loadedCheckpoints.remove(store.changelogPartition));
-                        store.setOffset(offset);
+                        final Long changelogOffset = changelogOffsetFromCheckpointedOffset(loadedCheckpoints.remove(store.changelogPartition));
+                        // StateStore#recover might roll back or forward a partially committed transaction
+                        final Long recoveredOffset;
+                        if (store.stateStore.transactional()) {

Review Comment:
   Pleas see my other comment: it seems since this recover function is "best effort" anyways, we can just always restore from the checkpointed `changelogOffset`, and hence we do not need this branched logic?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Gerrrr commented on pull request #12393: WIP: KAFKA-12549 Prototype for transactional state stores

Posted by GitBox <gi...@apache.org>.
Gerrrr commented on PR #12393:
URL: https://github.com/apache/kafka/pull/12393#issuecomment-1250710841

   Hey @guozhangwang !
   
   Yeah, I am still working on the PR - adding integration tests, running benchmarks, etc. I will ping you here and update the description once the PR is ready for review. We can decide on the best way to split it then.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] littlehorse-eng commented on a diff in pull request #12393: WIP: KAFKA-12549 Prototype for transactional state stores

Posted by GitBox <gi...@apache.org>.
littlehorse-eng commented on code in PR #12393:
URL: https://github.com/apache/kafka/pull/12393#discussion_r939561610


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/TransactionalTimestampedSegment.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.io.File;
+import java.util.Map;
+import org.apache.kafka.common.utils.Bytes;
+
+public class TransactionalTimestampedSegment extends AbstractTransactionalSegment {
+    TransactionalTimestampedSegment(final KeyValueSegment tmpStore,
+                                    final TimestampedSegment mainStore) {
+        super(tmpStore, mainStore);
+    }
+
+    @Override
+    public void deleteRange(final Bytes keyFrom, final Bytes keyTo) {
+        throw new UnsupportedOperationException();

Review Comment:
   This one will be tricky since you'll have to do a normal deleteRange on the tmpStore, then scan over the same range in the committed store and delete those keys in the tmp store as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] nicktelford commented on a diff in pull request #12393: WIP: KAFKA-12549 Prototype for transactional state stores

Posted by GitBox <gi...@apache.org>.
nicktelford commented on code in PR #12393:
URL: https://github.com/apache/kafka/pull/12393#discussion_r1018125919


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/TransactionalKeyValueStore.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+
+public class TransactionalKeyValueStore extends AbstractTransactionalStore<KeyValueStore<Bytes, byte[]>> {
+    private final RocksDBMetricsRecorder metricsRecorder;
+
+    private final KeyValueSegment tmpStore;
+    private final KeyValueStore<Bytes, byte[]> mainStore;
+
+    //VisibleForTesting
+    TransactionalKeyValueStore(final KeyValueStore<Bytes, byte[]> mainStore, final RocksDBMetricsRecorder metricsRecorder) {
+        this.metricsRecorder = metricsRecorder;
+        this.mainStore = mainStore;
+        this.tmpStore = createTmpStore(mainStore.name() + TMP_SUFFIX,

Review Comment:
   `createTmpStore` already appends the `TMP_SUFFIX` to the `segmentName`, so this results in a temporary store with a double suffix, like `/state/dir/2_13/rocksdb/foo/foo.tmp.tmp`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] WIP: KAFKA-12549 Prototype for transactional state stores [kafka]

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on PR #12393:
URL: https://github.com/apache/kafka/pull/12393#issuecomment-1960672595

   Closing this PR as stale. KIP-844 was discarded in favor of KIP-892.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] nicktelford commented on a diff in pull request #12393: WIP: KAFKA-12549 Prototype for transactional state stores

Posted by GitBox <gi...@apache.org>.
nicktelford commented on code in PR #12393:
URL: https://github.com/apache/kafka/pull/12393#discussion_r1005391821


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractTransactionalStore.java:
##########
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+
+public abstract class AbstractTransactionalStore<T extends KeyValueStore<Bytes, byte[]>> implements KeyValueStore<Bytes, byte[]> {
+    private static final byte MODIFICATION = 0x1;
+    private static final byte DELETION = 0x2;
+    private static final byte[] DELETION_VAL = {DELETION};
+
+    private StateStoreContext context;
+
+    static final String PREFIX = "transactional-";
+    //VisibleForTesting
+    public static final String TMP_SUFFIX = ".tmp";
+
+    private final Set<MergeKeyValueIterator> openIterators = Collections.synchronizedSet(new HashSet<>());
+
+    Map<String, Object> configs;
+    File stateDir;
+
+    private boolean consistencyEnabled = false;
+    private Position position;
+    protected OffsetCheckpoint positionCheckpoint;
+
+    KeyValueSegment createTmpStore(final String segmentName,
+                                   final String windowName,
+                                   final long segmentId,
+                                   final RocksDBMetricsRecorder metricsRecorder) {
+        return new KeyValueSegment(segmentName + TMP_SUFFIX,
+                                    windowName,
+                                    segmentId,
+                                    metricsRecorder);
+    }
+
+    public abstract T mainStore();
+
+    public abstract KeyValueSegment tmpStore();
+
+    @Deprecated
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+        if (context instanceof StateStoreContext) {
+            init((StateStoreContext) context, root);
+        } else {
+            throw new UnsupportedOperationException(
+                "Use TransactionalKeyValueStore#init(StateStoreContext, StateStore) instead."
+            );
+        }
+    }
+
+    @Override
+    public void init(final StateStoreContext context, final StateStore root) {
+        this.context = context;
+
+        doInit(context.appConfigs(), context.stateDir());
+        ((RocksDBStore) mainStore()).openDB(configs, stateDir);
+
+        final File positionCheckpointFile = new File(context.stateDir(), name() + ".position");
+        this.positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile);
+        this.position = StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint);
+        tmpStore().consistencyEnabled = consistencyEnabled;
+
+        // register and possibly restore the state from the logs
+        context.register(
+            root,
+            (RecordBatchingStateRestoreCallback) this::restoreBatch,
+            () -> StoreQueryUtils.checkpointPosition(positionCheckpoint, position)
+        );
+
+        consistencyEnabled = StreamsConfig.InternalConfig.getBoolean(
+            context.appConfigs(),
+            IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED,
+            false);
+    }
+
+    private void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> records) {
+        final Collection<ConsumerRecord<byte[], byte[]>> changelogRecords = records
+            .stream()
+            .map(record -> new ConsumerRecord<>(
+                record.topic(),
+                record.partition(),
+                record.offset(),
+                record.key(),
+                toUncommittedValue(record.value())))
+            .collect(Collectors.toList());
+        tmpStore().restoreBatch(changelogRecords);
+        commit(null);
+    }
+
+    void doInit(final Map<String, Object> configs, final File stateDir) {
+        this.configs = configs;
+        this.stateDir = stateDir;
+        tmpStore().openDB(configs, stateDir);
+    }
+
+    @Override
+    public synchronized void close() {
+        final HashSet<KeyValueIterator<Bytes, byte[]>> iterators;
+        synchronized (openIterators) {
+            iterators = new HashSet<>(openIterators);
+        }
+        for (final KeyValueIterator<Bytes, byte[]> iterator : iterators) {
+            iterator.close();
+        }
+
+        tmpStore().close();
+        mainStore().close();
+    }
+
+    @Override
+    public void commit(final Long changelogOffset) {
+        tmpStore().commit(changelogOffset);
+        doCommit();
+    }

Review Comment:
   Thinking about it, since the goal is to reduce write-amplification from flushing the memtables on every commit, we should ideally only flush memtables when they become large enough to trigger the usual RocksDB flush criteria. In which case, instead of a `checkpoint.interval.ms`, perhaps we should have something like `checkpoint.dirty.records.threshold` instead, which sets the threshold of data written to `tmpStore` that triggers the checkpointing? This would essentially set a (loose) upper-bound on the amount of records (per-store-partition), that need to be restored in the event of a failure.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##########
@@ -220,6 +221,32 @@ void registerStateStores(final List<StateStore> allStores, final InternalProcess
             }
             log.trace("Registered state store {}", store.name());
         }
+        checkMixedTxnStores();
+    }
+
+    private void checkMixedTxnStores() {
+        final Set<String> nonTxnStores = new HashSet<>();
+        final Set<String> txnStores = new HashSet<>();
+        final Collection<StateStore> stateStores = stores
+            .values()
+            .stream()
+            .map(sm -> sm.stateStore)
+            .collect(Collectors.toList());
+        for (final StateStore store : stateStores) {
+            if (store.transactional()) {
+                txnStores.add(store.name());

Review Comment:
   This could more efficiently be done with boolean flags for `hasTransactionalStores` and `hasNonTransactionalStores`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Gerrrr commented on a diff in pull request #12393: WIP: KAFKA-12549 Prototype for transactional state stores

Posted by GitBox <gi...@apache.org>.
Gerrrr commented on code in PR #12393:
URL: https://github.com/apache/kafka/pull/12393#discussion_r998842122


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractTransactionalStore.java:
##########
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+
+public abstract class AbstractTransactionalStore<T extends KeyValueStore<Bytes, byte[]>> implements KeyValueStore<Bytes, byte[]> {
+    private static final byte MODIFICATION = 0x1;
+    private static final byte DELETION = 0x2;
+    private static final byte[] DELETION_VAL = {DELETION};
+
+    private StateStoreContext context;
+
+    static final String PREFIX = "transactional-";
+    //VisibleForTesting
+    public static final String TMP_SUFFIX = ".tmp";
+
+    private final Set<MergeKeyValueIterator> openIterators = Collections.synchronizedSet(new HashSet<>());
+
+    Map<String, Object> configs;
+    File stateDir;
+
+    private boolean consistencyEnabled = false;
+    private Position position;
+    protected OffsetCheckpoint positionCheckpoint;
+
+    KeyValueSegment createTmpStore(final String segmentName,
+                                   final String windowName,
+                                   final long segmentId,
+                                   final RocksDBMetricsRecorder metricsRecorder) {
+        return new KeyValueSegment(segmentName + TMP_SUFFIX,
+                                    windowName,
+                                    segmentId,
+                                    metricsRecorder);
+    }
+
+    public abstract T mainStore();
+
+    public abstract KeyValueSegment tmpStore();
+
+    @Deprecated
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+        if (context instanceof StateStoreContext) {
+            init((StateStoreContext) context, root);
+        } else {
+            throw new UnsupportedOperationException(
+                "Use TransactionalKeyValueStore#init(StateStoreContext, StateStore) instead."
+            );
+        }
+    }
+
+    @Override
+    public void init(final StateStoreContext context, final StateStore root) {
+        this.context = context;
+
+        doInit(context.appConfigs(), context.stateDir());
+        ((RocksDBStore) mainStore()).openDB(configs, stateDir);
+
+        final File positionCheckpointFile = new File(context.stateDir(), name() + ".position");
+        this.positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile);
+        this.position = StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint);
+        tmpStore().consistencyEnabled = consistencyEnabled;
+
+        // register and possibly restore the state from the logs
+        context.register(
+            root,
+            (RecordBatchingStateRestoreCallback) this::restoreBatch,
+            () -> StoreQueryUtils.checkpointPosition(positionCheckpoint, position)
+        );
+
+        consistencyEnabled = StreamsConfig.InternalConfig.getBoolean(
+            context.appConfigs(),
+            IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED,
+            false);
+    }
+
+    private void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> records) {
+        final Collection<ConsumerRecord<byte[], byte[]>> changelogRecords = records
+            .stream()
+            .map(record -> new ConsumerRecord<>(
+                record.topic(),
+                record.partition(),
+                record.offset(),
+                record.key(),
+                toUncommittedValue(record.value())))
+            .collect(Collectors.toList());
+        tmpStore().restoreBatch(changelogRecords);
+        commit(null);
+    }
+
+    void doInit(final Map<String, Object> configs, final File stateDir) {
+        this.configs = configs;
+        this.stateDir = stateDir;
+        tmpStore().openDB(configs, stateDir);
+    }
+
+    @Override
+    public synchronized void close() {
+        final HashSet<KeyValueIterator<Bytes, byte[]>> iterators;
+        synchronized (openIterators) {
+            iterators = new HashSet<>(openIterators);
+        }
+        for (final KeyValueIterator<Bytes, byte[]> iterator : iterators) {
+            iterator.close();
+        }
+
+        tmpStore().close();
+        mainStore().close();
+    }
+
+    @Override
+    public void commit(final Long changelogOffset) {
+        tmpStore().commit(changelogOffset);
+        doCommit();
+    }
+
+    @Override
+    public boolean recover(final Long changelogOffset) {
+        truncateTmpStore();
+        return true;
+    }
+
+    private void truncateTmpStore() {
+        try {
+            tmpStore().close();
+            tmpStore().destroy();
+            tmpStore().openDB(configs, stateDir);
+        } catch (final IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public boolean persistent() {
+        return mainStore().persistent();
+    }
+
+    @Override
+    public boolean isOpen() {
+        return tmpStore().isOpen() && mainStore().isOpen();
+    }
+
+    @Override
+    public void put(final Bytes key, final byte[] value) {
+        StoreQueryUtils.updatePosition(position, context);
+        tmpStore().put(key, toUncommittedValue(value));
+    }
+
+    @Override
+    public byte[] putIfAbsent(final Bytes key, final byte[] value) {
+        final byte[] prev = get(key);
+        if (prev == null) {
+            StoreQueryUtils.updatePosition(position, context);
+            tmpStore().put(key, toUncommittedValue(value));
+        }
+        return prev;
+    }
+
+    @Override
+    public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
+        StoreQueryUtils.updatePosition(position, context);
+        final List<KeyValue<Bytes, byte[]>> tmpEntries = entries
+            .stream()
+            .map(e -> new KeyValue<>(e.key, toUncommittedValue(e.value)))
+            .collect(Collectors.toList());
+        tmpStore().putAll(tmpEntries);
+    }
+
+    @Override
+    public byte[] delete(final Bytes key) {
+        final byte[] value = get(key);
+        tmpStore().put(key, DELETION_VAL);
+        return value;
+    }
+
+    @Override
+    public byte[] get(final Bytes key) {
+        final byte[] tmpValue = tmpStore().get(key);
+        if (tmpValue == null) {
+            return mainStore().get(key);
+        } else if (tmpValue[0] == DELETION) {
+            return null;
+        } else {
+            return fromUncommittedValue(tmpValue);
+        }
+    }
+
+    @Override
+    public KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes to) {
+        final MergeKeyValueIterator iterator = new MergeKeyValueIterator(
+            tmpStore().range(from, to), mainStore().range(from, to), openIterators);
+        openIterators.add(iterator);
+        return iterator;
+    }
+
+    @Override
+    public KeyValueIterator<Bytes, byte[]> reverseRange(final Bytes from, final Bytes to) {
+        final MergeKeyValueIterator iterator = new MergeKeyValueIterator(
+            tmpStore().reverseRange(from, to),
+            mainStore().reverseRange(from, to),
+            true,
+            openIterators);
+        openIterators.add(iterator);
+        return iterator;
+    }
+
+    @Override
+    public KeyValueIterator<Bytes, byte[]> all() {
+        final MergeKeyValueIterator iterator = new MergeKeyValueIterator(
+            tmpStore().all(), mainStore().all(), openIterators);
+        openIterators.add(iterator);
+        return iterator;
+    }
+
+    @Override
+    public KeyValueIterator<Bytes, byte[]> reverseAll() {
+        final MergeKeyValueIterator iterator = new MergeKeyValueIterator(
+            tmpStore().reverseAll(), mainStore().reverseAll(), true, openIterators);
+        openIterators.add(iterator);
+        return iterator;
+    }
+
+    @Override
+    public long approximateNumEntries() {
+        try {
+            return Math.addExact(tmpStore().approximateNumEntries(), mainStore().approximateNumEntries());
+        } catch (final ArithmeticException e) {
+            return Long.MAX_VALUE;
+        }
+    }
+
+    @Override
+    public Position getPosition() {
+        return position;
+    }
+
+    private void doCommit() {
+        try (final KeyValueIterator<Bytes, byte[]> it = tmpStore().all()) {
+            while (it.hasNext()) {
+                final KeyValue<Bytes, byte[]> kv = it.next();
+                mainStore().put(kv.key, fromUncommittedValue(kv.value));
+            }
+        }
+
+        truncateTmpStore();
+    }
+
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query, final PositionBound positionBound,
+        final QueryConfig config) {
+        return StoreQueryUtils.handleBasicQueries(
+            query,
+            positionBound,
+            config,
+            this,
+            position,
+            context
+        );
+    }
+
+    @Override
+    public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefixScan(final P prefix,
+                                                                                    final PS prefixKeySerializer) {
+        final MergeKeyValueIterator iterator = new MergeKeyValueIterator(
+            tmpStore().prefixScan(prefix, prefixKeySerializer),
+            mainStore().prefixScan(prefix, prefixKeySerializer),
+            openIterators);
+        openIterators.add(iterator);
+        return iterator;
+    }
+
+    private static KeyValue<Bytes, byte[]> fromUncommittedKV(final KeyValue<Bytes, byte[]> kv) {
+        if (kv.value[0] == DELETION) {
+            return null;
+        } else {
+            final byte[] value = new byte[kv.value.length - 1];
+            System.arraycopy(kv.value, 1, value, 0, value.length);
+            return new KeyValue<>(kv.key, value);
+        }
+    }
+
+    private static byte[] fromUncommittedValue(final byte[] value) {

Review Comment:
   Thank you for the suggestion! I see one issue with this approach that I am unsure how to fix. `AbstractTransactionalStore#get` should be able to differentiate between a key that is absent in the `tmpStore` and a key that is shadowed by a tombstone. Consider the following sequence:
   1.  `txnDB.put("foo", 1);`
   2.  `txnDB.commit(1L);`
   3.  `txnDB.delete("foo");`
   4.  `txnDB.get("foo");`
   
   The `get` operation at step 4 should return `null`. I am not sure if it is possible to:
   1. Distinguish between an absent key in RocksDB and a key covered by a tombstone.
   2. Prevent RocksDB from purging tombstones. This step is required to avoid situations where compaction between steps 2 and 3 purges the tombstone for `foo`. 
   
   Please let me know if you have ideas to overcome this issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Gerrrr commented on a diff in pull request #12393: WIP: KAFKA-12549 Prototype for transactional state stores

Posted by GitBox <gi...@apache.org>.
Gerrrr commented on code in PR #12393:
URL: https://github.com/apache/kafka/pull/12393#discussion_r998929720


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractTransactionalStore.java:
##########
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+
+public abstract class AbstractTransactionalStore<T extends KeyValueStore<Bytes, byte[]>> implements KeyValueStore<Bytes, byte[]> {
+    private static final byte MODIFICATION = 0x1;
+    private static final byte DELETION = 0x2;
+    private static final byte[] DELETION_VAL = {DELETION};
+
+    private StateStoreContext context;
+
+    static final String PREFIX = "transactional-";
+    //VisibleForTesting
+    public static final String TMP_SUFFIX = ".tmp";
+
+    private final Set<MergeKeyValueIterator> openIterators = Collections.synchronizedSet(new HashSet<>());
+
+    Map<String, Object> configs;
+    File stateDir;
+
+    private boolean consistencyEnabled = false;
+    private Position position;
+    protected OffsetCheckpoint positionCheckpoint;
+
+    KeyValueSegment createTmpStore(final String segmentName,
+                                   final String windowName,
+                                   final long segmentId,
+                                   final RocksDBMetricsRecorder metricsRecorder) {
+        return new KeyValueSegment(segmentName + TMP_SUFFIX,
+                                    windowName,
+                                    segmentId,
+                                    metricsRecorder);
+    }
+
+    public abstract T mainStore();
+
+    public abstract KeyValueSegment tmpStore();
+
+    @Deprecated
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+        if (context instanceof StateStoreContext) {
+            init((StateStoreContext) context, root);
+        } else {
+            throw new UnsupportedOperationException(
+                "Use TransactionalKeyValueStore#init(StateStoreContext, StateStore) instead."
+            );
+        }
+    }
+
+    @Override
+    public void init(final StateStoreContext context, final StateStore root) {
+        this.context = context;
+
+        doInit(context.appConfigs(), context.stateDir());
+        ((RocksDBStore) mainStore()).openDB(configs, stateDir);
+
+        final File positionCheckpointFile = new File(context.stateDir(), name() + ".position");
+        this.positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile);
+        this.position = StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint);
+        tmpStore().consistencyEnabled = consistencyEnabled;

Review Comment:
   Good catch! Fixed in [b283f40](https://github.com/apache/kafka/pull/12393/commits/b283f40f19e63fc25fdbff8ffbd43b4a1bfdb4a3).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Gerrrr commented on a diff in pull request #12393: WIP: KAFKA-12549 Prototype for transactional state stores

Posted by GitBox <gi...@apache.org>.
Gerrrr commented on code in PR #12393:
URL: https://github.com/apache/kafka/pull/12393#discussion_r998928697


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java:
##########
@@ -115,7 +115,7 @@ static void closeStateManager(final Logger log,
                                   final StateDirectory stateDirectory,
                                   final TaskType taskType) {
         // if EOS is enabled, wipe out the whole state store for unclean close since it is now invalid
-        final boolean wipeStateStore = !closeClean && eosEnabled;
+        final boolean wipeStateStore = !closeClean && eosEnabled && !stateMgr.transactional();

Review Comment:
   FWIW I don't think that this is a severe issue because the `default.dsl.store` config users should be using controls transactionality of all state stores. However, it is still possible to pass a custom state store, so it is a good idea to check for mixed transactional configuration. Fixed in [a5e1fe3](https://github.com/apache/kafka/pull/12393/commits/a5e1fe3122280429f2eb1c5ce119dd756620ffb3).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Gerrrr commented on a diff in pull request #12393: WIP: KAFKA-12549 Prototype for transactional state stores

Posted by GitBox <gi...@apache.org>.
Gerrrr commented on code in PR #12393:
URL: https://github.com/apache/kafka/pull/12393#discussion_r998932827


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractTransactionalStore.java:
##########
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+
+public abstract class AbstractTransactionalStore<T extends KeyValueStore<Bytes, byte[]>> implements KeyValueStore<Bytes, byte[]> {
+    private static final byte MODIFICATION = 0x1;
+    private static final byte DELETION = 0x2;
+    private static final byte[] DELETION_VAL = {DELETION};
+
+    private StateStoreContext context;
+
+    static final String PREFIX = "transactional-";
+    //VisibleForTesting
+    public static final String TMP_SUFFIX = ".tmp";
+
+    private final Set<MergeKeyValueIterator> openIterators = Collections.synchronizedSet(new HashSet<>());
+
+    Map<String, Object> configs;
+    File stateDir;
+
+    private boolean consistencyEnabled = false;
+    private Position position;
+    protected OffsetCheckpoint positionCheckpoint;
+
+    KeyValueSegment createTmpStore(final String segmentName,
+                                   final String windowName,
+                                   final long segmentId,
+                                   final RocksDBMetricsRecorder metricsRecorder) {
+        return new KeyValueSegment(segmentName + TMP_SUFFIX,
+                                    windowName,
+                                    segmentId,
+                                    metricsRecorder);
+    }
+
+    public abstract T mainStore();
+
+    public abstract KeyValueSegment tmpStore();
+
+    @Deprecated
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+        if (context instanceof StateStoreContext) {
+            init((StateStoreContext) context, root);
+        } else {
+            throw new UnsupportedOperationException(
+                "Use TransactionalKeyValueStore#init(StateStoreContext, StateStore) instead."
+            );
+        }
+    }
+
+    @Override
+    public void init(final StateStoreContext context, final StateStore root) {
+        this.context = context;
+
+        doInit(context.appConfigs(), context.stateDir());
+        ((RocksDBStore) mainStore()).openDB(configs, stateDir);
+
+        final File positionCheckpointFile = new File(context.stateDir(), name() + ".position");
+        this.positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile);
+        this.position = StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint);
+        tmpStore().consistencyEnabled = consistencyEnabled;
+
+        // register and possibly restore the state from the logs
+        context.register(
+            root,
+            (RecordBatchingStateRestoreCallback) this::restoreBatch,
+            () -> StoreQueryUtils.checkpointPosition(positionCheckpoint, position)
+        );
+
+        consistencyEnabled = StreamsConfig.InternalConfig.getBoolean(
+            context.appConfigs(),
+            IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED,
+            false);
+    }
+
+    private void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> records) {
+        final Collection<ConsumerRecord<byte[], byte[]>> changelogRecords = records
+            .stream()
+            .map(record -> new ConsumerRecord<>(
+                record.topic(),
+                record.partition(),
+                record.offset(),
+                record.key(),
+                toUncommittedValue(record.value())))
+            .collect(Collectors.toList());
+        tmpStore().restoreBatch(changelogRecords);
+        commit(null);
+    }
+
+    void doInit(final Map<String, Object> configs, final File stateDir) {
+        this.configs = configs;
+        this.stateDir = stateDir;
+        tmpStore().openDB(configs, stateDir);
+    }
+
+    @Override
+    public synchronized void close() {
+        final HashSet<KeyValueIterator<Bytes, byte[]>> iterators;
+        synchronized (openIterators) {
+            iterators = new HashSet<>(openIterators);
+        }
+        for (final KeyValueIterator<Bytes, byte[]> iterator : iterators) {
+            iterator.close();
+        }
+
+        tmpStore().close();
+        mainStore().close();
+    }
+
+    @Override
+    public void commit(final Long changelogOffset) {
+        tmpStore().commit(changelogOffset);
+        doCommit();
+    }

Review Comment:
   Added missing `mainStore#commit` in [81f248b](https://github.com/apache/kafka/pull/12393/commits/81f248b19870e9438969b5fe7f28780f1bf1c538).
   
   I like your idea to decouple checkpointing interval from commit interval! How about we address it in a follow-up issue? This patch is getting rather big and I would like to limit its scope at this point.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org