You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/07/06 10:33:30 UTC

[GitHub] [kafka] cadonna opened a new pull request, #12384: KAFKA-10199: Add methods to add and remove tasks to task manager

cadonna opened a new pull request, #12384:
URL: https://github.com/apache/kafka/pull/12384

   To integrate the state updater into the current code, we need the
   ability to add and remove tasks from the task manager. This
   functionality is needed to ensure that a task is managed either
   by the task manager or by the state updater but not by both.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12384: KAFKA-10199: Add methods to add and remove tasks to task manager

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12384:
URL: https://github.com/apache/kafka/pull/12384#discussion_r914709611


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -798,12 +798,12 @@ private void closeTaskDirty(final Task task) {
         } catch (final RuntimeException swallow) {
             log.error("Error suspending dirty task {} ", task.id(), swallow);
         }
-        tasks.removeTaskBeforeClosing(task.id());
+        tasks.removeTask(task.id());

Review Comment:
   Renaming to a more appropriate name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12384: KAFKA-10199: Add methods to add and remove tasks to task manager

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12384:
URL: https://github.com/apache/kafka/pull/12384#discussion_r914709320


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java:
##########
@@ -82,7 +82,7 @@ int process(final int maxNumRecords, final Time time) {
                 }
             } catch (final Throwable t) {
                 taskExecutionMetadata.registerTaskError(task, t, now);
-                tasks.removeTaskFromCuccessfullyProcessedBeforeClosing(lastProcessed);
+                tasks.removeTaskFromSuccessfullyProcessedBeforeClosing(lastProcessed);

Review Comment:
   Just a typo



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna closed pull request #12384: KAFKA-10199: Add methods to add and remove tasks to task manager

Posted by GitBox <gi...@apache.org>.
cadonna closed pull request #12384: KAFKA-10199: Add methods to add and remove tasks to task manager
URL: https://github.com/apache/kafka/pull/12384


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12384: KAFKA-10199: Add methods to add and remove tasks to task manager

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12384:
URL: https://github.com/apache/kafka/pull/12384#discussion_r914709967


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java:
##########
@@ -283,24 +280,6 @@ Collection<Task> notPausedTasks() {
             .collect(Collectors.toList());
     }
 
-    Set<TaskId> activeTaskIds() {
-        return readOnlyActiveTaskIds;
-    }
-
-    Set<TaskId> standbyTaskIds() {
-        return readOnlyStandbyTaskIds;
-    }
-
-    // TODO: change return type to `StreamTask`
-    Map<TaskId, Task> activeTaskMap() {
-        return readOnlyActiveTasksPerId;
-    }
-
-    // TODO: change return type to `StandbyTask`
-    Map<TaskId, Task> standbyTaskMap() {
-        return readOnlyStandbyTasksPerId;
-    }
-

Review Comment:
   Those methods are not used anywhere.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12384: KAFKA-10199: Add methods to add and remove tasks to task manager

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12384:
URL: https://github.com/apache/kafka/pull/12384#discussion_r914710195


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java:
##########
@@ -237,14 +237,6 @@ Task activeTasksForInputPartition(final TopicPartition partition) {
         return activeTasksPerPartition.get(partition);
     }
 
-    // TODO: change return type to `StandbyTask`
-    Task standbyTask(final TaskId taskId) {
-        if (!standbyTasksPerId.containsKey(taskId)) {
-            throw new IllegalStateException("Standby task unknown: " + taskId);
-        }
-        return standbyTasksPerId.get(taskId);
-    }
-

Review Comment:
   Not used anywhere.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org