You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2023/02/07 19:33:20 UTC

[kafka] branch trunk updated: KAFKA-10575: Add onRestoreSuspsnded to StateRestoreListener (#13179)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 788793dee6f KAFKA-10575: Add onRestoreSuspsnded to StateRestoreListener (#13179)
788793dee6f is described below

commit 788793dee6fa5d7ba5cb7d756b72c7d043dc8789
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Tue Feb 7 11:33:09 2023 -0800

    KAFKA-10575: Add onRestoreSuspsnded to StateRestoreListener (#13179)
    
    1. Add the new API (default impl is empty) to StateRestoreListener.
    2. Update related unit tests
    
    Reviewers: Lucas Brutschy <lu...@users.noreply.github.com>, Matthias J. Sax <mj...@apache.org>
---
 .../streams/processor/StateRestoreListener.java    |  22 ++++
 .../processor/internals/StoreChangelogReader.java  |  15 +++
 .../internals/StoreChangelogReaderTest.java        | 133 +++++++++++++++++++--
 .../kafka/test/MockStateRestoreListener.java       |  10 ++
 4 files changed, 173 insertions(+), 7 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java
index 6ba794f187b..006cc58cd43 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java
@@ -22,13 +22,16 @@ import org.apache.kafka.common.TopicPartition;
 /**
  * Class for listening to various states of the restoration process of a StateStore.
  *
+ * <p>
  * When calling {@link org.apache.kafka.streams.KafkaStreams#setGlobalStateRestoreListener(StateRestoreListener)}
  * the passed instance is expected to be stateless since the {@code StateRestoreListener} is shared
  * across all {@link org.apache.kafka.streams.processor.internals.StreamThread} instances.
  *
+ * <p>
  * Users desiring stateful operations will need to provide synchronization internally in
  * the {@code StateRestorerListener} implementation.
  *
+ * <p>
  * Note that this listener is only registered at the per-client level and users can base on the {@code storeName}
  * parameter to define specific monitoring for different {@link StateStore}s. There is another
  * {@link StateRestoreCallback} interface which is registered via the
@@ -37,6 +40,12 @@ import org.apache.kafka.common.TopicPartition;
  * These two interfaces serve different restoration purposes and users should not try to implement both of them in a single
  * class during state store registration.
  *
+ * <p>
+ * Also note that the update process of standby tasks is not monitored via this interface, since a standby task does
+ * note actually <it>restore</it> state, but keeps updating its state from the changelogs written by the active task
+ * which does not ever finish.
+ *
+ * <p>
  * Incremental updates are exposed so users can estimate how much progress has been made.
  */
 public interface StateRestoreListener {
@@ -85,4 +94,17 @@ public interface StateRestoreListener {
                       final String storeName,
                       final long totalRestored);
 
+    /**
+     * Method called when restoring the {@link StateStore} is suspended due to the task being migrated out of the host.
+     * If the migrated task is recycled or re-assigned back to the current host, another
+     * {@link #onRestoreStart(TopicPartition, String, long, long)} would be called.
+     *
+     * @param topicPartition the {@link TopicPartition} containing the values to restore
+     * @param storeName the name of the store just restored
+     * @param totalRestored the total number of records restored for this TopicPartition before being paused
+     */
+    default void onRestoreSuspended(final TopicPartition topicPartition,
+                                    final String storeName,
+                                    final long totalRestored) {
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 874f1993c19..be580f3575c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -986,8 +986,23 @@ public class StoreChangelogReader implements ChangelogReader {
         for (final TopicPartition partition : revokedChangelogs) {
             final ChangelogMetadata changelogMetadata = changelogs.remove(partition);
             if (changelogMetadata != null) {
+                // if the changelog is still in REGISTERED, it means it has not initialized and started
+                // restoring yet, and hence we should not try to remove the changelog partition
                 if (!changelogMetadata.state().equals(ChangelogState.REGISTERED)) {
                     revokedInitializedChangelogs.add(partition);
+
+                    // if the changelog is not in RESTORING, it means
+                    // the corresponding onRestoreStart was not called; in this case
+                    // we should not call onRestoreSuspended either
+                    if (changelogMetadata.stateManager.taskType() == Task.TaskType.ACTIVE &&
+                        changelogMetadata.state().equals(ChangelogState.RESTORING)) {
+                        try {
+                            final String storeName = changelogMetadata.storeMetadata.store().name();
+                            stateRestoreListener.onRestoreSuspended(partition, storeName, changelogMetadata.totalRestored);
+                        } catch (final Exception e) {
+                            throw new StreamsException("State restore listener failed on restore paused", e);
+                        }
+                    }
                 }
 
                 changelogMetadata.clear();
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index fbd0db99a0d..3ceaed80210 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -72,6 +72,7 @@ import static org.apache.kafka.streams.processor.internals.Task.TaskType.ACTIVE;
 import static org.apache.kafka.streams.processor.internals.Task.TaskType.STANDBY;
 import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_BATCH;
 import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_END;
+import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_SUSPENDED;
 import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_START;
 import static org.easymock.EasyMock.anyLong;
 import static org.easymock.EasyMock.anyObject;
@@ -198,10 +199,83 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
     }
 
     @Test
-    public void shouldInitializeChangelogAndCheckForCompletion() {
+    public void shouldSupportUnregisterChangelogBeforeInitialization() {
+        final Map<TaskId, Task> mockTasks = mock(Map.class);
+        EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes();
+        EasyMock.expect(storeMetadata.offset()).andReturn(9L).anyTimes();
+        EasyMock.expect(stateManager.changelogOffsets()).andReturn(singletonMap(tp, 10L));
+        EasyMock.replay(mockTasks, stateManager, storeMetadata, store);
+
+        adminClient.updateEndOffsets(Collections.singletonMap(tp, 100L));
+
+        final StoreChangelogReader changelogReader =
+            new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback);
+
+        changelogReader.register(tp, stateManager);
+
+        if (type == STANDBY) {
+            changelogReader.transitToUpdateStandby();
+        }
+
+        changelogReader.unregister(Collections.singleton(tp));
+
+        assertEquals(Collections.emptySet(), consumer.assignment());
+
+        assertNull(callback.restoreTopicPartition);
+        assertNull(callback.storeNameCalledStates.get(RESTORE_START));
+        assertNull(callback.storeNameCalledStates.get(RESTORE_SUSPENDED));
+        assertNull(callback.storeNameCalledStates.get(RESTORE_BATCH));
+    }
+
+    @Test
+    public void shouldSupportUnregisterChangelogBeforeCompletion() {
         final Map<TaskId, Task> mockTasks = mock(Map.class);
         EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes();
         EasyMock.expect(storeMetadata.offset()).andReturn(9L).anyTimes();
+        EasyMock.expect(stateManager.changelogOffsets()).andReturn(singletonMap(tp, 10L));
+        EasyMock.replay(mockTasks, stateManager, storeMetadata, store);
+
+        adminClient.updateEndOffsets(Collections.singletonMap(tp, 100L));
+
+        final StoreChangelogReader changelogReader =
+            new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback);
+
+        changelogReader.register(tp, stateManager);
+
+        if (type == STANDBY) {
+            changelogReader.transitToUpdateStandby();
+        }
+
+        changelogReader.restore(mockTasks);
+
+        assertEquals(0L, changelogReader.changelogMetadata(tp).totalRestored());
+        assertEquals(Collections.emptySet(), changelogReader.completedChangelogs());
+        assertEquals(10L, consumer.position(tp));
+        assertEquals(Collections.emptySet(), consumer.paused());
+        assertEquals(Collections.singleton(tp), consumer.assignment());
+
+        changelogReader.unregister(Collections.singleton(tp));
+
+        assertEquals(Collections.emptySet(), consumer.assignment());
+
+        if (type == ACTIVE) {
+            assertEquals(tp, callback.restoreTopicPartition);
+            assertEquals(storeName, callback.storeNameCalledStates.get(RESTORE_START));
+            assertEquals(storeName, callback.storeNameCalledStates.get(RESTORE_SUSPENDED));
+        } else {
+            assertNull(callback.restoreTopicPartition);
+            assertNull(callback.storeNameCalledStates.get(RESTORE_START));
+            assertNull(callback.storeNameCalledStates.get(RESTORE_SUSPENDED));
+        }
+        assertNull(callback.storeNameCalledStates.get(RESTORE_BATCH));
+    }
+
+    @Test
+    public void shouldSupportUnregisterChangelogAfterCompletion() {
+        final Map<TaskId, Task> mockTasks = mock(Map.class);
+        EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes();
+        EasyMock.expect(storeMetadata.offset()).andReturn(9L).anyTimes();
+        EasyMock.expect(stateManager.changelogOffsets()).andReturn(singletonMap(tp, 10L));
         EasyMock.replay(mockTasks, stateManager, storeMetadata, store);
 
         adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L));
@@ -209,20 +283,65 @@ public class StoreChangelogReaderTest extends EasyMockSupport {
         final StoreChangelogReader changelogReader =
             new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback);
 
+        changelogReader.register(tp, stateManager);
+
+        if (type == STANDBY) {
+            changelogReader.transitToUpdateStandby();
+        }
+
+        changelogReader.restore(mockTasks);
+
+        assertEquals(0L, changelogReader.changelogMetadata(tp).totalRestored());
+        assertEquals(10L, consumer.position(tp));
+
+        assertEquals(Collections.singleton(tp), consumer.assignment());
+        if (type == ACTIVE) {
+            assertEquals(Collections.singleton(tp), changelogReader.completedChangelogs());
+            assertEquals(Collections.singleton(tp), consumer.paused());
+        } else {
+            assertEquals(Collections.emptySet(), changelogReader.completedChangelogs());
+            assertEquals(Collections.emptySet(), consumer.paused());
+        }
+
+        changelogReader.unregister(Collections.singleton(tp));
+
+        assertEquals(Collections.emptySet(), consumer.assignment());
+
+        if (type == ACTIVE) {
+            assertEquals(tp, callback.restoreTopicPartition);
+            assertEquals(storeName, callback.storeNameCalledStates.get(RESTORE_START));
+            assertEquals(storeName, callback.storeNameCalledStates.get(RESTORE_END));
+            assertNull(callback.storeNameCalledStates.get(RESTORE_SUSPENDED));
+            assertNull(callback.storeNameCalledStates.get(RESTORE_BATCH));
+        }
+    }
+
+    @Test
+    public void shouldInitializeChangelogAndCheckForCompletion() {
+        final Map<TaskId, Task> mockTasks = mock(Map.class);
+        EasyMock.expect(mockTasks.get(null)).andReturn(mock(Task.class)).anyTimes();
+        EasyMock.expect(storeMetadata.offset()).andReturn(9L).anyTimes();
+        EasyMock.replay(mockTasks, stateManager, storeMetadata, store);
+
+        adminClient.updateEndOffsets(Collections.singletonMap(tp, 10L));
+
+        final StoreChangelogReader changelogReader =
+                new StoreChangelogReader(time, config, logContext, adminClient, consumer, callback);
+
         changelogReader.register(tp, stateManager);
         changelogReader.restore(mockTasks);
 
         assertEquals(
-            type == ACTIVE ?
-                StoreChangelogReader.ChangelogState.COMPLETED :
-                StoreChangelogReader.ChangelogState.RESTORING,
-            changelogReader.changelogMetadata(tp).state()
+                type == ACTIVE ?
+                        StoreChangelogReader.ChangelogState.COMPLETED :
+                        StoreChangelogReader.ChangelogState.RESTORING,
+                changelogReader.changelogMetadata(tp).state()
         );
         assertEquals(type == ACTIVE ? 10L : null, changelogReader.changelogMetadata(tp).endOffset());
         assertEquals(0L, changelogReader.changelogMetadata(tp).totalRestored());
         assertEquals(
-            type == ACTIVE ? Collections.singleton(tp) : Collections.emptySet(),
-            changelogReader.completedChangelogs()
+                type == ACTIVE ? Collections.singleton(tp) : Collections.emptySet(),
+                changelogReader.completedChangelogs()
         );
         assertEquals(10L, consumer.position(tp));
         assertEquals(Collections.singleton(tp), consumer.paused());
diff --git a/streams/src/test/java/org/apache/kafka/test/MockStateRestoreListener.java b/streams/src/test/java/org/apache/kafka/test/MockStateRestoreListener.java
index 10269699d04..6c423a48705 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockStateRestoreListener.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockStateRestoreListener.java
@@ -37,6 +37,7 @@ public class MockStateRestoreListener implements StateRestoreListener {
     public static final String RESTORE_START = "restore_start";
     public static final String RESTORE_BATCH = "restore_batch";
     public static final String RESTORE_END = "restore_end";
+    public static final String RESTORE_SUSPENDED = "restore_suspended";
 
     @Override
     public void onRestoreStart(final TopicPartition topicPartition,
@@ -69,6 +70,15 @@ public class MockStateRestoreListener implements StateRestoreListener {
         totalNumRestored = totalRestored;
     }
 
+    @Override
+    public void onRestoreSuspended(final TopicPartition topicPartition,
+                                   final String storeName,
+                                   final long totalRestored) {
+        restoreTopicPartition = topicPartition;
+        storeNameCalledStates.put(RESTORE_SUSPENDED, storeName);
+        totalNumRestored = totalRestored;
+    }
+
     @Override
     public String toString() {
         return "MockStateRestoreListener{" +