You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ca...@apache.org on 2024/01/12 09:58:26 UTC

(kafka) branch 3.4 updated: KAFKA-16017: Checkpoint restored offsets instead of written offsets (#15177)

This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.4 by this push:
     new 028b841a4e6 KAFKA-16017: Checkpoint restored offsets instead of written offsets (#15177)
028b841a4e6 is described below

commit 028b841a4e665f00fd95f563d5fa80cf24873f5e
Author: Bruno Cadonna <ca...@apache.org>
AuthorDate: Fri Jan 12 09:52:20 2024 +0100

    KAFKA-16017: Checkpoint restored offsets instead of written offsets (#15177)
    
    Kafka Streams checkpoints the wrong offset when a task is closed during
    restoration. If under exactly-once processing guarantees a
    TaskCorruptedException happens, the affected task is closed dirty, its
    state content is wiped out and the task is re-initialized. If during
    the following restoration the task is closed cleanly, the task writes
    the offsets that it stores in its record collector to the checkpoint
    file. Those offsets are the offsets that the task wrote to the changelog
    topics. In other words, the task writes the end offsets of its changelog
    topics to the checkpoint file. Consequently, when the task is
    initialized again on the same Streams client, the checkpoint file is
    read and the task assumes it is fully restored although the records
    between the last offsets the task restored before closing clean and
    the end offset of the changelog topics are missing locally.
    
    The fix is to clear the offsets in the record collector on close.
    
    Cherry-pick of 19727f8
    
    Reviewer: Lucas Brutschy <lb...@confluent.io>
---
 checkstyle/import-control.xml                      |   1 +
 checkstyle/suppressions.xml                        |   2 +-
 .../processor/internals/RecordCollectorImpl.java   |   7 +-
 .../streams/integration/EosIntegrationTest.java    | 206 ++++++++++++++++++++-
 .../integration/utils/IntegrationTestUtils.java    |   1 -
 .../processor/internals/RecordCollectorTest.java   |  66 ++++++-
 6 files changed, 276 insertions(+), 7 deletions(-)

diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index e6284aaf070..e33c1a79b91 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -481,6 +481,7 @@
         <allow pkg="com.fasterxml.jackson" />
         <allow pkg="kafka.utils" />
         <allow pkg="org.apache.zookeeper" />
+        <allow pkg="org.apache.log4j" />
       </subpackage>
     </subpackage>
   </subpackage>
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 2be076e435e..7d864628679 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -221,7 +221,7 @@
 
     <!-- Streams tests -->
     <suppress checks="ClassFanOutComplexity"
-              files="(RecordCollectorTest|StreamsPartitionAssignorTest|StreamThreadTest|StreamTaskTest|TaskManagerTest|TopologyTestDriverTest).java"/>
+              files="(RecordCollectorTest|StreamsPartitionAssignorTest|StreamThreadTest|StreamTaskTest|TaskManagerTest|TopologyTestDriverTest|KafkaStreamsTest|EosIntegrationTest).java"/>
 
     <suppress checks="MethodLength"
               files="(EosIntegrationTest|EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|RocksDBWindowStoreTest).java"/>
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index 51eec220833..435504dc1c8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -331,7 +331,7 @@ public class RecordCollectorImpl implements RecordCollector {
         // transaction during handleRevocation and thus there is no transaction in flight, or else none of the revoked
         // tasks had any data in the current transaction and therefore there is no need to commit or abort it.
 
-        checkForException();
+        close();
     }
 
     /**
@@ -347,6 +347,11 @@ public class RecordCollectorImpl implements RecordCollector {
             streamsProducer.abortTransaction();
         }
 
+        close();
+    }
+
+    private void close() {
+        offsets.clear();
         checkForException();
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index 3ebcbdebef5..c47d5c45a16 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -26,6 +26,9 @@ import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serdes;
@@ -34,12 +37,18 @@ import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.errors.TaskCorruptedException;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Transformer;
 import org.apache.kafka.streams.kstream.TransformerSupplier;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateRestoreListener;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.StreamThread;
 import org.apache.kafka.streams.query.QueryResult;
 import org.apache.kafka.streams.query.RangeQuery;
@@ -69,6 +78,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Paths;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -81,14 +91,22 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.DEFAULT_TIMEOUT;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState;
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState;
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinRecordsReceived;
 import static org.apache.kafka.streams.query.StateQueryRequest.inStore;
 import static org.apache.kafka.test.TestUtils.consumerConfig;
 import static org.apache.kafka.test.TestUtils.waitForCondition;
@@ -782,6 +800,139 @@ public class EosIntegrationTest {
         verifyOffsetsAreInCheckpoint(1);
     }
 
+    @Test
+    @SuppressWarnings("deprecation")
+    public void shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoring() throws Exception {
+        if (!eosConfig.equals(StreamsConfig.EXACTLY_ONCE) && !eosConfig.equals(StreamsConfig.EXACTLY_ONCE_V2)) {
+            return;
+        }
+        final Properties streamsConfiguration = new Properties();
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig);
+        streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(applicationId).getPath());
+        streamsConfiguration.put(StreamsConfig.restoreConsumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 100);
+        final String stateStoreName = "stateStore";
+
+        purgeLocalStreamsState(streamsConfiguration);
+
+        final int startKey = 1;
+        final int endKey = 30001;
+        final List<KeyValue<Integer, Integer>> recordBatch1 = IntStream.range(startKey, endKey - 1000).mapToObj(i -> KeyValue.pair(i, 0)).collect(Collectors.toList());
+        IntegrationTestUtils.produceKeyValuesSynchronously(MULTI_PARTITION_INPUT_TOPIC,
+            recordBatch1,
+            TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+                IntegerSerializer.class,
+                IntegerSerializer.class),
+            CLUSTER.time);
+
+        final StoreBuilder<KeyValueStore<Integer, String>> stateStore = Stores.keyValueStoreBuilder(
+            Stores.persistentKeyValueStore(stateStoreName),
+            Serdes.Integer(),
+            Serdes.String()).withCachingEnabled();
+
+        final int partitionToVerify = 0;
+        final CountDownLatch latch = new CountDownLatch(1);
+        final AtomicBoolean throwException = new AtomicBoolean(false);
+        final TaskId task00 = new TaskId(0, partitionToVerify);
+        final AtomicLong restoredOffsetsForPartition0 = new AtomicLong(0);
+        final Topology topology = new Topology();
+        topology
+            .addSource("source", MULTI_PARTITION_INPUT_TOPIC)
+            .addProcessor("processor", () -> new Processor<Integer, String, Integer, String>() {
+                KeyValueStore<Integer, String> stateStore;
+                org.apache.kafka.streams.processor.api.ProcessorContext<Integer, String> context;
+
+                @Override
+                public void init(final org.apache.kafka.streams.processor.api.ProcessorContext<Integer, String> context) {
+                    Processor.super.init(context);
+                    this.context = context;
+                    stateStore = context.getStateStore(stateStoreName);
+                }
+
+                @Override
+                public void process(final Record<Integer, String> record) {
+                    context.recordMetadata().ifPresent(recordMetadata -> {
+                        if (recordMetadata.partition() == partitionToVerify) {
+                            if (throwException.compareAndSet(true, false)) {
+                                throw new TaskCorruptedException(Collections.singleton(task00));
+                            }
+                            stateStore.put(record.key(), record.value());
+                        } else {
+                            stateStore.put(record.key(), record.value());
+                        }
+                    });
+                }
+
+                @Override
+                public void close() {
+                    Processor.super.close();
+                }
+            }, "source")
+            .addStateStore(stateStore, "processor");
+
+        final KafkaStreams kafkaStreams = new KafkaStreams(topology, streamsConfiguration);
+        kafkaStreams.setGlobalStateRestoreListener(new StateRestoreListener() {
+            @Override
+            public void onRestoreStart(final TopicPartition topicPartition,
+                                       final String storeName,
+                                       final long startingOffset,
+                                       final long endingOffset) {}
+            @Override
+            public void onBatchRestored(final TopicPartition topicPartition,
+                                        final String storeName,
+                                        final long batchEndOffset,
+                                        final long numRestored) {
+                if (topicPartition.partition() == 0) {
+                    restoredOffsetsForPartition0.set(batchEndOffset);
+                    if (batchEndOffset > 100) {
+                        latch.countDown();
+                    }
+                }
+            }
+            @Override
+            public void onRestoreEnd(final TopicPartition topicPartition,
+                                     final String storeName,
+                                     final long totalRestored) {}
+        });
+        startApplicationAndWaitUntilRunning(Collections.singletonList(kafkaStreams), Duration.ofSeconds(60));
+        ensureCommittedRecordsInTopicPartition(
+            applicationId + "-" + stateStoreName + "-changelog",
+            partitionToVerify,
+            2000,
+            IntegerDeserializer.class,
+            IntegerDeserializer.class
+        );
+        throwException.set(true);
+        final List<KeyValue<Integer, Integer>> recordBatch2 = IntStream.range(endKey - 1000, endKey).mapToObj(i -> KeyValue.pair(i, 0)).collect(Collectors.toList());
+        IntegrationTestUtils.produceKeyValuesSynchronously(MULTI_PARTITION_INPUT_TOPIC,
+            recordBatch2,
+            TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+                IntegerSerializer.class,
+                IntegerSerializer.class),
+            CLUSTER.time);
+        latch.await();
+        kafkaStreams.close();
+        waitForApplicationState(Collections.singletonList(kafkaStreams), KafkaStreams.State.NOT_RUNNING, Duration.ofSeconds(60));
+
+        final File checkpointFile = Paths.get(
+            streamsConfiguration.getProperty(StreamsConfig.STATE_DIR_CONFIG),
+            streamsConfiguration.getProperty(StreamsConfig.APPLICATION_ID_CONFIG),
+            task00.toString(),
+            ".checkpoint"
+        ).toFile();
+        assertTrue(checkpointFile.exists());
+        final Map<TopicPartition, Long> checkpoints = new OffsetCheckpoint(checkpointFile).read();
+        assertEquals(
+            Long.valueOf(restoredOffsetsForPartition0.get()),
+            new ArrayList<>(checkpoints.values()).get(0)
+        );
+    }
+
     private void verifyOffsetsAreInCheckpoint(final int partition) throws IOException {
         final String stateStoreDir = stateTmpDir + File.separator + "appDir" + File.separator + applicationId + File.separator + "0_" + partition + File.separator;
 
@@ -976,13 +1127,21 @@ public class EosIntegrationTest {
     private List<KeyValue<Long, Long>> readResult(final String topic,
                                                   final int numberOfRecords,
                                                   final String groupId) throws Exception {
+        return readResult(topic, numberOfRecords, LongDeserializer.class, LongDeserializer.class, groupId);
+    }
+
+    private <K, V> List<KeyValue<K, V>> readResult(final String topic,
+                                                   final int numberOfRecords,
+                                                   final Class<? extends Deserializer<K>> keyDeserializer,
+                                                   final Class<? extends Deserializer<V>> valueDeserializer,
+                                                   final String groupId) throws Exception {
         if (groupId != null) {
             return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
                 TestUtils.consumerConfig(
                     CLUSTER.bootstrapServers(),
                     groupId,
-                    LongDeserializer.class,
-                    LongDeserializer.class,
+                    keyDeserializer,
+                    valueDeserializer,
                     Utils.mkProperties(Collections.singletonMap(
                         ConsumerConfig.ISOLATION_LEVEL_CONFIG,
                         IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT)))),
@@ -993,12 +1152,53 @@ public class EosIntegrationTest {
 
         // read uncommitted
         return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
-            TestUtils.consumerConfig(CLUSTER.bootstrapServers(), LongDeserializer.class, LongDeserializer.class),
+            TestUtils.consumerConfig(CLUSTER.bootstrapServers(), keyDeserializer, valueDeserializer),
             topic,
             numberOfRecords
         );
     }
 
+    private <K, V> void ensureCommittedRecordsInTopicPartition(final String topic,
+                                                               final int partition,
+                                                               final int numberOfRecords,
+                                                               final Class<? extends Deserializer<K>> keyDeserializer,
+                                                               final Class<? extends Deserializer<V>> valueDeserializer) throws Exception {
+        final long timeoutMs = 2 * DEFAULT_TIMEOUT;
+        final int maxTries = 10;
+        final long deadline = System.currentTimeMillis() + timeoutMs;
+        int tries = 0;
+        while (true) {
+            final List<ConsumerRecord<K, V>> consumerRecords = waitUntilMinRecordsReceived(
+                TestUtils.consumerConfig(
+                    CLUSTER.bootstrapServers(),
+                    CONSUMER_GROUP_ID,
+                    keyDeserializer,
+                    valueDeserializer,
+                    Utils.mkProperties(Collections.singletonMap(
+                        ConsumerConfig.ISOLATION_LEVEL_CONFIG,
+                        IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT))
+                    )
+                ),
+                topic,
+                numberOfRecords,
+                timeoutMs
+            );
+            ++tries;
+            if (consumerRecords.stream().anyMatch(record -> record.partition() == partition)) {
+                return;
+            }
+            if (tries >= maxTries) {
+                throw new AssertionError("No committed records in topic " + topic
+                    + ", partition " + partition + " after " + maxTries + " retries.");
+            }
+            final long now = System.currentTimeMillis();
+            if (now > deadline) {
+                throw new AssertionError("No committed records in topic " + topic
+                    + ", partition " + partition + " after " + timeoutMs + " ms.");
+            }
+        }
+    }
+
     private List<KeyValue<Long, Long>> computeExpectedResult(final List<KeyValue<Long, Long>> input) {
         final List<KeyValue<Long, Long>> expectedResult = new ArrayList<>(input.size());
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 65bfaaabde8..908f337ab99 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -1291,7 +1291,6 @@ public class IntegrationTestUtils {
                                                                  final int maxMessages) {
         final List<ConsumerRecord<K, V>> consumerRecords;
         consumer.subscribe(singletonList(topic));
-        System.out.println("Got assignment:" + consumer.assignment());
         final int pollIntervalMs = 100;
         consumerRecords = new ArrayList<>();
         int totalPollTimeMs = 0;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index 9f7d39d25cf..e99d51bce54 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -55,11 +55,12 @@ import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.MockClientSupplier;
-
+import org.apache.log4j.Level;
 import java.util.UUID;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -90,6 +91,8 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
 import static java.util.Collections.emptyList;
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.emptySet;
@@ -788,6 +791,66 @@ public class RecordCollectorTest {
         verify(streamsProducer);
     }
 
+    @Test
+    public void shouldClearOffsetsOnCloseClean() {
+        shouldClearOffsetsOnClose(true);
+    }
+
+    @Test
+    public void shouldClearOffsetsOnCloseDirty() {
+        shouldClearOffsetsOnClose(false);
+    }
+
+    private void shouldClearOffsetsOnClose(final boolean clean) {
+        final StreamsProducer streamsProducer = Mockito.mock(StreamsProducer.class);
+        when(streamsProducer.eosEnabled()).thenReturn(true);
+        final long offset = 1234L;
+        final RecordMetadata metadata = new RecordMetadata(
+            new TopicPartition(topic, 0),
+            offset,
+            0,
+            0,
+            1,
+            1
+        );
+        when(streamsProducer.send(any(), any())).thenAnswer(invocation -> {
+            ((Callback) invocation.getArgument(1)).onCompletion(metadata, null);
+            return null;
+        });
+        final ProcessorTopology topology = Mockito.mock(ProcessorTopology.class);
+
+        final RecordCollector collector = new RecordCollectorImpl(
+            logContext,
+            taskId,
+            streamsProducer,
+            productionExceptionHandler,
+            streamsMetrics,
+            topology
+        );
+        collector.send(
+            topic + "-changelog",
+            "key",
+            "value",
+            new RecordHeaders(),
+            0,
+            0L,
+            new StringSerializer(),
+            new StringSerializer(),
+            null,
+            null
+        );
+
+        assertFalse(collector.offsets().isEmpty());
+
+        if (clean) {
+            collector.closeClean();
+        } else {
+            collector.closeDirty();
+        }
+
+        assertTrue(collector.offsets().isEmpty());
+    }
+
     @Test
     public void shouldNotAbortTxOnCloseCleanIfEosEnabled() {
         final StreamsProducer streamsProducer = mock(StreamsProducer.class);
@@ -1245,6 +1308,7 @@ public class RecordCollectorTest {
 
         try (final LogCaptureAppender logCaptureAppender =
                  LogCaptureAppender.createAndRegister(RecordCollectorImpl.class)) {
+            logCaptureAppender.setThreshold(Level.INFO);
 
             collector.send(topic, "3", "0", null, null, stringSerializer, stringSerializer, null, null, streamPartitioner);
             collector.flush();