You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2023/10/11 23:04:41 UTC
[kafka] branch trunk updated: KAFKA-15571: `StateRestoreListener#onRestoreSuspended` is never called because `DelegatingStateRestoreListener` doesn't implement `onRestoreSuspended` (#14519)
This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5dd155f350e KAFKA-15571: `StateRestoreListener#onRestoreSuspended` is never called because `DelegatingStateRestoreListener` doesn't implement `onRestoreSuspended` (#14519)
5dd155f350e is described below
commit 5dd155f350e6f9092bd7734dbf0600dc4af80bfe
Author: Levani Kokhreidze <le...@transferwise.com>
AuthorDate: Thu Oct 12 02:04:34 2023 +0300
KAFKA-15571: `StateRestoreListener#onRestoreSuspended` is never called because `DelegatingStateRestoreListener` doesn't implement `onRestoreSuspended` (#14519)
With https://issues.apache.org/jira/browse/KAFKA-10575 StateRestoreListener#onRestoreSuspended was added. But local tests show that it is never called because DelegatingStateRestoreListener was not updated to call a new method
Reviewers: Anna Sophie Blee-Goldman <so...@responsive.dev>, Bruno Cadonna <ca...@confluent.io>
---
.../org/apache/kafka/streams/KafkaStreams.java | 11 ++
.../integration/RestoreIntegrationTest.java | 207 ++++++++++++++++++++-
2 files changed, 214 insertions(+), 4 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index c7b6c05d840..586e0c5f30e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -730,6 +730,17 @@ public class KafkaStreams implements AutoCloseable {
}
}
}
+
+ @Override
+ public void onRestoreSuspended(final TopicPartition topicPartition, final String storeName, final long totalRestored) {
+ if (globalStateRestoreListener != null) {
+ try {
+ globalStateRestoreListener.onRestoreSuspended(topicPartition, storeName, totalRestored);
+ } catch (final Exception fatalUserException) {
+ throwOnFatalException(fatalUserException, topicPartition, storeName);
+ }
+ }
+ }
}
/**
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
index 9c47a330f04..ba73b08c45b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.KeyValue;
@@ -56,15 +57,15 @@ import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
-import org.apache.kafka.test.TestUtils;
import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
-import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
@@ -73,9 +74,11 @@ import org.junit.jupiter.params.provider.MethodSource;
import java.io.File;
import java.io.IOException;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -83,9 +86,15 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import java.util.stream.Stream;
import static java.util.Arrays.asList;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
+import static org.apache.kafka.streams.Topology.AutoOffsetReset.EARLIEST;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
@@ -99,6 +108,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(600)
@Tag("integration")
public class RestoreIntegrationTest {
+ private static final Duration RESTORATION_DELAY = Duration.ofSeconds(1);
+
private static final int NUM_BROKERS = 1;
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
@@ -119,6 +130,8 @@ public class RestoreIntegrationTest {
private final int numberOfKeys = 10000;
private KafkaStreams kafkaStreams;
+ private final List<Properties> streamsConfigurations = new ArrayList<>();
+
@BeforeEach
public void createTopics(final TestInfo testInfo) throws InterruptedException {
appId = safeUniqueTestName(RestoreIntegrationTest.class, testInfo);
@@ -127,7 +140,12 @@ public class RestoreIntegrationTest {
}
private Properties props(final boolean stateUpdaterEnabled) {
+ return props(mkObjectProperties(mkMap(mkEntry(InternalConfig.STATE_UPDATER_ENABLED, stateUpdaterEnabled))));
+ }
+
+ private Properties props(final Properties extraProperties) {
final Properties streamsConfiguration = new Properties();
+
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
@@ -136,15 +154,21 @@ public class RestoreIntegrationTest {
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- streamsConfiguration.put(InternalConfig.STATE_UPDATER_ENABLED, stateUpdaterEnabled);
+ streamsConfiguration.putAll(extraProperties);
+
+ streamsConfigurations.add(streamsConfiguration);
+
return streamsConfiguration;
}
@AfterEach
- public void shutdown() {
+ public void shutdown() throws Exception {
if (kafkaStreams != null) {
kafkaStreams.close(Duration.ofSeconds(30));
}
+
+ IntegrationTestUtils.purgeLocalStreamsState(streamsConfigurations);
+ streamsConfigurations.clear();
}
private static Stream<Boolean> parameters() {
@@ -490,6 +514,181 @@ public class RestoreIntegrationTest {
assertThat(CloseCountingInMemoryStore.numStoresClosed(), CoreMatchers.equalTo(initialStoreCloseCount + 4));
}
+ @Test
+ public void shouldInvokeUserDefinedGlobalStateRestoreListener() throws Exception {
+ final String inputTopic = "inputTopic";
+ final String outputTopic = "outputTopic";
+ CLUSTER.createTopic(inputTopic, 5, 1);
+ CLUSTER.createTopic(outputTopic, 5, 1);
+
+ final Map<String, Object> kafkaStreams1Configuration = mkMap(
+ mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath() + "-ks1"),
+ mkEntry(StreamsConfig.consumerPrefix(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG), appId + "-ks1"),
+ mkEntry(StreamsConfig.restoreConsumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 10)
+ );
+ final Map<String, Object> kafkaStreams2Configuration = mkMap(
+ mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(appId).getPath() + "-ks2"),
+ mkEntry(StreamsConfig.consumerPrefix(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG), appId + "-ks2"),
+ mkEntry(StreamsConfig.restoreConsumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 10)
+ );
+
+ final StreamsBuilder builder = new StreamsBuilder();
+ builder.stream(inputTopic, Consumed.with(EARLIEST))
+ .groupByKey()
+ .reduce((oldVal, newVal) -> newVal)
+ .toStream()
+ .to(outputTopic);
+
+ final List<KeyValue<Integer, Integer>> sampleData = IntStream.range(0, 100)
+ .mapToObj(i -> new KeyValue<>(i, i))
+ .collect(Collectors.toList());
+
+ sendEvents(inputTopic, sampleData);
+
+ kafkaStreams = startKafkaStreams(builder, null, kafkaStreams1Configuration);
+
+ validateReceivedMessages(sampleData, outputTopic);
+
+ // Close kafkaStreams1 (with cleanup) and start it again to force the restoration of the state.
+ kafkaStreams.close(Duration.ofMillis(IntegrationTestUtils.DEFAULT_TIMEOUT));
+ IntegrationTestUtils.purgeLocalStreamsState(streamsConfigurations);
+
+ final TestStateRestoreListener kafkaStreams1StateRestoreListener = new TestStateRestoreListener(RESTORATION_DELAY);
+ kafkaStreams = startKafkaStreams(builder, kafkaStreams1StateRestoreListener, kafkaStreams1Configuration);
+
+ assertTrue(kafkaStreams1StateRestoreListener.awaitUntilRestorationStarts());
+ assertTrue(kafkaStreams1StateRestoreListener.awaitUntilBatchRestoredIsCalled());
+
+ // Simulate a new instance joining in the middle of the restoration.
+ // When this happens, some of the partitions that kafkaStreams1 was restoring will be migrated to kafkaStreams2,
+ // and kafkaStreams1 must call StateRestoreListener#onRestoreSuspended.
+ final TestStateRestoreListener kafkaStreams2StateRestoreListener = new TestStateRestoreListener(RESTORATION_DELAY);
+ final KafkaStreams kafkaStreams2 = startKafkaStreams(builder, kafkaStreams2StateRestoreListener, kafkaStreams2Configuration);
+ assertTrue(kafkaStreams1StateRestoreListener.awaitUntilRestorationSuspends());
+
+ assertTrue(kafkaStreams2StateRestoreListener.awaitUntilRestorationStarts());
+
+ assertTrue(kafkaStreams1StateRestoreListener.awaitUntilRestorationEnds());
+ assertTrue(kafkaStreams2StateRestoreListener.awaitUntilRestorationEnds());
+
+ // Cleanup
+ kafkaStreams2.close(Duration.ofMillis(IntegrationTestUtils.DEFAULT_TIMEOUT));
+ }
+
+ private void validateReceivedMessages(final List<KeyValue<Integer, Integer>> expectedRecords,
+ final String outputTopic) throws Exception {
+ final Properties consumerProperties = new Properties();
+ consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+ consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-" + appId);
+ consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ consumerProperties.setProperty(
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ IntegerDeserializer.class.getName()
+ );
+ consumerProperties.setProperty(
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ IntegerDeserializer.class.getName()
+ );
+
+ IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(
+ consumerProperties,
+ outputTopic,
+ expectedRecords
+ );
+ }
+
+ private KafkaStreams startKafkaStreams(final StreamsBuilder streamsBuilder,
+ final StateRestoreListener stateRestoreListener,
+ final Map<String, Object> extraConfiguration) {
+ final Properties streamsConfiguration = props(mkObjectProperties(extraConfiguration));
+ streamsConfiguration.put(StreamsConfig.restoreConsumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 10);
+
+ final KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), streamsConfiguration);
+
+ kafkaStreams.setGlobalStateRestoreListener(stateRestoreListener);
+ kafkaStreams.start();
+
+ return kafkaStreams;
+ }
+
+ private static final class TestStateRestoreListener implements StateRestoreListener {
+ private final Duration onBatchRestoredSleepDuration;
+
+ private final CountDownLatch onRestoreStartLatch = new CountDownLatch(1);
+ private final CountDownLatch onRestoreEndLatch = new CountDownLatch(1);
+ private final CountDownLatch onRestoreSuspendedLatch = new CountDownLatch(1);
+ private final CountDownLatch onBatchRestoredLatch = new CountDownLatch(1);
+
+ TestStateRestoreListener(final Duration onBatchRestoredSleepDuration) {
+ this.onBatchRestoredSleepDuration = onBatchRestoredSleepDuration;
+ }
+
+ boolean awaitUntilRestorationStarts() throws InterruptedException {
+ return awaitLatchWithTimeout(onRestoreStartLatch);
+ }
+
+ boolean awaitUntilRestorationSuspends() throws InterruptedException {
+ return awaitLatchWithTimeout(onRestoreSuspendedLatch);
+ }
+
+ boolean awaitUntilRestorationEnds() throws InterruptedException {
+ return awaitLatchWithTimeout(onRestoreEndLatch);
+ }
+
+ public boolean awaitUntilBatchRestoredIsCalled() throws InterruptedException {
+ return awaitLatchWithTimeout(onBatchRestoredLatch);
+ }
+
+ @Override
+ public void onRestoreStart(final TopicPartition topicPartition,
+ final String storeName,
+ final long startingOffset,
+ final long endingOffset) {
+ onRestoreStartLatch.countDown();
+ }
+
+ @Override
+ public void onBatchRestored(final TopicPartition topicPartition,
+ final String storeName,
+ final long batchEndOffset,
+ final long numRestored) {
+ Utils.sleep(onBatchRestoredSleepDuration.toMillis());
+ onBatchRestoredLatch.countDown();
+ }
+
+ @Override
+ public void onRestoreEnd(final TopicPartition topicPartition,
+ final String storeName,
+ final long totalRestored) {
+ onRestoreEndLatch.countDown();
+ }
+
+ @Override
+ public void onRestoreSuspended(final TopicPartition topicPartition,
+ final String storeName,
+ final long totalRestored) {
+ onRestoreSuspendedLatch.countDown();
+ }
+
+ private static boolean awaitLatchWithTimeout(final CountDownLatch latch) throws InterruptedException {
+ return latch.await(IntegrationTestUtils.DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ private void sendEvents(final String topic, final List<KeyValue<Integer, Integer>> events) {
+ IntegrationTestUtils.produceKeyValuesSynchronously(
+ topic,
+ events,
+ TestUtils.producerConfig(
+ CLUSTER.bootstrapServers(),
+ IntegerSerializer.class,
+ IntegerSerializer.class,
+ new Properties()
+ ),
+ CLUSTER.time
+ );
+ }
+
private static KeyValueBytesStoreSupplier getCloseCountingStore(final String name) {
return new KeyValueBytesStoreSupplier() {
@Override