You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ca...@apache.org on 2022/07/06 10:36:23 UTC

[kafka] branch trunk updated: KAFKA-10199: Remove call to Task#completeRestoration from state updater (#12379)

This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 00f395bb889 KAFKA-10199: Remove call to Task#completeRestoration from state updater (#12379)
00f395bb889 is described below

commit 00f395bb889252e3efb62cedfaa1209b58d5fd3c
Author: Bruno Cadonna <ca...@apache.org>
AuthorDate: Wed Jul 6 12:36:15 2022 +0200

    KAFKA-10199: Remove call to Task#completeRestoration from state updater (#12379)
    
    The call to Task#completeRestoration calls methods on the main consumer.
    The state updater thread should not access the main consumer since the
    main consumer is not thread-safe. Additionally, Task#completeRestoration
    changed the state of active tasks, but we decided to keep task life cycle
    management outside of the state updater.
    
    Task#completeRestoration should be called by the stream thread on
    restored active tasks returned by the state udpater.
    
    Reviewer: Guozhang Wang <gu...@apache.org>
---
 .../processor/internals/DefaultStateUpdater.java   | 13 ++-------
 .../internals/DefaultStateUpdaterTest.java         | 31 +++++-----------------
 2 files changed, 9 insertions(+), 35 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
index 886a37b3140..3969a911f33 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
@@ -46,7 +46,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.Consumer;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -60,16 +59,12 @@ public class DefaultStateUpdater implements StateUpdater {
 
         private final ChangelogReader changelogReader;
         private final AtomicBoolean isRunning = new AtomicBoolean(true);
-        private final Consumer<Set<TopicPartition>> offsetResetter;
         private final Map<TaskId, Task> updatingTasks = new ConcurrentHashMap<>();
         private final Logger log;
 
-        public StateUpdaterThread(final String name,
-                                  final ChangelogReader changelogReader,
-                                  final Consumer<Set<TopicPartition>> offsetResetter) {
+        public StateUpdaterThread(final String name, final ChangelogReader changelogReader) {
             super(name);
             this.changelogReader = changelogReader;
-            this.offsetResetter = offsetResetter;
 
             final String logPrefix = String.format("%s ", name);
             final LogContext logContext = new LogContext(logPrefix);
@@ -286,7 +281,6 @@ public class DefaultStateUpdater implements StateUpdater {
                                               final Set<TopicPartition> restoredChangelogs) {
             final Collection<TopicPartition> taskChangelogPartitions = task.changelogPartitions();
             if (restoredChangelogs.containsAll(taskChangelogPartitions)) {
-                task.completeRestoration(offsetResetter);
                 task.maybeCheckpoint(true);
                 addToRestoredTasks(task);
                 updatingTasks.remove(task.id());
@@ -332,7 +326,6 @@ public class DefaultStateUpdater implements StateUpdater {
 
     private final Time time;
     private final ChangelogReader changelogReader;
-    private final Consumer<Set<TopicPartition>> offsetResetter;
     private final Queue<TaskAndAction> tasksAndActions = new LinkedList<>();
     private final Lock tasksAndActionsLock = new ReentrantLock();
     private final Condition tasksAndActionsCondition = tasksAndActionsLock.newCondition();
@@ -350,17 +343,15 @@ public class DefaultStateUpdater implements StateUpdater {
 
     public DefaultStateUpdater(final StreamsConfig config,
                                final ChangelogReader changelogReader,
-                               final Consumer<Set<TopicPartition>> offsetResetter,
                                final Time time) {
         this.changelogReader = changelogReader;
-        this.offsetResetter = offsetResetter;
         this.time = time;
         this.commitIntervalMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG);
     }
 
     public void start() {
         if (stateUpdaterThread == null) {
-            stateUpdaterThread = new StateUpdaterThread("state-updater", changelogReader, offsetResetter);
+            stateUpdaterThread = new StateUpdaterThread("state-updater", changelogReader);
             stateUpdaterThread.start();
             shutdownGate = new CountDownLatch(1);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
index 5e2d90de71d..e5962274827 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
@@ -84,7 +84,7 @@ class DefaultStateUpdaterTest {
     private final StreamsConfig config = new StreamsConfig(configProps());
     private final ChangelogReader changelogReader = mock(ChangelogReader.class);
     private final java.util.function.Consumer<Set<TopicPartition>> offsetResetter = topicPartitions -> { };
-    private final DefaultStateUpdater stateUpdater = new DefaultStateUpdater(config, changelogReader, offsetResetter, time);
+    private final DefaultStateUpdater stateUpdater = new DefaultStateUpdater(config, changelogReader, time);
 
     @AfterEach
     public void tearDown() {
@@ -93,11 +93,11 @@ class DefaultStateUpdaterTest {
 
     private Properties configProps() {
         return mkObjectProperties(mkMap(
-                mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"),
-                mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"),
-                mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2),
-                mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL),
-                mkEntry(producerPrefix(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG), COMMIT_INTERVAL)
+            mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"),
+            mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"),
+            mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2),
+            mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL),
+            mkEntry(producerPrefix(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG), COMMIT_INTERVAL)
         ));
     }
 
@@ -202,7 +202,6 @@ class DefaultStateUpdaterTest {
         verifyRemovedTasks();
         verify(changelogReader, times(1)).enforceRestoreActive();
         verify(changelogReader, atLeast(3)).restore(anyMap());
-        verify(task).completeRestoration(offsetResetter);
         verify(changelogReader, never()).transitToUpdateStandby();
     }
 
@@ -234,9 +233,6 @@ class DefaultStateUpdaterTest {
         verifyRemovedTasks();
         verify(changelogReader, times(3)).enforceRestoreActive();
         verify(changelogReader, atLeast(4)).restore(anyMap());
-        verify(task3).completeRestoration(offsetResetter);
-        verify(task1).completeRestoration(offsetResetter);
-        verify(task2).completeRestoration(offsetResetter);
         verify(changelogReader, never()).transitToUpdateStandby();
     }
 
@@ -318,8 +314,6 @@ class DefaultStateUpdaterTest {
         verifyUpdatingStandbyTasks(task4, task3);
         verifyExceptionsAndFailedTasks();
         verifyRemovedTasks();
-        verify(task1).completeRestoration(offsetResetter);
-        verify(task2).completeRestoration(offsetResetter);
         verify(changelogReader, atLeast(3)).restore(anyMap());
         final InOrder orderVerifier = inOrder(changelogReader, task1, task2);
         orderVerifier.verify(changelogReader, times(2)).enforceRestoreActive();
@@ -343,7 +337,6 @@ class DefaultStateUpdaterTest {
 
         verifyRestoredActiveTasks(task1);
         verifyCheckpointTasks(true, task1);
-        verify(task1).completeRestoration(offsetResetter);
         verifyUpdatingStandbyTasks(task2);
         final InOrder orderVerifier = inOrder(changelogReader);
         orderVerifier.verify(changelogReader, times(1)).enforceRestoreActive();
@@ -352,7 +345,6 @@ class DefaultStateUpdaterTest {
         stateUpdater.add(task3);
 
         verifyRestoredActiveTasks(task1, task3);
-        verify(task3).completeRestoration(offsetResetter);
         orderVerifier.verify(changelogReader, times(1)).enforceRestoreActive();
         orderVerifier.verify(changelogReader, times(1)).transitToUpdateStandby();
     }
@@ -715,7 +707,7 @@ class DefaultStateUpdaterTest {
     public void shouldNotAutoCheckpointTasksIfIntervalNotElapsed() {
         // we need to use a non auto-ticking timer here to control how much time elapsed exactly
         final Time time = new MockTime();
-        final DefaultStateUpdater stateUpdater = new DefaultStateUpdater(config, changelogReader, offsetResetter, time);
+        final DefaultStateUpdater stateUpdater = new DefaultStateUpdater(config, changelogReader, time);
         try {
             final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
             final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0));
@@ -922,7 +914,6 @@ class DefaultStateUpdaterTest {
                 VERIFICATION_TIMEOUT,
                 "Did not get all restored active task within the given timeout!"
             );
-            assertTrue(restoredTasks.stream().allMatch(task -> task.state() == State.RESTORING));
         }
     }
 
@@ -956,10 +947,6 @@ class DefaultStateUpdaterTest {
                 VERIFICATION_TIMEOUT,
                 "Did not get all updating task within the given timeout!"
             );
-            assertTrue(updatingTasks.stream()
-                .allMatch(task -> task.isActive() && task.state() == State.RESTORING
-                    ||
-                    !task.isActive() && task.state() == State.RUNNING));
         }
     }
 
@@ -975,7 +962,6 @@ class DefaultStateUpdaterTest {
             VERIFICATION_TIMEOUT,
             "Did not see all standby task within the given timeout!"
         );
-        assertTrue(standbyTasks.stream().allMatch(task -> task.state() == State.RUNNING));
     }
 
     private void verifyRemovedTasks(final Task... tasks) throws Exception {
@@ -993,9 +979,6 @@ class DefaultStateUpdaterTest {
                 VERIFICATION_TIMEOUT,
                 "Did not get all removed task within the given timeout!"
             );
-            assertTrue(removedTasks.stream()
-                .allMatch(task -> task.isActive() && task.state() == State.RESTORING
-                    || !task.isActive() && task.state() == State.RUNNING));
         }
     }