You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/08/28 02:50:23 UTC

[GitHub] [kafka] ableegoldman commented on a change in pull request #11076: KAFKA-12486: Enforce Rebalance when a TaskCorruptedException is throw…

ableegoldman commented on a change in pull request #11076:
URL: https://github.com/apache/kafka/pull/11076#discussion_r697793854



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -590,7 +590,14 @@ boolean runLoop() {
                 log.warn("Detected the states of tasks " + e.corruptedTasks() + " are corrupted. " +
                          "Will close the task as dirty and re-create and bootstrap from scratch.", e);
                 try {
+                    // check if any active task got corrupted. We will trigger a rebalance in that case.
+                    // once the task corruptions have been handled
+                    Set<Task> corruptedActiveTasks = taskManager.anyActiveTasksCorrupted(e.corruptedTasks());
                     taskManager.handleCorruption(e.corruptedTasks());
+                    if (!corruptedActiveTasks.isEmpty()) {

Review comment:
       Note that we actually only clear the corrupted state in EOS applications (don't ask me whether I agree with this 😒 ). So we should add a `&& eosEnabled` to the condition for rebalancing. 

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -590,7 +590,14 @@ boolean runLoop() {
                 log.warn("Detected the states of tasks " + e.corruptedTasks() + " are corrupted. " +
                          "Will close the task as dirty and re-create and bootstrap from scratch.", e);
                 try {
+                    // check if any active task got corrupted. We will trigger a rebalance in that case.
+                    // once the task corruptions have been handled
+                    Set<Task> corruptedActiveTasks = taskManager.anyActiveTasksCorrupted(e.corruptedTasks());

Review comment:
       nit: instead of trying to get the actual set of corrupted active tasks just to check whether its empty or not, how about we make `TaskManager#handleCorruption` return a boolean instead that tells the StreamThread whether or not to trigger a rebalance. For one thing we happen to already sort out the active tasks from the standbys during that method so we can figure it out from that rather than recomputing this in the thread. For another, it's probably a good idea to keep this logic consolidated in one place, in particular within the TaskManager, so we can easily expand on this condition eg to further optimize based on task metadata or so on. Thoughts?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org