You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2023/10/11 23:04:41 UTC

[kafka] branch trunk updated: KAFKA-15571: `StateRestoreListener#onRestoreSuspended` is never called because `DelegatingStateRestoreListener` doesn't implement `onRestoreSuspended` (#14519)

This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5dd155f350e KAFKA-15571: `StateRestoreListener#onRestoreSuspended` is never called because `DelegatingStateRestoreListener` doesn't implement `onRestoreSuspended` (#14519)
5dd155f350e is described below

commit 5dd155f350e6f9092bd7734dbf0600dc4af80bfe
Author: Levani Kokhreidze <le...@transferwise.com>
AuthorDate: Thu Oct 12 02:04:34 2023 +0300

    KAFKA-15571: `StateRestoreListener#onRestoreSuspended` is never called because `DelegatingStateRestoreListener` doesn't implement `onRestoreSuspended` (#14519)
    
    With https://issues.apache.org/jira/browse/KAFKA-10575 StateRestoreListener#onRestoreSuspended was added. But local tests show that it is never called because DelegatingStateRestoreListener was not updated to call a new method
    
    Reviewers: Anna Sophie Blee-Goldman <so...@responsive.dev>, Bruno Cadonna <ca...@confluent.io>
---
 .../org/apache/kafka/streams/KafkaStreams.java     |  11 ++
 .../integration/RestoreIntegrationTest.java        | 207 ++++++++++++++++++++-
 2 files changed, 214 insertions(+), 4 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index c7b6c05d840..586e0c5f30e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -730,6 +730,17 @@ public class KafkaStreams implements AutoCloseable {
                 }
             }
         }
+
+        @Override
+        public void onRestoreSuspended(final TopicPartition topicPartition, final String storeName, final long totalRestored) {
+            if (globalStateRestoreListener != null) {
+                try {
+                    globalStateRestoreListener.onRestoreSuspended(topicPartition, storeName, totalRestored);
+                } catch (final Exception fatalUserException) {
+                    throwOnFatalException(fatalUserException, topicPartition, storeName);
+                }
+            }
+        }
     }
 
     /**
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
index 9c47a330f04..ba73b08c45b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KafkaStreams.State;
 import org.apache.kafka.streams.KeyValue;
@@ -56,15 +57,15 @@ import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
 import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
-import org.apache.kafka.test.TestUtils;
 import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
 import org.hamcrest.CoreMatchers;
-import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -73,9 +74,11 @@ import org.junit.jupiter.params.provider.MethodSource;
 import java.io.File;
 import java.io.IOException;
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -83,9 +86,15 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
 import static java.util.Arrays.asList;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
+import static org.apache.kafka.streams.Topology.AutoOffsetReset.EARLIEST;
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState;
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
 import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
@@ -99,6 +108,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 @Timeout(600)
 @Tag("integration")
 public class RestoreIntegrationTest {
+    private static final Duration RESTORATION_DELAY = Duration.ofSeconds(1);
+
     private static final int NUM_BROKERS = 1;
 
     public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
@@ -119,6 +130,8 @@ public class RestoreIntegrationTest {
     private final int numberOfKeys = 10000;
     private KafkaStreams kafkaStreams;
 
+    private final List<Properties> streamsConfigurations = new ArrayList<>();
+
     @BeforeEach
     public void createTopics(final TestInfo testInfo) throws InterruptedException {
         appId = safeUniqueTestName(RestoreIntegrationTest.class, testInfo);
@@ -127,7 +140,12 @@ public class RestoreIntegrationTest {
     }
 
     private Properties props(final boolean stateUpdaterEnabled) {
+        return props(mkObjectProperties(mkMap(mkEntry(InternalConfig.STATE_UPDATER_ENABLED, stateUpdaterEnabled))));
+    }
+
+    private Properties props(final Properties extraProperties) {
         final Properties streamsConfiguration = new Properties();
+
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
@@ -136,15 +154,21 @@ public class RestoreIntegrationTest {
         streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        streamsConfiguration.put(InternalConfig.STATE_UPDATER_ENABLED, stateUpdaterEnabled);
+        streamsConfiguration.putAll(extraProperties);
+
+        streamsConfigurations.add(streamsConfiguration);
+
         return streamsConfiguration;
     }
 
     @AfterEach
-    public void shutdown() {
+    public void shutdown() throws Exception {
         if (kafkaStreams != null) {
             kafkaStreams.close(Duration.ofSeconds(30));
         }
+
+        IntegrationTestUtils.purgeLocalStreamsState(streamsConfigurations);
+        streamsConfigurations.clear();
     }
 
     private static Stream<Boolean> parameters() {
@@ -490,6 +514,181 @@ public class RestoreIntegrationTest {
         assertThat(CloseCountingInMemoryStore.numStoresClosed(), CoreMatchers.equalTo(initialStoreCloseCount + 4));
     }
 
+    @Test
+    public void shouldInvokeUserDefinedGlobalStateRestoreListener() throws Exception {
+        final String inputTopic = "inputTopic";
+        final String outputTopic = "outputTopic";
+        CLUSTER.createTopic(inputTopic, 5, 1);
+        CLUSTER.createTopic(outputTopic, 5, 1);
+
+        final Map<String, Object> kafkaStreams1Configuration = mkMap(
+            mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath() + "-ks1"),
+            mkEntry(StreamsConfig.consumerPrefix(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG), appId + "-ks1"),
+            mkEntry(StreamsConfig.restoreConsumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 10)
+        );
+        final Map<String, Object> kafkaStreams2Configuration = mkMap(
+            mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath() + "-ks2"),
+            mkEntry(StreamsConfig.consumerPrefix(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG), appId + "-ks2"),
+            mkEntry(StreamsConfig.restoreConsumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 10)
+        );
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.stream(inputTopic, Consumed.with(EARLIEST))
+               .groupByKey()
+               .reduce((oldVal, newVal) -> newVal)
+               .toStream()
+               .to(outputTopic);
+
+        final List<KeyValue<Integer, Integer>> sampleData = IntStream.range(0, 100)
+                                                                     .mapToObj(i -> new KeyValue<>(i, i))
+                                                                     .collect(Collectors.toList());
+
+        sendEvents(inputTopic, sampleData);
+
+        kafkaStreams = startKafkaStreams(builder, null, kafkaStreams1Configuration);
+
+        validateReceivedMessages(sampleData, outputTopic);
+
+        // Close kafkaStreams1 (with cleanup) and start it again to force the restoration of the state.
+        kafkaStreams.close(Duration.ofMillis(IntegrationTestUtils.DEFAULT_TIMEOUT));
+        IntegrationTestUtils.purgeLocalStreamsState(streamsConfigurations);
+
+        final TestStateRestoreListener kafkaStreams1StateRestoreListener = new TestStateRestoreListener(RESTORATION_DELAY);
+        kafkaStreams = startKafkaStreams(builder, kafkaStreams1StateRestoreListener, kafkaStreams1Configuration);
+
+        assertTrue(kafkaStreams1StateRestoreListener.awaitUntilRestorationStarts());
+        assertTrue(kafkaStreams1StateRestoreListener.awaitUntilBatchRestoredIsCalled());
+
+        // Simulate a new instance joining in the middle of the restoration.
+        // When this happens, some of the partitions that kafkaStreams1 was restoring will be migrated to kafkaStreams2,
+        // and kafkaStreams1 must call StateRestoreListener#onRestoreSuspended.
+        final TestStateRestoreListener kafkaStreams2StateRestoreListener = new TestStateRestoreListener(RESTORATION_DELAY);
+        final KafkaStreams kafkaStreams2 = startKafkaStreams(builder, kafkaStreams2StateRestoreListener, kafkaStreams2Configuration);
+        assertTrue(kafkaStreams1StateRestoreListener.awaitUntilRestorationSuspends());
+
+        assertTrue(kafkaStreams2StateRestoreListener.awaitUntilRestorationStarts());
+
+        assertTrue(kafkaStreams1StateRestoreListener.awaitUntilRestorationEnds());
+        assertTrue(kafkaStreams2StateRestoreListener.awaitUntilRestorationEnds());
+
+        // Cleanup
+        kafkaStreams2.close(Duration.ofMillis(IntegrationTestUtils.DEFAULT_TIMEOUT));
+    }
+
+    private void validateReceivedMessages(final List<KeyValue<Integer, Integer>> expectedRecords,
+                                          final String outputTopic) throws Exception {
+        final Properties consumerProperties = new Properties();
+        consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-" + appId);
+        consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        consumerProperties.setProperty(
+            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+            IntegerDeserializer.class.getName()
+        );
+        consumerProperties.setProperty(
+            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+            IntegerDeserializer.class.getName()
+        );
+
+        IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(
+            consumerProperties,
+            outputTopic,
+            expectedRecords
+        );
+    }
+
+    private KafkaStreams startKafkaStreams(final StreamsBuilder streamsBuilder,
+                                           final StateRestoreListener stateRestoreListener,
+                                           final Map<String, Object> extraConfiguration) {
+        final Properties streamsConfiguration = props(mkObjectProperties(extraConfiguration));
+        streamsConfiguration.put(StreamsConfig.restoreConsumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 10);
+
+        final KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), streamsConfiguration);
+
+        kafkaStreams.setGlobalStateRestoreListener(stateRestoreListener);
+        kafkaStreams.start();
+
+        return kafkaStreams;
+    }
+
+    private static final class TestStateRestoreListener implements StateRestoreListener {
+        private final Duration onBatchRestoredSleepDuration;
+
+        private final CountDownLatch onRestoreStartLatch = new CountDownLatch(1);
+        private final CountDownLatch onRestoreEndLatch = new CountDownLatch(1);
+        private final CountDownLatch onRestoreSuspendedLatch = new CountDownLatch(1);
+        private final CountDownLatch onBatchRestoredLatch = new CountDownLatch(1);
+
+        TestStateRestoreListener(final Duration onBatchRestoredSleepDuration) {
+            this.onBatchRestoredSleepDuration = onBatchRestoredSleepDuration;
+        }
+
+        boolean awaitUntilRestorationStarts() throws InterruptedException {
+            return awaitLatchWithTimeout(onRestoreStartLatch);
+        }
+
+        boolean awaitUntilRestorationSuspends() throws InterruptedException {
+            return awaitLatchWithTimeout(onRestoreSuspendedLatch);
+        }
+
+        boolean awaitUntilRestorationEnds() throws InterruptedException {
+            return awaitLatchWithTimeout(onRestoreEndLatch);
+        }
+
+        public boolean awaitUntilBatchRestoredIsCalled() throws InterruptedException {
+            return awaitLatchWithTimeout(onBatchRestoredLatch);
+        }
+
+        @Override
+        public void onRestoreStart(final TopicPartition topicPartition,
+                                   final String storeName,
+                                   final long startingOffset,
+                                   final long endingOffset) {
+            onRestoreStartLatch.countDown();
+        }
+
+        @Override
+        public void onBatchRestored(final TopicPartition topicPartition,
+                                    final String storeName,
+                                    final long batchEndOffset,
+                                    final long numRestored) {
+            Utils.sleep(onBatchRestoredSleepDuration.toMillis());
+            onBatchRestoredLatch.countDown();
+        }
+
+        @Override
+        public void onRestoreEnd(final TopicPartition topicPartition,
+                                 final String storeName,
+                                 final long totalRestored) {
+            onRestoreEndLatch.countDown();
+        }
+
+        @Override
+        public void onRestoreSuspended(final TopicPartition topicPartition,
+                                       final String storeName,
+                                       final long totalRestored) {
+            onRestoreSuspendedLatch.countDown();
+        }
+
+        private static boolean awaitLatchWithTimeout(final CountDownLatch latch) throws InterruptedException {
+            return latch.await(IntegrationTestUtils.DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS);
+        }
+    }
+
+    private void sendEvents(final String topic, final List<KeyValue<Integer, Integer>> events) {
+        IntegrationTestUtils.produceKeyValuesSynchronously(
+            topic,
+            events,
+            TestUtils.producerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerSerializer.class,
+                IntegerSerializer.class,
+                new Properties()
+            ),
+            CLUSTER.time
+        );
+    }
+
     private static KeyValueBytesStoreSupplier getCloseCountingStore(final String name) {
         return new KeyValueBytesStoreSupplier() {
             @Override