You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/02/15 12:45:12 UTC

[GitHub] [kafka] cadonna opened a new pull request #11765: [WIP] State updater implementation

cadonna opened a new pull request #11765:
URL: https://github.com/apache/kafka/pull/11765


   WIP
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #11765: [WIP] State updater implementation

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #11765:
URL: https://github.com/apache/kafka/pull/11765#discussion_r809340005



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
##########
@@ -0,0 +1,390 @@
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.errors.TaskCorruptedException;
+import org.apache.kafka.streams.processor.TaskId;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+public class DefaultStateUpdater implements StateUpdater {
+
+    private final static String BUG_ERROR_MESSAGE = "This indicates a bug. " +
+        "Please report at https://issues.apache.org/jira/projects/KAFKA/issues or to the dev-mailing list (https://kafka.apache.org/contact).";
+
+    private class StateUpdaterThread extends Thread {
+
+        private final ChangelogReader changelogReader;
+        private final ConcurrentMap<TaskId, Task> updatingTasks = new ConcurrentHashMap<>();
+        private final AtomicBoolean isRunning = new AtomicBoolean(true);
+        private final java.util.function.Consumer<Set<TopicPartition>> offsetResetter;
+
+        public StateUpdaterThread(final String name,
+                                  final ChangelogReader changelogReader,
+                                  final java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
+            super(name);
+            this.changelogReader = changelogReader;
+            this.offsetResetter = offsetResetter;
+        }
+
+        @Override
+        public void run() {
+            try {
+                while (isRunning.get()) {
+                    try {
+                        performActionsOnTasks();
+                        restoreTasks();
+                        waitIfAllChangelogsCompletelyRead();
+                    } catch (final InterruptedException interruptedException) {
+                        return;
+                    }
+                }
+            } catch (final Throwable anyOtherError) {
+                // ToDo: log that the thread died unexpectedly
+            } finally{
+                clear();
+            }
+        }
+
+        private void performActionsOnTasks() throws InterruptedException {
+            tasksAndActionsLock.lock();
+            try {
+                for (final TaskAndAction taskAndAction : getTasksAndActions()) {
+                    final Task task = taskAndAction.task;
+                    final Action action = taskAndAction.action;
+                    switch (action) {
+                        case ADD:
+                            addTask(task);
+                            break;
+                        case REMOVE:
+                            removeTask(task);
+                            break;
+                        case RECYCLE:
+                            ;
+                            break;
+
+                    }
+                }
+            } finally {
+                tasksAndActionsLock.unlock();
+            }
+        }
+
+        private void restoreTasks() throws InterruptedException {
+            try {
+                // ToDo: Prioritize restoration of active tasks over standby tasks
+                //                changelogReader.enforceRestoreActive();
+                changelogReader.restore(updatingTasks);
+            } catch (final TaskCorruptedException taskCorruptedException) {
+                handleTaskCorruptedException(taskCorruptedException);
+            } catch (final StreamsException streamsException) {
+                handleStreamsException(streamsException);
+            }
+            final Set<TopicPartition> completedChangelogs = changelogReader.completedChangelogs();
+            final List<Task> activeTasks = updatingTasks.values().stream().filter(Task::isActive).collect(Collectors.toList());
+            for (final Task task : activeTasks) {
+                endRestorationIfChangelogsCompletelyRead(task, completedChangelogs);
+            }
+        }
+
+        private void endRestorationIfChangelogsCompletelyRead(final Task task,
+                                                              final Set<TopicPartition> restoredChangelogs) {
+            catchException(() -> {
+                final Collection<TopicPartition> taskChangelogPartitions = task.changelogPartitions();
+                if (restoredChangelogs.containsAll(taskChangelogPartitions)) {
+                    task.completeRestoration(offsetResetter);
+                    addTaskToRestoredTasks((StreamTask) task);
+                    updatingTasks.remove(task.id());
+                }
+                return null;
+            }, task);
+        }
+
+        private void waitIfAllChangelogsCompletelyRead() throws InterruptedException {
+            if (isRunning.get() && changelogReader.allChangelogsCompleted()) {
+                tasksAndActionsLock.lock();
+                try {
+                    while (tasksAndActions.isEmpty()) {
+                        tasksAndActionsCondition.await();
+                    }
+                } finally {
+                    tasksAndActionsLock.unlock();
+                }
+            }
+        }
+
+        private List<TaskAndAction> getTasksAndActions() {
+            final List<TaskAndAction> tasksAndActionsToProcess = new ArrayList<>(tasksAndActions);
+            tasksAndActions.clear();
+            return tasksAndActionsToProcess;
+        }
+
+        private void addTask(final Task task) {
+            catchException(() -> {
+                final Task.State state = task.state();
+                switch (state) {
+
+                    case CREATED:
+                        task.initializeIfNeeded();
+                        // Todo: catch exceptions and clear task timeout
+                        if (isStateless(task)) {
+                            addTaskToRestoredTasks((StreamTask) task);
+                        } else {
+                            updatingTasks.put(task.id(), task);
+                        }
+                        break;
+
+                    case SUSPENDED:
+                        task.resume();
+                        break;
+
+                    default:
+                        throw new IllegalStateException("Illegal state " + state + " while adding to the state updater. "
+                            + BUG_ERROR_MESSAGE);
+                }
+                return null;
+            }, task);
+        }
+
+        private boolean isStateless(final Task task) {
+            return task.changelogPartitions().isEmpty() && task.isActive();
+        }
+
+        private void removeTask(final Task task) {
+            catchException(() -> {
+                final Collection<TopicPartition> changelogPartitions = task.changelogPartitions();
+                if (!changelogPartitions.isEmpty()) {
+                    updatingTasks.remove(task.id());
+                    task.stateManager().checkpoint();
+                    changelogReader.unregister(changelogPartitions);
+                }
+                if (task.isActive()) {
+                    removeTaskFromRestoredTasks((StreamTask) task);
+                }
+                task.suspend();
+                task.closeClean();
+                return null;
+            }, task);
+        }
+
+        private void catchException(final Supplier<Void> codeToCheck, final Task task) {
+            try {
+                codeToCheck.get();
+            } catch (final RuntimeException exception) {
+                exceptions.add(exception);
+                updatingTasks.remove(task.id());
+            }
+        }
+
+        private void handleTaskCorruptedException(final TaskCorruptedException taskCorruptedException) {
+            exceptions.add(taskCorruptedException);
+            final Set<TaskId> corruptedTaskIds = taskCorruptedException.corruptedTasks();
+            for (final TaskId taskId : corruptedTaskIds) {
+                updatingTasks.remove(taskId);
+            }
+        }
+
+        private void handleStreamsException(final StreamsException streamsException) {
+            if (streamsException.taskId().isPresent()) {
+                final Task task = updatingTasks.get(streamsException.taskId().get());
+                task.stateManager().checkpoint();

Review comment:
       I thought the idea is to checkpoint if we need to interrupt restoration because we actually updated the state. Otherwise we would reread the changelog topic from the position before we started restoration. But now that I think about it with eos that might be a problem, right? Probably, I am missing something here. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #11765: [WIP] State updater implementation

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #11765:
URL: https://github.com/apache/kafka/pull/11765#issuecomment-1040246456


   @guozhangwang Here is my code for putting restoration on a separate thread so far.
   The idea that I had is to give a reference to the state updater to the `Tasks` container. The `Tasks` container uses this reference to retrieve all active tasks that are in restoration and all standby tasks. That means, methods like `allTasks()` need to be modified to get their result from the state updater and the other data structures in the `Tasks` container. An important invariant is that a task cannot exist in the state updater and a data structure other than the state updater in the `Tasks` container at the same time. A task is either in one or the other. A typical lifecycle of an active task would then be: creation -> added to state updater ->  restoration is done -> removed from state updater (by calling `getRestoredActiveTasks()`) -> added to the appropriate data structure in the `Tasks` container. Standby tasks are created and put in the state updater and stay there. Tasks are recycled (i.e. transformed from active to standby or vice versa) in the state updater.
   
   ToDos:
   - The unit tests of the state updater are failing.
   - The code to recycle tasks are not yet in the state updater
   - The modifications to the `Tasks` container are still missing
   - Integration of the state updater in the normal processing.
   
   The first two points can be done in this PR. The third point should be done in a separate PR since this is already quite big. The last point is also a separate PR.   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #11765: [WIP] State updater implementation

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #11765:
URL: https://github.com/apache/kafka/pull/11765#discussion_r809336171



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
##########
@@ -0,0 +1,390 @@
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.errors.TaskCorruptedException;
+import org.apache.kafka.streams.processor.TaskId;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+public class DefaultStateUpdater implements StateUpdater {
+
+    private final static String BUG_ERROR_MESSAGE = "This indicates a bug. " +
+        "Please report at https://issues.apache.org/jira/projects/KAFKA/issues or to the dev-mailing list (https://kafka.apache.org/contact).";
+
+    private class StateUpdaterThread extends Thread {
+
+        private final ChangelogReader changelogReader;
+        private final ConcurrentMap<TaskId, Task> updatingTasks = new ConcurrentHashMap<>();
+        private final AtomicBoolean isRunning = new AtomicBoolean(true);
+        private final java.util.function.Consumer<Set<TopicPartition>> offsetResetter;
+
+        public StateUpdaterThread(final String name,
+                                  final ChangelogReader changelogReader,
+                                  final java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
+            super(name);
+            this.changelogReader = changelogReader;
+            this.offsetResetter = offsetResetter;
+        }
+
+        @Override
+        public void run() {
+            try {
+                while (isRunning.get()) {
+                    try {
+                        performActionsOnTasks();
+                        restoreTasks();
+                        waitIfAllChangelogsCompletelyRead();
+                    } catch (final InterruptedException interruptedException) {
+                        return;
+                    }
+                }
+            } catch (final Throwable anyOtherError) {
+                // ToDo: log that the thread died unexpectedly
+            } finally{
+                clear();
+            }
+        }
+
+        private void performActionsOnTasks() throws InterruptedException {
+            tasksAndActionsLock.lock();
+            try {
+                for (final TaskAndAction taskAndAction : getTasksAndActions()) {
+                    final Task task = taskAndAction.task;
+                    final Action action = taskAndAction.action;
+                    switch (action) {
+                        case ADD:
+                            addTask(task);
+                            break;
+                        case REMOVE:
+                            removeTask(task);
+                            break;
+                        case RECYCLE:
+                            ;
+                            break;
+
+                    }
+                }
+            } finally {
+                tasksAndActionsLock.unlock();
+            }
+        }
+
+        private void restoreTasks() throws InterruptedException {
+            try {
+                // ToDo: Prioritize restoration of active tasks over standby tasks
+                //                changelogReader.enforceRestoreActive();
+                changelogReader.restore(updatingTasks);
+            } catch (final TaskCorruptedException taskCorruptedException) {
+                handleTaskCorruptedException(taskCorruptedException);
+            } catch (final StreamsException streamsException) {
+                handleStreamsException(streamsException);
+            }
+            final Set<TopicPartition> completedChangelogs = changelogReader.completedChangelogs();
+            final List<Task> activeTasks = updatingTasks.values().stream().filter(Task::isActive).collect(Collectors.toList());
+            for (final Task task : activeTasks) {
+                endRestorationIfChangelogsCompletelyRead(task, completedChangelogs);
+            }
+        }
+
+        private void endRestorationIfChangelogsCompletelyRead(final Task task,
+                                                              final Set<TopicPartition> restoredChangelogs) {
+            catchException(() -> {
+                final Collection<TopicPartition> taskChangelogPartitions = task.changelogPartitions();
+                if (restoredChangelogs.containsAll(taskChangelogPartitions)) {
+                    task.completeRestoration(offsetResetter);
+                    addTaskToRestoredTasks((StreamTask) task);
+                    updatingTasks.remove(task.id());
+                }
+                return null;
+            }, task);
+        }
+
+        private void waitIfAllChangelogsCompletelyRead() throws InterruptedException {
+            if (isRunning.get() && changelogReader.allChangelogsCompleted()) {
+                tasksAndActionsLock.lock();
+                try {
+                    while (tasksAndActions.isEmpty()) {
+                        tasksAndActionsCondition.await();
+                    }
+                } finally {
+                    tasksAndActionsLock.unlock();
+                }
+            }
+        }
+
+        private List<TaskAndAction> getTasksAndActions() {
+            final List<TaskAndAction> tasksAndActionsToProcess = new ArrayList<>(tasksAndActions);
+            tasksAndActions.clear();
+            return tasksAndActionsToProcess;
+        }
+
+        private void addTask(final Task task) {

Review comment:
       Actually, this was my original thought, but then I tried a couple of other things. If we agree on the invariant of not having a task at the same time in `Tasks` and the state updater, I think we can go back to my original thought and what you propose here of keeping the state transition logic out of the state updater.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #11765: [WIP] State updater implementation

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #11765:
URL: https://github.com/apache/kafka/pull/11765#discussion_r807190659



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
##########
@@ -0,0 +1,390 @@
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.errors.TaskCorruptedException;
+import org.apache.kafka.streams.processor.TaskId;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+public class DefaultStateUpdater implements StateUpdater {
+
+    private final static String BUG_ERROR_MESSAGE = "This indicates a bug. " +
+        "Please report at https://issues.apache.org/jira/projects/KAFKA/issues or to the dev-mailing list (https://kafka.apache.org/contact).";
+
+    private class StateUpdaterThread extends Thread {
+
+        private final ChangelogReader changelogReader;
+        private final ConcurrentMap<TaskId, Task> updatingTasks = new ConcurrentHashMap<>();
+        private final AtomicBoolean isRunning = new AtomicBoolean(true);
+        private final java.util.function.Consumer<Set<TopicPartition>> offsetResetter;
+
+        public StateUpdaterThread(final String name,
+                                  final ChangelogReader changelogReader,
+                                  final java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
+            super(name);
+            this.changelogReader = changelogReader;
+            this.offsetResetter = offsetResetter;
+        }
+
+        @Override
+        public void run() {
+            try {
+                while (isRunning.get()) {
+                    try {
+                        performActionsOnTasks();
+                        restoreTasks();
+                        waitIfAllChangelogsCompletelyRead();
+                    } catch (final InterruptedException interruptedException) {
+                        return;
+                    }
+                }
+            } catch (final Throwable anyOtherError) {
+                // ToDo: log that the thread died unexpectedly
+            } finally{
+                clear();
+            }
+        }
+
+        private void performActionsOnTasks() throws InterruptedException {
+            tasksAndActionsLock.lock();
+            try {
+                for (final TaskAndAction taskAndAction : getTasksAndActions()) {
+                    final Task task = taskAndAction.task;
+                    final Action action = taskAndAction.action;
+                    switch (action) {
+                        case ADD:
+                            addTask(task);
+                            break;
+                        case REMOVE:
+                            removeTask(task);
+                            break;
+                        case RECYCLE:
+                            ;
+                            break;
+
+                    }
+                }
+            } finally {
+                tasksAndActionsLock.unlock();
+            }
+        }
+
+        private void restoreTasks() throws InterruptedException {
+            try {
+                // ToDo: Prioritize restoration of active tasks over standby tasks
+                //                changelogReader.enforceRestoreActive();
+                changelogReader.restore(updatingTasks);
+            } catch (final TaskCorruptedException taskCorruptedException) {
+                handleTaskCorruptedException(taskCorruptedException);
+            } catch (final StreamsException streamsException) {
+                handleStreamsException(streamsException);
+            }
+            final Set<TopicPartition> completedChangelogs = changelogReader.completedChangelogs();
+            final List<Task> activeTasks = updatingTasks.values().stream().filter(Task::isActive).collect(Collectors.toList());
+            for (final Task task : activeTasks) {
+                endRestorationIfChangelogsCompletelyRead(task, completedChangelogs);
+            }
+        }
+
+        private void endRestorationIfChangelogsCompletelyRead(final Task task,
+                                                              final Set<TopicPartition> restoredChangelogs) {
+            catchException(() -> {
+                final Collection<TopicPartition> taskChangelogPartitions = task.changelogPartitions();
+                if (restoredChangelogs.containsAll(taskChangelogPartitions)) {
+                    task.completeRestoration(offsetResetter);
+                    addTaskToRestoredTasks((StreamTask) task);
+                    updatingTasks.remove(task.id());
+                }
+                return null;
+            }, task);
+        }
+
+        private void waitIfAllChangelogsCompletelyRead() throws InterruptedException {
+            if (isRunning.get() && changelogReader.allChangelogsCompleted()) {
+                tasksAndActionsLock.lock();
+                try {
+                    while (tasksAndActions.isEmpty()) {
+                        tasksAndActionsCondition.await();
+                    }
+                } finally {
+                    tasksAndActionsLock.unlock();
+                }
+            }
+        }
+
+        private List<TaskAndAction> getTasksAndActions() {
+            final List<TaskAndAction> tasksAndActionsToProcess = new ArrayList<>(tasksAndActions);
+            tasksAndActions.clear();
+            return tasksAndActionsToProcess;
+        }
+
+        private void addTask(final Task task) {

Review comment:
       I would like to suggest we keep the state transition logic inside a single class, a.k.a. the `Stream/StandbyTask` itself, while here we only do the logic based on the state but never change it's state. WDYT?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
##########
@@ -0,0 +1,390 @@
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.errors.TaskCorruptedException;
+import org.apache.kafka.streams.processor.TaskId;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+public class DefaultStateUpdater implements StateUpdater {
+
+    private final static String BUG_ERROR_MESSAGE = "This indicates a bug. " +
+        "Please report at https://issues.apache.org/jira/projects/KAFKA/issues or to the dev-mailing list (https://kafka.apache.org/contact).";
+
+    private class StateUpdaterThread extends Thread {
+
+        private final ChangelogReader changelogReader;
+        private final ConcurrentMap<TaskId, Task> updatingTasks = new ConcurrentHashMap<>();
+        private final AtomicBoolean isRunning = new AtomicBoolean(true);
+        private final java.util.function.Consumer<Set<TopicPartition>> offsetResetter;
+
+        public StateUpdaterThread(final String name,
+                                  final ChangelogReader changelogReader,
+                                  final java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
+            super(name);
+            this.changelogReader = changelogReader;
+            this.offsetResetter = offsetResetter;
+        }
+
+        @Override
+        public void run() {
+            try {
+                while (isRunning.get()) {
+                    try {
+                        performActionsOnTasks();
+                        restoreTasks();
+                        waitIfAllChangelogsCompletelyRead();
+                    } catch (final InterruptedException interruptedException) {
+                        return;
+                    }
+                }
+            } catch (final Throwable anyOtherError) {
+                // ToDo: log that the thread died unexpectedly
+            } finally{
+                clear();
+            }
+        }
+
+        private void performActionsOnTasks() throws InterruptedException {
+            tasksAndActionsLock.lock();
+            try {
+                for (final TaskAndAction taskAndAction : getTasksAndActions()) {
+                    final Task task = taskAndAction.task;
+                    final Action action = taskAndAction.action;
+                    switch (action) {
+                        case ADD:
+                            addTask(task);
+                            break;
+                        case REMOVE:
+                            removeTask(task);
+                            break;
+                        case RECYCLE:
+                            ;
+                            break;
+
+                    }
+                }
+            } finally {
+                tasksAndActionsLock.unlock();
+            }
+        }
+
+        private void restoreTasks() throws InterruptedException {
+            try {
+                // ToDo: Prioritize restoration of active tasks over standby tasks
+                //                changelogReader.enforceRestoreActive();
+                changelogReader.restore(updatingTasks);
+            } catch (final TaskCorruptedException taskCorruptedException) {
+                handleTaskCorruptedException(taskCorruptedException);
+            } catch (final StreamsException streamsException) {
+                handleStreamsException(streamsException);
+            }
+            final Set<TopicPartition> completedChangelogs = changelogReader.completedChangelogs();
+            final List<Task> activeTasks = updatingTasks.values().stream().filter(Task::isActive).collect(Collectors.toList());
+            for (final Task task : activeTasks) {
+                endRestorationIfChangelogsCompletelyRead(task, completedChangelogs);
+            }
+        }
+
+        private void endRestorationIfChangelogsCompletelyRead(final Task task,
+                                                              final Set<TopicPartition> restoredChangelogs) {
+            catchException(() -> {
+                final Collection<TopicPartition> taskChangelogPartitions = task.changelogPartitions();
+                if (restoredChangelogs.containsAll(taskChangelogPartitions)) {
+                    task.completeRestoration(offsetResetter);
+                    addTaskToRestoredTasks((StreamTask) task);
+                    updatingTasks.remove(task.id());
+                }
+                return null;
+            }, task);
+        }
+
+        private void waitIfAllChangelogsCompletelyRead() throws InterruptedException {
+            if (isRunning.get() && changelogReader.allChangelogsCompleted()) {
+                tasksAndActionsLock.lock();
+                try {
+                    while (tasksAndActions.isEmpty()) {
+                        tasksAndActionsCondition.await();
+                    }
+                } finally {
+                    tasksAndActionsLock.unlock();
+                }
+            }
+        }
+
+        private List<TaskAndAction> getTasksAndActions() {
+            final List<TaskAndAction> tasksAndActionsToProcess = new ArrayList<>(tasksAndActions);
+            tasksAndActions.clear();
+            return tasksAndActionsToProcess;
+        }
+
+        private void addTask(final Task task) {
+            catchException(() -> {
+                final Task.State state = task.state();
+                switch (state) {
+
+                    case CREATED:
+                        task.initializeIfNeeded();
+                        // Todo: catch exceptions and clear task timeout
+                        if (isStateless(task)) {
+                            addTaskToRestoredTasks((StreamTask) task);
+                        } else {
+                            updatingTasks.put(task.id(), task);
+                        }
+                        break;
+
+                    case SUSPENDED:
+                        task.resume();
+                        break;
+
+                    default:
+                        throw new IllegalStateException("Illegal state " + state + " while adding to the state updater. "
+                            + BUG_ERROR_MESSAGE);
+                }
+                return null;
+            }, task);
+        }
+
+        private boolean isStateless(final Task task) {
+            return task.changelogPartitions().isEmpty() && task.isActive();
+        }
+
+        private void removeTask(final Task task) {
+            catchException(() -> {
+                final Collection<TopicPartition> changelogPartitions = task.changelogPartitions();
+                if (!changelogPartitions.isEmpty()) {
+                    updatingTasks.remove(task.id());
+                    task.stateManager().checkpoint();
+                    changelogReader.unregister(changelogPartitions);
+                }
+                if (task.isActive()) {
+                    removeTaskFromRestoredTasks((StreamTask) task);
+                }
+                task.suspend();
+                task.closeClean();
+                return null;
+            }, task);
+        }
+
+        private void catchException(final Supplier<Void> codeToCheck, final Task task) {
+            try {
+                codeToCheck.get();
+            } catch (final RuntimeException exception) {
+                exceptions.add(exception);
+                updatingTasks.remove(task.id());
+            }
+        }
+
+        private void handleTaskCorruptedException(final TaskCorruptedException taskCorruptedException) {
+            exceptions.add(taskCorruptedException);
+            final Set<TaskId> corruptedTaskIds = taskCorruptedException.corruptedTasks();
+            for (final TaskId taskId : corruptedTaskIds) {
+                updatingTasks.remove(taskId);
+            }
+        }
+
+        private void handleStreamsException(final StreamsException streamsException) {
+            if (streamsException.taskId().isPresent()) {
+                final Task task = updatingTasks.get(streamsException.taskId().get());
+                task.stateManager().checkpoint();

Review comment:
       Why we want to checkpoint upon handling streams exception here?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
##########
@@ -0,0 +1,390 @@
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.errors.TaskCorruptedException;
+import org.apache.kafka.streams.processor.TaskId;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+public class DefaultStateUpdater implements StateUpdater {
+
+    private final static String BUG_ERROR_MESSAGE = "This indicates a bug. " +
+        "Please report at https://issues.apache.org/jira/projects/KAFKA/issues or to the dev-mailing list (https://kafka.apache.org/contact).";
+
+    private class StateUpdaterThread extends Thread {
+
+        private final ChangelogReader changelogReader;
+        private final ConcurrentMap<TaskId, Task> updatingTasks = new ConcurrentHashMap<>();
+        private final AtomicBoolean isRunning = new AtomicBoolean(true);
+        private final java.util.function.Consumer<Set<TopicPartition>> offsetResetter;
+
+        public StateUpdaterThread(final String name,
+                                  final ChangelogReader changelogReader,
+                                  final java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
+            super(name);
+            this.changelogReader = changelogReader;
+            this.offsetResetter = offsetResetter;
+        }
+
+        @Override
+        public void run() {
+            try {
+                while (isRunning.get()) {
+                    try {
+                        performActionsOnTasks();
+                        restoreTasks();
+                        waitIfAllChangelogsCompletelyRead();
+                    } catch (final InterruptedException interruptedException) {
+                        return;
+                    }
+                }
+            } catch (final Throwable anyOtherError) {
+                // ToDo: log that the thread died unexpectedly
+            } finally{
+                clear();
+            }
+        }
+
+        private void performActionsOnTasks() throws InterruptedException {
+            tasksAndActionsLock.lock();
+            try {
+                for (final TaskAndAction taskAndAction : getTasksAndActions()) {
+                    final Task task = taskAndAction.task;
+                    final Action action = taskAndAction.action;
+                    switch (action) {
+                        case ADD:
+                            addTask(task);
+                            break;
+                        case REMOVE:
+                            removeTask(task);
+                            break;
+                        case RECYCLE:
+                            ;
+                            break;
+
+                    }
+                }
+            } finally {
+                tasksAndActionsLock.unlock();
+            }
+        }
+
+        private void restoreTasks() throws InterruptedException {
+            try {
+                // ToDo: Prioritize restoration of active tasks over standby tasks
+                //                changelogReader.enforceRestoreActive();
+                changelogReader.restore(updatingTasks);
+            } catch (final TaskCorruptedException taskCorruptedException) {
+                handleTaskCorruptedException(taskCorruptedException);
+            } catch (final StreamsException streamsException) {
+                handleStreamsException(streamsException);
+            }
+            final Set<TopicPartition> completedChangelogs = changelogReader.completedChangelogs();
+            final List<Task> activeTasks = updatingTasks.values().stream().filter(Task::isActive).collect(Collectors.toList());
+            for (final Task task : activeTasks) {
+                endRestorationIfChangelogsCompletelyRead(task, completedChangelogs);
+            }
+        }
+
+        private void endRestorationIfChangelogsCompletelyRead(final Task task,
+                                                              final Set<TopicPartition> restoredChangelogs) {
+            catchException(() -> {
+                final Collection<TopicPartition> taskChangelogPartitions = task.changelogPartitions();
+                if (restoredChangelogs.containsAll(taskChangelogPartitions)) {
+                    task.completeRestoration(offsetResetter);
+                    addTaskToRestoredTasks((StreamTask) task);
+                    updatingTasks.remove(task.id());
+                }
+                return null;
+            }, task);
+        }
+
+        private void waitIfAllChangelogsCompletelyRead() throws InterruptedException {
+            if (isRunning.get() && changelogReader.allChangelogsCompleted()) {
+                tasksAndActionsLock.lock();
+                try {
+                    while (tasksAndActions.isEmpty()) {
+                        tasksAndActionsCondition.await();
+                    }
+                } finally {
+                    tasksAndActionsLock.unlock();
+                }
+            }
+        }
+
+        private List<TaskAndAction> getTasksAndActions() {
+            final List<TaskAndAction> tasksAndActionsToProcess = new ArrayList<>(tasksAndActions);
+            tasksAndActions.clear();
+            return tasksAndActionsToProcess;
+        }
+
+        private void addTask(final Task task) {
+            catchException(() -> {
+                final Task.State state = task.state();
+                switch (state) {
+
+                    case CREATED:
+                        task.initializeIfNeeded();
+                        // Todo: catch exceptions and clear task timeout
+                        if (isStateless(task)) {
+                            addTaskToRestoredTasks((StreamTask) task);
+                        } else {
+                            updatingTasks.put(task.id(), task);
+                        }
+                        break;
+
+                    case SUSPENDED:
+                        task.resume();
+                        break;
+
+                    default:
+                        throw new IllegalStateException("Illegal state " + state + " while adding to the state updater. "
+                            + BUG_ERROR_MESSAGE);
+                }
+                return null;
+            }, task);
+        }
+
+        private boolean isStateless(final Task task) {
+            return task.changelogPartitions().isEmpty() && task.isActive();
+        }
+
+        private void removeTask(final Task task) {
+            catchException(() -> {
+                final Collection<TopicPartition> changelogPartitions = task.changelogPartitions();
+                if (!changelogPartitions.isEmpty()) {
+                    updatingTasks.remove(task.id());
+                    task.stateManager().checkpoint();
+                    changelogReader.unregister(changelogPartitions);
+                }
+                if (task.isActive()) {
+                    removeTaskFromRestoredTasks((StreamTask) task);
+                }
+                task.suspend();
+                task.closeClean();
+                return null;
+            }, task);
+        }
+
+        private void catchException(final Supplier<Void> codeToCheck, final Task task) {
+            try {
+                codeToCheck.get();
+            } catch (final RuntimeException exception) {
+                exceptions.add(exception);
+                updatingTasks.remove(task.id());
+            }
+        }
+
+        private void handleTaskCorruptedException(final TaskCorruptedException taskCorruptedException) {
+            exceptions.add(taskCorruptedException);
+            final Set<TaskId> corruptedTaskIds = taskCorruptedException.corruptedTasks();
+            for (final TaskId taskId : corruptedTaskIds) {
+                updatingTasks.remove(taskId);
+            }
+        }
+
+        private void handleStreamsException(final StreamsException streamsException) {
+            if (streamsException.taskId().isPresent()) {
+                final Task task = updatingTasks.get(streamsException.taskId().get());
+                task.stateManager().checkpoint();
+                exceptions.add(streamsException);
+                updatingTasks.remove(task.id());
+            } else {
+                exceptions.add(streamsException);
+            }
+        }
+
+        private void addTaskToRestoredTasks(final StreamTask task) {
+            restoredActiveTasksLock.lock();
+            try {
+                restoredActiveTasks.add(task);
+                restoredActiveTasksCondition.signalAll();
+            } finally {
+                restoredActiveTasksLock.unlock();
+            }
+        }
+
+        private void removeTaskFromRestoredTasks(final StreamTask task) {
+            restoredActiveTasksLock.lock();
+            try {
+                restoredActiveTasks.remove(task);
+            } finally {
+                restoredActiveTasksLock.unlock();
+            }
+        }
+
+        public Collection<Task> getAllUpdatingTasks() {
+            return updatingTasks.values();
+        }
+
+        private void clear() {
+            tasksAndActionsLock.lock();
+            restoredActiveTasksLock.lock();
+            try {
+                tasksAndActions.clear();
+                restoredActiveTasks.clear();
+            } finally {
+                tasksAndActionsLock.unlock();
+                restoredActiveTasksLock.unlock();
+            }
+            changelogReader.clear();
+            updatingTasks.clear();
+        }
+    }
+
+    enum Action {
+        ADD,
+        REMOVE,
+        RECYCLE
+    }
+
+    private static class TaskAndAction {
+        public final Task task;
+        public final Action action;
+
+        public TaskAndAction(final Task task, final Action action) {
+            this.task = task;
+            this.action = action;
+        }
+    }
+
+    private final Time time;
+    private final Queue<TaskAndAction> tasksAndActions = new LinkedList<>();
+    private final Lock tasksAndActionsLock = new ReentrantLock();
+    private final Condition tasksAndActionsCondition = tasksAndActionsLock.newCondition();
+    private final Queue<StreamTask> restoredActiveTasks = new LinkedList<>();
+    private final Lock restoredActiveTasksLock = new ReentrantLock();
+    private final Condition restoredActiveTasksCondition = restoredActiveTasksLock.newCondition();
+    private final BlockingQueue<RuntimeException> exceptions = new LinkedBlockingQueue<>();

Review comment:
       What do you think about merging the `restoredActiveTasks` and `exceptions` into a single queue, for both reporting completed tasks and failed tasks? We can potentially extend the `TaskAndAction` to have `PROCESS` for restoration completed tasks and `ERROR` for exception cases. 
   
   Hence 1) between the main thread and the updater we just use two queues, rather than three data structures; 2) the `getRestoredActiveTasks` can be simplified without a timer, but just try to drain the next from the queue as consolidated together with `getExceptions`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #11765: [WIP] State updater implementation

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #11765:
URL: https://github.com/apache/kafka/pull/11765#discussion_r809347698



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
##########
@@ -0,0 +1,390 @@
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.errors.TaskCorruptedException;
+import org.apache.kafka.streams.processor.TaskId;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+public class DefaultStateUpdater implements StateUpdater {
+
+    private final static String BUG_ERROR_MESSAGE = "This indicates a bug. " +
+        "Please report at https://issues.apache.org/jira/projects/KAFKA/issues or to the dev-mailing list (https://kafka.apache.org/contact).";
+
+    private class StateUpdaterThread extends Thread {
+
+        private final ChangelogReader changelogReader;
+        private final ConcurrentMap<TaskId, Task> updatingTasks = new ConcurrentHashMap<>();
+        private final AtomicBoolean isRunning = new AtomicBoolean(true);
+        private final java.util.function.Consumer<Set<TopicPartition>> offsetResetter;
+
+        public StateUpdaterThread(final String name,
+                                  final ChangelogReader changelogReader,
+                                  final java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
+            super(name);
+            this.changelogReader = changelogReader;
+            this.offsetResetter = offsetResetter;
+        }
+
+        @Override
+        public void run() {
+            try {
+                while (isRunning.get()) {
+                    try {
+                        performActionsOnTasks();
+                        restoreTasks();
+                        waitIfAllChangelogsCompletelyRead();
+                    } catch (final InterruptedException interruptedException) {
+                        return;
+                    }
+                }
+            } catch (final Throwable anyOtherError) {
+                // ToDo: log that the thread died unexpectedly
+            } finally{
+                clear();
+            }
+        }
+
+        private void performActionsOnTasks() throws InterruptedException {
+            tasksAndActionsLock.lock();
+            try {
+                for (final TaskAndAction taskAndAction : getTasksAndActions()) {
+                    final Task task = taskAndAction.task;
+                    final Action action = taskAndAction.action;
+                    switch (action) {
+                        case ADD:
+                            addTask(task);
+                            break;
+                        case REMOVE:
+                            removeTask(task);
+                            break;
+                        case RECYCLE:
+                            ;
+                            break;
+
+                    }
+                }
+            } finally {
+                tasksAndActionsLock.unlock();
+            }
+        }
+
+        private void restoreTasks() throws InterruptedException {
+            try {
+                // ToDo: Prioritize restoration of active tasks over standby tasks
+                //                changelogReader.enforceRestoreActive();
+                changelogReader.restore(updatingTasks);
+            } catch (final TaskCorruptedException taskCorruptedException) {
+                handleTaskCorruptedException(taskCorruptedException);
+            } catch (final StreamsException streamsException) {
+                handleStreamsException(streamsException);
+            }
+            final Set<TopicPartition> completedChangelogs = changelogReader.completedChangelogs();
+            final List<Task> activeTasks = updatingTasks.values().stream().filter(Task::isActive).collect(Collectors.toList());
+            for (final Task task : activeTasks) {
+                endRestorationIfChangelogsCompletelyRead(task, completedChangelogs);
+            }
+        }
+
+        private void endRestorationIfChangelogsCompletelyRead(final Task task,
+                                                              final Set<TopicPartition> restoredChangelogs) {
+            catchException(() -> {
+                final Collection<TopicPartition> taskChangelogPartitions = task.changelogPartitions();
+                if (restoredChangelogs.containsAll(taskChangelogPartitions)) {
+                    task.completeRestoration(offsetResetter);
+                    addTaskToRestoredTasks((StreamTask) task);
+                    updatingTasks.remove(task.id());
+                }
+                return null;
+            }, task);
+        }
+
+        private void waitIfAllChangelogsCompletelyRead() throws InterruptedException {
+            if (isRunning.get() && changelogReader.allChangelogsCompleted()) {
+                tasksAndActionsLock.lock();
+                try {
+                    while (tasksAndActions.isEmpty()) {
+                        tasksAndActionsCondition.await();
+                    }
+                } finally {
+                    tasksAndActionsLock.unlock();
+                }
+            }
+        }
+
+        private List<TaskAndAction> getTasksAndActions() {
+            final List<TaskAndAction> tasksAndActionsToProcess = new ArrayList<>(tasksAndActions);
+            tasksAndActions.clear();
+            return tasksAndActionsToProcess;
+        }
+
+        private void addTask(final Task task) {
+            catchException(() -> {
+                final Task.State state = task.state();
+                switch (state) {
+
+                    case CREATED:
+                        task.initializeIfNeeded();
+                        // Todo: catch exceptions and clear task timeout
+                        if (isStateless(task)) {
+                            addTaskToRestoredTasks((StreamTask) task);
+                        } else {
+                            updatingTasks.put(task.id(), task);
+                        }
+                        break;
+
+                    case SUSPENDED:
+                        task.resume();
+                        break;
+
+                    default:
+                        throw new IllegalStateException("Illegal state " + state + " while adding to the state updater. "
+                            + BUG_ERROR_MESSAGE);
+                }
+                return null;
+            }, task);
+        }
+
+        private boolean isStateless(final Task task) {
+            return task.changelogPartitions().isEmpty() && task.isActive();
+        }
+
+        private void removeTask(final Task task) {
+            catchException(() -> {
+                final Collection<TopicPartition> changelogPartitions = task.changelogPartitions();
+                if (!changelogPartitions.isEmpty()) {
+                    updatingTasks.remove(task.id());
+                    task.stateManager().checkpoint();
+                    changelogReader.unregister(changelogPartitions);
+                }
+                if (task.isActive()) {
+                    removeTaskFromRestoredTasks((StreamTask) task);
+                }
+                task.suspend();
+                task.closeClean();
+                return null;
+            }, task);
+        }
+
+        private void catchException(final Supplier<Void> codeToCheck, final Task task) {
+            try {
+                codeToCheck.get();
+            } catch (final RuntimeException exception) {
+                exceptions.add(exception);
+                updatingTasks.remove(task.id());
+            }
+        }
+
+        private void handleTaskCorruptedException(final TaskCorruptedException taskCorruptedException) {
+            exceptions.add(taskCorruptedException);
+            final Set<TaskId> corruptedTaskIds = taskCorruptedException.corruptedTasks();
+            for (final TaskId taskId : corruptedTaskIds) {
+                updatingTasks.remove(taskId);
+            }
+        }
+
+        private void handleStreamsException(final StreamsException streamsException) {
+            if (streamsException.taskId().isPresent()) {
+                final Task task = updatingTasks.get(streamsException.taskId().get());
+                task.stateManager().checkpoint();
+                exceptions.add(streamsException);
+                updatingTasks.remove(task.id());
+            } else {
+                exceptions.add(streamsException);
+            }
+        }
+
+        private void addTaskToRestoredTasks(final StreamTask task) {
+            restoredActiveTasksLock.lock();
+            try {
+                restoredActiveTasks.add(task);
+                restoredActiveTasksCondition.signalAll();
+            } finally {
+                restoredActiveTasksLock.unlock();
+            }
+        }
+
+        private void removeTaskFromRestoredTasks(final StreamTask task) {
+            restoredActiveTasksLock.lock();
+            try {
+                restoredActiveTasks.remove(task);
+            } finally {
+                restoredActiveTasksLock.unlock();
+            }
+        }
+
+        public Collection<Task> getAllUpdatingTasks() {
+            return updatingTasks.values();
+        }
+
+        private void clear() {
+            tasksAndActionsLock.lock();
+            restoredActiveTasksLock.lock();
+            try {
+                tasksAndActions.clear();
+                restoredActiveTasks.clear();
+            } finally {
+                tasksAndActionsLock.unlock();
+                restoredActiveTasksLock.unlock();
+            }
+            changelogReader.clear();
+            updatingTasks.clear();
+        }
+    }
+
+    enum Action {
+        ADD,
+        REMOVE,
+        RECYCLE
+    }
+
+    private static class TaskAndAction {
+        public final Task task;
+        public final Action action;
+
+        public TaskAndAction(final Task task, final Action action) {
+            this.task = task;
+            this.action = action;
+        }
+    }
+
+    private final Time time;
+    private final Queue<TaskAndAction> tasksAndActions = new LinkedList<>();
+    private final Lock tasksAndActionsLock = new ReentrantLock();
+    private final Condition tasksAndActionsCondition = tasksAndActionsLock.newCondition();
+    private final Queue<StreamTask> restoredActiveTasks = new LinkedList<>();
+    private final Lock restoredActiveTasksLock = new ReentrantLock();
+    private final Condition restoredActiveTasksCondition = restoredActiveTasksLock.newCondition();
+    private final BlockingQueue<RuntimeException> exceptions = new LinkedBlockingQueue<>();

Review comment:
       Originally, I thought similarly, but then I realized that we there is not always a 1:1 relationship between exception and task. For example, `TaskCorruptedException` may apply to multiple tasks and other exceptions (I do not remember which) does not apply to any task.
   
   What is the problem with the timer on `getRestoredActiveTasks()`? I thought with this we can avoid kind of a busy wait in the poll loop during which we would call `mainConsumer.poll()` quite often. 
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #11765: [WIP] State updater implementation

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #11765:
URL: https://github.com/apache/kafka/pull/11765#issuecomment-1040246456


   @guozhangwang Here is my code for putting restoration on a separate thread so far.
   The idea that I had is to give a reference to the state updater to the `Tasks` container. The `Tasks` container uses this reference to retrieve all active tasks that are in restoration and all standby tasks. That means, methods like `allTasks()` need to be modified to get their result from the state updater and the other data structures in the `Tasks` container. An important invariant is that a task cannot exist in the state updater and a data structure other than the state updater in the `Tasks` container at the same time. A task is either in one or the other. A typical lifecycle of an active task would then be: creation -> added to state updater ->  restoration is done -> removed from state updater (by calling `getRestoredActiveTasks()`) -> added to the appropriate data structure in the `Tasks` container. Standby tasks are created and put in the state updater and stay there. Tasks are recycled (i.e. transformed from active to standby or vice versa) in the state updater.
   
   ToDos:
   - The unit tests of the state updater are failing.
   - The code to recycle tasks are not yet in the state updater
   - The modifications to the `Tasks` container are still missing
   - Integration of the state updater in the normal processing.
   
   The first two points can be done in this PR. The third point should be done in a separate PR since this is already quite big. The last point is also a separate PR.   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org