You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/09/19 01:53:04 UTC

[GitHub] [kafka] guozhangwang opened a new pull request, #12659: [DO NOT MERGE] KAFKA-10199: Integrate Topology Pause/Resume with StateUpdater

guozhangwang opened a new pull request, #12659:
URL: https://github.com/apache/kafka/pull/12659

   When a topology is paused / resumed, we also need to pause / resume its corresponding tasks inside state updater.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12659: [DO NOT MERGE] KAFKA-10199: Integrate Topology Pause/Resume with StateUpdater

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12659:
URL: https://github.com/apache/kafka/pull/12659#discussion_r974503648


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -785,6 +787,22 @@ private void addTasksToStateUpdater() {
         }
     }
 
+    private void pauseTasksInStateUpdater() {
+        for (final Task task : stateUpdater.getUpdatingTasks()) {

Review Comment:
   I'm a bit concerned about perf here since this is called for each iteration, and it will construct a list of read-only tasks hence generating a lot of young gen for GCs, but other than the current approach I have not got to a better solution.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on pull request #12659: [DO NOT MERGE] KAFKA-10199: Integrate Topology Pause/Resume with StateUpdater

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on PR #12659:
URL: https://github.com/apache/kafka/pull/12659#issuecomment-1250464383

   cc @cadonna please let me know if you like the current approach i.e. exposing updating / paused tasks from state updater, which may be stale with newly paused / resumed tasks (and they will be handled in the next iteration then). If it lgty I will add unit tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on pull request #12659: KAFKA-10199: Integrate Topology Pause/Resume with StateUpdater

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on PR #12659:
URL: https://github.com/apache/kafka/pull/12659#issuecomment-1259877050

   After discussed with @cadonna offline, we decide to first try with the simpler approach which let the state updater thread periodically check the source-of-truth topology metadata directly and pause / resume / checkpoint tasks. If pause / resume become a popular feature and the asynchronous procedure's delay becomes confusing to users, we would switch to a smaller-latency event-driven mechanism.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12659: [DO NOT MERGE] KAFKA-10199: Integrate Topology Pause/Resume with StateUpdater

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12659:
URL: https://github.com/apache/kafka/pull/12659#discussion_r976844948


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -785,6 +787,22 @@ private void addTasksToStateUpdater() {
         }
     }
 
+    private void pauseTasksInStateUpdater() {
+        for (final Task task : stateUpdater.getUpdatingTasks()) {

Review Comment:
   Thanks @cadonna that aligns with my alternative idea. I was concerned if exposing the topology metadata inside state updater would be too intrusive, but maybe that's a better trade-off for perf.
   
   I will follow that route in this PR and complete test coverage then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12659: KAFKA-10199: Integrate Topology Pause/Resume with StateUpdater

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12659:
URL: https://github.com/apache/kafka/pull/12659#discussion_r982161213


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java:
##########
@@ -170,6 +144,17 @@ public int hashCode() {
      */
     Set<Task> getTasks();
 
+    /**
+     * Gets all tasks that are currently being restored inside the state updater.
+     *
+     * Tasks that have just being added into the state updater via {@link StateUpdater#add(Task)}
+     * or have restored completely or removed will not be returned; similarly tasks that have just being
+     * removed via {@link StateUpdater#remove(TaskId)} maybe returned still.
+     *
+     * @return set of all updating tasks inside the state updater
+     */
+    Set<Task> getUpdatingTasks();

Review Comment:
   Could you please also remove `getActiveTasks()` from the interface?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12659: [DO NOT MERGE] KAFKA-10199: Integrate Topology Pause/Resume with StateUpdater

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12659:
URL: https://github.com/apache/kafka/pull/12659#discussion_r974686659


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -785,6 +787,22 @@ private void addTasksToStateUpdater() {
         }
     }
 
+    private void pauseTasksInStateUpdater() {
+        for (final Task task : stateUpdater.getUpdatingTasks()) {

Review Comment:
   Alternative idea is to add a `pauseTasks(topologyMetadata)` / `resumeTasks(topologyMetadata)` to replace the `pauseTask/resumeTask` by task ids, but that would a bit intrusive as introducing topologyMetadata to the state updater..



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12659: KAFKA-10199: Integrate Topology Pause/Resume with StateUpdater

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12659:
URL: https://github.com/apache/kafka/pull/12659#discussion_r977283462


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -785,6 +787,22 @@ private void addTasksToStateUpdater() {
         }
     }
 
+    private void pauseTasksInStateUpdater() {
+        for (final Task task : stateUpdater.getUpdatingTasks()) {

Review Comment:
   Haha, when I wrote my reply I did not see your alternative idea because the page was not refreshed. That means, we came to similar ideas which is a good sign. 🙂  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12659: [DO NOT MERGE] KAFKA-10199: Integrate Topology Pause/Resume with StateUpdater

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12659:
URL: https://github.com/apache/kafka/pull/12659#discussion_r976201996


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -785,6 +787,22 @@ private void addTasksToStateUpdater() {
         }
     }
 
+    private void pauseTasksInStateUpdater() {
+        for (final Task task : stateUpdater.getUpdatingTasks()) {

Review Comment:
   Actually, read-only tasks are currently only generated in `getTasks()` which is not called by `getUpdatingTasks()`. However, if we decide to expose `getUpdatingTasks()` we should also generate read-only tasks there.   
   
   We could pause topologies instead of tasks in the state updater. Instead of `pause(task)` and `resume(task)`, we would then have `pause(topology)` and `resume(topology)` in the state updater interface. Within the default updater we could store the paused topologies and process tasks according to whether they are part of a paused topology or not.
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12659: [DO NOT MERGE] KAFKA-10199: Integrate Topology Pause/Resume with StateUpdater

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12659:
URL: https://github.com/apache/kafka/pull/12659#discussion_r974503648


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -785,6 +787,22 @@ private void addTasksToStateUpdater() {
         }
     }
 
+    private void pauseTasksInStateUpdater() {
+        for (final Task task : stateUpdater.getUpdatingTasks()) {

Review Comment:
   I'm a bit concerned about perf here since this is called for each iteration, and to be safer we'd need to wrap each as read-only task as we did for `getTasks`, hence generating a lot of young gen for GCs, but other than the current approach I have not got to a better solution.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12659: KAFKA-10199: Integrate Topology Pause/Resume with StateUpdater

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12659:
URL: https://github.com/apache/kafka/pull/12659#discussion_r979883060


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -500,23 +500,29 @@ public void remove(final TaskId taskId) {
         }
     }
 
-    @Override
-    public void pause(final TaskId taskId) {
+    public void pause(final TopologyMetadata topologyMetadata) {
         tasksAndActionsLock.lock();
         try {
-            tasksAndActions.add(TaskAndAction.createPauseTask(taskId));
-            tasksAndActionsCondition.signalAll();
+            for (final Task task : getUpdatingTasks()) {
+                if (topologyMetadata.isPaused(task.id().topologyName())) {
+                    tasksAndActions.add(TaskAndAction.createPauseTask(task.id()));
+                    tasksAndActionsCondition.signalAll();
+                }
+            }

Review Comment:
   @guozhangwang I think, we misunderstood each other. Sorry if I was not clear enough.
   My proposal was to store the names of the paused topologies in the state updater thread, like:
   
   ```
   public class DefaultStateUpdater implements StateUpdater {
   ...
       private class StateUpdaterThread extends Thread {
       ...
           private Set<String> pausedTopologies = ConcurrentHashMap.newKeySet();
   
           private Set<String> getPausedTopologies() {
               return Collections.unmodifiableSet(pausedTopologies);
           }
   ```
   
   The `StateUpdater` (and `DefaultStateUpdater`) would then have the following method:
   
   ```
   public class DefaultStateUpdater implements StateUpdater {
   ...
       @Override
       public syncPausedTopologies(final Set<String> pausedTopologies) {
           final Set<String> stateUpdaterPausedTopologies = getPausedTopologies();
           final Set<String> pausedTopologiesToAdd = pausedTopologies.removeAll(stateUpdaterPausedTopologies);
           final Set<String> pausedTopologiesToRemove = stateUpdaterPausedTopologies.removeAll(pausedTopologies);
           if (!pausedTopologiesToAdd.isEmpty()) {  
               tasksAndActions.addPausedTopology(TaskAndAction.createAddPausedTopology(pausedTopologiesToAdd));
           }
           if (!pausedTopologiesToRemove.isEmpty()) {  
               tasksAndActions.removePausedTopology(TaskAndAction.createRemovePausedTopology(pausedTopologiesToRemove));
           }
       }
   ``` 
   
   In this way, we would not need to iterate over the tasks in the state updater in regular intervals but only if the set of paused topologies changed. 
   When we add a task to the state updater, the state updater thread needs to check if the topology of the task is paused or not.
   
   ```
   public class DefaultStateUpdater implements StateUpdater {
   ...
       private class StateUpdaterThread extends Thread {
       ...
           private void addTask(final Task task) {
                if (isStateless(task)) {
                   addToRestoredTasks((StreamTask) task);
                   log.info("Stateless active task " + task.id() + " was added to the restored tasks of the state updater");
               } else {
                   if (pausedTopologies.contains(task.topologyName())) {
                       // add to paused tasks
                   } else {
                      // add to updating tasks 
                   }
               }
   ``` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12659: KAFKA-10199: Integrate Topology Pause/Resume with StateUpdater

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12659:
URL: https://github.com/apache/kafka/pull/12659#discussion_r980338228


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java:
##########
@@ -170,6 +174,27 @@ public int hashCode() {
      */
     Set<Task> getTasks();
 
+    /**
+     * Gets all tasks that are currently being restored inside the state updater.
+     *
+     * Tasks that have just being added into the state updater via {@link StateUpdater#add(Task)}
+     * or have restored completely or removed will not be returned; similarly tasks that have just being
+     * removed via {@link StateUpdater#remove(TaskId)} maybe returned still.
+     *
+     * @return set of all updating tasks inside the state updater
+     */
+    Set<Task> getUpdatingTasks();
+
+    /**
+     * Gets all tasks that are paused from restoring inside the state updater.
+     *
+     * Tasks that have just being paused in the state updater via {@link StateUpdater#pause(TaskId)}
+     * or have restored completely or removed will not be returned.
+     *
+     * @return set of all tasks paused inside the state updater
+     */
+    Set<Task> getPausedTasks();
+

Review Comment:
   Cool, I think I'm convinced, I will remove those APIs then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on pull request #12659: KAFKA-10199: Integrate Topology Pause/Resume with StateUpdater

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on PR #12659:
URL: https://github.com/apache/kafka/pull/12659#issuecomment-1254220185

   @cadonna ready for another quick look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12659: KAFKA-10199: Integrate Topology Pause/Resume with StateUpdater

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12659:
URL: https://github.com/apache/kafka/pull/12659#discussion_r978255189


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -500,23 +500,29 @@ public void remove(final TaskId taskId) {
         }
     }
 
-    @Override
-    public void pause(final TaskId taskId) {
+    public void pause(final TopologyMetadata topologyMetadata) {
         tasksAndActionsLock.lock();
         try {
-            tasksAndActions.add(TaskAndAction.createPauseTask(taskId));
-            tasksAndActionsCondition.signalAll();
+            for (final Task task : getUpdatingTasks()) {
+                if (topologyMetadata.isPaused(task.id().topologyName())) {
+                    tasksAndActions.add(TaskAndAction.createPauseTask(task.id()));
+                    tasksAndActionsCondition.signalAll();
+                }
+            }

Review Comment:
   Hi @cadonna I tried out this idea but it turns out more complicated than expected. The main issue is that if we store the paused topology outside the restore thread, and only let the caller thread to modify it, the restore thread still need to access it while handling add-task; and vice versa if we store the paused topologies inside the restore thread and let that thread to modify it upon handling pause/resume-task, the caller thread still need to access it to determine if a new action needs to be enqueued. In either case we'd construct two communication channels between the two.
   
   In the end I chose a different route, which is directly pass in the `topologyMetadata` to StateUpdater, and then periodically check that topology's paused metadata to pause / resume tasks. As a result we would also remove pause/resume-task actions as a queue item. Right now I just piggy-backed it with checkpointing but I'm okay with doing it separately, periodically. After some pondering I feel this is actually simpler than passing pause/resume actions via channels.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12659: KAFKA-10199: Integrate Topology Pause/Resume with StateUpdater

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12659:
URL: https://github.com/apache/kafka/pull/12659#discussion_r978255351


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java:
##########
@@ -170,6 +174,27 @@ public int hashCode() {
      */
     Set<Task> getTasks();
 
+    /**
+     * Gets all tasks that are currently being restored inside the state updater.
+     *
+     * Tasks that have just being added into the state updater via {@link StateUpdater#add(Task)}
+     * or have restored completely or removed will not be returned; similarly tasks that have just being
+     * removed via {@link StateUpdater#remove(TaskId)} maybe returned still.
+     *
+     * @return set of all updating tasks inside the state updater
+     */
+    Set<Task> getUpdatingTasks();
+
+    /**
+     * Gets all tasks that are paused from restoring inside the state updater.
+     *
+     * Tasks that have just being paused in the state updater via {@link StateUpdater#pause(TaskId)}
+     * or have restored completely or removed will not be returned.
+     *
+     * @return set of all tasks paused inside the state updater
+     */
+    Set<Task> getPausedTasks();
+

Review Comment:
   I thought about this a bit, and I think it's still valuable for 1) testing purposes, and 2) metrics in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12659: KAFKA-10199: Integrate Topology Pause/Resume with StateUpdater

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12659:
URL: https://github.com/apache/kafka/pull/12659#discussion_r977372667


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -500,23 +500,29 @@ public void remove(final TaskId taskId) {
         }
     }
 
-    @Override
-    public void pause(final TaskId taskId) {
+    public void pause(final TopologyMetadata topologyMetadata) {
         tasksAndActionsLock.lock();
         try {
-            tasksAndActions.add(TaskAndAction.createPauseTask(taskId));
-            tasksAndActionsCondition.signalAll();
+            for (final Task task : getUpdatingTasks()) {
+                if (topologyMetadata.isPaused(task.id().topologyName())) {
+                    tasksAndActions.add(TaskAndAction.createPauseTask(task.id()));
+                    tasksAndActionsCondition.signalAll();
+                }
+            }

Review Comment:
   Wouldn't it be better to store paused topologies within the state updater thread? 
   
   Consider the following scenario: 
   1. The state updater managed task A from topology X. 
   2. Topology X is paused.
   3. Task A is revoked from and task B from topology X is assigned.
   
   When the topology is paused, we would pass the topology name through an input queue event (like add or remove) to the state updater thread. Once the state updater thread processes the input queue event with the topology name to pause, it stores the name of the paused topology and pauses all tasks (i.e., task A) of the paused topology. Once task B is assigned and added to the state updater, the paused topologies are consulted and since topology X is paused task B will be paused directly. When a topology is resumed, the corresponding tasks are resumed  and the name of the resumed topology is removed from the state updater thread.
   If we expose the names of the paused topology from the state updater thread to the default state updater, we can check in `pause(topology)` if the state updater thread already knows about the paused topology and avoid creating an event in the input queue for already-known paused topologies. 
   All this would avoid to iterate over the tasks each time `checkStateUpdater` is called. We would only do a check of the paused topologies when a task is added, and loop over the task if a topology was just paused/resumed.
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12659: KAFKA-10199: Integrate Topology Pause/Resume with StateUpdater

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12659:
URL: https://github.com/apache/kafka/pull/12659#discussion_r977373789


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java:
##########
@@ -170,6 +174,27 @@ public int hashCode() {
      */
     Set<Task> getTasks();
 
+    /**
+     * Gets all tasks that are currently being restored inside the state updater.
+     *
+     * Tasks that have just being added into the state updater via {@link StateUpdater#add(Task)}
+     * or have restored completely or removed will not be returned; similarly tasks that have just being
+     * removed via {@link StateUpdater#remove(TaskId)} maybe returned still.
+     *
+     * @return set of all updating tasks inside the state updater
+     */
+    Set<Task> getUpdatingTasks();
+
+    /**
+     * Gets all tasks that are paused from restoring inside the state updater.
+     *
+     * Tasks that have just being paused in the state updater via {@link StateUpdater#pause(TaskId)}
+     * or have restored completely or removed will not be returned.
+     *
+     * @return set of all tasks paused inside the state updater
+     */
+    Set<Task> getPausedTasks();
+

Review Comment:
   We do not need to expose those anymore, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12659: KAFKA-10199: Integrate Topology Pause/Resume with StateUpdater

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12659:
URL: https://github.com/apache/kafka/pull/12659#discussion_r979837654


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java:
##########
@@ -170,6 +174,27 @@ public int hashCode() {
      */
     Set<Task> getTasks();
 
+    /**
+     * Gets all tasks that are currently being restored inside the state updater.
+     *
+     * Tasks that have just being added into the state updater via {@link StateUpdater#add(Task)}
+     * or have restored completely or removed will not be returned; similarly tasks that have just being
+     * removed via {@link StateUpdater#remove(TaskId)} maybe returned still.
+     *
+     * @return set of all updating tasks inside the state updater
+     */
+    Set<Task> getUpdatingTasks();
+
+    /**
+     * Gets all tasks that are paused from restoring inside the state updater.
+     *
+     * Tasks that have just being paused in the state updater via {@link StateUpdater#pause(TaskId)}
+     * or have restored completely or removed will not be returned.
+     *
+     * @return set of all tasks paused inside the state updater
+     */
+    Set<Task> getPausedTasks();
+

Review Comment:
   I do not think we those for testing purposes since you usually test the implementation of an interface and not the interface itself. For exmple, on `DefaultStateUpdater` that is an implementation of this interface those APIs are exposed for testing purposes. I am also not convinced about metrics since those should be recorded inside the state updater implementation. I propose to remove those APIs and add them later if needed. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12659: KAFKA-10199: Integrate Topology Pause/Resume with StateUpdater

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12659:
URL: https://github.com/apache/kafka/pull/12659#discussion_r979883060


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -500,23 +500,29 @@ public void remove(final TaskId taskId) {
         }
     }
 
-    @Override
-    public void pause(final TaskId taskId) {
+    public void pause(final TopologyMetadata topologyMetadata) {
         tasksAndActionsLock.lock();
         try {
-            tasksAndActions.add(TaskAndAction.createPauseTask(taskId));
-            tasksAndActionsCondition.signalAll();
+            for (final Task task : getUpdatingTasks()) {
+                if (topologyMetadata.isPaused(task.id().topologyName())) {
+                    tasksAndActions.add(TaskAndAction.createPauseTask(task.id()));
+                    tasksAndActionsCondition.signalAll();
+                }
+            }

Review Comment:
   @guozhangwang I think, we misunderstood each other. Sorry if I was not clear enough.
   My proposal was to store the names of the paused topologies in the state updater thread, like:
   
   ```
   public class DefaultStateUpdater implements StateUpdater {
   ...
       private class StateUpdaterThread extends Thread {
       ...
           private Set<String> pausedTopologies = ConcurrentHashMap.newKeySet();
   
           private Set<String> getPausedTopologies() {
               return Collections.unmodifiableSet(pausedTopologies);
           }
   ```
   
   The `StateUpdater` (and `DefaultStateUpdater`) then has the following method that is called in `taskManager.checkStateUpdater()`:
   
   ```
   public class DefaultStateUpdater implements StateUpdater {
   ...
       @Override
       public syncPausedTopologies(final Set<String> pausedTopologies) {
           final Set<String> stateUpdaterPausedTopologies = getPausedTopologies();
           final Set<String> pausedTopologiesToAdd = pausedTopologies.removeAll(stateUpdaterPausedTopologies);
           final Set<String> pausedTopologiesToRemove = stateUpdaterPausedTopologies.removeAll(pausedTopologies);
           if (!pausedTopologiesToAdd.isEmpty()) {  
               tasksAndActions.addPausedTopology(TaskAndAction.createAddPausedTopology(pausedTopologiesToAdd));
           }
           if (!pausedTopologiesToRemove.isEmpty()) {  
               tasksAndActions.removePausedTopology(TaskAndAction.createRemovePausedTopology(pausedTopologiesToRemove));
           }
       }
   ``` 
   
   In this way, we would not need to iterate over the tasks in the state updater in regular intervals but only if the set of paused topologies changed. 
   
   When we add a task to the state updater, the state updater thread needs to check if the topology of the task is paused or not.
   
   ```
   public class DefaultStateUpdater implements StateUpdater {
   ...
       private class StateUpdaterThread extends Thread {
       ...
           private void addTask(final Task task) {
                if (isStateless(task)) {
                   addToRestoredTasks((StreamTask) task);
                   log.info("Stateless active task " + task.id() + " was added to the restored tasks of the state updater");
               } else {
                   if (pausedTopologies.contains(task.topologyName())) {
                       // add to paused tasks
                   } else {
                      // add to updating tasks 
                   }
               }
   ``` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12659: KAFKA-10199: Integrate Topology Pause/Resume with StateUpdater

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12659:
URL: https://github.com/apache/kafka/pull/12659#discussion_r977372667


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -500,23 +500,29 @@ public void remove(final TaskId taskId) {
         }
     }
 
-    @Override
-    public void pause(final TaskId taskId) {
+    public void pause(final TopologyMetadata topologyMetadata) {
         tasksAndActionsLock.lock();
         try {
-            tasksAndActions.add(TaskAndAction.createPauseTask(taskId));
-            tasksAndActionsCondition.signalAll();
+            for (final Task task : getUpdatingTasks()) {
+                if (topologyMetadata.isPaused(task.id().topologyName())) {
+                    tasksAndActions.add(TaskAndAction.createPauseTask(task.id()));
+                    tasksAndActionsCondition.signalAll();
+                }
+            }

Review Comment:
   Wouldn't it be better to store paused topologies within the state updater thread? 
   
   Consider the following scenario: 
   1. The state updater managed task A from topology X. 
   2. Topology X is paused.
   3. Task A is revoked from and task B from topology X is assigned.
   
   When the topology is paused, we would pass the topology name through an input queue event (like add or remove) to the state updater thread. Once the state updater thread processes the input queue event with the topology name to pause, it stores the name of the paused topology and pauses all tasks (i.e., task A) of the paused topology. Once task B is assigned and added to the state updater, the paused topologies are consulted and since topology X is paused task B will be paused directly. When a topology is resumed, the corresponding tasks are resumed  and the name of the resumed topology is removed from the state updater thread.
   If we expose the names of the paused topology from the state updater thread to the default state updater, we can check in `pause(topology)` if the state updater thread already knows about the paused topology and avoid creating an event in the input queue for already-known paused topologies. 
   All this would avoid to iterate over the tasks each time `checkStateUpdater` is called. We would only do a check of the paused topologies when a task is added, and loop over the task if a topology was just paused.
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12659: KAFKA-10199: Integrate Topology Pause/Resume with StateUpdater

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12659:
URL: https://github.com/apache/kafka/pull/12659#discussion_r977372667


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -500,23 +500,29 @@ public void remove(final TaskId taskId) {
         }
     }
 
-    @Override
-    public void pause(final TaskId taskId) {
+    public void pause(final TopologyMetadata topologyMetadata) {
         tasksAndActionsLock.lock();
         try {
-            tasksAndActions.add(TaskAndAction.createPauseTask(taskId));
-            tasksAndActionsCondition.signalAll();
+            for (final Task task : getUpdatingTasks()) {
+                if (topologyMetadata.isPaused(task.id().topologyName())) {
+                    tasksAndActions.add(TaskAndAction.createPauseTask(task.id()));
+                    tasksAndActionsCondition.signalAll();
+                }
+            }

Review Comment:
   Wouldn't it be better to store paused topologies within the state updater thread? 
   
   Consider the following scenario: 
   1. The state updater managed task A from topology X. 
   2. Topology X is paused.
   3. Task A is revoked from and task B from topology X is assigned.
   When the topology is paused, we would pass the topology name through an input queue event (like add or remove) to the state updater thread. Once the state updater thread processes the input queue event with the topology name to pause, it stores the name of the paused topology and pauses all tasks (i.e., task A) of the paused topology. Once task B is assigned and added to the state updater, the paused topologies are consulted and since topology X is paused task B will be paused directly. When a topology is resumed, the corresponding tasks are resumed  and the name of the resumed topology is removed from the state updater thread.
   If we expose the names of the paused topology from the state updater thread to the default state updater, we can check in `pause(topology)` if the state updater thread already knows about the paused topology and avoid creating an event in the input queue for already-known paused topologies. 
   All this would avoid to iterate over the tasks each time `checkStateUpdater` is called. We would only do a check of the paused topologies when a task is added, and loop over the task if a topology was just paused.
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12659: KAFKA-10199: Integrate Topology Pause/Resume with StateUpdater

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12659:
URL: https://github.com/apache/kafka/pull/12659#discussion_r980337970


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -500,23 +500,29 @@ public void remove(final TaskId taskId) {
         }
     }
 
-    @Override
-    public void pause(final TaskId taskId) {
+    public void pause(final TopologyMetadata topologyMetadata) {
         tasksAndActionsLock.lock();
         try {
-            tasksAndActions.add(TaskAndAction.createPauseTask(taskId));
-            tasksAndActionsCondition.signalAll();
+            for (final Task task : getUpdatingTasks()) {
+                if (topologyMetadata.isPaused(task.id().topologyName())) {
+                    tasksAndActions.add(TaskAndAction.createPauseTask(task.id()));
+                    tasksAndActionsCondition.signalAll();
+                }
+            }

Review Comment:
   Hi @cadonna I actually did exactly this with a concurrent hash map keysets :) The main issue is because the `pausedTopologies` would be modified by the main thread, while it would be checked upon `add` by the restore thread, there's a race condition complexity here since the main thread has a sequence of 1) check the paused topology concurrent set, 2) create pause/resume tasks into the queue, 3) update the paused topology concurrent set. And with restore thread checking the paused topology set concurrently, we could have many racing complexities unless we synchronize the whole section of 1/2/3 steps above. It's definitely doable, but it's introducing a lot of edge cases handling when getting an action from the queue.
   
   Introducing the whole `topology metadata` into the state updater on the other hand could delay the syncing of paused topologies as we only check it infrequently, but we are asynchronously updating the paused topologies anyways, and I feel its less complex.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang merged pull request #12659: KAFKA-10199: Integrate Topology Pause/Resume with StateUpdater

Posted by GitBox <gi...@apache.org>.
guozhangwang merged PR #12659:
URL: https://github.com/apache/kafka/pull/12659


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org