You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2021/09/13 21:17:36 UTC
[kafka] branch trunk updated: KAFKA-13249: Always update changelog
offsets before writing the checkpoint file (#11283)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a03bda6 KAFKA-13249: Always update changelog offsets before writing the checkpoint file (#11283)
a03bda6 is described below
commit a03bda61e068d72823af47e5f25ffd12c3319541
Author: Oliver Hutchison <hu...@users.noreply.github.com>
AuthorDate: Tue Sep 14 07:15:22 2021 +1000
KAFKA-13249: Always update changelog offsets before writing the checkpoint file (#11283)
When using EOS checkpointed offsets are not updated to the latest offsets from the changelog because the maybeWriteCheckpoint method is only ever called when commitNeeded=false. This change will force the update if enforceCheckpoint=true .
I have also added a test which verifies that both the state store and the checkpoint file are completely up to date with the changelog after the app has shutdown.
Reviewers: Anna Sophie Blee-Goldman <ab...@apache.org>, Guozhang Wang <wa...@gmail.com>
---
.../streams/processor/internals/StreamTask.java | 2 +-
.../streams/integration/EosIntegrationTest.java | 109 ++++++++++++++++++++-
.../processor/internals/StreamTaskTest.java | 6 +-
3 files changed, 110 insertions(+), 7 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index d59ec66..7c85869 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -577,7 +577,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) {
// commitNeeded indicates we may have processed some records since last commit
// and hence we need to refresh checkpointable offsets regardless whether we should checkpoint or not
- if (commitNeeded) {
+ if (commitNeeded || enforceCheckpoint) {
stateMgr.updateChangelogOffsets(checkpointableOffsets());
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index 0d5c187..08cfc4b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.IsolationLevel;
@@ -39,6 +40,8 @@ import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
@@ -46,7 +49,12 @@ import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
+import org.apache.kafka.streams.state.internals.RocksDBStore;
+import org.apache.kafka.streams.state.internals.RocksDbKeyValueBytesStoreSupplier;
import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.MockInternalProcessorContext;
+import org.apache.kafka.test.MockKeyValueStore;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.AfterClass;
@@ -63,6 +71,8 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.math.BigInteger;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -82,10 +92,12 @@ import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
import static org.apache.kafka.test.StreamsTestUtils.startKafkaStreamsAndWaitForRunningState;
+import static org.apache.kafka.test.TestUtils.consumerConfig;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -135,12 +147,15 @@ public class EosIntegrationTest {
private volatile boolean hasUnexpectedError = false;
+ private String stateTmpDir;
+
@SuppressWarnings("deprecation")
@Parameters(name = "{0}")
public static Collection<String[]> data() {
- return Arrays.asList(new String[][] {
- {StreamsConfig.EXACTLY_ONCE},
- {StreamsConfig.EXACTLY_ONCE_V2}
+ return Arrays.asList(new String[][]{
+ {StreamsConfig.AT_LEAST_ONCE},
+ {StreamsConfig.EXACTLY_ONCE},
+ {StreamsConfig.EXACTLY_ONCE_V2}
});
}
@@ -396,6 +411,8 @@ public class EosIntegrationTest {
@Test
public void shouldNotViolateEosIfOneTaskFails() throws Exception {
+ if (eosConfig.equals(StreamsConfig.AT_LEAST_ONCE)) return;
+
// this test writes 10 + 5 + 5 records per partition (running with 2 partitions)
// the app is supposed to copy all 40 records into the output topic
//
@@ -495,6 +512,8 @@ public class EosIntegrationTest {
@Test
public void shouldNotViolateEosIfOneTaskFailsWithState() throws Exception {
+ if (eosConfig.equals(StreamsConfig.AT_LEAST_ONCE)) return;
+
// this test updates a store with 10 + 5 + 5 records per partition (running with 2 partitions)
// the app is supposed to emit all 40 update records into the output topic
//
@@ -609,6 +628,8 @@ public class EosIntegrationTest {
@Test
public void shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances() throws Exception {
+ if (eosConfig.equals(StreamsConfig.AT_LEAST_ONCE)) return;
+
// this test writes 10 + 5 + 5 + 10 records per partition (running with 2 partitions)
// the app is supposed to copy all 60 records into the output topic
//
@@ -765,6 +786,84 @@ public class EosIntegrationTest {
}
}
+ @Test
+ public void shouldWriteLatestOffsetsToCheckpointOnShutdown() throws Exception {
+ final List<KeyValue<Long, Long>> writtenData = prepareData(0L, 10, 0L, 1L);
+ final List<KeyValue<Long, Long>> expectedResult = computeExpectedResult(writtenData);
+
+ try (final KafkaStreams streams = getKafkaStreams("streams", true, "appDir", 1, eosConfig, MAX_POLL_INTERVAL_MS)) {
+
+ startKafkaStreamsAndWaitForRunningState(streams, MAX_WAIT_TIME_MS);
+
+ writeInputData(writtenData);
+
+ waitForCondition(
+ () -> commitRequested.get() == 2, MAX_WAIT_TIME_MS,
+ "SteamsTasks did not request commit.");
+
+ final List<KeyValue<Long, Long>> committedRecords = readResult(writtenData.size(), CONSUMER_GROUP_ID);
+
+ checkResultPerKey(
+ committedRecords,
+ expectedResult,
+ "The committed records do not match what expected");
+
+ verifyStateStore(
+ streams,
+ getMaxPerKey(expectedResult),
+ "The state store content do not match what expected");
+ }
+
+ final Set<KeyValue<Long, Long>> expectedState = getMaxPerKey(expectedResult);
+ verifyStateIsInStoreAndOffsetsAreInCheckpoint(0, expectedState);
+ verifyStateIsInStoreAndOffsetsAreInCheckpoint(1, expectedState);
+
+ assertThat("Not all expected state values were found in the state stores", expectedState.isEmpty());
+ }
+
+ private void verifyStateIsInStoreAndOffsetsAreInCheckpoint(final int partition, final Set<KeyValue<Long, Long>> expectedState) throws IOException {
+ final String stateStoreDir = stateTmpDir + File.separator + "appDir" + File.separator + applicationId + File.separator + "0_" + partition + File.separator;
+
+ // Verify that the data in the state store on disk is fully up-to-date
+ final StateStoreContext context = new MockInternalProcessorContext(new Properties(), new TaskId(0, 0), new File(stateStoreDir));
+ final MockKeyValueStore stateStore = new MockKeyValueStore("store", false);
+ final RocksDBStore store = (RocksDBStore) new RocksDbKeyValueBytesStoreSupplier(storeName, false).get();
+ store.init(context, stateStore);
+
+ store.all().forEachRemaining(kv -> {
+ final KeyValue<Long, Long> kv2 = new KeyValue<>(new BigInteger(kv.key.get()).longValue(), new BigInteger(kv.value).longValue());
+ expectedState.remove(kv2);
+ });
+
+ // Verify that the checkpointed offsets match exactly with max offset of the records in the changelog
+ final OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(stateStoreDir + ".checkpoint"));
+ final Map<TopicPartition, Long> checkpointedOffsets = checkpoint.read();
+ checkpointedOffsets.forEach(this::verifyChangelogMaxRecordOffsetMatchesCheckpointedOffset);
+ }
+
+ private void verifyChangelogMaxRecordOffsetMatchesCheckpointedOffset(final TopicPartition tp, final long checkpointedOffset) {
+ final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfig(CLUSTER.bootstrapServers(), Serdes.ByteArray().deserializer().getClass(), Serdes.ByteArray().deserializer().getClass()));
+ final List<TopicPartition> partitions = Collections.singletonList(tp);
+ consumer.assign(partitions);
+ consumer.seekToEnd(partitions);
+ final long topicEndOffset = consumer.position(tp);
+
+ assertTrue("changelog topic end " + topicEndOffset + " is less than checkpointed offset " + checkpointedOffset,
+ topicEndOffset >= checkpointedOffset);
+
+ consumer.seekToBeginning(partitions);
+
+ Long maxRecordOffset = null;
+ while (consumer.position(tp) != topicEndOffset) {
+ final List<ConsumerRecord<String, String>> records = consumer.poll(Duration.ofMillis(0)).records(tp);
+ if (!records.isEmpty()) {
+ maxRecordOffset = records.get(records.size() - 1).offset();
+ }
+ }
+
+ assertEquals("Checkpointed offset does not match end of changelog", maxRecordOffset, (Long) checkpointedOffset);
+ }
+
private List<KeyValue<Long, Long>> prepareData(final long fromInclusive,
final long toExclusive,
final Long... keys) {
@@ -875,6 +974,8 @@ public class EosIntegrationTest {
} }, storeNames)
.to(SINGLE_PARTITION_OUTPUT_TOPIC);
+ stateTmpDir = TestUtils.tempDirectory().getPath() + File.separator;
+
final Properties properties = new Properties();
// Set commit interval to a larger value to avoid affection of controlled stream commit,
// but not too large as we need to have a relatively low transaction timeout such
@@ -890,7 +991,7 @@ public class EosIntegrationTest {
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), maxPollIntervalMs - 1);
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), maxPollIntervalMs);
properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
- properties.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath() + File.separator + appDir);
+ properties.put(StreamsConfig.STATE_DIR_CONFIG, stateTmpDir + appDir);
properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, dummyHostName + ":2142");
final Properties config = StreamsTestUtils.getStreamsConfig(
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 2c4df65..077d921 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -1659,7 +1659,8 @@ public class StreamTaskTest {
public void shouldSkipCheckpointingSuspendedCreatedTask() {
stateManager.checkpoint();
EasyMock.expectLastCall().andThrow(new AssertionError("Should not have tried to checkpoint"));
- EasyMock.replay(stateManager);
+ EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes();
+ EasyMock.replay(stateManager, recordCollector);
task = createStatefulTask(createConfig("100"), true);
task.suspend();
@@ -1672,7 +1673,8 @@ public class StreamTaskTest {
EasyMock.expectLastCall().once();
EasyMock.expect(stateManager.changelogOffsets())
.andReturn(singletonMap(partition1, 1L));
- EasyMock.replay(stateManager);
+ EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes();
+ EasyMock.replay(stateManager, recordCollector);
task = createStatefulTask(createConfig("100"), true);
task.initializeIfNeeded();