You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/03/30 23:57:32 UTC

[GitHub] [kafka] ableegoldman commented on a change in pull request #10444: HOTFIX: move rebalanceInProgress check to skip commit during handleCorrupted

ableegoldman commented on a change in pull request #10444:
URL: https://github.com/apache/kafka/pull/10444#discussion_r604505465



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1013,28 +1016,34 @@ void addRecordsToTasks(final ConsumerRecords<byte[], byte[]> records) {
      */
     int commit(final Collection<Task> tasksToCommit) {
         int committed = 0;
-        if (rebalanceInProgress) {
-            committed = -1;
-        } else {
-            final Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>();
-            try {
-                committed = commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(tasksToCommit, consumedOffsetsAndMetadataPerTask);
-            } catch (final TimeoutException timeoutException) {
-                consumedOffsetsAndMetadataPerTask
-                    .keySet()
-                    .forEach(t -> t.maybeInitTaskTimeoutOrThrow(time.milliseconds(), timeoutException));
-            }
+
+        final Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>();
+        try {
+            committed = commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(tasksToCommit, consumedOffsetsAndMetadataPerTask);
+        } catch (final TimeoutException timeoutException) {
+            consumedOffsetsAndMetadataPerTask
+                .keySet()
+                .forEach(t -> t.maybeInitTaskTimeoutOrThrow(time.milliseconds(), timeoutException));
         }
+
         return committed;
     }
 
     /**
+     * @throws TaskMigratedException if committing offsets failed (non-EOS)
+     *                               or if the task producer got fenced (EOS)
+     * @throws TimeoutException if committing offsets failed due to TimeoutException (non-EOS)
+     * @throws TaskCorruptedException if committing offsets failed due to TimeoutException (EOS)
      * @param consumedOffsetsAndMetadataPerTask an empty map that will be filled in with the prepared offsets
+     * @return number of committed offsets, or -1 if we are in the middle of a rebalance and cannot commit
      */
     private int commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(final Collection<Task> tasksToCommit,
                                                                     final Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask) {
-        int committed = 0;
+        if (rebalanceInProgress) {
+            return -1;
+        }

Review comment:
       This is the only logical change, I moved this check from `TaskManager#commit` to this method. And added a bunch of comments/docs so we don't forget this again




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org