You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "lucasbru (via GitHub)" <gi...@apache.org> on 2023/02/07 13:41:25 UTC

[GitHub] [kafka] lucasbru commented on a diff in pull request #13025: KAFKA-14299: Fix pause and resume with state updater

lucasbru commented on code in PR #13025:
URL: https://github.com/apache/kafka/pull/13025#discussion_r1097298086


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1534,7 +1540,13 @@ Set<TaskId> standbyTaskIds() {
     Map<TaskId, Task> allTasks() {
         // not bothering with an unmodifiable map, since the tasks themselves are mutable, but
         // if any outside code modifies the map or the tasks, it would be a severe transgression.
-        return tasks.allTasksPerId();
+        if (stateUpdater != null) {
+            final Map<TaskId, Task> ret = stateUpdater.getTasks().stream().collect(Collectors.toMap(Task::id, x -> x));
+            ret.putAll(tasks.allTasksPerId());
+            return ret;
+        } else {
+            return tasks.allTasksPerId();
+        }

Review Comment:
   Done



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1534,7 +1540,13 @@ Set<TaskId> standbyTaskIds() {
     Map<TaskId, Task> allTasks() {
         // not bothering with an unmodifiable map, since the tasks themselves are mutable, but
         // if any outside code modifies the map or the tasks, it would be a severe transgression.
-        return tasks.allTasksPerId();
+        if (stateUpdater != null) {
+            final Map<TaskId, Task> ret = stateUpdater.getTasks().stream().collect(Collectors.toMap(Task::id, x -> x));
+            ret.putAll(tasks.allTasksPerId());

Review Comment:
   Yes, once you merge the other one we can rebase this one.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java:
##########
@@ -190,7 +190,7 @@ public void clearTaskTimeout() {
 
     @Override
     public boolean commitNeeded() {
-        throw new UnsupportedOperationException("This task is read-only");
+        return task.commitNeeded();

Review Comment:
   `StreamThread.maybeCommit` uses `TaskManager.allTasks` that excluded tasks in the state updater before, but includes tasks in the state updater now. However, `commitNeeded` should be `false` for all those tasks right? I was scratching my head here a bit, because `StreamThread.maybeCommit` was processing `RESTORING` tasks in the old code path, but not in the state updater code path.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -258,21 +269,27 @@ private List<TaskAndAction> getTasksAndActions() {
         }
 
         private void addTask(final Task task) {
+            final TaskId taskId = task.id();
             if (isStateless(task)) {
                 addToRestoredTasks((StreamTask) task);
-                log.info("Stateless active task " + task.id() + " was added to the restored tasks of the state updater");
+                log.info("Stateless active task " + taskId + " was added to the restored tasks of the state updater");
+            } else if (topologyMetadata.isPaused(taskId.topologyName())) {
+                pausedTasks.put(taskId, task);

Review Comment:
   w.r.t. Bruno's comment: done



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -258,21 +269,27 @@ private List<TaskAndAction> getTasksAndActions() {
         }
 
         private void addTask(final Task task) {
+            final TaskId taskId = task.id();
             if (isStateless(task)) {
                 addToRestoredTasks((StreamTask) task);
-                log.info("Stateless active task " + task.id() + " was added to the restored tasks of the state updater");
+                log.info("Stateless active task " + taskId + " was added to the restored tasks of the state updater");
+            } else if (topologyMetadata.isPaused(taskId.topologyName())) {
+                pausedTasks.put(taskId, task);

Review Comment:
   The `PauseResumeIntegrationTest` is testing precisely this (that _no_ progress is made when a task is added in paused state). I also wasn't sure how important this property is for KSQL, but I suppose it is important enough that Jim wrote a unit test for it. I also think it makes sense to have the property, making a little progress on a paused task is unintuitive IMHO.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -411,6 +422,7 @@ private void checkAllUpdatingTaskStates(final long now) {
     private final Condition restoredActiveTasksCondition = restoredActiveTasksLock.newCondition();
     private final BlockingQueue<ExceptionAndTasks> exceptionsAndFailedTasks = new LinkedBlockingQueue<>();
     private final BlockingQueue<Task> removedTasks = new LinkedBlockingQueue<>();
+    private final AtomicBoolean needsResumeCheck = new AtomicBoolean(false);

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org