You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/09/27 20:37:46 UTC

[GitHub] [kafka] guozhangwang opened a new pull request #11362: KAFKA-13319: Do not commit empty offsets on producer

guozhangwang opened a new pull request #11362:
URL: https://github.com/apache/kafka/pull/11362


   We observed on the broker side that txn-offset-commit request with empty topics are received. After checking the source code I found there's on place on Streams which is unnecessarily sending empty offsets. This PR cleans up the streams layer logic a bit to not send empty offsets, and at the same time also guard against empty offsets at the producer layer as well.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #11362: KAFKA-13319: Do not commit empty offsets on producer

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #11362:
URL: https://github.com/apache/kafka/pull/11362#issuecomment-929476485


   @guozhangwang Oh, I didn't realize you were completing this under KAFKA-13319. I was hoping that we could add logic to the producer as well. When the offset map passed to `sendOffsetsToTransaction`, we can short-cut return instead of going through the AddOffsetsToTxn and TxnOffsetCommit flow.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11362: KAFKA-13319: Do not commit empty offsets on producer

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11362:
URL: https://github.com/apache/kafka/pull/11362#discussion_r717817262



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##########
@@ -704,11 +704,14 @@ public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offs
         throwIfInvalidGroupMetadata(groupMetadata);
         throwIfNoTransactionManager();
         throwIfProducerClosed();
-        long start = time.nanoseconds();
-        TransactionalRequestResult result = transactionManager.sendOffsetsToTransaction(offsets, groupMetadata);
-        sender.wakeup();
-        result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
-        producerMetrics.recordSendOffsets(time.nanoseconds() - start);
+
+        if (offsets.isEmpty()) {

Review comment:
       This check should be inverted.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11362: KAFKA-13319: Do not commit empty offsets on producer

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11362:
URL: https://github.com/apache/kafka/pull/11362#discussion_r717817262



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##########
@@ -704,11 +704,14 @@ public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offs
         throwIfInvalidGroupMetadata(groupMetadata);
         throwIfNoTransactionManager();
         throwIfProducerClosed();
-        long start = time.nanoseconds();
-        TransactionalRequestResult result = transactionManager.sendOffsetsToTransaction(offsets, groupMetadata);
-        sender.wakeup();
-        result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
-        producerMetrics.recordSendOffsets(time.nanoseconds() - start);
+
+        if (offsets.isEmpty()) {

Review comment:
       This check should be inverted.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #11362: KAFKA-13319: Do not commit empty offsets on producer

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #11362:
URL: https://github.com/apache/kafka/pull/11362#issuecomment-938839860


   @hachikuji @showuon could you take another look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang merged pull request #11362: KAFKA-13319: Do not commit empty offsets on producer

Posted by GitBox <gi...@apache.org>.
guozhangwang merged pull request #11362:
URL: https://github.com/apache/kafka/pull/11362


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #11362: KAFKA-13319: Do not commit empty offsets on producer

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #11362:
URL: https://github.com/apache/kafka/pull/11362#discussion_r718738299



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##########
@@ -704,11 +704,14 @@ public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offs
         throwIfInvalidGroupMetadata(groupMetadata);
         throwIfNoTransactionManager();
         throwIfProducerClosed();
-        long start = time.nanoseconds();
-        TransactionalRequestResult result = transactionManager.sendOffsetsToTransaction(offsets, groupMetadata);
-        sender.wakeup();
-        result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
-        producerMetrics.recordSendOffsets(time.nanoseconds() - start);
+
+        if (offsets.isEmpty()) {

Review comment:
       Ah my bad..
   
   I was trying to fix both on the caller (streams) side as well as the callee (producer) side, but the callee part was added in a last minute and obviously I need some coffee at that time already.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #11362: KAFKA-13319: Do not commit empty offsets on producer

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #11362:
URL: https://github.com/apache/kafka/pull/11362#issuecomment-929476485


   @guozhangwang Oh, I didn't realize you were completing this under KAFKA-13319. I was hoping that we could add logic to the producer as well. When the offset map passed to `sendOffsetsToTransaction`, we can short-cut return instead of going through the AddOffsetsToTxn and TxnOffsetCommit flow.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #11362: KAFKA-13319: Do not commit empty offsets on producer

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #11362:
URL: https://github.com/apache/kafka/pull/11362#discussion_r725149597



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
##########
@@ -735,7 +735,6 @@ public void shouldCommitNonCorruptedTasksOnTaskCorruptedException() {
         topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(), anyString());
         expectLastCall().anyTimes();
         expectRestoreToBeCompleted(consumer, changeLogReader);
-        consumer.commitSync(eq(emptyMap()));

Review comment:
       SG! Will update.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #11362: KAFKA-13319: Do not commit empty offsets on producer

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #11362:
URL: https://github.com/apache/kafka/pull/11362#discussion_r718738299



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##########
@@ -704,11 +704,14 @@ public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offs
         throwIfInvalidGroupMetadata(groupMetadata);
         throwIfNoTransactionManager();
         throwIfProducerClosed();
-        long start = time.nanoseconds();
-        TransactionalRequestResult result = transactionManager.sendOffsetsToTransaction(offsets, groupMetadata);
-        sender.wakeup();
-        result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
-        producerMetrics.recordSendOffsets(time.nanoseconds() - start);
+
+        if (offsets.isEmpty()) {

Review comment:
       Ah my bad..
   
   I was trying to fix both on the caller (streams) side as well as the callee (producer) side, but the callee part was added in a last minute and obviously I need some coffee at that time already.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji edited a comment on pull request #11362: KAFKA-13319: Do not commit empty offsets on producer

Posted by GitBox <gi...@apache.org>.
hachikuji edited a comment on pull request #11362:
URL: https://github.com/apache/kafka/pull/11362#issuecomment-929476485


   @guozhangwang Oh, I didn't realize you were completing this under KAFKA-13319. I looked again at the producer logic and left a comment. Probably explains why all the tests are failing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #11362: KAFKA-13319: Do not commit empty offsets on producer

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #11362:
URL: https://github.com/apache/kafka/pull/11362#discussion_r717027686



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -189,14 +189,13 @@ void handleCorruption(final Set<TaskId> corruptedTasks) {
 
         // We need to commit before closing the corrupted active tasks since this will force the ongoing txn to abort
         try {
-            commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(tasks()

Review comment:
       No logical change here, just renamings.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
##########
@@ -735,7 +735,6 @@ public void shouldCommitNonCorruptedTasksOnTaskCorruptedException() {
         topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(), anyString());
         expectLastCall().anyTimes();
         expectRestoreToBeCompleted(consumer, changeLogReader);
-        consumer.commitSync(eq(emptyMap()));

Review comment:
       These unit tests mocking are changed since originally we would send empty offsets.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1084,26 +1083,27 @@ int commit(final Collection<Task> tasksToCommit) {
      *                               or if the task producer got fenced (EOS)
      * @throws TimeoutException if committing offsets failed due to TimeoutException (non-EOS)
      * @throws TaskCorruptedException if committing offsets failed due to TimeoutException (EOS)
-     * @param consumedOffsetsAndMetadataPerTask an empty map that will be filled in with the prepared offsets
+     * @param consumedOffsetsAndMetadata an empty map that will be filled in with the prepared offsets
      * @return number of committed offsets, or -1 if we are in the middle of a rebalance and cannot commit
      */
-    private int commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(final Collection<Task> tasksToCommit,
-                                                                    final Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask) {
+    private int commitTasksAndMaybeUpdateCommittableOffsets(final Collection<Task> tasksToCommit,
+                                                            final Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadata) {
         if (rebalanceInProgress) {
             return -1;
         }
 
         int committed = 0;
         for (final Task task : tasksToCommit) {
+            // we need to call commitNeeded first since we need to update committable offsets
             if (task.commitNeeded()) {
                 final Map<TopicPartition, OffsetAndMetadata> offsetAndMetadata = task.prepareCommit();
-                if (task.isActive()) {

Review comment:
       We do not need to check on `task.isActive()` since for standby tasks, it would always return empty offset map. Instead we should just make sure that the map is not empty (for active tasks, they are possibly empty depending on their states) before adding to the map.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang merged pull request #11362: KAFKA-13319: Do not commit empty offsets on producer

Posted by GitBox <gi...@apache.org>.
guozhangwang merged pull request #11362:
URL: https://github.com/apache/kafka/pull/11362


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11362: KAFKA-13319: Do not commit empty offsets on producer

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11362:
URL: https://github.com/apache/kafka/pull/11362#discussion_r719115383



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
##########
@@ -735,7 +735,6 @@ public void shouldCommitNonCorruptedTasksOnTaskCorruptedException() {
         topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(), anyString());
         expectLastCall().anyTimes();
         expectRestoreToBeCompleted(consumer, changeLogReader);
-        consumer.commitSync(eq(emptyMap()));

Review comment:
       Since we won't commit empty offsets now, I think we should add test for this improvement. So, I'm thinking, instead of removing this line, we should verify that the `commitSync(emptyMap)` is not called during this test.
   
   ```java
   consumer.commitSync(eq(emptyMap()));
   expectLastCall().andStubThrow(new AssertionError("should not invoke commitSync when offset map is empty"));
   ```
   What do you think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #11362: KAFKA-13319: Do not commit empty offsets on producer

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #11362:
URL: https://github.com/apache/kafka/pull/11362#discussion_r717027686



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -189,14 +189,13 @@ void handleCorruption(final Set<TaskId> corruptedTasks) {
 
         // We need to commit before closing the corrupted active tasks since this will force the ongoing txn to abort
         try {
-            commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(tasks()

Review comment:
       No logical change here, just renamings.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
##########
@@ -735,7 +735,6 @@ public void shouldCommitNonCorruptedTasksOnTaskCorruptedException() {
         topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(), anyString());
         expectLastCall().anyTimes();
         expectRestoreToBeCompleted(consumer, changeLogReader);
-        consumer.commitSync(eq(emptyMap()));

Review comment:
       These unit tests mocking are changed since originally we would send empty offsets.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1084,26 +1083,27 @@ int commit(final Collection<Task> tasksToCommit) {
      *                               or if the task producer got fenced (EOS)
      * @throws TimeoutException if committing offsets failed due to TimeoutException (non-EOS)
      * @throws TaskCorruptedException if committing offsets failed due to TimeoutException (EOS)
-     * @param consumedOffsetsAndMetadataPerTask an empty map that will be filled in with the prepared offsets
+     * @param consumedOffsetsAndMetadata an empty map that will be filled in with the prepared offsets
      * @return number of committed offsets, or -1 if we are in the middle of a rebalance and cannot commit
      */
-    private int commitAndFillInConsumedOffsetsAndMetadataPerTaskMap(final Collection<Task> tasksToCommit,
-                                                                    final Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask) {
+    private int commitTasksAndMaybeUpdateCommittableOffsets(final Collection<Task> tasksToCommit,
+                                                            final Map<Task, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadata) {
         if (rebalanceInProgress) {
             return -1;
         }
 
         int committed = 0;
         for (final Task task : tasksToCommit) {
+            // we need to call commitNeeded first since we need to update committable offsets
             if (task.commitNeeded()) {
                 final Map<TopicPartition, OffsetAndMetadata> offsetAndMetadata = task.prepareCommit();
-                if (task.isActive()) {

Review comment:
       We do not need to check on `task.isActive()` since for standby tasks, it would always return empty offset map. Instead we should just make sure that the map is not empty (for active tasks, they are possibly empty depending on their states) before adding to the map.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji edited a comment on pull request #11362: KAFKA-13319: Do not commit empty offsets on producer

Posted by GitBox <gi...@apache.org>.
hachikuji edited a comment on pull request #11362:
URL: https://github.com/apache/kafka/pull/11362#issuecomment-929476485


   @guozhangwang Oh, I didn't realize you were completing this under KAFKA-13319. I looked again at the producer logic and left a comment. Probably explains why all the tests are failing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org