You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/08/27 03:10:34 UTC

[GitHub] [kafka] jordanbull opened a new pull request, #12566: [Connect] Only clear origOffsets when clearing messageBatch

jordanbull opened a new pull request, #12566:
URL: https://github.com/apache/kafka/pull/12566

   Only clear origOffsets when clearing the messageBatch. Prior to this change, origOffsets is cleared at the start of each convertMessages call in which all newly polled messages are added to origOffsets. In cases where RetriableException is thrown in SinkTask.put, polling is paused and the existing messageBatch is kept. This behavior resulted in clearing all offset tracking on the next attempt despite still having messages being tried and thus never committed after eventually succeeding.
   
   Reported in [KAFKA-13927](https://issues.apache.org/jira/browse/KAFKA-13927)
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante merged pull request #12566: KAFKA-13927: Only clear origOffsets when clearing messageBatch

Posted by GitBox <gi...@apache.org>.
C0urante merged PR #12566:
URL: https://github.com/apache/kafka/pull/12566


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jordanbull commented on a diff in pull request #12566: KAFKA-13927: Only clear origOffsets when clearing messageBatch

Posted by GitBox <gi...@apache.org>.
jordanbull commented on code in PR #12566:
URL: https://github.com/apache/kafka/pull/12566#discussion_r974545797


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java:
##########
@@ -435,6 +453,13 @@ public void testPollRedelivery() throws Exception {
         assertTaskMetricValue("batch-size-max", 1.0);
         assertTaskMetricValue("batch-size-avg", 0.5);
 
+        assertEquals(workerCurrentOffsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "currentOffsets"));

Review Comment:
   I've also added an assertion on the offsets given to precommit above. Happy to remove this line



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jordanbull commented on a diff in pull request #12566: KAFKA-13927: Only clear origOffsets when clearing messageBatch

Posted by GitBox <gi...@apache.org>.
jordanbull commented on code in PR #12566:
URL: https://github.com/apache/kafka/pull/12566#discussion_r974544940


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java:
##########
@@ -763,6 +763,9 @@ private void onPartitionsRemoved(Collection<TopicPartition> partitions, boolean
                 return;
 
             try {
+                for (TopicPartition partition: partitions) {
+                    origOffsets.remove(partition);
+                }

Review Comment:
   Makes sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12566: KAFKA-13927: Only clear origOffsets when clearing messageBatch

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12566:
URL: https://github.com/apache/kafka/pull/12566#discussion_r974231345


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java:
##########
@@ -435,6 +453,13 @@ public void testPollRedelivery() throws Exception {
         assertTaskMetricValue("batch-size-max", 1.0);
         assertTaskMetricValue("batch-size-avg", 0.5);
 
+        assertEquals(workerCurrentOffsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "currentOffsets"));

Review Comment:
   Do we have to probe internal fields to verify this change? Couldn't we examine the offsets given to `SinkTask::preCommit` or `Consumer::commitAsync` instead?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java:
##########
@@ -763,6 +763,9 @@ private void onPartitionsRemoved(Collection<TopicPartition> partitions, boolean
                 return;
 
             try {
+                for (TopicPartition partition: partitions) {
+                    origOffsets.remove(partition);
+                }

Review Comment:
   Any reason not to put this in `closePartitions`, where we also clear out entries from `currentOffsets`?
   Also, this can be simplified:
   ```java
   origOffsets.keySet().removeAll(partitions);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12566: KAFKA-13927: Only clear origOffsets when clearing messageBatch

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12566:
URL: https://github.com/apache/kafka/pull/12566#discussion_r975650210


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java:
##########
@@ -435,6 +453,13 @@ public void testPollRedelivery() throws Exception {
         assertTaskMetricValue("batch-size-max", 1.0);
         assertTaskMetricValue("batch-size-avg", 0.5);
 
+        assertEquals(workerCurrentOffsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "currentOffsets"));

Review Comment:
   Yeah, I think the assertions you added above cover this case nicely 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org