You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "erikvanoosten (via GitHub)" <gi...@apache.org> on 2023/05/06 06:40:16 UTC

[GitHub] [kafka] erikvanoosten opened a new pull request, #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

erikvanoosten opened a new pull request, #13678:
URL: https://github.com/apache/kafka/pull/13678

   The contract for commitSync() guarantees that the callbacks for all prior async commits will be invoked before it (successfully?) returns. Prior to this change the contract could be violated if an empty offsets map were passed in to commitSync().
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] erikvanoosten commented on pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "erikvanoosten (via GitHub)" <gi...@apache.org>.
erikvanoosten commented on PR #13678:
URL: https://github.com/apache/kafka/pull/13678#issuecomment-1537071678

   This PR is a copy of https://github.com/apache/kafka/pull/9111 so all credits go to @thomaslee.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13678:
URL: https://github.com/apache/kafka/pull/13678#discussion_r1191400253


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -186,6 +187,7 @@ public ConsumerCoordinator(GroupRebalanceConfig rebalanceConfig,
         this.completedOffsetCommits = new ConcurrentLinkedQueue<>();
         this.sensors = new ConsumerCoordinatorMetrics(metrics, metricGrpPrefix);
         this.interceptors = interceptors;
+        this.inFlightAsyncCommits = new AtomicInteger();
         this.pendingAsyncCommits = new AtomicInteger();

Review Comment:
   Thanks @dajac - Do you mean the code isn't changing the counter in the commit callback? I don't know the exact reason, but my guess is, the commit isn't sent until a coordinator is ready and it is therefore called "pending".  If the coordinator exist, then there's no point to increment and decrement the counter, but it will be done anyway, i.e. there will (most likely) be a response.
   
   I'm in supportive of the consolidating suggestions.  For me, it is mentally difficult to manage multiple different states correctly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] erikvanoosten commented on pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "erikvanoosten (via GitHub)" <gi...@apache.org>.
erikvanoosten commented on PR #13678:
URL: https://github.com/apache/kafka/pull/13678#issuecomment-1564684675

   > @philipnee I will try to review it this week. Thanks!
   
   Hi @dajac, did you already get the chance to look at this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] erikvanoosten commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "erikvanoosten (via GitHub)" <gi...@apache.org>.
erikvanoosten commented on code in PR #13678:
URL: https://github.com/apache/kafka/pull/13678#discussion_r1214314299


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -105,6 +105,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
     private final boolean autoCommitEnabled;
     private final int autoCommitIntervalMs;
     private final ConsumerInterceptors<?, ?> interceptors;
+    // package private for testing
+    final AtomicInteger inFlightAsyncCommits;
     private final AtomicInteger pendingAsyncCommits;

Review Comment:
   @dajac I took the description of pendingAyncCommits from your comment https://github.com/apache/kafka/pull/13678#discussion_r1191927989, I hope I did that correctly.
   
   Changed in: https://github.com/apache/kafka/pull/13678/commits/2bfedf0ec27f9d386d9cddb2ba24d53e533ab51f



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13678:
URL: https://github.com/apache/kafka/pull/13678#discussion_r1197985175


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -600,6 +602,30 @@ public void testCoordinatorNotAvailable() {
         assertTrue(coordinator.coordinatorUnknown());
     }
 
+    @Test
+    public void testSyncCommitWithoutOffsetsAndPendingAsyncCommit() {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        TopicPartition tp = new TopicPartition("foo", 0);
+        Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(tp, new OffsetAndMetadata(123));
+
+        final AtomicBoolean committed = new AtomicBoolean();

Review Comment:
   does this need to be atomic? there shouldn't be concurrent access to the var. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] erikvanoosten commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "erikvanoosten (via GitHub)" <gi...@apache.org>.
erikvanoosten commented on code in PR #13678:
URL: https://github.com/apache/kafka/pull/13678#discussion_r1221305987


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -105,6 +105,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
     private final boolean autoCommitEnabled;
     private final int autoCommitIntervalMs;
     private final ConsumerInterceptors<?, ?> interceptors;
+    // track number of async commits for which callback must be called
+    // package private for testing
+    final AtomicInteger inFlightAsyncCommits;
+    // track number of inflight coordinator lookups

Review Comment:
   Of course! Done in https://github.com/apache/kafka/pull/13678/commits/e4ba315ecbf0878706de1e1b620911896a994bed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13678:
URL: https://github.com/apache/kafka/pull/13678#discussion_r1214154053


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -105,6 +105,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
     private final boolean autoCommitEnabled;
     private final int autoCommitIntervalMs;
     private final ConsumerInterceptors<?, ?> interceptors;
+    // package private for testing
+    final AtomicInteger inFlightAsyncCommits;
     private final AtomicInteger pendingAsyncCommits;

Review Comment:
   Could we add a comment for these two variables? I think that we were all confused by them while looking at this PR so adding clarifying comments would make sense here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13678:
URL: https://github.com/apache/kafka/pull/13678#discussion_r1191400253


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -186,6 +187,7 @@ public ConsumerCoordinator(GroupRebalanceConfig rebalanceConfig,
         this.completedOffsetCommits = new ConcurrentLinkedQueue<>();
         this.sensors = new ConsumerCoordinatorMetrics(metrics, metricGrpPrefix);
         this.interceptors = interceptors;
+        this.inFlightAsyncCommits = new AtomicInteger();
         this.pendingAsyncCommits = new AtomicInteger();

Review Comment:
   Thanks @dajac - Do you mean the code isn't changing the counter in the commit callback? I don't know the exact reason, but my guess is, the commit isn't sent until a coordinator is ready and it is therefore called "pending".  If the coordinator is connected, then there's no point to increment and decrement the counter, but it will be done anyway, i.e. there will (most likely) be a response.
   
   I'm in supportive of the consolidating suggestions.  For me, it is mentally difficult to manage multiple different states correctly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] erikvanoosten commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "erikvanoosten (via GitHub)" <gi...@apache.org>.
erikvanoosten commented on code in PR #13678:
URL: https://github.com/apache/kafka/pull/13678#discussion_r1196205881


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -1223,6 +1233,26 @@ public void maybeAutoCommitOffsetsAsync(long now) {
         }
     }
 
+    private boolean waitForPendingAsyncCommits(Timer timer) {

Review Comment:
   @philipnee it seems there is still a gap in our mutual understanding. I really would like this problem to be solved though. It seems to most promising route towards using Kafka correctly from a completely async/concurrent runtime. Shall we organize a online meeting, or perhaps you can make the changes you proposed yourself?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13678:
URL: https://github.com/apache/kafka/pull/13678#discussion_r1197989343


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -1164,8 +1172,12 @@ public void onFailure(RuntimeException e) {
     public boolean commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets, Timer timer) {
         invokeCompletedOffsetCommitCallbacks();
 
-        if (offsets.isEmpty())
-            return true;
+        if (offsets.isEmpty()) {
+            // The KafkaConsumer API guarantees that the callbacks for all commitAsync()
+            // calls will be invoked when commitSync() returns. This should be true even

Review Comment:
   WDYT: `We guarantee that the callbacks for all commitAsync() calls will be invoked when commitSync() returns, even if the user tries to commit empty offsets`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on PR #13678:
URL: https://github.com/apache/kafka/pull/13678#issuecomment-1557511754

   @dajac - Would you have some time to review and help Erik to merge this? I've done a pass and I think it's okay.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] erikvanoosten commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "erikvanoosten (via GitHub)" <gi...@apache.org>.
erikvanoosten commented on code in PR #13678:
URL: https://github.com/apache/kafka/pull/13678#discussion_r1200843891


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -186,6 +187,7 @@ public ConsumerCoordinator(GroupRebalanceConfig rebalanceConfig,
         this.completedOffsetCommits = new ConcurrentLinkedQueue<>();
         this.sensors = new ConsumerCoordinatorMetrics(metrics, metricGrpPrefix);
         this.interceptors = interceptors;
+        this.inFlightAsyncCommits = new AtomicInteger();
         this.pendingAsyncCommits = new AtomicInteger();

Review Comment:
   Starting from commit https://github.com/apache/kafka/pull/13678/commits/5f8bfc42e9791a97c01cecef334fccc88dd547f2 this is much clearer because only a single method updates `inFlightAsyncCommits`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] erikvanoosten commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "erikvanoosten (via GitHub)" <gi...@apache.org>.
erikvanoosten commented on code in PR #13678:
URL: https://github.com/apache/kafka/pull/13678#discussion_r1200842817


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -600,6 +601,30 @@ public void testCoordinatorNotAvailable() {
         assertTrue(coordinator.coordinatorUnknown());
     }
 
+    @Test
+    public void testSyncCommitWithoutOffsetsAndPendingAsyncCommit() {

Review Comment:
   Sorry, somehow I missed this comment. Yes, I agree your suggestion is much better. Applied in https://github.com/apache/kafka/pull/13678/commits/13bccc7afe6422f48f87bda031083dde50802649



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13678:
URL: https://github.com/apache/kafka/pull/13678#discussion_r1221045241


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -105,6 +105,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
     private final boolean autoCommitEnabled;
     private final int autoCommitIntervalMs;
     private final ConsumerInterceptors<?, ?> interceptors;
+    // track number of async commits for which callback must be called
+    // package private for testing
+    final AtomicInteger inFlightAsyncCommits;
+    // track number of inflight coordinator lookups

Review Comment:
   nit: Could we say `track the number of pending async commits waiting on the coordinator lookup to complete`.?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] erikvanoosten commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "erikvanoosten (via GitHub)" <gi...@apache.org>.
erikvanoosten commented on code in PR #13678:
URL: https://github.com/apache/kafka/pull/13678#discussion_r1197494892


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -1223,6 +1233,26 @@ public void maybeAutoCommitOffsetsAsync(long now) {
         }
     }
 
+    private boolean waitForPendingAsyncCommits(Timer timer) {

Review Comment:
   > The code appears to check `inFlightAsyncCommits` only in the case where the incoming `offsets` map is empty. Wouldn't we want to always check that?
   
   @kirktrue I think it was changed this way because when offsets is not empty, method `invokeCompletedOffsetCommitCallbacks();`is called which already does what we need.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] erikvanoosten commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "erikvanoosten (via GitHub)" <gi...@apache.org>.
erikvanoosten commented on code in PR #13678:
URL: https://github.com/apache/kafka/pull/13678#discussion_r1198570974


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -600,6 +602,30 @@ public void testCoordinatorNotAvailable() {
         assertTrue(coordinator.coordinatorUnknown());
     }
 
+    @Test
+    public void testSyncCommitWithoutOffsetsAndPendingAsyncCommit() {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        TopicPartition tp = new TopicPartition("foo", 0);
+        Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(tp, new OffsetAndMetadata(123));
+
+        final AtomicBoolean committed = new AtomicBoolean();

Review Comment:
   Probably because of this:
   <img width="539" alt="afbeelding" src="https://github.com/apache/kafka/assets/630600/847654a4-8b22-49c8-9cb0-534b7586e4be">
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on PR #13678:
URL: https://github.com/apache/kafka/pull/13678#issuecomment-1557527598

   @philipnee I will try to review it this week. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] kirktrue commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "kirktrue (via GitHub)" <gi...@apache.org>.
kirktrue commented on code in PR #13678:
URL: https://github.com/apache/kafka/pull/13678#discussion_r1190210077


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -1132,6 +1136,7 @@ public void onSuccess(Void value) {
                 if (interceptors != null)
                     interceptors.onCommit(offsets);
                 completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, null));
+                inFlightAsyncCommits.decrementAndGet();

Review Comment:
   Can we wrap the `decrementAndGet` calls in a `try`/`finally` block in case something weird happens in the code that comes before it?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -1145,6 +1150,7 @@ public void onFailure(RuntimeException e) {
                 if (commitException instanceof FencedInstanceIdException) {
                     asyncCommitFenced.set(true);
                 }
+                inFlightAsyncCommits.decrementAndGet();

Review Comment:
   I'm not sure if we need to wrap the `decrementAndGet` call in a `try`/`finally` block here, too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] erikvanoosten commented on pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "erikvanoosten (via GitHub)" <gi...@apache.org>.
erikvanoosten commented on PR #13678:
URL: https://github.com/apache/kafka/pull/13678#issuecomment-1542480034

   > Hey @erikvanoosten, Thanks for the PR! Could you add checks for inflightCommits count gets set to 0 in a few of the callback testing function like `testAsyncCommitCallbacksInvokedPriorToSyncCommitCompletion` ?
   
   Sure, I'll try. I am new in this code base and these changes are not from me, but I'll try nonetheless.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] erikvanoosten commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "erikvanoosten (via GitHub)" <gi...@apache.org>.
erikvanoosten commented on code in PR #13678:
URL: https://github.com/apache/kafka/pull/13678#discussion_r1196205881


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -1223,6 +1233,26 @@ public void maybeAutoCommitOffsetsAsync(long now) {
         }
     }
 
+    private boolean waitForPendingAsyncCommits(Timer timer) {

Review Comment:
   @philipnee it seems there is still a gap in our mutual understanding. I really would like this problem to be solved though. It seems the most promising route towards using Kafka correctly from a completely async/concurrent runtime. Shall we organize an online meeting, or perhaps you can make the changes you proposed yourself?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on PR #13678:
URL: https://github.com/apache/kafka/pull/13678#issuecomment-1580998335

   @erikvanoosten It seems that you don't have an account for jira so I can't assign the ticket to you. You should request one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on PR #13678:
URL: https://github.com/apache/kafka/pull/13678#issuecomment-1574056729

   I'm leaving it for David to press that button!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on PR #13678:
URL: https://github.com/apache/kafka/pull/13678#issuecomment-1568784867

   @erikvanoosten I am sorry but I haven't had the time to get to it yet. Will do!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on PR #13678:
URL: https://github.com/apache/kafka/pull/13678#issuecomment-1574015844

   The failures seem unrelated
   ```
   Build / JDK 8 and Scala 2.12 / testDescribeReportOverriddenConfigs(String).quorum=kraft – kafka.admin.TopicCommandIntegrationTest
   10s
   Build / JDK 8 and Scala 2.12 / testDelayedConfigurationOperations() – org.apache.kafka.controller.QuorumControllerTest
   <1s
   Build / JDK 11 and Scala 2.13 / testSyncTopicConfigs() – org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest
   1m 36s
   Build / JDK 11 and Scala 2.13 / testBalancePartitionLeaders() – org.apache.kafka.controller.QuorumControllerTest
   12s
   Build / JDK 11 and Scala 2.13 / testBalancePartitionLeaders() – org.apache.kafka.controller.QuorumControllerTest
   13s
   Fixed 68
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13678:
URL: https://github.com/apache/kafka/pull/13678#discussion_r1190166602


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -1223,6 +1233,26 @@ public void maybeAutoCommitOffsetsAsync(long now) {
         }
     }
 
+    private boolean waitForPendingAsyncCommits(Timer timer) {
+        if (inFlightAsyncCommits.get() == 0) {
+            return true;
+        }
+
+        do {
+            ensureCoordinatorReady(timer);

Review Comment:
   scratch this off, see the comment above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13678:
URL: https://github.com/apache/kafka/pull/13678#discussion_r1191400253


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -186,6 +187,7 @@ public ConsumerCoordinator(GroupRebalanceConfig rebalanceConfig,
         this.completedOffsetCommits = new ConcurrentLinkedQueue<>();
         this.sensors = new ConsumerCoordinatorMetrics(metrics, metricGrpPrefix);
         this.interceptors = interceptors;
+        this.inFlightAsyncCommits = new AtomicInteger();
         this.pendingAsyncCommits = new AtomicInteger();

Review Comment:
   Thanks @dajac - Do you mean the code isn't changing the counter in the commit callback? I don't know the exact reason, but my guess is, the commit isn't sent until a coordinator is ready and it is therefore called "pending".  If the coordinator is connected, then there's no point to increment and decrement the counter, because there will be a response anyway.
   
   I'm in supportive of the consolidating suggestions.  For me, it is mentally difficult to manage multiple different states correctly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] erikvanoosten commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "erikvanoosten (via GitHub)" <gi...@apache.org>.
erikvanoosten commented on code in PR #13678:
URL: https://github.com/apache/kafka/pull/13678#discussion_r1190791004


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -1223,6 +1233,26 @@ public void maybeAutoCommitOffsetsAsync(long now) {
         }
     }
 
+    private boolean waitForPendingAsyncCommits(Timer timer) {

Review Comment:
   Renamed the method in 1fa48f53f0e6f5a2a9821075fa053e01cba6b0b2.
   
   Also, see this comment: https://github.com/apache/kafka/pull/13678#discussion_r1190730213



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13678:
URL: https://github.com/apache/kafka/pull/13678#discussion_r1196830860


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -1075,6 +1078,7 @@ void invokeCompletedOffsetCommitCallbacks() {
     public RequestFuture<Void> commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
         invokeCompletedOffsetCommitCallbacks();
 
+        inFlightAsyncCommits.incrementAndGet();

Review Comment:
   should we move this to the top of `sendOffsetCommitRequest`? Technically if we don't have a coordinator (see the else block), then there's really no inflight request. Which means, should we also remove line 1142 as it is failed coordinator request handling.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13678:
URL: https://github.com/apache/kafka/pull/13678#discussion_r1196816858


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -668,6 +695,7 @@ public void onFailure(RuntimeException e, RequestFuture<Object> future) {
         coordinator.markCoordinatorUnknown("test cause");
         consumerClient.pollNoWakeup();
         assertTrue(asyncCallbackInvoked.get());
+        assertEquals(coordinator.inFlightAsyncCommits.get(), 0);

Review Comment:
   I was wondering if we could add this to the AfterEach, but it seems like `testCommitAsyncWithUserAssignedType` would fail.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -1223,6 +1233,26 @@ public void maybeAutoCommitOffsetsAsync(long now) {
         }
     }
 
+    private boolean waitForPendingAsyncCommits(Timer timer) {

Review Comment:
   Hey @erikvanoosten - I took a closer look at your PR. I think your current approach is fine. 



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -600,6 +601,30 @@ public void testCoordinatorNotAvailable() {
         assertTrue(coordinator.coordinatorUnknown());
     }
 
+    @Test
+    public void testSyncCommitWithoutOffsetsAndPendingAsyncCommit() {

Review Comment:
   maybe `testEnsureCompletingAsyncCommitsWhenSyncCommitWithoutOffsets` ? I thought the original naming sounds a bit like firing sync commit without offset and async commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] erikvanoosten commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "erikvanoosten (via GitHub)" <gi...@apache.org>.
erikvanoosten commented on code in PR #13678:
URL: https://github.com/apache/kafka/pull/13678#discussion_r1198573071


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -1164,8 +1172,12 @@ public void onFailure(RuntimeException e) {
     public boolean commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets, Timer timer) {
         invokeCompletedOffsetCommitCallbacks();
 
-        if (offsets.isEmpty())
-            return true;
+        if (offsets.isEmpty()) {
+            // The KafkaConsumer API guarantees that the callbacks for all commitAsync()
+            // calls will be invoked when commitSync() returns. This should be true even

Review Comment:
   Sure: https://github.com/apache/kafka/pull/13678/commits/5f8bfc42e9791a97c01cecef334fccc88dd547f2



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13678:
URL: https://github.com/apache/kafka/pull/13678#discussion_r1190132986


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -1223,6 +1233,26 @@ public void maybeAutoCommitOffsetsAsync(long now) {
         }
     }
 
+    private boolean waitForPendingAsyncCommits(Timer timer) {

Review Comment:
   or maybe 
   ```
   if (offset.isEmpty() && inflightCommit.get() == 0) return true;
   ... do the rest
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] erikvanoosten commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "erikvanoosten (via GitHub)" <gi...@apache.org>.
erikvanoosten commented on code in PR #13678:
URL: https://github.com/apache/kafka/pull/13678#discussion_r1192944903


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -1223,6 +1233,26 @@ public void maybeAutoCommitOffsetsAsync(long now) {
         }
     }
 
+    private boolean waitForPendingAsyncCommits(Timer timer) {

Review Comment:
   > so I think you just need to try to send the async commit after this check.
   
   I thought that the async commits would already have been send and that we only need to somehow communicate with the broker to see if they have been completed (and if so, call the callbacks). Is that assumption wrong?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13678:
URL: https://github.com/apache/kafka/pull/13678#discussion_r1190765038


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -186,6 +187,7 @@ public ConsumerCoordinator(GroupRebalanceConfig rebalanceConfig,
         this.completedOffsetCommits = new ConcurrentLinkedQueue<>();
         this.sensors = new ConsumerCoordinatorMetrics(metrics, metricGrpPrefix);
         this.interceptors = interceptors;
+        this.inFlightAsyncCommits = new AtomicInteger();
         this.pendingAsyncCommits = new AtomicInteger();

Review Comment:
   @philipnee @kirktrue @erikvanoosten I was looking at how we use `pendingAsyncCommits` in `close()`. My understanding is that we use it to wait in close until all the in-flight commit requests are done to ensure that their callbacks are called. Knowing this, isn't it weird that we decrement `pendingAsyncCommits` when we resolve the coordinator and not when the response of the offset commit is received? I may be missing something but this is not clear to me.
   
   I was looking into this because I was hoping that we could consolidate `inFlightAsyncCommits` and `pendingAsyncCommits`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] erikvanoosten commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "erikvanoosten (via GitHub)" <gi...@apache.org>.
erikvanoosten commented on code in PR #13678:
URL: https://github.com/apache/kafka/pull/13678#discussion_r1190789869


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -1223,6 +1233,26 @@ public void maybeAutoCommitOffsetsAsync(long now) {
         }
     }
 
+    private boolean waitForPendingAsyncCommits(Timer timer) {
+        if (inFlightAsyncCommits.get() == 0) {
+            return true;
+        }
+
+        do {
+            ensureCoordinatorReady(timer);

Review Comment:
   See this comment: https://github.com/apache/kafka/pull/13678#discussion_r1190730213



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] erikvanoosten commented on pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "erikvanoosten (via GitHub)" <gi...@apache.org>.
erikvanoosten commented on PR #13678:
URL: https://github.com/apache/kafka/pull/13678#issuecomment-1580081660

   Commits were squashed. No further changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] erikvanoosten commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "erikvanoosten (via GitHub)" <gi...@apache.org>.
erikvanoosten commented on code in PR #13678:
URL: https://github.com/apache/kafka/pull/13678#discussion_r1214314299


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -105,6 +105,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
     private final boolean autoCommitEnabled;
     private final int autoCommitIntervalMs;
     private final ConsumerInterceptors<?, ?> interceptors;
+    // package private for testing
+    final AtomicInteger inFlightAsyncCommits;
     private final AtomicInteger pendingAsyncCommits;

Review Comment:
   I took the description of pendingAyncCommits from your comment https://github.com/apache/kafka/pull/13678#discussion_r1191927989, I hope I did that correctly.
   
   Changed in: https://github.com/apache/kafka/pull/13678/commits/2bfedf0ec27f9d386d9cddb2ba24d53e533ab51f



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] erikvanoosten commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "erikvanoosten (via GitHub)" <gi...@apache.org>.
erikvanoosten commented on code in PR #13678:
URL: https://github.com/apache/kafka/pull/13678#discussion_r1191595197


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -1223,6 +1233,26 @@ public void maybeAutoCommitOffsetsAsync(long now) {
         }
     }
 
+    private boolean waitForPendingAsyncCommits(Timer timer) {

Review Comment:
   I am struggling, so many words I do not know the meaning of. Because of that I do not understand your suggestion. What is a coordinator, what does it mean to be connected? Why do you want to send async commits from the commitSync method?
   
   > could you elaborate logic will spread around and understandability will suffer a lot
   For example, `sendOffsetCommitRequest` does nothing when it finds an empty offsets (it returns early). So that method has to be changed. I stopped there because I do not understand what this method does (definitely more than sending requests 😄). At first I though we may need to check the inflighAsyncCommits there, but perhaps we only have to remove the empty check on top of `sendOffsetCommitRequest`.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -1223,6 +1233,26 @@ public void maybeAutoCommitOffsetsAsync(long now) {
         }
     }
 
+    private boolean waitForPendingAsyncCommits(Timer timer) {

Review Comment:
   I am struggling, so many words I do not know the meaning of. Because of that I do not understand your suggestion. What is a coordinator, what does it mean to be connected? Why do you want to send async commits from the commitSync method?
   
   > could you elaborate logic will spread around and understandability will suffer a lot
   
   For example, `sendOffsetCommitRequest` does nothing when it finds an empty offsets (it returns early). So that method has to be changed. I stopped there because I do not understand what this method does (definitely more than sending requests 😄). At first I though we may need to check the inflighAsyncCommits there, but perhaps we only have to remove the empty check on top of `sendOffsetCommitRequest`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] erikvanoosten commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "erikvanoosten (via GitHub)" <gi...@apache.org>.
erikvanoosten commented on code in PR #13678:
URL: https://github.com/apache/kafka/pull/13678#discussion_r1192944987


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -1223,6 +1233,26 @@ public void maybeAutoCommitOffsetsAsync(long now) {
         }
     }
 
+    private boolean waitForPendingAsyncCommits(Timer timer) {

Review Comment:
   > The main concern me and David have is managing 2 atomic int, because at certain point one might forget to update one of it and causes some weird bug.
   
   Yes, I share that concern. I do not have a good solution though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on PR #13678:
URL: https://github.com/apache/kafka/pull/13678#issuecomment-1542502382

   @showuon - do you have a cycle to take a look at this issue?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on PR #13678:
URL: https://github.com/apache/kafka/pull/13678#issuecomment-1543506521

   Another small weirdness is that when `commitAsync` is called with an empty map, its callback is executed before the ones which are still in-flight. This beaks the contract of the method: `Corresponding commit callbacks are also invoked in the same order.`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13678:
URL: https://github.com/apache/kafka/pull/13678#discussion_r1197989343


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -1164,8 +1172,12 @@ public void onFailure(RuntimeException e) {
     public boolean commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets, Timer timer) {
         invokeCompletedOffsetCommitCallbacks();
 
-        if (offsets.isEmpty())
-            return true;
+        if (offsets.isEmpty()) {
+            // The KafkaConsumer API guarantees that the callbacks for all commitAsync()
+            // calls will be invoked when commitSync() returns. This should be true even

Review Comment:
   WDYT: `We guarantee that the callbacks for all commitAsync() will be invoked when commitSync() completes, even if the user tries to commit empty offsets`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] erikvanoosten commented on pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "erikvanoosten (via GitHub)" <gi...@apache.org>.
erikvanoosten commented on PR #13678:
URL: https://github.com/apache/kafka/pull/13678#issuecomment-1537107938

   The failing tests are not related to this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] erikvanoosten commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "erikvanoosten (via GitHub)" <gi...@apache.org>.
erikvanoosten commented on code in PR #13678:
URL: https://github.com/apache/kafka/pull/13678#discussion_r1190793655


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -1132,6 +1136,7 @@ public void onSuccess(Void value) {
                 if (interceptors != null)
                     interceptors.onCommit(offsets);
                 completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, null));
+                inFlightAsyncCommits.decrementAndGet();

Review Comment:
   Yes, who knows what `interceptors.onCommit` will do. Fixed in 1fa48f53f0e6f5a2a9821075fa053e01cba6b0b2.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] erikvanoosten commented on pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "erikvanoosten (via GitHub)" <gi...@apache.org>.
erikvanoosten commented on PR #13678:
URL: https://github.com/apache/kafka/pull/13678#issuecomment-1541455680

   @philipnee Did you get a chance to look at this PR already?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13678:
URL: https://github.com/apache/kafka/pull/13678#discussion_r1190104779


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -1223,6 +1233,26 @@ public void maybeAutoCommitOffsetsAsync(long now) {
         }
     }
 
+    private boolean waitForPendingAsyncCommits(Timer timer) {

Review Comment:
   can we name this: `invokePendingAsyncCommits` ?
   so that we could do:
   ```
   if (!invokePendingAsyncCommits(timer) { return false; }
   equestFuture<Void> future = sendOffsetCommitRequest(...)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] erikvanoosten commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "erikvanoosten (via GitHub)" <gi...@apache.org>.
erikvanoosten commented on code in PR #13678:
URL: https://github.com/apache/kafka/pull/13678#discussion_r1190794344


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -1145,6 +1150,7 @@ public void onFailure(RuntimeException e) {
                 if (commitException instanceof FencedInstanceIdException) {
                     asyncCommitFenced.set(true);
                 }
+                inFlightAsyncCommits.decrementAndGet();

Review Comment:
   I think it is not needed here. There is nothing that could fail with an exception.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] erikvanoosten commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "erikvanoosten (via GitHub)" <gi...@apache.org>.
erikvanoosten commented on code in PR #13678:
URL: https://github.com/apache/kafka/pull/13678#discussion_r1197499238


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -668,6 +695,7 @@ public void onFailure(RuntimeException e, RequestFuture<Object> future) {
         coordinator.markCoordinatorUnknown("test cause");
         consumerClient.pollNoWakeup();
         assertTrue(asyncCallbackInvoked.get());
+        assertEquals(coordinator.inFlightAsyncCommits.get(), 0);

Review Comment:
   You mean add assertion `assertEquals(coordinator.inFlightAsyncCommits.get(), 0);` in the AfterEach? Hmm, how could that fail that test? Test `testCommitAsyncWithUserAssignedType` already ends with that assertion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] erikvanoosten commented on pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "erikvanoosten (via GitHub)" <gi...@apache.org>.
erikvanoosten commented on PR #13678:
URL: https://github.com/apache/kafka/pull/13678#issuecomment-1574996923

   Okay! @dajac if you're fine with https://github.com/apache/kafka/pull/13678/commits/2bfedf0ec27f9d386d9cddb2ba24d53e533ab51f, this can be merged! 🥳 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac merged pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac merged PR #13678:
URL: https://github.com/apache/kafka/pull/13678


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] kirktrue commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "kirktrue (via GitHub)" <gi...@apache.org>.
kirktrue commented on code in PR #13678:
URL: https://github.com/apache/kafka/pull/13678#discussion_r1190226530


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -1223,6 +1233,26 @@ public void maybeAutoCommitOffsetsAsync(long now) {
         }
     }
 
+    private boolean waitForPendingAsyncCommits(Timer timer) {

Review Comment:
   The code appears to check `inFlightAsyncCommits` only in the case where the incoming `offsets` map is empty. Wouldn't we want to always check that?
   
   @philipnee's suggestion makes sense to me:
   
   ```java
   if (offsets.isEmpty() && inflightCommit.get() == 0)
       return true;
   ```
   
   `sendOffsetCommitRequest` already handles the case where the incoming map is empty, so perhaps that could be left as is?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] erikvanoosten commented on pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "erikvanoosten (via GitHub)" <gi...@apache.org>.
erikvanoosten commented on PR #13678:
URL: https://github.com/apache/kafka/pull/13678#issuecomment-1556197144

   @philipnee Shall we merge?
   
   The failing tests are not related to this PR.
   I can confirm that the last changes are still performing as expected for my use-case (tested locally).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13678:
URL: https://github.com/apache/kafka/pull/13678#discussion_r1191927989


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -186,6 +187,7 @@ public ConsumerCoordinator(GroupRebalanceConfig rebalanceConfig,
         this.completedOffsetCommits = new ConcurrentLinkedQueue<>();
         this.sensors = new ConsumerCoordinatorMetrics(metrics, metricGrpPrefix);
         this.interceptors = interceptors;
+        this.inFlightAsyncCommits = new AtomicInteger();
         this.pendingAsyncCommits = new AtomicInteger();

Review Comment:
   Okay. I understand this a bit better now. `pendingAsyncCommits` is really only meant to track inflight coordinator lookups. Then, in `close`, we wait on those to complete before calling `close` of the super class which waits on all the inflight requests to complete.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on PR #13678:
URL: https://github.com/apache/kafka/pull/13678#issuecomment-1542476279

   Hey @erikvanoosten, Thanks for the PR!  Could you add checks for inflightCommits count gets set to 0 in a few of the callback testing function like `testAsyncCommitCallbacksInvokedPriorToSyncCommitCompletion` ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] erikvanoosten commented on pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "erikvanoosten (via GitHub)" <gi...@apache.org>.
erikvanoosten commented on PR #13678:
URL: https://github.com/apache/kafka/pull/13678#issuecomment-1543538068

   > Hey @erikvanoosten, Thanks for the PR! Could you add checks for inflightCommits count gets set to 0 in a few of the callback testing function like `testAsyncCommitCallbacksInvokedPriorToSyncCommitCompletion` ?
   
   I looked for all invocations of commitAsync and added additional asserts in 1fa48f53f0e6f5a2a9821075fa053e01cba6b0b2.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13678:
URL: https://github.com/apache/kafka/pull/13678#discussion_r1191665112


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -1223,6 +1233,26 @@ public void maybeAutoCommitOffsetsAsync(long now) {
         }
     }
 
+    private boolean waitForPendingAsyncCommits(Timer timer) {

Review Comment:
   the sendOffsetCommitRequest method basically 1. if the offset is empty, complete the future and return immediately, 2. if there's no coordinator `checkAndGetCoordinator` then complete the future with exception (like completeExceptionally) and return, otherwise 3. create the request data and send it `client.send(coordinator, builder).compose(new OffsetCommitResponseHandler(offsets, generation));` 
   
   What I'm suggesting is not to change that method, but to change the `commitOffsetsSync`.  My suggestions are:
   
   Currently, it returns immediately if the offset is empty. But we don't want that, because we also want to check if there's any inflightAsyncCommits. 
   
   Now, if the we can't return immediately, we will need to send these commits, and the requirement is to `coordinatorUnknownAndUnreadySync`.  This check is already in place, so I think you just need to try to send the async commit after this check.
   
   So there's not very much code change there :) 
   
   The main concern me and David have is managing 2 atomic int, because at certain point one might forget to update one of it and causes some weird bug.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13678:
URL: https://github.com/apache/kafka/pull/13678#discussion_r1191387973


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -1223,6 +1233,26 @@ public void maybeAutoCommitOffsetsAsync(long now) {
         }
     }
 
+    private boolean waitForPendingAsyncCommits(Timer timer) {

Review Comment:
   could you elaborate `logic will spread around and understandability will suffer a lot` : I was suggesting to first check if there's any inflight commits, then wait for the coordinator to be connected.  After the connection is established, you can try to send the async commits.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13678:
URL: https://github.com/apache/kafka/pull/13678#discussion_r1190097766


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -1223,6 +1233,26 @@ public void maybeAutoCommitOffsetsAsync(long now) {
         }
     }
 
+    private boolean waitForPendingAsyncCommits(Timer timer) {
+        if (inFlightAsyncCommits.get() == 0) {
+            return true;
+        }
+
+        do {
+            ensureCoordinatorReady(timer);

Review Comment:
   commitOffsetSync is already attempting to connect to the coordinator, so it might make more sense to remove this, and move the` if (offset.isEmpty()) {...} `  below `coordinatorUnknownAndUnreadySync`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] erikvanoosten commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "erikvanoosten (via GitHub)" <gi...@apache.org>.
erikvanoosten commented on code in PR #13678:
URL: https://github.com/apache/kafka/pull/13678#discussion_r1190730213


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -1223,6 +1233,26 @@ public void maybeAutoCommitOffsetsAsync(long now) {
         }
     }
 
+    private boolean waitForPendingAsyncCommits(Timer timer) {

Review Comment:
   I tried to apply @philipnee's suggestion but it will be very tricky; logic will spread around and understandability will suffer a lot. Method commitSync must either return false, or wait until the pending async commits have been handled. I discovered that the test covers this aspect well. When we use the if statement and then use the rest of the code as is, it will return true (because we gave it an empty offsets) even though there are still inflight async commits. To fix this, more logic is needed. How to do this exactly is beyond me at the moment. I would prefer to not change this aspect.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on PR #13678:
URL: https://github.com/apache/kafka/pull/13678#issuecomment-1551778364

   @erikvanoosten - Thanks for following up on this PR, I think we are really closed here.  Also apologize about the misleading comment.  I left a few comments above.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] erikvanoosten commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "erikvanoosten (via GitHub)" <gi...@apache.org>.
erikvanoosten commented on code in PR #13678:
URL: https://github.com/apache/kafka/pull/13678#discussion_r1197512217


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -1075,6 +1078,7 @@ void invokeCompletedOffsetCommitCallbacks() {
     public RequestFuture<Void> commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
         invokeCompletedOffsetCommitCallbacks();
 
+        inFlightAsyncCommits.incrementAndGet();

Review Comment:
   @philipnee I rebased the branch on trunk so the line numbers changed a bit. Instead of line 1142, Is it now line 1118 (in the onFailure block inside `commitOffsetsAsync`)?
   
   I made changes based on your idea. I think we should keep the increment in `doCommitOffsetsAsync` because that is also where the decrement is. `inFlightAsyncCommits` is now only updated in that method. WDYT?
   
   Also, instead of using a try/finally, I moved the decrement to the top of the callback method.
   
   This is committed in https://github.com/apache/kafka/pull/13678/commits/f8ce359de5e0a7921bfac26da39feb2af11a9f09.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] erikvanoosten commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "erikvanoosten (via GitHub)" <gi...@apache.org>.
erikvanoosten commented on code in PR #13678:
URL: https://github.com/apache/kafka/pull/13678#discussion_r1197512217


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -1075,6 +1078,7 @@ void invokeCompletedOffsetCommitCallbacks() {
     public RequestFuture<Void> commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
         invokeCompletedOffsetCommitCallbacks();
 
+        inFlightAsyncCommits.incrementAndGet();

Review Comment:
   @philipnee I rebased the branch on trunk so the line numbers changed a bit. Instead of line 1142, Is it now line 1118 (in the onFailure block inside `commitOffsetsAsync`)?
   
   I made changes based on your idea. I think we should keep the increment in `doCommitOffsetsAsync` because that is also where the decrement is. WDYT?
   
   Also, instead of using a try/finally, I moved the decrement to the top of the callback method.
   
   This is committed in https://github.com/apache/kafka/pull/13678/commits/f8ce359de5e0a7921bfac26da39feb2af11a9f09.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] erikvanoosten commented on a diff in pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "erikvanoosten (via GitHub)" <gi...@apache.org>.
erikvanoosten commented on code in PR #13678:
URL: https://github.com/apache/kafka/pull/13678#discussion_r1197514992


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -668,6 +695,7 @@ public void onFailure(RuntimeException e, RequestFuture<Object> future) {
         coordinator.markCoordinatorUnknown("test cause");
         consumerClient.pollNoWakeup();
         assertTrue(asyncCallbackInvoked.get());
+        assertEquals(coordinator.inFlightAsyncCommits.get(), 0);

Review Comment:
   I see that you tested this in combination with the comment from next thread. And indeed, I see the same thing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] erikvanoosten commented on pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "erikvanoosten (via GitHub)" <gi...@apache.org>.
erikvanoosten commented on PR #13678:
URL: https://github.com/apache/kafka/pull/13678#issuecomment-1574055215

   What is the process now? Who will press the merge button?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] erikvanoosten commented on pull request #13678: KAFKA-10337: await async commits in commitSync even if no offsets given

Posted by "erikvanoosten (via GitHub)" <gi...@apache.org>.
erikvanoosten commented on PR #13678:
URL: https://github.com/apache/kafka/pull/13678#issuecomment-1581020841

   > @erikvanoosten It seems that you don't have an account for jira so I can't assign the ticket to you. You should request one.
   
   I do have an account. My Jira userid is erikvanoosten.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org