You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/11/18 14:19:11 UTC

[GitHub] [kafka] vpapavas opened a new pull request #11513: feat: Write offset position information of records to changelog

vpapavas opened a new pull request #11513:
URL: https://github.com/apache/kafka/pull/11513


   As part of the consistency work, a RocksDBStore has a consistency vector that contains the latest seen offset of every partition. We need this information on StandBy servers as well. To achieve this, we persist the offset along each record that is written to the changelog topic. This way, Standby servers can re-create the consistency vector during restoration.
   
   There are three tests, one integration test that checks that the offsets are written to the changelog and restored at the standby. A unit test that tests restoration and a unit test that tests writing to changelog.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vpapavas commented on a change in pull request #11513: feat: Write and restore position to/from changelog

Posted by GitBox <gi...@apache.org>.
vpapavas commented on a change in pull request #11513:
URL: https://github.com/apache/kafka/pull/11513#discussion_r761883799



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogRecordDeserializationHelper.java
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.header.internals.RecordHeader;
+
+/**
+ * Changelog records without any headers are considered old format.
+ * New format changelog records will have a version in their headers.
+ * Version 0: This indicates that the changelog records are under version control.
+ * Version 1: This indicates that the changelog records have consistency information.
+ */
+public class ChangelogRecordDeserializationHelper {
+    private static final byte[] V_0_CHANGELOG_VERSION_HEADER_VALUE = {(byte) 0};

Review comment:
       OK about the one version. I actually used two versions to decouple changelog record versioning from consistency. So, the rationale was that `V0` means versioning and `V1` means consistency. But it also works if we have one version.
   
   I wanted the consumer to check `consistencyEnabled` so that we can make it explicit to users that are doing rolling upgrades that consistency works only if all servers have consistency enabled. For instance, if the active has consistency enabled but the standby doesn't yet, I want to throw an exception. This way, we don't have weird consistency guarantees when querying, where before a rebalance consistency worked whereas after the rebalance, it won't since the standby doesn't have it enabled. 
   
   The other option is to ignore the position information at standbys when restoring which will lead to the problem above during querying but won't force users to by stopping processing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vpapavas commented on a change in pull request #11513: feat: Write and restore position to/from changelog

Posted by GitBox <gi...@apache.org>.
vpapavas commented on a change in pull request #11513:
URL: https://github.com/apache/kafka/pull/11513#discussion_r761883799



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogRecordDeserializationHelper.java
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.header.internals.RecordHeader;
+
+/**
+ * Changelog records without any headers are considered old format.
+ * New format changelog records will have a version in their headers.
+ * Version 0: This indicates that the changelog records are under version control.
+ * Version 1: This indicates that the changelog records have consistency information.
+ */
+public class ChangelogRecordDeserializationHelper {
+    private static final byte[] V_0_CHANGELOG_VERSION_HEADER_VALUE = {(byte) 0};

Review comment:
       OK about the one version. I actually used two versions to decouple changelog record versioning from consistency. So, the rationale was that `V0` means versioning and `V1` means consistency. But it also works if we have one version.
   
   I wanted the consumer to check `consistencyEnabled` so that we can make it explicit to users that are doing rolling upgrades that consistency works only if all servers have consistency enabled. For instance, if the active has consistency enabled but the standby doesn't yet, I want to throw an exception. This way, we don't have weird consistency guarantees when querying, where before a rebalance consistency worked whereas after the rebalance, it won't since the standby doesn't have it enabled. 
   
   The other option is to ignore the position information at standbys when restoring which will lead to the problem above during querying but won't force users to do upgrades on all servers.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vpapavas commented on a change in pull request #11513: feat: Write offset position information of records to changelog

Posted by GitBox <gi...@apache.org>.
vpapavas commented on a change in pull request #11513:
URL: https://github.com/apache/kafka/pull/11513#discussion_r755099262



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
##########
@@ -155,6 +169,11 @@ private void putInternal(final Bytes key,
                 context.timestamp(),
                 context.partition(),
                 context.topic()));
+        if (context.recordMetadata().isPresent() && consistencyEnabled) {

Review comment:
       We need to record the position here as well as reads might be served directly from the cache




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11513: feat: Write and restore position to/from changelog

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11513:
URL: https://github.com/apache/kafka/pull/11513#discussion_r764271813



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogRecordDeserializationHelper.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.state.internals.PositionSerde;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Changelog records without any headers are considered old format.
+ * New format changelog records will have a version in their headers.
+ * Version 0: This indicates that the changelog records have consistency information.
+ */
+public class ChangelogRecordDeserializationHelper {
+    public static final Logger log = LoggerFactory.getLogger(ChangelogRecordDeserializationHelper.class);
+    private static final byte[] V_0_CHANGELOG_VERSION_HEADER_VALUE = {(byte) 0};
+    public static final String CHANGELOG_VERSION_HEADER_KEY = "v";
+    public static final String CHANGELOG_POSITION_HEADER_KEY = "c";
+    public static final RecordHeader CHANGELOG_VERSION_HEADER_RECORD_CONSISTENCY = new RecordHeader(
+            CHANGELOG_VERSION_HEADER_KEY, V_0_CHANGELOG_VERSION_HEADER_VALUE);
+
+    public static Position applyChecksAndUpdatePosition(
+            final ConsumerRecord<byte[], byte[]> record,
+            final boolean consistencyEnabled,
+            final Position position
+    ) {
+        Position restoredPosition = Position.emptyPosition();
+        if (!consistencyEnabled) {

Review comment:
       Good point! @vpapavas , can you handle stuff like this in a follow-on PR? I'm doing a final pass to try and get this one merged.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11513: feat: Write and restore position to/from changelog

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11513:
URL: https://github.com/apache/kafka/pull/11513#discussion_r761242697



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
##########
@@ -246,4 +246,4 @@ public TaskType taskType() {
     public String changelogFor(final String storeName) {
         return stateManager().changelogFor(storeName);
     }
-}
+}

Review comment:
       It's just removing a newline at the end. It's harmless.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] patrickstuedi commented on a change in pull request #11513: feat: Write and restore position to/from changelog

Posted by GitBox <gi...@apache.org>.
patrickstuedi commented on a change in pull request #11513:
URL: https://github.com/apache/kafka/pull/11513#discussion_r763810277



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogRecordDeserializationHelper.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.state.internals.PositionSerde;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Changelog records without any headers are considered old format.
+ * New format changelog records will have a version in their headers.
+ * Version 0: This indicates that the changelog records have consistency information.
+ */
+public class ChangelogRecordDeserializationHelper {
+    public static final Logger log = LoggerFactory.getLogger(ChangelogRecordDeserializationHelper.class);
+    private static final byte[] V_0_CHANGELOG_VERSION_HEADER_VALUE = {(byte) 0};
+    public static final String CHANGELOG_VERSION_HEADER_KEY = "v";
+    public static final String CHANGELOG_POSITION_HEADER_KEY = "c";
+    public static final RecordHeader CHANGELOG_VERSION_HEADER_RECORD_CONSISTENCY = new RecordHeader(
+            CHANGELOG_VERSION_HEADER_KEY, V_0_CHANGELOG_VERSION_HEADER_VALUE);
+
+    public static Position applyChecksAndUpdatePosition(
+            final ConsumerRecord<byte[], byte[]> record,
+            final boolean consistencyEnabled,
+            final Position position
+    ) {
+        Position restoredPosition = Position.emptyPosition();
+        if (!consistencyEnabled) {

Review comment:
       nit: creating restoredPosition is not required if !constinceyEnabled




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11513: KAFKA-13506: Write and restore position to/from changelog

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11513:
URL: https://github.com/apache/kafka/pull/11513#discussion_r765066972



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/ConsistencyVectorIntegrationTest.java
##########
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.QueryableStoreType;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.internals.Position;
+import org.apache.kafka.streams.state.internals.RocksDBStore;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getStore;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
+import static org.apache.kafka.streams.state.QueryableStoreTypes.keyValueStore;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasEntry;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+
+@Category({IntegrationTest.class})
+public class ConsistencyVectorIntegrationTest {
+
+    private static final int NUM_BROKERS = 1;
+    private static int port = 0;
+    private static final String INPUT_TOPIC_NAME = "input-topic";
+    private static final String TABLE_NAME = "source-table";
+
+    public final EmbeddedKafkaCluster cluster = new EmbeddedKafkaCluster(NUM_BROKERS);
+
+    @Rule
+    public TestName testName = new TestName();
+
+    private final List<KafkaStreams> streamsToCleanup = new ArrayList<>();
+    private final MockTime mockTime = cluster.time;
+
+    @Before
+    public void before() throws InterruptedException, IOException {
+        cluster.start();
+        cluster.createTopic(INPUT_TOPIC_NAME, 2, 1);
+    }
+
+    @After
+    public void after() {
+        for (final KafkaStreams kafkaStreams : streamsToCleanup) {
+            kafkaStreams.close();
+        }
+        cluster.stop();
+    }
+
+    @Test
+    public void shouldHaveSamePositionBoundActiveAndStandBy() throws Exception {
+        final int batch1NumMessages = 100;
+        final int key = 1;
+        final Semaphore semaphore = new Semaphore(0);
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        Objects.requireNonNull(TABLE_NAME, "name cannot be null");
+        final TestingRocksDbKeyValueBytesStoreSupplier supplier =
+                new TestingRocksDbKeyValueBytesStoreSupplier(TABLE_NAME);
+
+        builder.table(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()),
+                        Materialized.<Integer, Integer>as(supplier).withCachingDisabled())
+                .toStream()
+                .peek((k, v) -> semaphore.release());
+
+        final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, streamsConfiguration());
+        final KafkaStreams kafkaStreams2 = createKafkaStreams(builder, streamsConfiguration());
+        final List<KafkaStreams> kafkaStreamsList = Arrays.asList(kafkaStreams1, kafkaStreams2);
+
+        startApplicationAndWaitUntilRunning(kafkaStreamsList, Duration.ofSeconds(60));
+
+        produceValueRange(key, 0, batch1NumMessages);
+
+        // Assert that all messages in the first batch were processed in a timely manner
+        assertThat(semaphore.tryAcquire(batch1NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true)));
+
+        final QueryableStoreType<ReadOnlyKeyValueStore<Integer, Integer>> queryableStoreType = keyValueStore();
+
+        // Assert that both active and standby have the same position bound
+        TestUtils.waitForCondition(() -> {
+            final ReadOnlyKeyValueStore<Integer, Integer> store1 = getStore(TABLE_NAME, kafkaStreams1, true, queryableStoreType);
+            return store1.get(key) != null;
+        }, "store1 cannot find results for key");
+        TestUtils.waitForCondition(() -> {
+            final ReadOnlyKeyValueStore<Integer, Integer> store2 = getStore(TABLE_NAME, kafkaStreams2, true, queryableStoreType);
+            return store2.get(key) != null;
+        }, "store2 cannot find results for key");
+
+        final AtomicInteger count = new AtomicInteger();
+        for (final TestingRocksDBStore store : supplier.stores) {

Review comment:
       I think there will be a better way to write this test once we have the position tracking implemented end-to-end. For now, I think this test is sufficient.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11513: feat: Write and restore position to/from changelog

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11513:
URL: https://github.com/apache/kafka/pull/11513#discussion_r764278336



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogRecordDeserializationHelper.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.state.internals.PositionSerde;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Changelog records without any headers are considered old format.
+ * New format changelog records will have a version in their headers.
+ * Version 0: This indicates that the changelog records have consistency information.
+ */
+public class ChangelogRecordDeserializationHelper {
+    public static final Logger log = LoggerFactory.getLogger(ChangelogRecordDeserializationHelper.class);
+    private static final byte[] V_0_CHANGELOG_VERSION_HEADER_VALUE = {(byte) 0};
+    public static final String CHANGELOG_VERSION_HEADER_KEY = "v";
+    public static final String CHANGELOG_POSITION_HEADER_KEY = "c";
+    public static final RecordHeader CHANGELOG_VERSION_HEADER_RECORD_CONSISTENCY = new RecordHeader(
+            CHANGELOG_VERSION_HEADER_KEY, V_0_CHANGELOG_VERSION_HEADER_VALUE);
+
+    public static Position applyChecksAndUpdatePosition(
+            final ConsumerRecord<byte[], byte[]> record,
+            final boolean consistencyEnabled,
+            final Position position
+    ) {
+        Position restoredPosition = Position.emptyPosition();
+        if (!consistencyEnabled) {
+            return Position.emptyPosition();

Review comment:
       Even though I think this is a bit off, I'm going to go ahead and merge it, so we can fix forward. The overall feature isn't fully implemented yet anyway, so this will have no negative effects if we release the branch right now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vpapavas commented on a change in pull request #11513: feat: Write offset position information of records to changelog

Posted by GitBox <gi...@apache.org>.
vpapavas commented on a change in pull request #11513:
URL: https://github.com/apache/kafka/pull/11513#discussion_r755285644



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
##########
@@ -246,4 +246,4 @@ public TaskType taskType() {
     public String changelogFor(final String storeName) {
         return stateManager().changelogFor(storeName);
     }
-}
+}

Review comment:
       I don't understand what this change is




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11513: KAFKA-13506: Write and restore position to/from changelog

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11513:
URL: https://github.com/apache/kafka/pull/11513#discussion_r765074233



##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
##########
@@ -103,18 +142,22 @@ public void shouldLogPuts() {
 
     @Test
     public void shouldLogRemoves() {
+        System.out.println("First call");

Review comment:
       ```suggestion
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11513: KAFKA-13506: Write and restore position to/from changelog

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11513:
URL: https://github.com/apache/kafka/pull/11513#discussion_r765060632



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
##########
@@ -79,15 +94,21 @@ public void init(final StateStoreContext context, final StateStore root) {
     }
 
     @Override
+    @SuppressWarnings("unchecked")
     public void remove(final Windowed<Bytes> sessionKey) {
         wrapped().remove(sessionKey);
-        context.logChange(name(), SessionKeySchema.toBinary(sessionKey), null, context.timestamp());
+        context.logChange(name(), SessionKeySchema.toBinary(sessionKey), null, context.timestamp(), position);

Review comment:
       Yep, I think that's correct anyhow, since deleting a record from a store is also an update to its state, and the position should uniquely correspond to whether you see the delete or not.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11513: KAFKA-13506: Write and restore position to/from changelog

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11513:
URL: https://github.com/apache/kafka/pull/11513#discussion_r765067376



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
##########
@@ -120,7 +129,7 @@ public void setup() {
             timestampedIters.add(i, mock(KeyValueIterator.class));
         }
 
-        final ProcessorStateManager stateManager = mock(ProcessorStateManager.class);
+        //final ProcessorStateManager stateManager = mock(ProcessorStateManager.class);

Review comment:
       ```suggestion
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vpapavas commented on a change in pull request #11513: feat: Write offset position information of records to changelog

Posted by GitBox <gi...@apache.org>.
vpapavas commented on a change in pull request #11513:
URL: https://github.com/apache/kafka/pull/11513#discussion_r755097382



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##########
@@ -110,17 +120,26 @@ public RecordCollector recordCollector() {
     public void logChange(final String storeName,
                           final Bytes key,
                           final byte[] value,
-                          final long timestamp) {
+                          final long timestamp,
+                          final Optional<Position> position) {
         throwUnsupportedOperationExceptionIfStandby("logChange");
 
         final TopicPartition changelogPartition = stateManager().registeredChangelogPartitionFor(storeName);
 
-        // Sending null headers to changelog topics (KIP-244)
+        final Headers headers = new RecordHeaders();
+        if (!consistencyEnabled) {

Review comment:
       From now on, the records written to the changelog will always have headers. In the default case, it will just be the version of the records. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vpapavas commented on a change in pull request #11513: feat: Write offset position to changelog

Posted by GitBox <gi...@apache.org>.
vpapavas commented on a change in pull request #11513:
URL: https://github.com/apache/kafka/pull/11513#discussion_r761135387



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
##########
@@ -216,6 +226,11 @@ public void put(final Bytes key,
             expiredRecordSensor.record(1.0d, ProcessorContextUtils.currentSystemTime(context));
             LOG.warn("Skipping record for expired segment.");
         } else {
+            final InternalProcessorContext internalContext = asInternalProcessorContext(context);

Review comment:
       Until the state store gets upgraded to use the `StateStoreContext`, we need to do this if we want to record the position on put




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11513: KAFKA-13506: Write and restore position to/from changelog

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11513:
URL: https://github.com/apache/kafka/pull/11513#discussion_r765090882



##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/GlobalStateStoreProviderTest.java
##########
@@ -110,6 +111,9 @@ public void before() {
             );
         expect(mockContext.taskId()).andStubReturn(new TaskId(0, 0));
         expect(mockContext.recordCollector()).andStubReturn(null);
+        final Map<String, Object> configValues = new HashMap<>();
+        configValues.put(InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED, false);

Review comment:
       I think these shouldn't be necessary anymore, since the default is `false`. Can we remove them in a follow-on PR?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11513: KAFKA-13506: Write and restore position to/from changelog

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11513:
URL: https://github.com/apache/kafka/pull/11513#discussion_r765074461



##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
##########
@@ -103,18 +142,22 @@ public void shouldLogPuts() {
 
     @Test
     public void shouldLogRemoves() {
+        System.out.println("First call");
         inner.remove(key1);
         EasyMock.expectLastCall();
 
         init();
+        System.out.println("Second call");

Review comment:
       ```suggestion
   ```

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
##########
@@ -103,18 +142,22 @@ public void shouldLogPuts() {
 
     @Test
     public void shouldLogRemoves() {
+        System.out.println("First call");
         inner.remove(key1);
         EasyMock.expectLastCall();
 
         init();
+        System.out.println("Second call");
         store.remove(key1);
 
         final Bytes binaryKey = SessionKeySchema.toBinary(key1);
 
         EasyMock.reset(context);
-        context.logChange(store.name(), binaryKey, null, 0L);
+        EasyMock.expect(context.recordMetadata()).andStubReturn(Optional.empty());
+        context.logChange(store.name(), binaryKey, null, 0L, Position.emptyPosition());
 
         EasyMock.replay(context);
+        System.out.println("Third call");

Review comment:
       ```suggestion
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vpapavas commented on a change in pull request #11513: feat: Write offset position to changelog

Posted by GitBox <gi...@apache.org>.
vpapavas commented on a change in pull request #11513:
URL: https://github.com/apache/kafka/pull/11513#discussion_r755099262



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
##########
@@ -155,6 +169,11 @@ private void putInternal(final Bytes key,
                 context.timestamp(),
                 context.partition(),
                 context.topic()));
+        if (context.recordMetadata().isPresent() && consistencyEnabled) {

Review comment:
       We need to record the position here as well, since reads might be served directly from the cache




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vpapavas commented on a change in pull request #11513: feat: Write offset position to changelog

Posted by GitBox <gi...@apache.org>.
vpapavas commented on a change in pull request #11513:
URL: https://github.com/apache/kafka/pull/11513#discussion_r760099865



##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -947,6 +947,11 @@
         // Private API used to control the emit latency for left/outer join results (https://issues.apache.org/jira/browse/KAFKA-10847)
         public static final String EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX = "__emit.interval.ms.kstreams.outer.join.spurious.results.fix__";
 
+        // Private API used to control the usage of consistency offset vectors
+        public static final String IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED = "__iq.consistency.offset"

Review comment:
       Internal feature flag. All work regarding writing and reading the `Position` to/from the changelog is hidden behind this flag




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vpapavas commented on a change in pull request #11513: KAFKA-13506: Write and restore position to/from changelog

Posted by GitBox <gi...@apache.org>.
vpapavas commented on a change in pull request #11513:
URL: https://github.com/apache/kafka/pull/11513#discussion_r764724467



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogRecordDeserializationHelper.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.state.internals.PositionSerde;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Changelog records without any headers are considered old format.
+ * New format changelog records will have a version in their headers.
+ * Version 0: This indicates that the changelog records have consistency information.
+ */
+public class ChangelogRecordDeserializationHelper {
+    public static final Logger log = LoggerFactory.getLogger(ChangelogRecordDeserializationHelper.class);
+    private static final byte[] V_0_CHANGELOG_VERSION_HEADER_VALUE = {(byte) 0};
+    public static final String CHANGELOG_VERSION_HEADER_KEY = "v";
+    public static final String CHANGELOG_POSITION_HEADER_KEY = "c";
+    public static final RecordHeader CHANGELOG_VERSION_HEADER_RECORD_CONSISTENCY = new RecordHeader(
+            CHANGELOG_VERSION_HEADER_KEY, V_0_CHANGELOG_VERSION_HEADER_VALUE);
+
+    public static Position applyChecksAndUpdatePosition(
+            final ConsumerRecord<byte[], byte[]> record,
+            final boolean consistencyEnabled,
+            final Position position
+    ) {
+        Position restoredPosition = Position.emptyPosition();
+        if (!consistencyEnabled) {
+            return Position.emptyPosition();

Review comment:
       Good catch, I totally missed this. Thank you!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vpapavas commented on a change in pull request #11513: feat: Write offset position to changelog

Posted by GitBox <gi...@apache.org>.
vpapavas commented on a change in pull request #11513:
URL: https://github.com/apache/kafka/pull/11513#discussion_r761142526



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/ConsistencyVectorIntegrationTest.java
##########
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.QueryableStoreType;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.internals.Position;
+import org.apache.kafka.streams.state.internals.RocksDBStore;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getStore;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
+import static org.apache.kafka.streams.state.QueryableStoreTypes.keyValueStore;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasEntry;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+
+@Category({IntegrationTest.class})
+public class ConsistencyVectorIntegrationTest {
+
+    private static final int NUM_BROKERS = 1;
+    private static int port = 0;
+    private static final String INPUT_TOPIC_NAME = "input-topic";
+    private static final String TABLE_NAME = "source-table";
+
+    public final EmbeddedKafkaCluster cluster = new EmbeddedKafkaCluster(NUM_BROKERS);
+
+    @Rule
+    public TestName testName = new TestName();
+
+    private final List<KafkaStreams> streamsToCleanup = new ArrayList<>();
+    private final MockTime mockTime = cluster.time;
+
+    @Before
+    public void before() throws InterruptedException, IOException {
+        cluster.start();
+        cluster.createTopic(INPUT_TOPIC_NAME, 2, 1);
+    }
+
+    @After
+    public void after() {
+        for (final KafkaStreams kafkaStreams : streamsToCleanup) {
+            kafkaStreams.close();
+        }
+        cluster.stop();
+    }
+
+    @Test
+    public void shouldHaveSamePositionBoundActiveAndStandBy() throws Exception {
+        final int batch1NumMessages = 100;
+        final int key = 1;
+        final Semaphore semaphore = new Semaphore(0);
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        Objects.requireNonNull(TABLE_NAME, "name cannot be null");
+        final TestingRocksDbKeyValueBytesStoreSupplier supplier =
+                new TestingRocksDbKeyValueBytesStoreSupplier(TABLE_NAME);
+
+        builder.table(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()),
+                        Materialized.<Integer, Integer>as(supplier).withCachingDisabled())
+                .toStream()
+                .peek((k, v) -> semaphore.release());
+
+        final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, streamsConfiguration());
+        final KafkaStreams kafkaStreams2 = createKafkaStreams(builder, streamsConfiguration());
+        final List<KafkaStreams> kafkaStreamsList = Arrays.asList(kafkaStreams1, kafkaStreams2);
+
+        startApplicationAndWaitUntilRunning(kafkaStreamsList, Duration.ofSeconds(60));
+
+        produceValueRange(key, 0, batch1NumMessages);
+
+        // Assert that all messages in the first batch were processed in a timely manner
+        assertThat(semaphore.tryAcquire(batch1NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true)));
+
+        final QueryableStoreType<ReadOnlyKeyValueStore<Integer, Integer>> queryableStoreType = keyValueStore();
+
+        // Assert that both active and standby have the same position bound
+        TestUtils.waitForCondition(() -> {
+            final ReadOnlyKeyValueStore<Integer, Integer> store1 = getStore(TABLE_NAME, kafkaStreams1, true, queryableStoreType);
+            return store1.get(key) != null;
+        }, "store1 cannot find results for key");
+        TestUtils.waitForCondition(() -> {
+            final ReadOnlyKeyValueStore<Integer, Integer> store2 = getStore(TABLE_NAME, kafkaStreams2, true, queryableStoreType);
+            return store2.get(key) != null;
+        }, "store2 cannot find results for key");
+
+        final AtomicInteger count = new AtomicInteger();
+        for (final TestingRocksDBStore store : supplier.stores) {
+            if (!store.getPosition().isUnbounded()) {
+                assertThat(store.getDbDir().toString().contains("/0_0/"), is(true));
+                assertThat(store.getPosition().getBound(INPUT_TOPIC_NAME), notNullValue());
+                assertThat(store.getPosition().getBound(INPUT_TOPIC_NAME), hasEntry(0, 99L));
+                count.incrementAndGet();
+            }
+        }
+        assertThat(count.get(), is(2));
+    }
+
+    public class TestingRocksDBStore extends RocksDBStore {

Review comment:
       This is why I needed the constructor of `RocksDBStore` to be public




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vpapavas commented on a change in pull request #11513: feat: Write offset position to changelog

Posted by GitBox <gi...@apache.org>.
vpapavas commented on a change in pull request #11513:
URL: https://github.com/apache/kafka/pull/11513#discussion_r755285644



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
##########
@@ -246,4 +246,4 @@ public TaskType taskType() {
     public String changelogFor(final String storeName) {
         return stateManager().changelogFor(storeName);
     }
-}
+}

Review comment:
       I don't understand what this change is, the contents are identical




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11513: KAFKA-13506: Write and restore position to/from changelog

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11513:
URL: https://github.com/apache/kafka/pull/11513#discussion_r765093493



##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/InMemorySessionStoreTest.java
##########
@@ -91,29 +90,20 @@ public void shouldNotExpireFromOpenIterator() {
 
     @Test
     public void shouldMatchPositionAfterPut() {
-
-        final List<KeyValue<Windowed<String>, Long>> entries = new ArrayList<>();
-        entries.add(new KeyValue<>(new Windowed<String>("a", new SessionWindow(0, 0)), 1L));
-        entries.add(new KeyValue<>(new Windowed<String>("aa", new SessionWindow(0, 10)), 2L));
-        entries.add(new KeyValue<>(new Windowed<String>("a", new SessionWindow(10, 20)), 3L));
-
-        final MonotonicProcessorRecordContext recordContext = new MonotonicProcessorRecordContext("input", 0);
-        context.setRecordContext(recordContext);
-
-        final Position expected = Position.emptyPosition();
-        long offset = 0;
-        for (final KeyValue<Windowed<String>, Long> k : entries) {
-            sessionStore.put(k.key, k.value);
-            expected.withComponent("input", 0, offset);
-            offset++;
-        }
-
         final MeteredSessionStore<String, Long> meteredSessionStore = (MeteredSessionStore<String, Long>) sessionStore;
         final ChangeLoggingSessionBytesStore changeLoggingSessionBytesStore = (ChangeLoggingSessionBytesStore) meteredSessionStore.wrapped();
         final InMemorySessionStore inMemorySessionStore = (InMemorySessionStore) changeLoggingSessionBytesStore.wrapped();
 
+        context.setRecordContext(new ProcessorRecordContext(0, 1, 0, "", new RecordHeaders()));
+        sessionStore.put(new Windowed<String>("a", new SessionWindow(0, 0)), 1L);
+        context.setRecordContext(new ProcessorRecordContext(0, 2, 0, "", new RecordHeaders()));
+        sessionStore.put(new Windowed<String>("aa", new SessionWindow(0, 10)), 2L);
+        context.setRecordContext(new ProcessorRecordContext(0, 3, 0, "", new RecordHeaders()));
+        sessionStore.put(new Windowed<String>("a", new SessionWindow(10, 20)), 3L);
+
+        final Position expected = Position.fromMap(mkMap(mkEntry("", mkMap(mkEntry(0, 3L)))));
         final Position actual = inMemorySessionStore.getPosition();
-        assertThat(expected, is(actual));
+        assertEquals(expected, actual);

Review comment:
       Not a huge deal, but we tend to prefer `assertThat` to `assertEqual`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11513: feat: Write and restore position to/from changelog

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11513:
URL: https://github.com/apache/kafka/pull/11513#discussion_r764279791



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##########
@@ -110,17 +121,27 @@ public RecordCollector recordCollector() {
     public void logChange(final String storeName,
                           final Bytes key,
                           final byte[] value,
-                          final long timestamp) {
+                          final long timestamp,
+                          final Position position) {
         throwUnsupportedOperationExceptionIfStandby("logChange");
 
         final TopicPartition changelogPartition = stateManager().registeredChangelogPartitionFor(storeName);
 
-        // Sending null headers to changelog topics (KIP-244)
+        Headers headers = new RecordHeaders();

Review comment:
       nit: I'd make this final with no assignment, then assign it in both branches below.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11513: KAFKA-13506: Write and restore position to/from changelog

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11513:
URL: https://github.com/apache/kafka/pull/11513#discussion_r765062259



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -106,10 +111,13 @@
     private final RocksDBMetricsRecorder metricsRecorder;
 
     protected volatile boolean open = false;
+    // VisibleForTesting
+    protected Position position;
+
     private StateStoreContext context;
-    private Position position;
 
-    RocksDBStore(final String name,
+    // VisibleForTesting
+    public RocksDBStore(final String name,

Review comment:
       Update: I didn't wind up returning the position information in the framework PR, so the comment still applies. I think we'll revise this once we're able to get the position from the public API, but for now this is just fine.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vpapavas commented on a change in pull request #11513: KAFKA-13506: Write and restore position to/from changelog

Posted by GitBox <gi...@apache.org>.
vpapavas commented on a change in pull request #11513:
URL: https://github.com/apache/kafka/pull/11513#discussion_r764716888



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogRecordDeserializationHelper.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.state.internals.PositionSerde;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Changelog records without any headers are considered old format.
+ * New format changelog records will have a version in their headers.
+ * Version 0: This indicates that the changelog records have consistency information.
+ */
+public class ChangelogRecordDeserializationHelper {
+    public static final Logger log = LoggerFactory.getLogger(ChangelogRecordDeserializationHelper.class);
+    private static final byte[] V_0_CHANGELOG_VERSION_HEADER_VALUE = {(byte) 0};
+    public static final String CHANGELOG_VERSION_HEADER_KEY = "v";
+    public static final String CHANGELOG_POSITION_HEADER_KEY = "c";
+    public static final RecordHeader CHANGELOG_VERSION_HEADER_RECORD_CONSISTENCY = new RecordHeader(
+            CHANGELOG_VERSION_HEADER_KEY, V_0_CHANGELOG_VERSION_HEADER_VALUE);
+
+    public static Position applyChecksAndUpdatePosition(
+            final ConsumerRecord<byte[], byte[]> record,
+            final boolean consistencyEnabled,
+            final Position position
+    ) {
+        Position restoredPosition = Position.emptyPosition();
+        if (!consistencyEnabled) {
+            return Position.emptyPosition();

Review comment:
       It should update the position but `merge` creates a new instance for every invocation. So either we change `merge` to not create a new copy (which is what I prefer since the contract of `merge` is to combine two positions into the first) or I have to manually do the merge which defeats the point of having the `merge` function in the first place 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vpapavas commented on a change in pull request #11513: feat: Write offset position to changelog

Posted by GitBox <gi...@apache.org>.
vpapavas commented on a change in pull request #11513:
URL: https://github.com/apache/kafka/pull/11513#discussion_r755096395



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogRecordDeserializationHelper.java
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.header.internals.RecordHeader;
+
+/**
+ * Changelog records without any headers are considered old format.
+ * New format changelog records will have a version in their headers.
+ * Version 0: This indicates that the changelog records are under version control.
+ * Version 1: This indicates that the changelog records have consistency information.
+ */
+public class ChangelogRecordDeserializationHelper {
+    private static final byte[] V_0_CHANGELOG_VERSION_HEADER_VALUE = {(byte) 0};

Review comment:
       We want to have version control for changelog records. `V0` means that the changelog records will have headers from now . `V_1` means that the headers contain consistency information.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vpapavas commented on a change in pull request #11513: feat: Write offset position information of records to changelog

Posted by GitBox <gi...@apache.org>.
vpapavas commented on a change in pull request #11513:
URL: https://github.com/apache/kafka/pull/11513#discussion_r755098773



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##########
@@ -110,17 +120,26 @@ public RecordCollector recordCollector() {
     public void logChange(final String storeName,
                           final Bytes key,
                           final byte[] value,
-                          final long timestamp) {
+                          final long timestamp,
+                          final Optional<Position> position) {
         throwUnsupportedOperationExceptionIfStandby("logChange");
 
         final TopicPartition changelogPartition = stateManager().registeredChangelogPartitionFor(storeName);
 
-        // Sending null headers to changelog topics (KIP-244)
+        final Headers headers = new RecordHeaders();
+        if (!consistencyEnabled) {
+            headers.add(ChangelogRecordDeserializationHelper.CHANGELOG_VERSION_HEADER_RECORD_DEFAULT);
+        } else {
+            // Add the vector clock to the header part of every record

Review comment:
       We know this will lead to inefficiencies when there are multiple topics with long names. One solution is to map the topic names to bytes to save space. The other option is to not write the entire vector clock with every record. The latter approach is under design




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vpapavas commented on a change in pull request #11513: feat: Write and restore position to/from changelog

Posted by GitBox <gi...@apache.org>.
vpapavas commented on a change in pull request #11513:
URL: https://github.com/apache/kafka/pull/11513#discussion_r764012303



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/ConsistencyVectorIntegrationTest.java
##########
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.QueryableStoreType;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.internals.Position;
+import org.apache.kafka.streams.state.internals.RocksDBStore;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getStore;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
+import static org.apache.kafka.streams.state.QueryableStoreTypes.keyValueStore;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasEntry;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+
+@Category({IntegrationTest.class})
+public class ConsistencyVectorIntegrationTest {
+
+    private static final int NUM_BROKERS = 1;
+    private static int port = 0;
+    private static final String INPUT_TOPIC_NAME = "input-topic";
+    private static final String TABLE_NAME = "source-table";
+
+    public final EmbeddedKafkaCluster cluster = new EmbeddedKafkaCluster(NUM_BROKERS);
+
+    @Rule
+    public TestName testName = new TestName();
+
+    private final List<KafkaStreams> streamsToCleanup = new ArrayList<>();
+    private final MockTime mockTime = cluster.time;
+
+    @Before
+    public void before() throws InterruptedException, IOException {
+        cluster.start();
+        cluster.createTopic(INPUT_TOPIC_NAME, 2, 1);
+    }
+
+    @After
+    public void after() {
+        for (final KafkaStreams kafkaStreams : streamsToCleanup) {
+            kafkaStreams.close();
+        }
+        cluster.stop();
+    }
+
+    @Test
+    public void shouldHaveSamePositionBoundActiveAndStandBy() throws Exception {
+        final int batch1NumMessages = 100;

Review comment:
       I updated the test to check that it sees the last record of the batch in the standby store




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vpapavas commented on a change in pull request #11513: feat: Write offset position to changelog

Posted by GitBox <gi...@apache.org>.
vpapavas commented on a change in pull request #11513:
URL: https://github.com/apache/kafka/pull/11513#discussion_r760111821



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
##########
@@ -79,15 +94,21 @@ public void init(final StateStoreContext context, final StateStore root) {
     }
 
     @Override
+    @SuppressWarnings("unchecked")
     public void remove(final Windowed<Bytes> sessionKey) {
         wrapped().remove(sessionKey);
-        context.logChange(name(), SessionKeySchema.toBinary(sessionKey), null, context.timestamp());
+        context.logChange(name(), SessionKeySchema.toBinary(sessionKey), null, context.timestamp(), position);

Review comment:
       We need to log a position here since we don't want mixed records (some that have and some that don't have headers with consistency info) in the changelog 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vpapavas commented on a change in pull request #11513: feat: Write offset position to changelog

Posted by GitBox <gi...@apache.org>.
vpapavas commented on a change in pull request #11513:
URL: https://github.com/apache/kafka/pull/11513#discussion_r760121526



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/Position.java
##########
@@ -27,6 +30,8 @@
 import java.util.function.BiConsumer;
 
 public class Position {
+    public static final String VECTOR_KEY = "c";

Review comment:
       This is the header key




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vpapavas commented on a change in pull request #11513: feat: Write offset position to changelog

Posted by GitBox <gi...@apache.org>.
vpapavas commented on a change in pull request #11513:
URL: https://github.com/apache/kafka/pull/11513#discussion_r760111821



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
##########
@@ -79,15 +94,21 @@ public void init(final StateStoreContext context, final StateStore root) {
     }
 
     @Override
+    @SuppressWarnings("unchecked")
     public void remove(final Windowed<Bytes> sessionKey) {
         wrapped().remove(sessionKey);
-        context.logChange(name(), SessionKeySchema.toBinary(sessionKey), null, context.timestamp());
+        context.logChange(name(), SessionKeySchema.toBinary(sessionKey), null, context.timestamp(), position);

Review comment:
       We need to log a position here since we don't want mixed records (some that have and some that don't have headers with consistency info) in the changelog.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vpapavas commented on a change in pull request #11513: feat: Write offset position information of records to changelog

Posted by GitBox <gi...@apache.org>.
vpapavas commented on a change in pull request #11513:
URL: https://github.com/apache/kafka/pull/11513#discussion_r752325491



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/ConsistencyVectorIntegrationTest.java
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.processor.internals.Position;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.QueryableStoreType;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.internals.RocksDBStore;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getStore;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
+import static org.apache.kafka.streams.state.QueryableStoreTypes.keyValueStore;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasEntry;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+
+@Category({IntegrationTest.class})
+public class ConsistencyVectorIntegrationTest {
+
+    private static final int NUM_BROKERS = 1;
+    private static int port = 0;
+    private static final String INPUT_TOPIC_NAME = "input-topic";
+    private static final String TABLE_NAME = "source-table";
+
+    public final EmbeddedKafkaCluster cluster = new EmbeddedKafkaCluster(NUM_BROKERS);
+
+    @Rule
+    public TestName testName = new TestName();
+
+    private final List<KafkaStreams> streamsToCleanup = new ArrayList<>();
+    private final MockTime mockTime = cluster.time;
+
+    @Before
+    public void before() throws InterruptedException, IOException {
+        cluster.start();
+        cluster.createTopic(INPUT_TOPIC_NAME, 2, 1);
+    }
+
+    @After
+    public void after() {
+        for (final KafkaStreams kafkaStreams : streamsToCleanup) {
+            kafkaStreams.close();
+        }
+        cluster.stop();
+    }
+
+    @Test
+    public void shouldHaveSamePositionBoundActiveAndStandBy() throws Exception {
+        final int batch1NumMessages = 100;
+        final int key = 1;
+        final Semaphore semaphore = new Semaphore(0);
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        Objects.requireNonNull(TABLE_NAME, "name cannot be null");
+        final TestingRocksDbKeyValueBytesStoreSupplier supplier =
+                new TestingRocksDbKeyValueBytesStoreSupplier(TABLE_NAME);
+
+        builder.table(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()),
+                        Materialized.<Integer, Integer>as(supplier).withCachingDisabled())
+                .toStream()
+                .peek((k, v) -> semaphore.release());
+
+        final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, streamsConfiguration());
+        final KafkaStreams kafkaStreams2 = createKafkaStreams(builder, streamsConfiguration());
+        final List<KafkaStreams> kafkaStreamsList = Arrays.asList(kafkaStreams1, kafkaStreams2);
+
+        startApplicationAndWaitUntilRunning(kafkaStreamsList, Duration.ofSeconds(60));
+
+        produceValueRange(key, 0, batch1NumMessages);
+
+        // Assert that all messages in the first batch were processed in a timely manner
+        assertThat(semaphore.tryAcquire(batch1NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true)));
+
+        final QueryableStoreType<ReadOnlyKeyValueStore<Integer, Integer>> queryableStoreType = keyValueStore();
+
+        // Assert that both active and standby have the same position bound
+        TestUtils.waitForCondition(() -> {
+            final ReadOnlyKeyValueStore<Integer, Integer> store1 = getStore(TABLE_NAME, kafkaStreams1, true, queryableStoreType);
+            return store1.get(key) != null;
+        }, "store1 cannot find results for key");
+        TestUtils.waitForCondition(() -> {
+            final ReadOnlyKeyValueStore<Integer, Integer> store2 = getStore(TABLE_NAME, kafkaStreams2, true, queryableStoreType);
+            return store2.get(key) != null;
+        }, "store2 cannot find results for key");
+
+        final AtomicInteger count = new AtomicInteger();
+        for (final TestingRocksDBStore store : supplier.stores) {
+            store.getPosition().ifPresent(p -> {
+                if (!p.isUnbounded()) {
+                    assertThat(store.getDbDir().toString().contains("/0_0/"), is(true));

Review comment:
       There must be a better way to test this but I basically want to make sure that one store is that of the active and the other is that of the standby




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vpapavas commented on a change in pull request #11513: feat: Write offset position information of records to changelog

Posted by GitBox <gi...@apache.org>.
vpapavas commented on a change in pull request #11513:
URL: https://github.com/apache/kafka/pull/11513#discussion_r755096395



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogRecordDeserializationHelper.java
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.header.internals.RecordHeader;
+
+/**
+ * Changelog records without any headers are considered old format.
+ * New format changelog records will have a version in their headers.
+ * Version 0: This indicates that the changelog records are under version control.
+ * Version 1: This indicates that the changelog records have consistency information.
+ */
+public class ChangelogRecordDeserializationHelper {
+    private static final byte[] V_0_CHANGELOG_VERSION_HEADER_VALUE = {(byte) 0};

Review comment:
       `V0` means that the changelog records will have headers from now




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vpapavas commented on a change in pull request #11513: KAFKA-13506: Write and restore position to/from changelog

Posted by GitBox <gi...@apache.org>.
vpapavas commented on a change in pull request #11513:
URL: https://github.com/apache/kafka/pull/11513#discussion_r764716888



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogRecordDeserializationHelper.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.state.internals.PositionSerde;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Changelog records without any headers are considered old format.
+ * New format changelog records will have a version in their headers.
+ * Version 0: This indicates that the changelog records have consistency information.
+ */
+public class ChangelogRecordDeserializationHelper {
+    public static final Logger log = LoggerFactory.getLogger(ChangelogRecordDeserializationHelper.class);
+    private static final byte[] V_0_CHANGELOG_VERSION_HEADER_VALUE = {(byte) 0};
+    public static final String CHANGELOG_VERSION_HEADER_KEY = "v";
+    public static final String CHANGELOG_POSITION_HEADER_KEY = "c";
+    public static final RecordHeader CHANGELOG_VERSION_HEADER_RECORD_CONSISTENCY = new RecordHeader(
+            CHANGELOG_VERSION_HEADER_KEY, V_0_CHANGELOG_VERSION_HEADER_VALUE);
+
+    public static Position applyChecksAndUpdatePosition(
+            final ConsumerRecord<byte[], byte[]> record,
+            final boolean consistencyEnabled,
+            final Position position
+    ) {
+        Position restoredPosition = Position.emptyPosition();
+        if (!consistencyEnabled) {
+            return Position.emptyPosition();

Review comment:
       It should update the position but `merge` creates a new instance for every invocation. So either we change `merge` to not create a new copy (which is what I prefer since the contract of `merge` is to combine two positions into the first) or I have to manually do the merge which defeats the point of having the `merge` function in the first place 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vpapavas commented on a change in pull request #11513: feat: Write offset position to changelog

Posted by GitBox <gi...@apache.org>.
vpapavas commented on a change in pull request #11513:
URL: https://github.com/apache/kafka/pull/11513#discussion_r760099865



##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -947,6 +947,11 @@
         // Private API used to control the emit latency for left/outer join results (https://issues.apache.org/jira/browse/KAFKA-10847)
         public static final String EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX = "__emit.interval.ms.kstreams.outer.join.spurious.results.fix__";
 
+        // Private API used to control the usage of consistency offset vectors
+        public static final String IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED = "__iq.consistency.offset"

Review comment:
       Internal feature flag. All work regarding consistency is hidden behind this flag. That's why the `Position` is optional in the stores




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vpapavas commented on a change in pull request #11513: feat: Write offset position to changelog

Posted by GitBox <gi...@apache.org>.
vpapavas commented on a change in pull request #11513:
URL: https://github.com/apache/kafka/pull/11513#discussion_r760111821



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
##########
@@ -79,15 +94,21 @@ public void init(final StateStoreContext context, final StateStore root) {
     }
 
     @Override
+    @SuppressWarnings("unchecked")
     public void remove(final Windowed<Bytes> sessionKey) {
         wrapped().remove(sessionKey);
-        context.logChange(name(), SessionKeySchema.toBinary(sessionKey), null, context.timestamp());
+        context.logChange(name(), SessionKeySchema.toBinary(sessionKey), null, context.timestamp(), position);

Review comment:
       We need to log a position here since we don't want mixed records (some that have and some that don't have headers with consistency info) in the changelog. In any case, the position is ignored during restoration for tombstones




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vpapavas commented on a change in pull request #11513: feat: Write and restore position to/from changelog

Posted by GitBox <gi...@apache.org>.
vpapavas commented on a change in pull request #11513:
URL: https://github.com/apache/kafka/pull/11513#discussion_r762030366



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/ConsistencyVectorIntegrationTest.java
##########
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.QueryableStoreType;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.internals.Position;
+import org.apache.kafka.streams.state.internals.RocksDBStore;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getStore;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
+import static org.apache.kafka.streams.state.QueryableStoreTypes.keyValueStore;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasEntry;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+
+@Category({IntegrationTest.class})
+public class ConsistencyVectorIntegrationTest {
+
+    private static final int NUM_BROKERS = 1;
+    private static int port = 0;
+    private static final String INPUT_TOPIC_NAME = "input-topic";
+    private static final String TABLE_NAME = "source-table";
+
+    public final EmbeddedKafkaCluster cluster = new EmbeddedKafkaCluster(NUM_BROKERS);
+
+    @Rule
+    public TestName testName = new TestName();
+
+    private final List<KafkaStreams> streamsToCleanup = new ArrayList<>();
+    private final MockTime mockTime = cluster.time;
+
+    @Before
+    public void before() throws InterruptedException, IOException {
+        cluster.start();
+        cluster.createTopic(INPUT_TOPIC_NAME, 2, 1);
+    }
+
+    @After
+    public void after() {
+        for (final KafkaStreams kafkaStreams : streamsToCleanup) {
+            kafkaStreams.close();
+        }
+        cluster.stop();
+    }
+
+    @Test
+    public void shouldHaveSamePositionBoundActiveAndStandBy() throws Exception {
+        final int batch1NumMessages = 100;
+        final int key = 1;
+        final Semaphore semaphore = new Semaphore(0);
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        Objects.requireNonNull(TABLE_NAME, "name cannot be null");
+        final TestingRocksDbKeyValueBytesStoreSupplier supplier =
+                new TestingRocksDbKeyValueBytesStoreSupplier(TABLE_NAME);
+
+        builder.table(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()),
+                        Materialized.<Integer, Integer>as(supplier).withCachingDisabled())
+                .toStream()
+                .peek((k, v) -> semaphore.release());
+
+        final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, streamsConfiguration());
+        final KafkaStreams kafkaStreams2 = createKafkaStreams(builder, streamsConfiguration());
+        final List<KafkaStreams> kafkaStreamsList = Arrays.asList(kafkaStreams1, kafkaStreams2);
+
+        startApplicationAndWaitUntilRunning(kafkaStreamsList, Duration.ofSeconds(60));
+
+        produceValueRange(key, 0, batch1NumMessages);
+
+        // Assert that all messages in the first batch were processed in a timely manner
+        assertThat(semaphore.tryAcquire(batch1NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true)));
+
+        final QueryableStoreType<ReadOnlyKeyValueStore<Integer, Integer>> queryableStoreType = keyValueStore();
+
+        // Assert that both active and standby have the same position bound
+        TestUtils.waitForCondition(() -> {
+            final ReadOnlyKeyValueStore<Integer, Integer> store1 = getStore(TABLE_NAME, kafkaStreams1, true, queryableStoreType);
+            return store1.get(key) != null;
+        }, "store1 cannot find results for key");
+        TestUtils.waitForCondition(() -> {
+            final ReadOnlyKeyValueStore<Integer, Integer> store2 = getStore(TABLE_NAME, kafkaStreams2, true, queryableStoreType);
+            return store2.get(key) != null;
+        }, "store2 cannot find results for key");
+
+        final AtomicInteger count = new AtomicInteger();
+        for (final TestingRocksDBStore store : supplier.stores) {

Review comment:
       I had written a comment here but it got lost: There must be a better way to test this: I want to make sure that at the end, there are two stores that have partition 0, one at the active and one at the standby so that I can compare their position.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vpapavas commented on a change in pull request #11513: feat: Write offset position to changelog

Posted by GitBox <gi...@apache.org>.
vpapavas commented on a change in pull request #11513:
URL: https://github.com/apache/kafka/pull/11513#discussion_r755096395



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogRecordDeserializationHelper.java
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.header.internals.RecordHeader;
+
+/**
+ * Changelog records without any headers are considered old format.
+ * New format changelog records will have a version in their headers.
+ * Version 0: This indicates that the changelog records are under version control.
+ * Version 1: This indicates that the changelog records have consistency information.
+ */
+public class ChangelogRecordDeserializationHelper {
+    private static final byte[] V_0_CHANGELOG_VERSION_HEADER_VALUE = {(byte) 0};

Review comment:
       We want to have version control for changelog records. `V0` means that the changelog records will have headers from now but these headers are empty and not used. `V_1` means that the headers contain consistency information.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vpapavas commented on a change in pull request #11513: feat: Write offset position to changelog

Posted by GitBox <gi...@apache.org>.
vpapavas commented on a change in pull request #11513:
URL: https://github.com/apache/kafka/pull/11513#discussion_r761141392



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -312,6 +318,11 @@ public synchronized void put(final Bytes key,
     public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
         try (final WriteBatch batch = new WriteBatch()) {
             dbAccessor.prepareBatch(entries, batch);
+            // FIXME Will the recordMetadata be the offset of the last record in the batch?

Review comment:
       @vvcephei  What the comment is asking basically: What is the recordMetadata when there is a batch of records?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vpapavas commented on a change in pull request #11513: feat: Write offset position to changelog

Posted by GitBox <gi...@apache.org>.
vpapavas commented on a change in pull request #11513:
URL: https://github.com/apache/kafka/pull/11513#discussion_r761138847



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
##########
@@ -76,6 +87,10 @@ public long approximateNumEntries() {
     public void put(final Bytes key,
                     final byte[] value) {
         wrapped().put(key, value);
+        if (context != null && context.recordMetadata().isPresent()) {

Review comment:
       @vvcephei when can the context be null? I copied this check from your POC but it wasn't everywhere so I am not sure where we need it. I guess the safest is to add it everywhere




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vpapavas commented on a change in pull request #11513: feat: Write offset position to changelog

Posted by GitBox <gi...@apache.org>.
vpapavas commented on a change in pull request #11513:
URL: https://github.com/apache/kafka/pull/11513#discussion_r761140618



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -106,10 +111,13 @@
     private final RocksDBMetricsRecorder metricsRecorder;
 
     protected volatile boolean open = false;
+    // VisibleForTesting
+    protected Position position;
+
     private StateStoreContext context;
-    private Position position;
 
-    RocksDBStore(final String name,
+    // VisibleForTesting
+    public RocksDBStore(final String name,

Review comment:
       This is needed so that I can test the consistency in the integration test. I need access to the `Position` so that I can assert it is correct




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11513: feat: Write and restore position to/from changelog

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11513:
URL: https://github.com/apache/kafka/pull/11513#discussion_r761510761



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##########
@@ -110,17 +121,28 @@ public RecordCollector recordCollector() {
     public void logChange(final String storeName,
                           final Bytes key,
                           final byte[] value,
-                          final long timestamp) {
+                          final long timestamp,
+                          final Optional<Position> position) {
         throwUnsupportedOperationExceptionIfStandby("logChange");
 
         final TopicPartition changelogPartition = stateManager().registeredChangelogPartitionFor(storeName);
 
-        // Sending null headers to changelog topics (KIP-244)
+        final Headers headers = new RecordHeaders();
+        if (!consistencyEnabled) {
+            headers.add(ChangelogRecordDeserializationHelper.CHANGELOG_VERSION_HEADER_RECORD_DEFAULT);

Review comment:
       Following on the last comment (and the earlier one), I don't think we need the "default" version, just the "consistency" one.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/ConsistencyVectorIntegrationTest.java
##########
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.QueryableStoreType;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.internals.Position;
+import org.apache.kafka.streams.state.internals.RocksDBStore;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getStore;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
+import static org.apache.kafka.streams.state.QueryableStoreTypes.keyValueStore;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasEntry;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+
+@Category({IntegrationTest.class})
+public class ConsistencyVectorIntegrationTest {
+
+    private static final int NUM_BROKERS = 1;
+    private static int port = 0;
+    private static final String INPUT_TOPIC_NAME = "input-topic";
+    private static final String TABLE_NAME = "source-table";
+
+    public final EmbeddedKafkaCluster cluster = new EmbeddedKafkaCluster(NUM_BROKERS);
+
+    @Rule
+    public TestName testName = new TestName();
+
+    private final List<KafkaStreams> streamsToCleanup = new ArrayList<>();
+    private final MockTime mockTime = cluster.time;
+
+    @Before
+    public void before() throws InterruptedException, IOException {
+        cluster.start();
+        cluster.createTopic(INPUT_TOPIC_NAME, 2, 1);
+    }
+
+    @After
+    public void after() {
+        for (final KafkaStreams kafkaStreams : streamsToCleanup) {
+            kafkaStreams.close();
+        }
+        cluster.stop();
+    }
+
+    @Test
+    public void shouldHaveSamePositionBoundActiveAndStandBy() throws Exception {
+        final int batch1NumMessages = 100;

Review comment:
       It seems like this test is just as valid with one message as with 100. I'm concerned that we might have a flaky test if the standby has only restored like 50 of the messages when we do that last assertion. It looks like all the safety checks before that would pass, but there's nothing that guarantees we have read the entire batch into the standby before we assert the position.
   
   OTOH, if we just write a single record, then it's either there or it's not.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
##########
@@ -216,6 +226,15 @@ public void put(final Bytes key,
             expiredRecordSensor.record(1.0d, ProcessorContextUtils.currentSystemTime(context));
             LOG.warn("Skipping record for expired segment.");
         } else {
+            try {
+                final InternalProcessorContext internalContext = asInternalProcessorContext(context);

Review comment:
       This is to expose `recordMetadata`, right? We should actually be able to migrate this class to `StateStoreContext` now and not need this cast. (The cast would probably break users' unit tests).

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
##########
@@ -138,8 +153,12 @@ public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
         return wrapped().reverseAll();
     }
 
-    void log(final Bytes key,
-             final byte[] value) {
-        context.logChange(name(), key, value, context.timestamp());
+    @SuppressWarnings("unchecked")
+    void log(final Bytes key, final byte[] value) {
+        Optional<Position> optionalPosition = Optional.empty();
+        if (consistencyEnabled) {
+            optionalPosition = Optional.of(position);
+        }

Review comment:
       Since the processor context also checks whether the feature is enabled before writing headers, we can just skip this check and instead pass the position unconditionally.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/Position.java
##########
@@ -27,6 +30,8 @@
 import java.util.function.BiConsumer;
 
 public class Position {
+    public static final String VECTOR_KEY = "c";

Review comment:
       We'd better put all the changelog headers constants together in the same file to avoid collisions if we add more headers later.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogRecordDeserializationHelper.java
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.header.internals.RecordHeader;
+
+/**
+ * Changelog records without any headers are considered old format.
+ * New format changelog records will have a version in their headers.
+ * Version 0: This indicates that the changelog records are under version control.
+ * Version 1: This indicates that the changelog records have consistency information.
+ */
+public class ChangelogRecordDeserializationHelper {
+    private static final byte[] V_0_CHANGELOG_VERSION_HEADER_VALUE = {(byte) 0};

Review comment:
       I think we needed both versions before, when we were thinking about changing the key format of the changelog, but now I think we just need one version. Offhand, it seems like the algorithm should be:
   
   ```
   versionHeader = record.headers().lastHeader(
                   ChangelogRecordDeserializationHelper.CHANGELOG_VERSION_HEADER_KEY);
   if (versionHeader == null) {
     return
   } else {
     switch (versionHeader.value()[0]) {
       case 0:
         vectorHeader = record.headers().lastHeader(Position.VECTOR_KEY);
         if (vectorHeader == null) {
           throw new StreamsException("This should not happen. Consistency is enabled but the changelog " +
             "contains records without consistency information.");
           }
         position.merge(Position.deserialize(ByteBuffer.wrap(vectorHeader.value())));
       default:
         // log a warning because the changelog writer produced a record with a newer version than we understand
         // maybe we want to zero out the position, since we no longer know what position we're at
         // we probably don't want to throw an exception, since we are presumably in the middle of a rolling upgrade
         return;
     }
   }
   ```
   
   In other words, I don't think the consumer actually needs to check `consistencyEnabled` at all, and I don't think that we currently need the `CHANGELOG_VERSION_HEADER_RECORD_DEFAULT` version that's here right now.
   
   But we do actually need to have some provision for what to do if we get a version that's larger than the ones we know how to handle.
   
   What do you think?

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -106,10 +111,13 @@
     private final RocksDBMetricsRecorder metricsRecorder;
 
     protected volatile boolean open = false;
+    // VisibleForTesting
+    protected Position position;
+
     private StateStoreContext context;
-    private Position position;
 
-    RocksDBStore(final String name,
+    // VisibleForTesting
+    public RocksDBStore(final String name,

Review comment:
       I see. Once the framework PR is merged, we'll be able to get the position through the public API, and won't need this anymore. I think your PR will get merged first, so let's plan to circle back and revert some of this stuff after the public API is in.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
##########
@@ -384,12 +387,14 @@ public void localSessionStoreShouldNotAllowInitOrClose() {
     }
 
     @Test
-    public void shouldNotSendRecordHeadersToChangelogTopic() {
+    public void shouldSendV0RecordHeadersToChangelogTopic() {

Review comment:
       And we probably want to add a new test that we write the consistency header if the flag is enabled, right?

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
##########
@@ -79,15 +91,34 @@ public void init(final StateStoreContext context, final StateStore root) {
     }
 
     @Override
+    @SuppressWarnings("unchecked")
     public void remove(final Windowed<Bytes> sessionKey) {
+        if (context.recordMetadata().isPresent()) {
+            final RecordMetadata meta = context.recordMetadata().get();
+            position = position.update(meta.topic(), meta.partition(), meta.offset());
+        }
         wrapped().remove(sessionKey);
-        context.logChange(name(), SessionKeySchema.toBinary(sessionKey), null, context.timestamp());
+        Optional<Position> optionalPosition = Optional.empty();
+        if (consistencyEnabled) {
+            optionalPosition = Optional.of(position);
+        }
+        context.logChange(name(), SessionKeySchema.toBinary(sessionKey), null, context.timestamp(), optionalPosition);
     }
 
     @Override
+    @SuppressWarnings("unchecked")
     public void put(final Windowed<Bytes> sessionKey, final byte[] aggregate) {
+        if (context.recordMetadata().isPresent()) {
+            final RecordMetadata meta = context.recordMetadata().get();
+            position = position.update(meta.topic(), meta.partition(), meta.offset());
+        }
         wrapped().put(sessionKey, aggregate);
-        context.logChange(name(), SessionKeySchema.toBinary(sessionKey), aggregate, context.timestamp());
+        Optional<Position> optionalPosition = Optional.empty();
+        if (consistencyEnabled) {
+            optionalPosition = Optional.of(position);
+        }
+        context.logChange(
+                name(), SessionKeySchema.toBinary(sessionKey), aggregate, context.timestamp(), optionalPosition);

Review comment:
       Yeah, I think these will all become just one-line changes.
   
   ```suggestion
           context.logChange(name(), SessionKeySchema.toBinary(sessionKey), aggregate, context.timestamp(), position);
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
##########
@@ -76,6 +87,10 @@ public long approximateNumEntries() {
     public void put(final Bytes key,
                     final byte[] value) {
         wrapped().put(key, value);
+        if (context != null && context.recordMetadata().isPresent()) {

Review comment:
       Yeah, I do think it's easier to just have it everywhere. In the "real" PR, I wrapped this logic up in a utility method: https://github.com/apache/kafka/pull/11557/files#diff-d0743c083f89b083921f29f722b02676d55e0d602fef06954e7301afc19d1df3R56-R64
   
   The only reason it might be null is in unit tests, but we do need to support unit tests, since our users will have their own unit tests of their processors, which might have stores.
   
   I think it's obvious enough that you need to provide a context with record metadata if you want position tracking that I feel ok about just skipping the position update if the context is null or the record metadata is not present.

##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -947,6 +947,11 @@
         // Private API used to control the emit latency for left/outer join results (https://issues.apache.org/jira/browse/KAFKA-10847)
         public static final String EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX = "__emit.interval.ms.kstreams.outer.join.spurious.results.fix__";
 
+        // Private API used to control the usage of consistency offset vectors
+        public static final String IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED = "__iq.consistency.offset"

Review comment:
       Thanks! I totally forgot about this while writing the KIP because I was so focused on the API. I think we'll need a public feature flag so that people can disable the feature if they're running on older brokers that don't support headers.
   
   Let's keep this for now to keep the review cycle efficient on your PR. I'll amend the KIP, and we can follow up with a public config.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/ConsistencyVectorIntegrationTest.java
##########
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.QueryableStoreType;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.internals.Position;
+import org.apache.kafka.streams.state.internals.RocksDBStore;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getStore;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
+import static org.apache.kafka.streams.state.QueryableStoreTypes.keyValueStore;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasEntry;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+
+@Category({IntegrationTest.class})
+public class ConsistencyVectorIntegrationTest {
+
+    private static final int NUM_BROKERS = 1;
+    private static int port = 0;
+    private static final String INPUT_TOPIC_NAME = "input-topic";
+    private static final String TABLE_NAME = "source-table";
+
+    public final EmbeddedKafkaCluster cluster = new EmbeddedKafkaCluster(NUM_BROKERS);
+
+    @Rule
+    public TestName testName = new TestName();
+
+    private final List<KafkaStreams> streamsToCleanup = new ArrayList<>();
+    private final MockTime mockTime = cluster.time;
+
+    @Before
+    public void before() throws InterruptedException, IOException {
+        cluster.start();
+        cluster.createTopic(INPUT_TOPIC_NAME, 2, 1);
+    }
+
+    @After
+    public void after() {
+        for (final KafkaStreams kafkaStreams : streamsToCleanup) {
+            kafkaStreams.close();
+        }
+        cluster.stop();
+    }
+
+    @Test
+    public void shouldHaveSamePositionBoundActiveAndStandBy() throws Exception {
+        final int batch1NumMessages = 100;

Review comment:
       If you want to write 100 messages, we could also get a valid check by giving each record a different key or value and then waiting until we see all the keys (or values) before we assert the position.
   
   Then again, once the framework PR is in place, we could instead just use IQv2 to repeatedly query both instances and verify that we eventually get the right position back for both. This is another thing we should plan to circle back and refactor once that work is merged.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
##########
@@ -384,12 +387,14 @@ public void localSessionStoreShouldNotAllowInitOrClose() {
     }
 
     @Test
-    public void shouldNotSendRecordHeadersToChangelogTopic() {
+    public void shouldSendV0RecordHeadersToChangelogTopic() {

Review comment:
       We'll want to restore this test back to exactly the old version of this diff, to be sure we don't have a regression on older brokers.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##########
@@ -110,17 +120,26 @@ public RecordCollector recordCollector() {
     public void logChange(final String storeName,
                           final Bytes key,
                           final byte[] value,
-                          final long timestamp) {
+                          final long timestamp,
+                          final Optional<Position> position) {
         throwUnsupportedOperationExceptionIfStandby("logChange");
 
         final TopicPartition changelogPartition = stateManager().registeredChangelogPartitionFor(storeName);
 
-        // Sending null headers to changelog topics (KIP-244)
+        final Headers headers = new RecordHeaders();
+        if (!consistencyEnabled) {

Review comment:
       I see the value in setting this precedent, but unfortunately, we can't do it, due to the need to continue supporting older brokers (we support older versions that don't allow record headers). Instead, if `!consistencyEnabled`, we should just not add headers at all (i.e., we should continue to pass `null` as the headers).

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -312,6 +318,11 @@ public synchronized void put(final Bytes key,
     public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
         try (final WriteBatch batch = new WriteBatch()) {
             dbAccessor.prepareBatch(entries, batch);
+            // FIXME Will the recordMetadata be the offset of the last record in the batch?

Review comment:
       I think that in reality, we only call this method during restoration, in which case `context.recordMetadata` will be absent (and the position is updated in the RecordBatching restore callback).
   
   In theory, though, it doesn't really matter what is going on when some processor calls one or more state store methods. The processor is always at a specific position in its input topic-parititon(s), and that's the position component that we should update.
   
   The only contract that the store needs to maintain is that it updates the position on every mutation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei merged pull request #11513: KAFKA-13506: Write and restore position to/from changelog

Posted by GitBox <gi...@apache.org>.
vvcephei merged pull request #11513:
URL: https://github.com/apache/kafka/pull/11513


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11513: feat: Write and restore position to/from changelog

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11513:
URL: https://github.com/apache/kafka/pull/11513#discussion_r764277280



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogRecordDeserializationHelper.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.state.internals.PositionSerde;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Changelog records without any headers are considered old format.
+ * New format changelog records will have a version in their headers.
+ * Version 0: This indicates that the changelog records have consistency information.
+ */
+public class ChangelogRecordDeserializationHelper {
+    public static final Logger log = LoggerFactory.getLogger(ChangelogRecordDeserializationHelper.class);
+    private static final byte[] V_0_CHANGELOG_VERSION_HEADER_VALUE = {(byte) 0};
+    public static final String CHANGELOG_VERSION_HEADER_KEY = "v";
+    public static final String CHANGELOG_POSITION_HEADER_KEY = "c";
+    public static final RecordHeader CHANGELOG_VERSION_HEADER_RECORD_CONSISTENCY = new RecordHeader(
+            CHANGELOG_VERSION_HEADER_KEY, V_0_CHANGELOG_VERSION_HEADER_VALUE);
+
+    public static Position applyChecksAndUpdatePosition(
+            final ConsumerRecord<byte[], byte[]> record,
+            final boolean consistencyEnabled,
+            final Position position
+    ) {
+        Position restoredPosition = Position.emptyPosition();
+        if (!consistencyEnabled) {
+            return Position.emptyPosition();

Review comment:
       Shouldn't these be returning `position`? This method's contract is to "update" the position, not "replace" it, right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org