You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/10/25 17:04:10 UTC

[GitHub] [kafka] vamossagar12 opened a new pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

vamossagar12 opened a new pull request #11433:
URL: https://github.com/apache/kafka/pull/11433


   …tion times in EOS
   
   The PR aims to avoid transaction timeouts arising due to long restoration times of new tasks. In particular, during assignment of active tasks, if there are any open transactions that need commit, this change would ensure those are done.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #11433:
URL: https://github.com/apache/kafka/pull/11433#issuecomment-1035711101


   @ableegoldman . do you think this one is in a good shape?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #11433:
URL: https://github.com/apache/kafka/pull/11433#issuecomment-1065000470


   @showuon  made the changes. Let me know if these make sense now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r833872517



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -262,6 +262,42 @@ private void closeDirtyAndRevive(final Collection<Task> taskWithChangelogs, fina
         }
     }
 
+    private void commitActiveTasks(final Set<Task> activeTasksNeedCommit, final AtomicReference<RuntimeException> activeTasksCommitException) {
+
+        final Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsPerTask = new HashMap<>();
+        prepareCommitAndAddOffsetsToMap(activeTasksNeedCommit, consumedOffsetsPerTask);
+
+        final Set<Task> dirtyTasks = new HashSet<>();
+        try {
+            taskExecutor.commitOffsetsOrTransaction(consumedOffsetsPerTask);
+        } catch (final TaskCorruptedException e) {
+            log.warn("Some tasks were corrupted when trying to commit offsets, these will be cleaned and revived: {}",
+                    e.corruptedTasks());
+
+            // If we hit a TaskCorruptedException it must be EOS, just handle the cleanup for those corrupted tasks right here
+            dirtyTasks.addAll(tasks.tasks(e.corruptedTasks()));
+            closeDirtyAndRevive(dirtyTasks, true);
+        } catch (final RuntimeException e) {
+            log.error("Exception caught while committing active tasks: " + consumedOffsetsPerTask.keySet(), e);
+            activeTasksCommitException.compareAndSet(null, e);
+            dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
+        }
+
+        // for non-revoking active tasks, we should not enforce checkpoint
+        // as it's EOS enabled in which case no checkpoint should be written while
+        // the task is in RUNNING tate
+        for (final Task task : activeTasksNeedCommit) {
+            if (!dirtyTasks.contains(task)) {
+                try {
+                    task.postCommit(false);

Review comment:
       Seriously, I'm not quite sure if we should commit checkpoint here or not. When entering this phase, the task might be `RESTORING` state or `RUNNING` state. We can checkpoint for `RESTORING` state, but for `RUNNING` state, we might not have to. So, I was thinking we did something like this:
   ```
   task.postCommit(!task.state().equals("RUNNING"));
   ```
   But I checked again the discussion thread in JIRA, @ableegoldman  suggest we did checkpoint after committing. So, I'm wondering if we should just force to `true` here or not?
   https://issues.apache.org/jira/browse/KAFKA-13295?focusedCommentId=17429067&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17429067
   
   cc @guozhangwang @ableegoldman 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #11433:
URL: https://github.com/apache/kafka/pull/11433#issuecomment-1053493098


   @showuon sure no worries! let me know whenever you get the chance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a change in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on a change in pull request #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r739152945



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -292,12 +327,20 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
         final Set<Task> tasksToRecycle = new TreeSet<>(byId);
         final Set<Task> tasksToCloseClean = new TreeSet<>(byId);
         final Set<Task> tasksToCloseDirty = new TreeSet<>(byId);
+        boolean commitAssignedActiveTasks = false;
+        final Set<Task> activeTasksNeedCommit = new HashSet<>();
 
         // first rectify all existing tasks
         for (final Task task : tasks.allTasks()) {
             if (activeTasks.containsKey(task.id()) && task.isActive()) {
                 tasks.updateInputPartitionsAndResume(task, activeTasks.get(task.id()));
                 activeTasksToCreate.remove(task.id());
+                if (task.state() == State.RESTORING) {

Review comment:
       Thanks for the info. I will go through the `StoreChangelogReader` class to understand the relevant logic. 
   However, I am slightly confused as there are a couple of first passes in different places in your explanation :D . So, for now, if we find that activeTasksToCreate is non-empty, then we can set commitAssignedActiveTasks to true.
   
   But, for the long term, we also need to check if there are some tasks still in RESTORING at the end of TaskManager#tryToCompleteRestoration. Is that right? This particular method is called from runOnce in StreamThread. So, are you suggesting that we also add the check in this method and again check if there are active tasks needing commit and the other condition - along with the one added in handleAssignment?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #11433:
URL: https://github.com/apache/kafka/pull/11433#issuecomment-1075878024


   Just stumbled across this PR -- should we try to get it into 3.2 release? Seems to be a valuable bug-fix.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a change in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on a change in pull request #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r737354845



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -280,6 +328,13 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
                      "\tExisting standby tasks: {}",
                  activeTasks.keySet(), standbyTasks.keySet(), activeTaskIds(), standbyTaskIds());
 
+        final AtomicReference<RuntimeException> activeTasksCommitException = new AtomicReference<>(null);
+        commitActiveTasks(activeTasks, activeTasksCommitException);

Review comment:
       Yeah that makes sense. I made the relevant changes




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #11433:
URL: https://github.com/apache/kafka/pull/11433#issuecomment-1042525944


   @vamossagar12 , thanks for the PR. I'll take a look in next week.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r737016054



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -280,6 +328,13 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
                      "\tExisting standby tasks: {}",
                  activeTasks.keySet(), standbyTasks.keySet(), activeTaskIds(), standbyTaskIds());
 
+        final AtomicReference<RuntimeException> activeTasksCommitException = new AtomicReference<>(null);
+        commitActiveTasks(activeTasks, activeTasksCommitException);

Review comment:
       I recommend moving this to after the `// first rectify all existing tasks` and `// close and recycle those tasks` sections (and the exception handling section that comes after that), to make sure we first clean up/clear out any tasks that are going to be closed anyways. Then you should be able to simplify `commitActiveTasks`, for example you only need to loop through the `activeTasks` argument since this now covers all still-assigned active tasks. 
   
   Actually, maybe it would make sense to avoid doing all this in a standalone method, and instead try to do some of this bookkeeping of which/whether any tasks need to be committed while we iterate through all the `tasks` in the `// first rectify all existing tasks` section. Does that make sense?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r820068525



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -343,6 +382,16 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
                 }
             }
 
+            // We commit active tasks only if there are any active tasks which need restoring.
+            if (!activeTasksNeedCommit.isEmpty()) {
+                final AtomicReference<RuntimeException> activeTasksCommitException = new AtomicReference<>(null);
+                commitActiveTasks(activeTasksNeedCommit, activeTasksCommitException);
+
+                if (activeTasksCommitException.get() != null) {
+                    throw activeTasksCommitException.get();
+                }
+

Review comment:
       nit: extra line

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -265,6 +265,41 @@ private void closeDirtyAndRevive(final Collection<Task> taskWithChangelogs, fina
         }
     }
 
+    private void commitActiveTasks(final Set<Task> activeTasksNeedCommit, final AtomicReference<RuntimeException> activeTasksCommitException) {
+
+        final Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsPerTask = new HashMap<>();
+        prepareCommitAndAddOffsetsToMap(activeTasksNeedCommit, consumedOffsetsPerTask);
+
+        final Set<Task> dirtyTasks = new HashSet<>();
+        try {
+            commitOffsetsOrTransaction(consumedOffsetsPerTask);
+        } catch (final TaskCorruptedException e) {
+            log.warn("Some tasks were corrupted when trying to commit offsets, these will be cleaned and revived: {}",
+                    e.corruptedTasks());
+
+            // If we hit a TaskCorruptedException it must be EOS, just handle the cleanup for those corrupted tasks right here
+            dirtyTasks.addAll(tasks.tasks(e.corruptedTasks()));
+            closeDirtyAndRevive(dirtyTasks, true);
+        } catch (final RuntimeException e) {
+            log.error("Exception caught while committing active tasks " + activeTasksNeedCommit, e);

Review comment:
       Should we output the `consumedOffsetsPerTask.keySet()` instead of `activeTasksNeedCommit` here?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -265,6 +265,41 @@ private void closeDirtyAndRevive(final Collection<Task> taskWithChangelogs, fina
         }
     }
 
+    private void commitActiveTasks(final Set<Task> activeTasksNeedCommit, final AtomicReference<RuntimeException> activeTasksCommitException) {
+
+        final Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsPerTask = new HashMap<>();
+        prepareCommitAndAddOffsetsToMap(activeTasksNeedCommit, consumedOffsetsPerTask);
+
+        final Set<Task> dirtyTasks = new HashSet<>();
+        try {
+            commitOffsetsOrTransaction(consumedOffsetsPerTask);
+        } catch (final TaskCorruptedException e) {
+            log.warn("Some tasks were corrupted when trying to commit offsets, these will be cleaned and revived: {}",
+                    e.corruptedTasks());
+
+            // If we hit a TaskCorruptedException it must be EOS, just handle the cleanup for those corrupted tasks right here
+            dirtyTasks.addAll(tasks.tasks(e.corruptedTasks()));
+            closeDirtyAndRevive(dirtyTasks, true);
+        } catch (final RuntimeException e) {
+            log.error("Exception caught while committing active tasks " + activeTasksNeedCommit, e);

Review comment:
       nit:
   `log.error("Exception caught while committing active tasks: {}", activeTasksNeedCommit, e);`

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -265,6 +265,41 @@ private void closeDirtyAndRevive(final Collection<Task> taskWithChangelogs, fina
         }
     }
 
+    private void commitActiveTasks(final Set<Task> activeTasksNeedCommit, final AtomicReference<RuntimeException> activeTasksCommitException) {
+
+        final Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsPerTask = new HashMap<>();
+        prepareCommitAndAddOffsetsToMap(activeTasksNeedCommit, consumedOffsetsPerTask);
+
+        final Set<Task> dirtyTasks = new HashSet<>();
+        try {
+            commitOffsetsOrTransaction(consumedOffsetsPerTask);
+        } catch (final TaskCorruptedException e) {
+            log.warn("Some tasks were corrupted when trying to commit offsets, these will be cleaned and revived: {}",
+                    e.corruptedTasks());
+
+            // If we hit a TaskCorruptedException it must be EOS, just handle the cleanup for those corrupted tasks right here
+            dirtyTasks.addAll(tasks.tasks(e.corruptedTasks()));
+            closeDirtyAndRevive(dirtyTasks, true);
+        } catch (final RuntimeException e) {
+            log.error("Exception caught while committing active tasks " + activeTasksNeedCommit, e);
+            activeTasksCommitException.compareAndSet(null, e);
+            dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
+        }
+
+        // we enforce checkpointing upon suspending a task: if it is resumed later we just proceed normally, if it is
+        // going to be closed we would checkpoint by then

Review comment:
       I don't think the comment here is correct. We didn't `suspend` any task in `handleAssignment`, right? Based on the comments [here](https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java#L596-L599), it looks like we don't expect to force checkpoint in RUNNING state. So, I'm not quite sure we should force all checkpoint here. Thoughts?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -265,6 +265,41 @@ private void closeDirtyAndRevive(final Collection<Task> taskWithChangelogs, fina
         }
     }
 
+    private void commitActiveTasks(final Set<Task> activeTasksNeedCommit, final AtomicReference<RuntimeException> activeTasksCommitException) {
+
+        final Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsPerTask = new HashMap<>();
+        prepareCommitAndAddOffsetsToMap(activeTasksNeedCommit, consumedOffsetsPerTask);
+
+        final Set<Task> dirtyTasks = new HashSet<>();
+        try {
+            commitOffsetsOrTransaction(consumedOffsetsPerTask);
+        } catch (final TaskCorruptedException e) {
+            log.warn("Some tasks were corrupted when trying to commit offsets, these will be cleaned and revived: {}",
+                    e.corruptedTasks());
+
+            // If we hit a TaskCorruptedException it must be EOS, just handle the cleanup for those corrupted tasks right here
+            dirtyTasks.addAll(tasks.tasks(e.corruptedTasks()));
+            closeDirtyAndRevive(dirtyTasks, true);
+        } catch (final RuntimeException e) {
+            log.error("Exception caught while committing active tasks " + activeTasksNeedCommit, e);
+            activeTasksCommitException.compareAndSet(null, e);
+            dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
+        }
+
+        // we enforce checkpointing upon suspending a task: if it is resumed later we just proceed normally, if it is
+        // going to be closed we would checkpoint by then
+        for (final Task task : activeTasksNeedCommit) {
+            if (!dirtyTasks.contains(task)) {
+                try {
+                    task.postCommit(true);
+                } catch (final RuntimeException e) {
+                    log.error("Exception caught while post-committing task " + task.id(), e);

Review comment:
       nit:
   `log.error("Exception caught while post-committing task: {}", task.id(), e);`

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -265,6 +265,41 @@ private void closeDirtyAndRevive(final Collection<Task> taskWithChangelogs, fina
         }
     }
 
+    private void commitActiveTasks(final Set<Task> activeTasksNeedCommit, final AtomicReference<RuntimeException> activeTasksCommitException) {
+
+        final Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsPerTask = new HashMap<>();
+        prepareCommitAndAddOffsetsToMap(activeTasksNeedCommit, consumedOffsetsPerTask);
+
+        final Set<Task> dirtyTasks = new HashSet<>();
+        try {
+            commitOffsetsOrTransaction(consumedOffsetsPerTask);

Review comment:
       Looks like the code has been refactored in this PR: https://github.com/apache/kafka/pull/11738
   We have to call it via `taskExecutor`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r820070930



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -265,6 +265,41 @@ private void closeDirtyAndRevive(final Collection<Task> taskWithChangelogs, fina
         }
     }
 
+    private void commitActiveTasks(final Set<Task> activeTasksNeedCommit, final AtomicReference<RuntimeException> activeTasksCommitException) {
+
+        final Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsPerTask = new HashMap<>();
+        prepareCommitAndAddOffsetsToMap(activeTasksNeedCommit, consumedOffsetsPerTask);
+
+        final Set<Task> dirtyTasks = new HashSet<>();
+        try {
+            commitOffsetsOrTransaction(consumedOffsetsPerTask);

Review comment:
       Looks like the code has been refactored in this PR: https://github.com/apache/kafka/pull/11738
   We have to call it via `taskExecutor` now. Please rebase the branch. Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a change in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on a change in pull request #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r824603721



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -265,6 +265,41 @@ private void closeDirtyAndRevive(final Collection<Task> taskWithChangelogs, fina
         }
     }
 
+    private void commitActiveTasks(final Set<Task> activeTasksNeedCommit, final AtomicReference<RuntimeException> activeTasksCommitException) {
+
+        final Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsPerTask = new HashMap<>();
+        prepareCommitAndAddOffsetsToMap(activeTasksNeedCommit, consumedOffsetsPerTask);
+
+        final Set<Task> dirtyTasks = new HashSet<>();
+        try {
+            commitOffsetsOrTransaction(consumedOffsetsPerTask);
+        } catch (final TaskCorruptedException e) {
+            log.warn("Some tasks were corrupted when trying to commit offsets, these will be cleaned and revived: {}",
+                    e.corruptedTasks());
+
+            // If we hit a TaskCorruptedException it must be EOS, just handle the cleanup for those corrupted tasks right here
+            dirtyTasks.addAll(tasks.tasks(e.corruptedTasks()));
+            closeDirtyAndRevive(dirtyTasks, true);
+        } catch (final RuntimeException e) {
+            log.error("Exception caught while committing active tasks " + activeTasksNeedCommit, e);
+            activeTasksCommitException.compareAndSet(null, e);
+            dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
+        }
+
+        // we enforce checkpointing upon suspending a task: if it is resumed later we just proceed normally, if it is
+        // going to be closed we would checkpoint by then

Review comment:
       yeah ok.. that makes sense. I changed it to not to checkpoint in this case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r738872895



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -292,12 +327,20 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
         final Set<Task> tasksToRecycle = new TreeSet<>(byId);
         final Set<Task> tasksToCloseClean = new TreeSet<>(byId);
         final Set<Task> tasksToCloseDirty = new TreeSet<>(byId);
+        boolean commitAssignedActiveTasks = false;
+        final Set<Task> activeTasksNeedCommit = new HashSet<>();
 
         // first rectify all existing tasks
         for (final Task task : tasks.allTasks()) {
             if (activeTasks.containsKey(task.id()) && task.isActive()) {
                 tasks.updateInputPartitionsAndResume(task, activeTasks.get(task.id()));
                 activeTasksToCreate.remove(task.id());
+                if (task.state() == State.RESTORING) {

Review comment:
       Note that we're currently going through only the _existing_ tasks -- but we want to commit only if there are _new_ tasks which will need restoring. 
   
   Unfortunately due to the task lifecycle, specifically that all tasks pass through the `RESTORING` phase before going into `RUNNING`, it's actually nontrivial to figure out if we're going to need to actually spend any time restoring new tasks. As a first pass, for now (so we can get some kind of fix into 3.1), we can just set `commitAssignedActiveTasks = true` if there are any newly added active tasks at all. 
   
   I think that's fine for a first pass, but if you're interested in how we could check if any of the new active tasks actually need restoring, check out the `StoreChangelogReader` class. It will always do at least a single first pass to confirm that any new active tasks are all caught up, so one idea would be to just check if there are any changelogs left that need to be restored from after doing this first pass. Actually, you may not even need to worry about this "first pass",  you can just blindly do the commit if there are any active tasks that need to be committed and there are some tasks still in RESTORING at the end of TaskManager#tryToCompleteRestoration. Does that make sense? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r833872517



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -262,6 +262,42 @@ private void closeDirtyAndRevive(final Collection<Task> taskWithChangelogs, fina
         }
     }
 
+    private void commitActiveTasks(final Set<Task> activeTasksNeedCommit, final AtomicReference<RuntimeException> activeTasksCommitException) {
+
+        final Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsPerTask = new HashMap<>();
+        prepareCommitAndAddOffsetsToMap(activeTasksNeedCommit, consumedOffsetsPerTask);
+
+        final Set<Task> dirtyTasks = new HashSet<>();
+        try {
+            taskExecutor.commitOffsetsOrTransaction(consumedOffsetsPerTask);
+        } catch (final TaskCorruptedException e) {
+            log.warn("Some tasks were corrupted when trying to commit offsets, these will be cleaned and revived: {}",
+                    e.corruptedTasks());
+
+            // If we hit a TaskCorruptedException it must be EOS, just handle the cleanup for those corrupted tasks right here
+            dirtyTasks.addAll(tasks.tasks(e.corruptedTasks()));
+            closeDirtyAndRevive(dirtyTasks, true);
+        } catch (final RuntimeException e) {
+            log.error("Exception caught while committing active tasks: " + consumedOffsetsPerTask.keySet(), e);
+            activeTasksCommitException.compareAndSet(null, e);
+            dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
+        }
+
+        // for non-revoking active tasks, we should not enforce checkpoint
+        // as it's EOS enabled in which case no checkpoint should be written while
+        // the task is in RUNNING tate
+        for (final Task task : activeTasksNeedCommit) {
+            if (!dirtyTasks.contains(task)) {
+                try {
+                    task.postCommit(false);

Review comment:
       Seriously, I'm not quite sure if we should commit checkpoint here or not. When entering this phase, the task might be `RESTORING` state or `RUNNING` state or others, maybe. We can checkpoint for `RESTORING` state, but for `RUNNING` state, we might not have to. So, I was thinking we did something like this:
   ```
   task.postCommit(!task.state().equals("RUNNING"));
   ```
   But I checked again the discussion thread in JIRA, @ableegoldman  suggest we did checkpoint after committing. So, I'm wondering if we should just force to `true` here or not?
   https://issues.apache.org/jira/browse/KAFKA-13295?focusedCommentId=17429067&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17429067
   
   cc @guozhangwang @ableegoldman 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a change in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on a change in pull request #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r786180105



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -292,12 +327,20 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
         final Set<Task> tasksToRecycle = new TreeSet<>(byId);
         final Set<Task> tasksToCloseClean = new TreeSet<>(byId);
         final Set<Task> tasksToCloseDirty = new TreeSet<>(byId);
+        boolean commitAssignedActiveTasks = false;
+        final Set<Task> activeTasksNeedCommit = new HashSet<>();
 
         // first rectify all existing tasks
         for (final Task task : tasks.allTasks()) {
             if (activeTasks.containsKey(task.id()) && task.isActive()) {
                 tasks.updateInputPartitionsAndResume(task, activeTasks.get(task.id()));
                 activeTasksToCreate.remove(task.id());
+                if (task.state() == State.RESTORING) {

Review comment:
       hey @ableegoldman , can you plz take a look at this one whenever you get the chance..




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #11433:
URL: https://github.com/apache/kafka/pull/11433#issuecomment-1050463207


   Sorry, I'll review the PR to upgrade to log4j 2 first. That has higher priority. I'll check your PR after that. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #11433:
URL: https://github.com/apache/kafka/pull/11433#issuecomment-1075887120


   Oh, sorry, totally forgot about this PR. I'll take a look again this week. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a change in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on a change in pull request #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r745870914



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -292,12 +327,20 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
         final Set<Task> tasksToRecycle = new TreeSet<>(byId);
         final Set<Task> tasksToCloseClean = new TreeSet<>(byId);
         final Set<Task> tasksToCloseDirty = new TreeSet<>(byId);
+        boolean commitAssignedActiveTasks = false;
+        final Set<Task> activeTasksNeedCommit = new HashSet<>();
 
         // first rectify all existing tasks
         for (final Task task : tasks.allTasks()) {
             if (activeTasks.containsKey(task.id()) && task.isActive()) {
                 tasks.updateInputPartitionsAndResume(task, activeTasks.get(task.id()));
                 activeTasksToCreate.remove(task.id());
+                if (task.state() == State.RESTORING) {

Review comment:
       @ableegoldman , could you plz help me out with this above question. I feel we can close this PR off as the changes seem minimal




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a change in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on a change in pull request #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r833912049



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -262,6 +262,42 @@ private void closeDirtyAndRevive(final Collection<Task> taskWithChangelogs, fina
         }
     }
 
+    private void commitActiveTasks(final Set<Task> activeTasksNeedCommit, final AtomicReference<RuntimeException> activeTasksCommitException) {
+
+        final Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsPerTask = new HashMap<>();
+        prepareCommitAndAddOffsetsToMap(activeTasksNeedCommit, consumedOffsetsPerTask);
+
+        final Set<Task> dirtyTasks = new HashSet<>();
+        try {
+            taskExecutor.commitOffsetsOrTransaction(consumedOffsetsPerTask);
+        } catch (final TaskCorruptedException e) {
+            log.warn("Some tasks were corrupted when trying to commit offsets, these will be cleaned and revived: {}",
+                    e.corruptedTasks());
+
+            // If we hit a TaskCorruptedException it must be EOS, just handle the cleanup for those corrupted tasks right here
+            dirtyTasks.addAll(tasks.tasks(e.corruptedTasks()));
+            closeDirtyAndRevive(dirtyTasks, true);
+        } catch (final RuntimeException e) {
+            log.error("Exception caught while committing active tasks: " + consumedOffsetsPerTask.keySet(), e);
+            activeTasksCommitException.compareAndSet(null, e);
+            dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
+        }
+
+        // for non-revoking active tasks, we should not enforce checkpoint
+        // as it's EOS enabled in which case no checkpoint should be written while
+        // the task is in RUNNING tate
+        for (final Task task : activeTasksNeedCommit) {
+            if (!dirtyTasks.contains(task)) {
+                try {
+                    task.postCommit(false);

Review comment:
       Thanks @showuon . Well actually this was true before and based on the last review you had suggested to be set it to false that's why I toggled it. Yeah going by that JIRA, it could be left to true. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a change in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on a change in pull request #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r833912049



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -262,6 +262,42 @@ private void closeDirtyAndRevive(final Collection<Task> taskWithChangelogs, fina
         }
     }
 
+    private void commitActiveTasks(final Set<Task> activeTasksNeedCommit, final AtomicReference<RuntimeException> activeTasksCommitException) {
+
+        final Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsPerTask = new HashMap<>();
+        prepareCommitAndAddOffsetsToMap(activeTasksNeedCommit, consumedOffsetsPerTask);
+
+        final Set<Task> dirtyTasks = new HashSet<>();
+        try {
+            taskExecutor.commitOffsetsOrTransaction(consumedOffsetsPerTask);
+        } catch (final TaskCorruptedException e) {
+            log.warn("Some tasks were corrupted when trying to commit offsets, these will be cleaned and revived: {}",
+                    e.corruptedTasks());
+
+            // If we hit a TaskCorruptedException it must be EOS, just handle the cleanup for those corrupted tasks right here
+            dirtyTasks.addAll(tasks.tasks(e.corruptedTasks()));
+            closeDirtyAndRevive(dirtyTasks, true);
+        } catch (final RuntimeException e) {
+            log.error("Exception caught while committing active tasks: " + consumedOffsetsPerTask.keySet(), e);
+            activeTasksCommitException.compareAndSet(null, e);
+            dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
+        }
+
+        // for non-revoking active tasks, we should not enforce checkpoint
+        // as it's EOS enabled in which case no checkpoint should be written while
+        // the task is in RUNNING tate
+        for (final Task task : activeTasksNeedCommit) {
+            if (!dirtyTasks.contains(task)) {
+                try {
+                    task.postCommit(false);

Review comment:
       Thanks @showuon . I think it was true before but I toggles it. Yeah going by that JIRA, it could be left to true. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a change in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on a change in pull request #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r774050815



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -292,12 +327,20 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
         final Set<Task> tasksToRecycle = new TreeSet<>(byId);
         final Set<Task> tasksToCloseClean = new TreeSet<>(byId);
         final Set<Task> tasksToCloseDirty = new TreeSet<>(byId);
+        boolean commitAssignedActiveTasks = false;
+        final Set<Task> activeTasksNeedCommit = new HashSet<>();
 
         // first rectify all existing tasks
         for (final Task task : tasks.allTasks()) {
             if (activeTasks.containsKey(task.id()) && task.isActive()) {
                 tasks.updateInputPartitionsAndResume(task, activeTasks.get(task.id()));
                 activeTasksToCreate.remove(task.id());
+                if (task.state() == State.RESTORING) {

Review comment:
       hey @ableegoldman , i relooked at your comments above and seemed to have understood it now... I have added the logic and also modified a test case. LEt me know if that's making sense. Thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a change in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on a change in pull request #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r760369366



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -292,12 +327,20 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
         final Set<Task> tasksToRecycle = new TreeSet<>(byId);
         final Set<Task> tasksToCloseClean = new TreeSet<>(byId);
         final Set<Task> tasksToCloseDirty = new TreeSet<>(byId);
+        boolean commitAssignedActiveTasks = false;
+        final Set<Task> activeTasksNeedCommit = new HashSet<>();
 
         // first rectify all existing tasks
         for (final Task task : tasks.allTasks()) {
             if (activeTasks.containsKey(task.id()) && task.isActive()) {
                 tasks.updateInputPartitionsAndResume(task, activeTasks.get(task.id()));
                 activeTasksToCreate.remove(task.id());
+                if (task.state() == State.RESTORING) {

Review comment:
       hi @ableegoldman .. bumping this thread again...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on pull request #11433:
URL: https://github.com/apache/kafka/pull/11433#issuecomment-951125230


   @ableegoldman / @guozhangwang plz review this whenever you get the chance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a change in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

Posted by GitBox <gi...@apache.org>.
vamossagar12 commented on a change in pull request #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r779716364



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -292,12 +327,20 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
         final Set<Task> tasksToRecycle = new TreeSet<>(byId);
         final Set<Task> tasksToCloseClean = new TreeSet<>(byId);
         final Set<Task> tasksToCloseDirty = new TreeSet<>(byId);
+        boolean commitAssignedActiveTasks = false;
+        final Set<Task> activeTasksNeedCommit = new HashSet<>();
 
         // first rectify all existing tasks
         for (final Task task : tasks.allTasks()) {
             if (activeTasks.containsKey(task.id()) && task.isActive()) {
                 tasks.updateInputPartitionsAndResume(task, activeTasks.get(task.id()));
                 activeTasksToCreate.remove(task.id());
+                if (task.state() == State.RESTORING) {

Review comment:
       hey @ableegoldman , let me know if you get a chance to look at this one now!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org