You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2021/09/13 21:17:36 UTC

[kafka] branch trunk updated: KAFKA-13249: Always update changelog offsets before writing the checkpoint file (#11283)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a03bda6  KAFKA-13249: Always update changelog offsets before writing the checkpoint file (#11283)
a03bda6 is described below

commit a03bda61e068d72823af47e5f25ffd12c3319541
Author: Oliver Hutchison <hu...@users.noreply.github.com>
AuthorDate: Tue Sep 14 07:15:22 2021 +1000

    KAFKA-13249: Always update changelog offsets before writing the checkpoint file (#11283)
    
    When using EOS checkpointed offsets are not updated to the latest offsets from the changelog because the maybeWriteCheckpoint method is only ever called when commitNeeded=false. This change will force the update if enforceCheckpoint=true .
    
    I have also added a test which verifies that both the state store and the checkpoint file are completely up to date with the changelog after the app has shutdown.
    
    Reviewers: Anna Sophie Blee-Goldman <ab...@apache.org>, Guozhang Wang <wa...@gmail.com>
---
 .../streams/processor/internals/StreamTask.java    |   2 +-
 .../streams/integration/EosIntegrationTest.java    | 109 ++++++++++++++++++++-
 .../processor/internals/StreamTaskTest.java        |   6 +-
 3 files changed, 110 insertions(+), 7 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index d59ec66..7c85869 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -577,7 +577,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
     protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) {
         // commitNeeded indicates we may have processed some records since last commit
         // and hence we need to refresh checkpointable offsets regardless whether we should checkpoint or not
-        if (commitNeeded) {
+        if (commitNeeded || enforceCheckpoint) {
             stateMgr.updateChangelogOffsets(checkpointableOffsets());
         }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index 0d5c187..08cfc4b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.IsolationLevel;
@@ -39,6 +40,8 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Transformer;
 import org.apache.kafka.streams.kstream.TransformerSupplier;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.StreamThread;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -46,7 +49,12 @@ import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
+import org.apache.kafka.streams.state.internals.RocksDBStore;
+import org.apache.kafka.streams.state.internals.RocksDbKeyValueBytesStoreSupplier;
 import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.MockInternalProcessorContext;
+import org.apache.kafka.test.MockKeyValueStore;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestUtils;
 import org.junit.AfterClass;
@@ -63,6 +71,8 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.math.BigInteger;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -82,10 +92,12 @@ import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
 import static org.apache.kafka.test.StreamsTestUtils.startKafkaStreamsAndWaitForRunningState;
+import static org.apache.kafka.test.TestUtils.consumerConfig;
 import static org.apache.kafka.test.TestUtils.waitForCondition;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
@@ -135,12 +147,15 @@ public class EosIntegrationTest {
 
     private volatile boolean hasUnexpectedError = false;
 
+    private String stateTmpDir;
+
     @SuppressWarnings("deprecation")
     @Parameters(name = "{0}")
     public static Collection<String[]> data() {
-        return Arrays.asList(new String[][] {
-            {StreamsConfig.EXACTLY_ONCE},
-            {StreamsConfig.EXACTLY_ONCE_V2}
+        return Arrays.asList(new String[][]{
+                {StreamsConfig.AT_LEAST_ONCE},
+                {StreamsConfig.EXACTLY_ONCE},
+                {StreamsConfig.EXACTLY_ONCE_V2}
         });
     }
 
@@ -396,6 +411,8 @@ public class EosIntegrationTest {
 
     @Test
     public void shouldNotViolateEosIfOneTaskFails() throws Exception {
+        if (eosConfig.equals(StreamsConfig.AT_LEAST_ONCE)) return;
+
         // this test writes 10 + 5 + 5 records per partition (running with 2 partitions)
         // the app is supposed to copy all 40 records into the output topic
         //
@@ -495,6 +512,8 @@ public class EosIntegrationTest {
 
     @Test
     public void shouldNotViolateEosIfOneTaskFailsWithState() throws Exception {
+        if (eosConfig.equals(StreamsConfig.AT_LEAST_ONCE)) return;
+
         // this test updates a store with 10 + 5 + 5 records per partition (running with 2 partitions)
         // the app is supposed to emit all 40 update records into the output topic
         //
@@ -609,6 +628,8 @@ public class EosIntegrationTest {
 
     @Test
     public void shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances() throws Exception {
+        if (eosConfig.equals(StreamsConfig.AT_LEAST_ONCE)) return;
+
         // this test writes 10 + 5 + 5 + 10 records per partition (running with 2 partitions)
         // the app is supposed to copy all 60 records into the output topic
         //
@@ -765,6 +786,84 @@ public class EosIntegrationTest {
         }
     }
 
+    @Test
+    public void shouldWriteLatestOffsetsToCheckpointOnShutdown() throws Exception {
+        final List<KeyValue<Long, Long>> writtenData = prepareData(0L, 10, 0L, 1L);
+        final List<KeyValue<Long, Long>> expectedResult = computeExpectedResult(writtenData);
+
+        try (final KafkaStreams streams = getKafkaStreams("streams", true, "appDir", 1, eosConfig, MAX_POLL_INTERVAL_MS)) {
+
+            startKafkaStreamsAndWaitForRunningState(streams, MAX_WAIT_TIME_MS);
+
+            writeInputData(writtenData);
+
+            waitForCondition(
+                    () -> commitRequested.get() == 2, MAX_WAIT_TIME_MS,
+                    "SteamsTasks did not request commit.");
+
+            final List<KeyValue<Long, Long>> committedRecords = readResult(writtenData.size(), CONSUMER_GROUP_ID);
+
+            checkResultPerKey(
+                    committedRecords,
+                    expectedResult,
+                    "The committed records do not match what expected");
+
+            verifyStateStore(
+                    streams,
+                    getMaxPerKey(expectedResult),
+                    "The state store content do not match what expected");
+        }
+
+        final Set<KeyValue<Long, Long>> expectedState = getMaxPerKey(expectedResult);
+        verifyStateIsInStoreAndOffsetsAreInCheckpoint(0, expectedState);
+        verifyStateIsInStoreAndOffsetsAreInCheckpoint(1, expectedState);
+
+        assertThat("Not all expected state values were found in the state stores", expectedState.isEmpty());
+    }
+
+    private void verifyStateIsInStoreAndOffsetsAreInCheckpoint(final int partition, final Set<KeyValue<Long, Long>> expectedState) throws IOException {
+        final String stateStoreDir = stateTmpDir + File.separator + "appDir" + File.separator + applicationId + File.separator + "0_" + partition + File.separator;
+
+        // Verify that the data in the state store on disk is fully up-to-date
+        final StateStoreContext context = new MockInternalProcessorContext(new Properties(), new TaskId(0, 0), new File(stateStoreDir));
+        final MockKeyValueStore stateStore = new MockKeyValueStore("store", false);
+        final RocksDBStore store = (RocksDBStore) new RocksDbKeyValueBytesStoreSupplier(storeName, false).get();
+        store.init(context, stateStore);
+
+        store.all().forEachRemaining(kv -> {
+            final KeyValue<Long, Long> kv2 = new KeyValue<>(new BigInteger(kv.key.get()).longValue(), new BigInteger(kv.value).longValue());
+            expectedState.remove(kv2);
+        });
+
+        // Verify that the checkpointed offsets match exactly with max offset of the records in the changelog
+        final OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(stateStoreDir + ".checkpoint"));
+        final Map<TopicPartition, Long> checkpointedOffsets = checkpoint.read();
+        checkpointedOffsets.forEach(this::verifyChangelogMaxRecordOffsetMatchesCheckpointedOffset);
+    }
+
+    private void verifyChangelogMaxRecordOffsetMatchesCheckpointedOffset(final TopicPartition tp, final long checkpointedOffset) {
+        final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfig(CLUSTER.bootstrapServers(), Serdes.ByteArray().deserializer().getClass(), Serdes.ByteArray().deserializer().getClass()));
+        final List<TopicPartition> partitions = Collections.singletonList(tp);
+        consumer.assign(partitions);
+        consumer.seekToEnd(partitions);
+        final long topicEndOffset = consumer.position(tp);
+
+        assertTrue("changelog topic end " + topicEndOffset + " is less than checkpointed offset " + checkpointedOffset,
+                topicEndOffset >= checkpointedOffset);
+
+        consumer.seekToBeginning(partitions);
+
+        Long maxRecordOffset = null;
+        while (consumer.position(tp) != topicEndOffset) {
+            final List<ConsumerRecord<String, String>> records = consumer.poll(Duration.ofMillis(0)).records(tp);
+            if (!records.isEmpty()) {
+                maxRecordOffset = records.get(records.size() - 1).offset();
+            }
+        }
+
+        assertEquals("Checkpointed offset does not match end of changelog", maxRecordOffset, (Long) checkpointedOffset);
+    }
+
     private List<KeyValue<Long, Long>> prepareData(final long fromInclusive,
                                                    final long toExclusive,
                                                    final Long... keys) {
@@ -875,6 +974,8 @@ public class EosIntegrationTest {
             } }, storeNames)
             .to(SINGLE_PARTITION_OUTPUT_TOPIC);
 
+        stateTmpDir = TestUtils.tempDirectory().getPath() + File.separator;
+
         final Properties properties = new Properties();
         // Set commit interval to a larger value to avoid affection of controlled stream commit,
         // but not too large as we need to have a relatively low transaction timeout such
@@ -890,7 +991,7 @@ public class EosIntegrationTest {
         properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), maxPollIntervalMs - 1);
         properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), maxPollIntervalMs);
         properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
-        properties.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath() + File.separator + appDir);
+        properties.put(StreamsConfig.STATE_DIR_CONFIG, stateTmpDir + appDir);
         properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, dummyHostName + ":2142");
 
         final Properties config = StreamsTestUtils.getStreamsConfig(
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 2c4df65..077d921 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -1659,7 +1659,8 @@ public class StreamTaskTest {
     public void shouldSkipCheckpointingSuspendedCreatedTask() {
         stateManager.checkpoint();
         EasyMock.expectLastCall().andThrow(new AssertionError("Should not have tried to checkpoint"));
-        EasyMock.replay(stateManager);
+        EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes();
+        EasyMock.replay(stateManager, recordCollector);
 
         task = createStatefulTask(createConfig("100"), true);
         task.suspend();
@@ -1672,7 +1673,8 @@ public class StreamTaskTest {
         EasyMock.expectLastCall().once();
         EasyMock.expect(stateManager.changelogOffsets())
                 .andReturn(singletonMap(partition1, 1L));
-        EasyMock.replay(stateManager);
+        EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes();
+        EasyMock.replay(stateManager, recordCollector);
 
         task = createStatefulTask(createConfig("100"), true);
         task.initializeIfNeeded();