You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "mjsax (via GitHub)" <gi...@apache.org> on 2023/04/28 16:54:43 UTC

[GitHub] [kafka] mjsax commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

mjsax commented on code in PR #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r832815852


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -262,6 +262,42 @@ private void closeDirtyAndRevive(final Collection<Task> taskWithChangelogs, fina
         }
     }
 
+    private void commitActiveTasks(final Set<Task> activeTasksNeedCommit, final AtomicReference<RuntimeException> activeTasksCommitException) {

Review Comment:
   Can't we reuse the existing `commitTasksAndMaybeUpdateCommittableOffsets()` ?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -342,7 +342,19 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
 
         maybeThrowTaskExceptions(taskCloseExceptions);
 
-        createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+        rebalanceInProgress = true;
+        final Collection<Task> newActiveTasks = createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+        // If there are any transactions in flight and there are newly created active tasks, commit the tasks
+        // to avoid potential long restoration times.
+        if (processingMode == EXACTLY_ONCE_V2 && threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) {
+            log.info("New active tasks were added and there is an inflight transaction. Attempting to commit tasks.");
+            final int numCommitted = commitTasksAndMaybeUpdateCommittableOffsets(tasks.allTasks(), new HashMap<>());

Review Comment:
   We pass in the second parameter to be able to handle timeout exceptions correctly (cf. other places where we call `commitTasksAndMaybeUpdateCommittableOffsets`) -- here, we don't catch `TimeoutException`. What is the impact? Seems it bubble into the consumer and crashes us?
   
   Given that we kinda need to commit, it seems there are two things we could do: either add a loop here, and retry the commit until we exceed `task.timeout.config` (not my personally preferred solution), or actually move the whole commit logic into the restore part (not sure if this is easily possible) -- ie, each time we enter the restore code, we check if there is an open TX and commit. Not sure how this would align this the new state-updated core though.
   
   \cc @cadonna @lucasbru -- can you comment?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -342,7 +342,19 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
 
         maybeThrowTaskExceptions(taskCloseExceptions);
 
-        createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+        rebalanceInProgress = true;
+        final Collection<Task> newActiveTasks = createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+        // If there are any transactions in flight and there are newly created active tasks, commit the tasks
+        // to avoid potential long restoration times.
+        if (processingMode == EXACTLY_ONCE_V2 && threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) {

Review Comment:
   Why is this limited to EOS_v2? From my understanding, EOS_v1 would have the same problem?
   
   It also only seems to be a potential issue, when we get stateful tasks assigned? Not sure if we can limit the last check to `!newActiveStatefulTasks.isEmpty()` ?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -342,7 +342,19 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
 
         maybeThrowTaskExceptions(taskCloseExceptions);
 
-        createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+        rebalanceInProgress = true;
+        final Collection<Task> newActiveTasks = createNewTasks(activeTasksToCreate, standbyTasksToCreate);

Review Comment:
   Seems we only the count (or a boolean) if empty or not, but not the full collection. Can we simplify this?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -342,7 +342,19 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
 
         maybeThrowTaskExceptions(taskCloseExceptions);
 
-        createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+        rebalanceInProgress = true;

Review Comment:
   If we set `rebalanceInProgress = true`, it seems the later call to `commitTasksAndMaybeUpdateCommittableOffsets` would exit early and return `-1`? Does not seem right?
   
   I cannot remember the full purpose of `rebalanceInProgress` flag, and given cooperator rebalancing and processing record during a rebalance, wondering if semantics actually changed?
   
   Edit: just see that there is a discussion about this below.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -342,7 +342,18 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
 
         maybeThrowTaskExceptions(taskCloseExceptions);
 
-        createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+        final Collection<Task> newActiveTasks = createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+        // If there are any transactions in flight and there are newly created active tasks, commit the tasks
+        // to avoid potential long restoration times.
+        if (processingMode == EXACTLY_ONCE_V2 && threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) {
+            log.info("New active tasks were added and there is an inflight transaction. Attempting to commit tasks.");
+            final int numCommitted = commitTasksAndMaybeUpdateCommittableOffsets(newActiveTasks, new HashMap<>());
+            if (numCommitted == -1) {

Review Comment:
   Seems we should not mess with the control flow here. Me might end up with spagetti-code. I would advocate the move the commit logic into the restore code path if possible as mentioned further above -- this way, the existing control flow is not changed and we avoid all these compilations.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -493,13 +542,22 @@ boolean tryToCompleteRestoration(final long now, final java.util.function.Consum
                     // we found a restoring task that isn't done restoring, which is evidence that
                     // not all tasks are running
                     allRunning = false;
+                    restoringTasks.add(task);
                 }
             }
         }
 
         if (allRunning) {
             // we can call resume multiple times since it is idempotent.
             mainConsumer.resume(mainConsumer.assignment());
+        } else {
+            // There are still some tasks in RESTORING phase.
+            final AtomicReference<RuntimeException> activeTasksCommitException = new AtomicReference<>(null);
+            commitActiveTasks(restoringTasks, activeTasksCommitException);

Review Comment:
   Why do we commit `restoringTasks`?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -342,7 +342,18 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
 
         maybeThrowTaskExceptions(taskCloseExceptions);
 
-        createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+        final Collection<Task> newActiveTasks = createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+        // If there are any transactions in flight and there are newly created active tasks, commit the tasks
+        // to avoid potential long restoration times.
+        if (processingMode == EXACTLY_ONCE_V2 && threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) {
+            log.info("New active tasks were added and there is an inflight transaction. Attempting to commit tasks.");

Review Comment:
   Is INFO the right log level? Seem DEBUG might be more appropriate?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org