You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2021/09/14 23:25:23 UTC

[kafka] branch 2.8 updated: KAFKA-13249: Always update changelog offsets before writing the checkpoint file (#11283)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new afaa12a  KAFKA-13249: Always update changelog offsets before writing the checkpoint file (#11283)
afaa12a is described below

commit afaa12a0968a952dfc601a7485a76399edd8c78f
Author: Oliver Hutchison <hu...@users.noreply.github.com>
AuthorDate: Tue Sep 14 07:15:22 2021 +1000

    KAFKA-13249: Always update changelog offsets before writing the checkpoint file (#11283)
    
    When using EOS checkpointed offsets are not updated to the latest offsets from the changelog because the maybeWriteCheckpoint method is only ever called when commitNeeded=false. This change will force the update if enforceCheckpoint=true .
    
    I have also added a test which verifies that both the state store and the checkpoint file are completely up to date with the changelog after the app has shutdown.
    
    Reviewers: Anna Sophie Blee-Goldman <ab...@apache.org>, Guozhang Wang <wa...@gmail.com>
---
 .../streams/processor/internals/StreamTask.java    |   2 +-
 .../streams/integration/EosIntegrationTest.java    | 103 ++++++++++++++++++++-
 .../processor/internals/StreamTaskTest.java        |   6 +-
 3 files changed, 107 insertions(+), 4 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 99d6ac6..144a41c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -565,7 +565,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
     protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) {
         // commitNeeded indicates we may have processed some records since last commit
         // and hence we need to refresh checkpointable offsets regardless whether we should checkpoint or not
-        if (commitNeeded) {
+        if (commitNeeded || enforceCheckpoint) {
             stateMgr.updateChangelogOffsets(checkpointableOffsets());
         }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index c1bac52..eb530b0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.IsolationLevel;
@@ -39,6 +40,8 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Transformer;
 import org.apache.kafka.streams.kstream.TransformerSupplier;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.StreamThread;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -46,7 +49,12 @@ import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
+import org.apache.kafka.streams.state.internals.RocksDBStore;
+import org.apache.kafka.streams.state.internals.RocksDbKeyValueBytesStoreSupplier;
 import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.MockInternalProcessorContext;
+import org.apache.kafka.test.MockKeyValueStore;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
@@ -61,6 +69,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -80,10 +91,12 @@ import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
 import static org.apache.kafka.test.StreamsTestUtils.startKafkaStreamsAndWaitForRunningState;
+import static org.apache.kafka.test.TestUtils.consumerConfig;
 import static org.apache.kafka.test.TestUtils.waitForCondition;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
@@ -131,6 +144,8 @@ public class EosIntegrationTest {
         });
     }
 
+    private String stateTmpDir;
+
     @Parameter
     public String eosConfig;
 
@@ -381,6 +396,8 @@ public class EosIntegrationTest {
 
     @Test
     public void shouldNotViolateEosIfOneTaskFails() throws Exception {
+        if (eosConfig.equals(StreamsConfig.AT_LEAST_ONCE)) return;
+
         // this test writes 10 + 5 + 5 records per partition (running with 2 partitions)
         // the app is supposed to copy all 40 records into the output topic
         //
@@ -480,6 +497,8 @@ public class EosIntegrationTest {
 
     @Test
     public void shouldNotViolateEosIfOneTaskFailsWithState() throws Exception {
+        if (eosConfig.equals(StreamsConfig.AT_LEAST_ONCE)) return;
+
         // this test updates a store with 10 + 5 + 5 records per partition (running with 2 partitions)
         // the app is supposed to emit all 40 update records into the output topic
         //
@@ -594,6 +613,8 @@ public class EosIntegrationTest {
 
     @Test
     public void shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances() throws Exception {
+        if (eosConfig.equals(StreamsConfig.AT_LEAST_ONCE)) return;
+
         // this test writes 10 + 5 + 5 + 10 records per partition (running with 2 partitions)
         // the app is supposed to copy all 60 records into the output topic
         //
@@ -750,6 +771,84 @@ public class EosIntegrationTest {
         }
     }
 
+    @Test
+    public void shouldWriteLatestOffsetsToCheckpointOnShutdown() throws Exception {
+        final List<KeyValue<Long, Long>> writtenData = prepareData(0L, 10, 0L, 1L);
+        final List<KeyValue<Long, Long>> expectedResult = computeExpectedResult(writtenData);
+
+        try (final KafkaStreams streams = getKafkaStreams("streams", true, "appDir", 1, eosConfig, MAX_POLL_INTERVAL_MS)) {
+
+            startKafkaStreamsAndWaitForRunningState(streams, MAX_WAIT_TIME_MS);
+
+            writeInputData(writtenData);
+
+            waitForCondition(
+                    () -> commitRequested.get() == 2, MAX_WAIT_TIME_MS,
+                    "SteamsTasks did not request commit.");
+
+            final List<KeyValue<Long, Long>> committedRecords = readResult(writtenData.size(), CONSUMER_GROUP_ID);
+
+            checkResultPerKey(
+                    committedRecords,
+                    expectedResult,
+                    "The committed records do not match what expected");
+
+            verifyStateStore(
+                    streams,
+                    getMaxPerKey(expectedResult),
+                    "The state store content do not match what expected");
+        }
+
+        final Set<KeyValue<Long, Long>> expectedState = getMaxPerKey(expectedResult);
+        verifyStateIsInStoreAndOffsetsAreInCheckpoint(0, expectedState);
+        verifyStateIsInStoreAndOffsetsAreInCheckpoint(1, expectedState);
+
+        assertThat("Not all expected state values were found in the state stores", expectedState.isEmpty());
+    }
+
+    private void verifyStateIsInStoreAndOffsetsAreInCheckpoint(final int partition, final Set<KeyValue<Long, Long>> expectedState) throws IOException {
+        final String stateStoreDir = stateTmpDir + File.separator + "appDir" + File.separator + applicationId + File.separator + "0_" + partition + File.separator;
+
+        // Verify that the data in the state store on disk is fully up-to-date
+        final StateStoreContext context = new MockInternalProcessorContext(new Properties(), new TaskId(0, 0), new File(stateStoreDir));
+        final MockKeyValueStore stateStore = new MockKeyValueStore("store", false);
+        final RocksDBStore store = (RocksDBStore) new RocksDbKeyValueBytesStoreSupplier(storeName, false).get();
+        store.init(context, stateStore);
+
+        store.all().forEachRemaining(kv -> {
+            final KeyValue<Long, Long> kv2 = new KeyValue<>(new BigInteger(kv.key.get()).longValue(), new BigInteger(kv.value).longValue());
+            expectedState.remove(kv2);
+        });
+
+        // Verify that the checkpointed offsets match exactly with max offset of the records in the changelog
+        final OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(stateStoreDir + ".checkpoint"));
+        final Map<TopicPartition, Long> checkpointedOffsets = checkpoint.read();
+        checkpointedOffsets.forEach(this::verifyChangelogMaxRecordOffsetMatchesCheckpointedOffset);
+    }
+
+    private void verifyChangelogMaxRecordOffsetMatchesCheckpointedOffset(final TopicPartition tp, final long checkpointedOffset) {
+        final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfig(CLUSTER.bootstrapServers(), Serdes.ByteArray().deserializer().getClass(), Serdes.ByteArray().deserializer().getClass()));
+        final List<TopicPartition> partitions = Collections.singletonList(tp);
+        consumer.assign(partitions);
+        consumer.seekToEnd(partitions);
+        final long topicEndOffset = consumer.position(tp);
+
+        assertTrue("changelog topic end " + topicEndOffset + " is less than checkpointed offset " + checkpointedOffset,
+                topicEndOffset >= checkpointedOffset);
+
+        consumer.seekToBeginning(partitions);
+
+        Long maxRecordOffset = null;
+        while (consumer.position(tp) != topicEndOffset) {
+            final List<ConsumerRecord<String, String>> records = consumer.poll(Duration.ofMillis(0)).records(tp);
+            if (!records.isEmpty()) {
+                maxRecordOffset = records.get(records.size() - 1).offset();
+            }
+        }
+
+        assertEquals("Checkpointed offset does not match end of changelog", maxRecordOffset, (Long) checkpointedOffset);
+    }
+
     private List<KeyValue<Long, Long>> prepareData(final long fromInclusive,
                                                    final long toExclusive,
                                                    final Long... keys) {
@@ -860,6 +959,8 @@ public class EosIntegrationTest {
             } }, storeNames)
             .to(SINGLE_PARTITION_OUTPUT_TOPIC);
 
+        stateTmpDir = TestUtils.tempDirectory().getPath() + File.separator;
+
         final Properties properties = new Properties();
         properties.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig);
         properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numberOfStreamsThreads);
@@ -870,7 +971,7 @@ public class EosIntegrationTest {
         properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), 5 * 1000 - 1);
         properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), maxPollIntervalMs);
         properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
-        properties.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath() + File.separator + appDir);
+        properties.put(StreamsConfig.STATE_DIR_CONFIG, stateTmpDir + appDir);
         properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, dummyHostName + ":2142");
 
         final Properties config = StreamsTestUtils.getStreamsConfig(
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index ec5d295..99dd334 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -1761,7 +1761,8 @@ public class StreamTaskTest {
     public void shouldSkipCheckpointingSuspendedCreatedTask() {
         stateManager.checkpoint();
         EasyMock.expectLastCall().andThrow(new AssertionError("Should not have tried to checkpoint"));
-        EasyMock.replay(stateManager);
+        EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes();
+        EasyMock.replay(stateManager, recordCollector);
 
         task = createStatefulTask(createConfig("100"), true);
         task.suspend();
@@ -1774,7 +1775,8 @@ public class StreamTaskTest {
         EasyMock.expectLastCall().once();
         EasyMock.expect(stateManager.changelogOffsets())
                 .andReturn(singletonMap(partition1, 1L));
-        EasyMock.replay(stateManager);
+        EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes();
+        EasyMock.replay(stateManager, recordCollector);
 
         task = createStatefulTask(createConfig("100"), true);
         task.initializeIfNeeded();