You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/11/22 21:45:40 UTC

[GitHub] [kafka] C0urante opened a new pull request #11526: KAFKA-13472: Correct last-committed offsets tracking for sink tasks after partial revocation

C0urante opened a new pull request #11526:
URL: https://github.com/apache/kafka/pull/11526


   [Jira](https://issues.apache.org/jira/browse/KAFKA-13472)
   
   The `WorkerSinkTask.lastCommittedOffsets` field is now added to (via `Map::putAll`) after a successful offset commit, instead of being completely overwritten. In order to prevent this collection from growing indefinitely, elements are removed from it after topic partitions are revoked from the task's consumer.
   
   Two test cases are added to `WorkerSinkTaskTest`:
   
   - A basic test to verify the "rewind for redelivery" behavior when a task throws an exception from `SinkTask::preCommit`; surprisingly, no existing test cases appear to cover this scenario
   - A more sophisticated test to verify this same behavior, but with a few rounds of cooperative consumer rebalancing beforehand that expose a bug in the current logic for the `WorkerSinkTask` class
   
   The `VerifiableSinkTask` class is also updated to only flush the requested topic partitions in its `flush` method. This is technically unrelated to the issue addressed by this PR and can be moved to a separate PR if necessary; including it here as the original context for identifying this bug was debugging failed system tests and the logic in this part of the tests was originally suspected as a cause of the test failure.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #11526: KAFKA-13472: Correct last-committed offsets tracking for sink tasks after partial revocation

Posted by GitBox <gi...@apache.org>.
C0urante commented on pull request #11526:
URL: https://github.com/apache/kafka/pull/11526#issuecomment-981983768


   Thanks @kkonstantine, appreciate the turnaround.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #11526: KAFKA-13472: Correct last-committed offsets tracking for sink tasks after partial revocation

Posted by GitBox <gi...@apache.org>.
C0urante commented on pull request #11526:
URL: https://github.com/apache/kafka/pull/11526#issuecomment-977168354


   Thanks @kkonstantine. My description may have been a little misleading; to clarify, none of this was caught by automated testing (unit, integration, or system); it was discovered by re-reading the code base while investigating an unrelated failure. I made note of system tests because the fix to the `VerifiableSinkTask` class seemed like it may have been useful initially, and after discovering that it was not the cause of the failure, still seemed worth keeping around in case a future test relied on that part of its behavior being correct. It is not related to the bug addressed by this PR, though, so it felt worth calling out in case we want to withhold that change for this blocker PR during code freeze.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11526: KAFKA-13472: Correct last-committed offsets tracking for sink tasks after partial revocation

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11526:
URL: https://github.com/apache/kafka/pull/11526#discussion_r755504329



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -654,6 +655,7 @@ private void closePartitions(Collection<TopicPartition> topicPartitions, boolean
             topicPartitions.forEach(currentOffsets::remove);
         }
         updatePartitionCount();
+        topicPartitions.forEach(lastCommittedOffsets::remove);

Review comment:
       Ah, neat trick. Ran unit+integration tests locally after making this change and they all passed; looks like the operation is supported.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #11526: KAFKA-13472: Correct last-committed offsets tracking for sink tasks after partial revocation

Posted by GitBox <gi...@apache.org>.
C0urante commented on pull request #11526:
URL: https://github.com/apache/kafka/pull/11526#issuecomment-976645713


   @kkonstantine would you mind taking a look at this 3.1 blocker? There was a small issue in https://github.com/apache/kafka/pull/10563 that causes a regression in some edge cases.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11526: KAFKA-13472: Correct last-committed offsets tracking for sink tasks after partial revocation

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11526:
URL: https://github.com/apache/kafka/pull/11526#discussion_r755504329



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -654,6 +655,7 @@ private void closePartitions(Collection<TopicPartition> topicPartitions, boolean
             topicPartitions.forEach(currentOffsets::remove);
         }
         updatePartitionCount();
+        topicPartitions.forEach(lastCommittedOffsets::remove);

Review comment:
       Ah, neat trick. Ran unit+integration tests locally after making this change and they all passed; looks like the operation is supported, and yes, it does appear applicable.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine merged pull request #11526: KAFKA-13472: Correct last-committed offsets tracking for sink tasks after partial revocation

Posted by GitBox <gi...@apache.org>.
kkonstantine merged pull request #11526:
URL: https://github.com/apache/kafka/pull/11526


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine commented on a change in pull request #11526: KAFKA-13472: Correct last-committed offsets tracking for sink tasks after partial revocation

Posted by GitBox <gi...@apache.org>.
kkonstantine commented on a change in pull request #11526:
URL: https://github.com/apache/kafka/pull/11526#discussion_r755431755



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -654,6 +655,7 @@ private void closePartitions(Collection<TopicPartition> topicPartitions, boolean
             topicPartitions.forEach(currentOffsets::remove);
         }
         updatePartitionCount();
+        topicPartitions.forEach(lastCommittedOffsets::remove);

Review comment:
       Is `removeAll` a supported operation by the underlying set of the hashmap here? Is it applicable?
   (same question above for `currentOffsets`)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org