You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2023/02/07 19:33:20 UTC
[kafka] branch trunk updated: KAFKA-10575: Add onRestoreSuspsnded to StateRestoreListener (#13179)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 788793dee6f KAFKA-10575: Add onRestoreSuspsnded to StateRestoreListener (#13179)
788793dee6f is described below
commit 788793dee6fa5d7ba5cb7d756b72c7d043dc8789
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Tue Feb 7 11:33:09 2023 -0800
KAFKA-10575: Add onRestoreSuspsnded to StateRestoreListener (#13179)
1. Add the new API (default impl is empty) to StateRestoreListener.
2. Update related unit tests
Reviewers: Lucas Brutschy <lu...@users.noreply.github.com>, Matthias J. Sax <mj...@apache.org>
---
.../streams/processor/StateRestoreListener.java | 22 ++++
.../processor/internals/StoreChangelogReader.java | 15 +++
.../internals/StoreChangelogReaderTest.java | 133 +++++++++++++++++++--
.../kafka/test/MockStateRestoreListener.java | 10 ++
4 files changed, 173 insertions(+), 7 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java
index 6ba794f187b..006cc58cd43 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java
@@ -22,13 +22,16 @@ import org.apache.kafka.common.TopicPartition;
/**
* Class for listening to various states of the restoration process of a StateStore.
*
+ * <p>
* When calling {@link org.apache.kafka.streams.KafkaStreams#setGlobalStateRestoreListener(StateRestoreListener)}
* the passed instance is expected to be stateless since the {@code StateRestoreListener} is shared
* across all {@link org.apache.kafka.streams.processor.internals.StreamThread} instances.
*
+ * <p>
* Users desiring stateful operations will need to provide synchronization internally in
* the {@code StateRestorerListener} implementation.
*
+ * <p>
* Note that this listener is only registered at the per-client level and users can base on the {@code storeName}
* parameter to define specific monitoring for different {@link StateStore}s. There is another
* {@link StateRestoreCallback} interface which is registered via the
@@ -37,6 +40,12 @@ import org.apache.kafka.common.TopicPartition;
* These two interfaces serve different restoration purposes and users should not try to implement both of them in a single
* class during state store registration.
*
+ * <p>
+ * Also note that the update process of standby tasks is not monitored via this interface, since a standby task does
+ * note actually <it>restore</it> state, but keeps updating its state from the changelogs written by the active task
+ * which does not ever finish.
+ *
+ * <p>
* Incremental updates are exposed so users can estimate how much progress has been made.
*/
public interface StateRestoreListener {
@@ -85,4 +94,17 @@ public interface StateRestoreListener {
final String storeName,
final long totalRestored);
+ /**
+ * Method called when restoring the {@link StateStore} is suspended due to the task being migrated out of the host.
+ * If the migrated task is recycled or re-assigned back to the current host, another
+ * {@link #onRestoreStart(TopicPartition, String, long, long)} would be called.
+ *
+ * @param topicPartition the {@link TopicPartition} containing the values to restore
+ * @param storeName the name of the store just restored
+ * @param totalRestored the total number of records restored for this TopicPartition before being paused
+ */
+ default void onRestoreSuspended(final TopicPartition topicPartition,
+ final String storeName,
+ final long totalRestored) {
+ }
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 874f1993c19..be580f3575c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -986,8 +986,23 @@ public class StoreChangelogReader implements ChangelogReader {
for (final TopicPartition partition : revokedChangelogs) {
final ChangelogMetadata changelogMetadata = changelogs.remove(partition);
if (changelogMetadata != null) {
+ // if the changelog is still in REGISTERED, it means it has not initialized and started
+ // restoring yet, and hence we should not try to remove the changelog partition
if (!changelogMetadata.state().equals(ChangelogState.REGISTERED)) {
revokedInitializedChangelogs.add(partition);
+
+ // if the changelog is not in RESTORING, it means
+ // the corresponding onRestoreStart was not called; in this case
+ // we should not call onRestoreSuspended either
+ if (changelogMetadata.stateManager.taskType() == Task.TaskType.ACTIVE &&
+ changelogMetadata.state().equals(ChangelogState.RESTORING)) {
+ try {
+ final String storeName = changelogMetadata.storeMetadata.store().name();
+ stateRestoreListener.onRestoreSuspended(partition, storeName, changelogMetadata.totalRestored);
+ } catch (final Exception e) {
+ throw new StreamsException("State restore listener failed on restore paused", e);
+ }
+ }
}
changelogMetadata.clear();
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index fbd0db99a0d..3ceaed80210 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -72,6 +72,7 @@ import static org.apache.kafka.streams.processor.internals.Task.TaskType.ACTIVE;
import static org.apache.kafka.streams.processor.internals.Task.TaskType.STANDBY;
import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_BATCH;
import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_END;
+import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_SUSPENDED;
import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_START;
import static org.easymock.EasyMock.anyLong;
import static org.easymock.EasyMock.anyObject;
@@ -198,10 +199,83 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
}
@Test
- public void shouldInitializeChangelogAndCheckForCompletion() {
+ public void shouldSupportUnregisterChangelogBeforeInitialization() {
+ final Map<TaskId, Task> mockTasks = mock(Map.class);
+ EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes();
+ EasyMock.expect(storeMetadata.offset()).andReturn(9L).anyTimes();
+ EasyMock.expect(stateManager.changelogOffsets()).andReturn(singletonMap(tp, 10L));
+ EasyMock.replay(mockTasks, stateManager, storeMetadata, store);
+
+ adminClient.updateEndOffsets(Collections.singletonMap(tp, 100L));
+
+ final StoreChangelogReader changelogReader =
+ new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback);
+
+ changelogReader.register(tp, stateManager);
+
+ if (type == STANDBY) {
+ changelogReader.transitToUpdateStandby();
+ }
+
+ changelogReader.unregister(Collections.singleton(tp));
+
+ assertEquals(Collections.emptySet(), consumer.assignment());
+
+ assertNull(callback.restoreTopicPartition);
+ assertNull(callback.storeNameCalledStates.get(RESTORE_START));
+ assertNull(callback.storeNameCalledStates.get(RESTORE_SUSPENDED));
+ assertNull(callback.storeNameCalledStates.get(RESTORE_BATCH));
+ }
+
+ @Test
+ public void shouldSupportUnregisterChangelogBeforeCompletion() {
final Map<TaskId, Task> mockTasks = mock(Map.class);
EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes();
EasyMock.expect(storeMetadata.offset()).andReturn(9L).anyTimes();
+ EasyMock.expect(stateManager.changelogOffsets()).andReturn(singletonMap(tp, 10L));
+ EasyMock.replay(mockTasks, stateManager, storeMetadata, store);
+
+ adminClient.updateEndOffsets(Collections.singletonMap(tp, 100L));
+
+ final StoreChangelogReader changelogReader =
+ new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback);
+
+ changelogReader.register(tp, stateManager);
+
+ if (type == STANDBY) {
+ changelogReader.transitToUpdateStandby();
+ }
+
+ changelogReader.restore(mockTasks);
+
+ assertEquals(0L, changelogReader.changelogMetadata(tp).totalRestored());
+ assertEquals(Collections.emptySet(), changelogReader.completedChangelogs());
+ assertEquals(10L, consumer.position(tp));
+ assertEquals(Collections.emptySet(), consumer.paused());
+ assertEquals(Collections.singleton(tp), consumer.assignment());
+
+ changelogReader.unregister(Collections.singleton(tp));
+
+ assertEquals(Collections.emptySet(), consumer.assignment());
+
+ if (type == ACTIVE) {
+ assertEquals(tp, callback.restoreTopicPartition);
+ assertEquals(storeName, callback.storeNameCalledStates.get(RESTORE_START));
+ assertEquals(storeName, callback.storeNameCalledStates.get(RESTORE_SUSPENDED));
+ } else {
+ assertNull(callback.restoreTopicPartition);
+ assertNull(callback.storeNameCalledStates.get(RESTORE_START));
+ assertNull(callback.storeNameCalledStates.get(RESTORE_SUSPENDED));
+ }
+ assertNull(callback.storeNameCalledStates.get(RESTORE_BATCH));
+ }
+
+ @Test
+ public void shouldSupportUnregisterChangelogAfterCompletion() {
+ final Map<TaskId, Task> mockTasks = mock(Map.class);
+ EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes();
+ EasyMock.expect(storeMetadata.offset()).andReturn(9L).anyTimes();
+ EasyMock.expect(stateManager.changelogOffsets()).andReturn(singletonMap(tp, 10L));
EasyMock.replay(mockTasks, stateManager, storeMetadata, store);
adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L));
@@ -209,20 +283,65 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
final StoreChangelogReader changelogReader =
new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback);
+ changelogReader.register(tp, stateManager);
+
+ if (type == STANDBY) {
+ changelogReader.transitToUpdateStandby();
+ }
+
+ changelogReader.restore(mockTasks);
+
+ assertEquals(0L, changelogReader.changelogMetadata(tp).totalRestored());
+ assertEquals(10L, consumer.position(tp));
+
+ assertEquals(Collections.singleton(tp), consumer.assignment());
+ if (type == ACTIVE) {
+ assertEquals(Collections.singleton(tp), changelogReader.completedChangelogs());
+ assertEquals(Collections.singleton(tp), consumer.paused());
+ } else {
+ assertEquals(Collections.emptySet(), changelogReader.completedChangelogs());
+ assertEquals(Collections.emptySet(), consumer.paused());
+ }
+
+ changelogReader.unregister(Collections.singleton(tp));
+
+ assertEquals(Collections.emptySet(), consumer.assignment());
+
+ if (type == ACTIVE) {
+ assertEquals(tp, callback.restoreTopicPartition);
+ assertEquals(storeName, callback.storeNameCalledStates.get(RESTORE_START));
+ assertEquals(storeName, callback.storeNameCalledStates.get(RESTORE_END));
+ assertNull(callback.storeNameCalledStates.get(RESTORE_SUSPENDED));
+ assertNull(callback.storeNameCalledStates.get(RESTORE_BATCH));
+ }
+ }
+
+ @Test
+ public void shouldInitializeChangelogAndCheckForCompletion() {
+ final Map<TaskId, Task> mockTasks = mock(Map.class);
+ EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes();
+ EasyMock.expect(storeMetadata.offset()).andReturn(9L).anyTimes();
+ EasyMock.replay(mockTasks, stateManager, storeMetadata, store);
+
+ adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L));
+
+ final StoreChangelogReader changelogReader =
+ new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback);
+
changelogReader.register(tp, stateManager);
changelogReader.restore(mockTasks);
assertEquals(
- type == ACTIVE ?
- StoreChangelogReader.ChangelogState.COMPLETED :
- StoreChangelogReader.ChangelogState.RESTORING,
- changelogReader.changelogMetadata(tp).state()
+ type == ACTIVE ?
+ StoreChangelogReader.ChangelogState.COMPLETED :
+ StoreChangelogReader.ChangelogState.RESTORING,
+ changelogReader.changelogMetadata(tp).state()
);
assertEquals(type == ACTIVE ? 10L : null, changelogReader.changelogMetadata(tp).endOffset());
assertEquals(0L, changelogReader.changelogMetadata(tp).totalRestored());
assertEquals(
- type == ACTIVE ? Collections.singleton(tp) : Collections.emptySet(),
- changelogReader.completedChangelogs()
+ type == ACTIVE ? Collections.singleton(tp) : Collections.emptySet(),
+ changelogReader.completedChangelogs()
);
assertEquals(10L, consumer.position(tp));
assertEquals(Collections.singleton(tp), consumer.paused());
diff --git a/streams/src/test/java/org/apache/kafka/test/MockStateRestoreListener.java b/streams/src/test/java/org/apache/kafka/test/MockStateRestoreListener.java
index 10269699d04..6c423a48705 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockStateRestoreListener.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockStateRestoreListener.java
@@ -37,6 +37,7 @@ public class MockStateRestoreListener implements StateRestoreListener {
public static final String RESTORE_START = "restore_start";
public static final String RESTORE_BATCH = "restore_batch";
public static final String RESTORE_END = "restore_end";
+ public static final String RESTORE_SUSPENDED = "restore_suspended";
@Override
public void onRestoreStart(final TopicPartition topicPartition,
@@ -69,6 +70,15 @@ public class MockStateRestoreListener implements StateRestoreListener {
totalNumRestored = totalRestored;
}
+ @Override
+ public void onRestoreSuspended(final TopicPartition topicPartition,
+ final String storeName,
+ final long totalRestored) {
+ restoreTopicPartition = topicPartition;
+ storeNameCalledStates.put(RESTORE_SUSPENDED, storeName);
+ totalNumRestored = totalRestored;
+ }
+
@Override
public String toString() {
return "MockStateRestoreListener{" +