You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/02/17 18:14:22 UTC

[GitHub] [kafka] cadonna commented on a change in pull request #11765: [WIP] State updater implementation

cadonna commented on a change in pull request #11765:
URL: https://github.com/apache/kafka/pull/11765#discussion_r809336171



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
##########
@@ -0,0 +1,390 @@
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.errors.TaskCorruptedException;
+import org.apache.kafka.streams.processor.TaskId;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+public class DefaultStateUpdater implements StateUpdater {
+
+    private final static String BUG_ERROR_MESSAGE = "This indicates a bug. " +
+        "Please report at https://issues.apache.org/jira/projects/KAFKA/issues or to the dev-mailing list (https://kafka.apache.org/contact).";
+
+    private class StateUpdaterThread extends Thread {
+
+        private final ChangelogReader changelogReader;
+        private final ConcurrentMap<TaskId, Task> updatingTasks = new ConcurrentHashMap<>();
+        private final AtomicBoolean isRunning = new AtomicBoolean(true);
+        private final java.util.function.Consumer<Set<TopicPartition>> offsetResetter;
+
+        public StateUpdaterThread(final String name,
+                                  final ChangelogReader changelogReader,
+                                  final java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
+            super(name);
+            this.changelogReader = changelogReader;
+            this.offsetResetter = offsetResetter;
+        }
+
+        @Override
+        public void run() {
+            try {
+                while (isRunning.get()) {
+                    try {
+                        performActionsOnTasks();
+                        restoreTasks();
+                        waitIfAllChangelogsCompletelyRead();
+                    } catch (final InterruptedException interruptedException) {
+                        return;
+                    }
+                }
+            } catch (final Throwable anyOtherError) {
+                // ToDo: log that the thread died unexpectedly
+            } finally{
+                clear();
+            }
+        }
+
+        private void performActionsOnTasks() throws InterruptedException {
+            tasksAndActionsLock.lock();
+            try {
+                for (final TaskAndAction taskAndAction : getTasksAndActions()) {
+                    final Task task = taskAndAction.task;
+                    final Action action = taskAndAction.action;
+                    switch (action) {
+                        case ADD:
+                            addTask(task);
+                            break;
+                        case REMOVE:
+                            removeTask(task);
+                            break;
+                        case RECYCLE:
+                            ;
+                            break;
+
+                    }
+                }
+            } finally {
+                tasksAndActionsLock.unlock();
+            }
+        }
+
+        private void restoreTasks() throws InterruptedException {
+            try {
+                // ToDo: Prioritize restoration of active tasks over standby tasks
+                //                changelogReader.enforceRestoreActive();
+                changelogReader.restore(updatingTasks);
+            } catch (final TaskCorruptedException taskCorruptedException) {
+                handleTaskCorruptedException(taskCorruptedException);
+            } catch (final StreamsException streamsException) {
+                handleStreamsException(streamsException);
+            }
+            final Set<TopicPartition> completedChangelogs = changelogReader.completedChangelogs();
+            final List<Task> activeTasks = updatingTasks.values().stream().filter(Task::isActive).collect(Collectors.toList());
+            for (final Task task : activeTasks) {
+                endRestorationIfChangelogsCompletelyRead(task, completedChangelogs);
+            }
+        }
+
+        private void endRestorationIfChangelogsCompletelyRead(final Task task,
+                                                              final Set<TopicPartition> restoredChangelogs) {
+            catchException(() -> {
+                final Collection<TopicPartition> taskChangelogPartitions = task.changelogPartitions();
+                if (restoredChangelogs.containsAll(taskChangelogPartitions)) {
+                    task.completeRestoration(offsetResetter);
+                    addTaskToRestoredTasks((StreamTask) task);
+                    updatingTasks.remove(task.id());
+                }
+                return null;
+            }, task);
+        }
+
+        private void waitIfAllChangelogsCompletelyRead() throws InterruptedException {
+            if (isRunning.get() && changelogReader.allChangelogsCompleted()) {
+                tasksAndActionsLock.lock();
+                try {
+                    while (tasksAndActions.isEmpty()) {
+                        tasksAndActionsCondition.await();
+                    }
+                } finally {
+                    tasksAndActionsLock.unlock();
+                }
+            }
+        }
+
+        private List<TaskAndAction> getTasksAndActions() {
+            final List<TaskAndAction> tasksAndActionsToProcess = new ArrayList<>(tasksAndActions);
+            tasksAndActions.clear();
+            return tasksAndActionsToProcess;
+        }
+
+        private void addTask(final Task task) {

Review comment:
       Actually, this was my original thought, but then I tried a couple of other things. If we agree on the invariant of not having a task at the same time in `Tasks` and the state updater, I think we can go back to my original thought and what you propose here of keeping the state transition logic out of the state updater.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org