You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "machi1990 (via GitHub)" <gi...@apache.org> on 2023/05/03 11:54:13 UTC

[GitHub] [kafka] machi1990 opened a new pull request, #13665: KAFKA-12485: Speed up Consumer#committed by returning cached offsets for owned partitions

machi1990 opened a new pull request, #13665:
URL: https://github.com/apache/kafka/pull/13665

   Opening as a WIP as I need to look on adding more tests. I'll promote this once that's done.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] machi1990 commented on a diff in pull request #13665: KAFKA-12485: Speed up Consumer#committed by returning cached offsets for owned partitions

Posted by "machi1990 (via GitHub)" <gi...@apache.org>.
machi1990 commented on code in PR #13665:
URL: https://github.com/apache/kafka/pull/13665#discussion_r1198622895


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -1007,16 +1047,33 @@ public Map<TopicPartition, OffsetAndMetadata> fetchCommittedOffsets(final Set<To
             if (pendingCommittedOffsetRequest != null) {
                 future = pendingCommittedOffsetRequest.response;
             } else {
-                future = sendOffsetFetchRequest(partitions);
-                pendingCommittedOffsetRequest = new PendingCommittedOffsetRequest(partitions, generationForOffsetRequest, future);
+                future = sendOffsetFetchRequest(nonCachedPartitions);
+                pendingCommittedOffsetRequest = new PendingCommittedOffsetRequest(nonCachedPartitions, generationForOffsetRequest, future);
             }
             client.poll(future, timer);
 
             if (future.isDone()) {
                 pendingCommittedOffsetRequest = null;
 
                 if (future.succeeded()) {
-                    return future.value();
+                    Map<TopicPartition, OffsetAndMetadata> freshOffsets = future.value();
+
+                    // update cache for assigned partitions that are not cached yet
+                    for (TopicPartition nonCachedAssignedPartition: nonCachedAssignedPartitions) {
+                        if (!this.subscriptions.isAssigned(nonCachedAssignedPartition)) {
+                            // it is possible that the topic is no longer assigned when the response is received,
+                            // in this case we do not update the cache with the fresh value
+                            continue;
+                        }
+
+                        OffsetAndMetadata offset = freshOffsets.get(nonCachedAssignedPartition);
+                        if (offset != null) { // it is possible that the offset and metadata were not fetched
+                            this.partitionOffsetsCache.put(nonCachedAssignedPartition, offset);

Review Comment:
   Thank for you for this, I get the gist of what you are saying and it makes sense. I'll go have a look at this again and re-visit it with that new understanding or I'll come back with additional questions if I need of more understanding. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] machi1990 commented on a diff in pull request #13665: KAFKA-12485: Speed up Consumer#committed by returning cached offsets for owned partitions

Posted by "machi1990 (via GitHub)" <gi...@apache.org>.
machi1990 commented on code in PR #13665:
URL: https://github.com/apache/kafka/pull/13665#discussion_r1230620093


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -3114,6 +3137,70 @@ public void testFetchCommittedOffsets() {
         assertEquals(new OffsetAndMetadata(offset, leaderEpoch, metadata), fetchedOffsets.get(t1p));
     }
 
+    @Test
+    public void testPopulatingOffsetCacheForAssignedPartition() {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        Map<TopicPartition, OffsetAndMetadata> committedOffsetsCache = coordinator.committedOffsetsCache();
+        // committedOffsetsCache should be empty
+        assertEquals(committedOffsetsCache.size(), 0);
+
+        long offset = 500L;
+        String metadata = "blahblah";
+        Optional<Integer> leaderEpoch = Optional.of(15);
+        OffsetFetchResponse.PartitionData data = new OffsetFetchResponse.PartitionData(offset, leaderEpoch,
+                metadata, Errors.NONE);
+
+        client.prepareResponse(offsetFetchResponse(Errors.NONE, singletonMap(t1p, data)));
+        subscriptions.assignFromUser(singleton(t1p));
+        Map<TopicPartition, OffsetAndMetadata> fetchedOffsets = coordinator.fetchCommittedOffsets(singleton(t1p),
+                time.timer(Long.MAX_VALUE));
+
+        assertNotNull(fetchedOffsets);
+        OffsetAndMetadata expected = new OffsetAndMetadata(offset, leaderEpoch, metadata);
+        assertEquals(expected, fetchedOffsets.get(t1p));
+
+        // check committedOffsetsCache is populated
+        assertEquals(committedOffsetsCache.size(), 1);
+        assertEquals(expected, committedOffsetsCache.get(t1p));
+    }
+
+    @Test
+    public void testReturningCachedOffsetForAssignedPartition() {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        Map<TopicPartition, OffsetAndMetadata> committedOffsetsCache = coordinator.committedOffsetsCache();
+
+        long offset = 500L;
+        String metadata = "blahblah";
+        Optional<Integer> leaderEpoch = Optional.of(15);
+        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(offset, leaderEpoch, metadata);
+        committedOffsetsCache.put(t1p, offsetAndMetadata);

Review Comment:
   Yes, actually this test case already somehow covers what's tested in the above! I'll merge the two cases. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on pull request #13665: KAFKA-12485: Speed up Consumer#committed by returning cached offsets for owned partitions

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on PR #13665:
URL: https://github.com/apache/kafka/pull/13665#issuecomment-1596379448

   Re-triggering CI build


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] machi1990 commented on a diff in pull request #13665: KAFKA-12485: Speed up Consumer#committed by returning cached offsets for owned partitions

Posted by "machi1990 (via GitHub)" <gi...@apache.org>.
machi1990 commented on code in PR #13665:
URL: https://github.com/apache/kafka/pull/13665#discussion_r1198690005


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -1007,16 +1047,33 @@ public Map<TopicPartition, OffsetAndMetadata> fetchCommittedOffsets(final Set<To
             if (pendingCommittedOffsetRequest != null) {
                 future = pendingCommittedOffsetRequest.response;
             } else {
-                future = sendOffsetFetchRequest(partitions);
-                pendingCommittedOffsetRequest = new PendingCommittedOffsetRequest(partitions, generationForOffsetRequest, future);
+                future = sendOffsetFetchRequest(nonCachedPartitions);
+                pendingCommittedOffsetRequest = new PendingCommittedOffsetRequest(nonCachedPartitions, generationForOffsetRequest, future);
             }
             client.poll(future, timer);
 
             if (future.isDone()) {
                 pendingCommittedOffsetRequest = null;
 
                 if (future.succeeded()) {
-                    return future.value();
+                    Map<TopicPartition, OffsetAndMetadata> freshOffsets = future.value();
+
+                    // update cache for assigned partitions that are not cached yet
+                    for (TopicPartition nonCachedAssignedPartition: nonCachedAssignedPartitions) {
+                        if (!this.subscriptions.isAssigned(nonCachedAssignedPartition)) {
+                            // it is possible that the topic is no longer assigned when the response is received,
+                            // in this case we do not update the cache with the fresh value
+                            continue;
+                        }
+
+                        OffsetAndMetadata offset = freshOffsets.get(nonCachedAssignedPartition);
+                        if (offset != null) { // it is possible that the offset and metadata were not fetched
+                            this.partitionOffsetsCache.put(nonCachedAssignedPartition, offset);

Review Comment:
   Hi @showuon I forgot to mention or call your attention on this line https://github.com/apache/kafka/blob/401fb417bf60864e6d380f979d268d895c5ad727/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1422 which already updates the cache. Is that what you've in mind?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on pull request #13665: KAFKA-12485: Speed up Consumer#committed by returning cached offsets for owned partitions

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on PR #13665:
URL: https://github.com/apache/kafka/pull/13665#issuecomment-1600007864

   > offsets commits are happening somewhere else and that's via the producer.sendOffsetsToTransaction(..) e.g
   
   Oh, EOS case! I didn't consider it, sorry! 
   Hmm... if there is EOS case to consider, the original cache mechanism will not work since the offset commit is not via consumer, the consumer has no idea which offset has committed. 
   I think we should close this PR and JIRA ticket as "invalid" and add comment into the JIRA ticket. WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] machi1990 commented on pull request #13665: KAFKA-12485: Speed up Consumer#committed by returning cached offsets for owned partitions

Posted by "machi1990 (via GitHub)" <gi...@apache.org>.
machi1990 commented on PR #13665:
URL: https://github.com/apache/kafka/pull/13665#issuecomment-1592934743

   Thanks @showuon for your review on this. I've addressed the comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] machi1990 commented on a diff in pull request #13665: KAFKA-12485: Speed up Consumer#committed by returning cached offsets for owned partitions

Posted by "machi1990 (via GitHub)" <gi...@apache.org>.
machi1990 commented on code in PR #13665:
URL: https://github.com/apache/kafka/pull/13665#discussion_r1198620869


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -123,6 +124,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
 
     // hold onto request&future for committed offset requests to enable async calls.
     private PendingCommittedOffsetRequest pendingCommittedOffsetRequest = null;
+    // holds the offset metadata for assigned partitions to reduce remote calls thus speeding up fetching partition metadata
+    private final Map<TopicPartition, OffsetAndMetadata> partitionOffsetsCache;

Review Comment:
   good suggestion, I'll do so.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] machi1990 commented on a diff in pull request #13665: KAFKA-12485: Speed up Consumer#committed by returning cached offsets for owned partitions

Posted by "machi1990 (via GitHub)" <gi...@apache.org>.
machi1990 commented on code in PR #13665:
URL: https://github.com/apache/kafka/pull/13665#discussion_r1198624271


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -346,6 +355,10 @@ private Exception invokePartitionsRevoked(final SortedSet<TopicPartition> revoke
             final long startMs = time.milliseconds();
             listener.onPartitionsRevoked(revokedPartitions);
             sensors.revokeCallbackSensor.record(time.milliseconds() - startMs);
+            // remove the offset metadata cache for revoked partitions
+            for (TopicPartition revokedPartition: revokedPartitions) {
+                this.partitionOffsetsCache.remove(revokedPartition);
+            }

Review Comment:
   If I understood correctly, this part is not needed at all?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] machi1990 commented on pull request #13665: KAFKA-12485: Speed up Consumer#committed by returning cached offsets for owned partitions

Posted by "machi1990 (via GitHub)" <gi...@apache.org>.
machi1990 commented on PR #13665:
URL: https://github.com/apache/kafka/pull/13665#issuecomment-1596813632

   > @machi1990 , looks like this change breaks some tests. Could you take a look?
   > 
   > https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13665/12
   
   Thank you @showuon I am re-running the whole test suite on my machine again. I'll report back what I find. In the meanwhile, let me know if you prefer me to mark this PR as draft? I am okay either way.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] machi1990 commented on pull request #13665: KAFKA-12485: Speed up Consumer#committed by returning cached offsets for owned partitions

Posted by "machi1990 (via GitHub)" <gi...@apache.org>.
machi1990 commented on PR #13665:
URL: https://github.com/apache/kafka/pull/13665#issuecomment-1592771368

   > @machi1990 , thanks for the update, left some more comments.
   
   Thanks for the thorough review @showuon I've addressed all the comments. Please have another look when you've some time, thanks. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] machi1990 commented on pull request #13665: KAFKA-12485: Speed up Consumer#committed by returning cached offsets for owned partitions

Posted by "machi1990 (via GitHub)" <gi...@apache.org>.
machi1990 commented on PR #13665:
URL: https://github.com/apache/kafka/pull/13665#issuecomment-1542514663

   @ableegoldman @showuon can you've a look at this draft PR once you've sometime? Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] machi1990 commented on a diff in pull request #13665: KAFKA-12485: Speed up Consumer#committed by returning cached offsets for owned partitions

Posted by "machi1990 (via GitHub)" <gi...@apache.org>.
machi1990 commented on code in PR #13665:
URL: https://github.com/apache/kafka/pull/13665#discussion_r1230786333


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -351,6 +359,10 @@ private Exception invokePartitionsRevoked(final SortedSet<TopicPartition> revoke
             final long startMs = time.milliseconds();
             listener.onPartitionsRevoked(revokedPartitions);
             sensors.revokeCallbackSensor.record(time.milliseconds() - startMs);
+            // remove the offset metadata cache for revoked partitions
+            for (TopicPartition revokedPartition: revokedPartitions) {
+                this.committedTopicPartitionOffsetsCache.remove(revokedPartition);
+            }

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] machi1990 commented on a diff in pull request #13665: KAFKA-12485: Speed up Consumer#committed by returning cached offsets for owned partitions

Posted by "machi1990 (via GitHub)" <gi...@apache.org>.
machi1990 commented on code in PR #13665:
URL: https://github.com/apache/kafka/pull/13665#discussion_r1230936981


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -3114,6 +3149,54 @@ public void testFetchCommittedOffsets() {
         assertEquals(new OffsetAndMetadata(offset, leaderEpoch, metadata), fetchedOffsets.get(t1p));
     }
 
+    @Test
+    public void testPopulatingOffsetCacheForAssignedPartition() {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        Map<TopicPartition, OffsetAndMetadata> committedOffsetsCache = coordinator.committedOffsetsCache();
+        // committedOffsetsCache should be empty
+        assertTrue(committedOffsetsCache.isEmpty());
+
+        long offset = 500L;
+        String metadata = "blahblah";
+        Optional<Integer> leaderEpoch = Optional.of(15);
+        OffsetFetchResponse.PartitionData data = new OffsetFetchResponse.PartitionData(offset, leaderEpoch,
+                metadata, Errors.NONE);
+
+        client.prepareResponse(offsetFetchResponse(Errors.NONE, singletonMap(t1p, data)));
+        subscriptions.assignFromUser(singleton(t1p));
+        Map<TopicPartition, OffsetAndMetadata> fetchedOffsets = coordinator.fetchCommittedOffsets(singleton(t1p),
+                time.timer(Long.MAX_VALUE));
+
+        assertNotNull(fetchedOffsets);
+        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(offset, leaderEpoch, metadata);
+        assertEquals(offsetAndMetadata, fetchedOffsets.get(t1p));
+
+        // check committedOffsetsCache is populated
+        assertEquals( 1, committedOffsetsCache.size());
+        assertEquals(offsetAndMetadata, committedOffsetsCache.get(t1p));
+
+        // fetch again with t1p + t2p, but will send fetch for t2p since t1p is in cache
+        long offsetPartition2 = 50L;
+        String metadataPartition2 = "foobar";
+        Optional<Integer> leaderEpochPartition2 = Optional.of(19909);
+        data = new OffsetFetchResponse.PartitionData(offsetPartition2, leaderEpochPartition2,
+                metadataPartition2, Errors.NONE);
+        client.prepareResponse(offsetFetchResponse(Errors.NONE, singletonMap(t2p, data)));
+
+        fetchedOffsets = coordinator.fetchCommittedOffsets(new HashSet<>(Arrays.asList(t1p, t2p)), time.timer(Long.MAX_VALUE));
+
+        assertNotNull(fetchedOffsets);
+
+        assertEquals(2, fetchedOffsets.size()); // tp1 and tp2 should be returned with tp1 coming from cache
+        assertEquals( 1, committedOffsetsCache.size()); // cache size is still 1 since only tp1 is an owned partition

Review Comment:
   Typo in IDE. Good catch, should be corrected now! Thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] machi1990 commented on pull request #13665: KAFKA-12485: Speed up Consumer#committed by returning cached offsets for owned partitions

Posted by "machi1990 (via GitHub)" <gi...@apache.org>.
machi1990 commented on PR #13665:
URL: https://github.com/apache/kafka/pull/13665#issuecomment-1597260242

   @showuon I was looking onto this and after several local runs, I managed to eliminate some flasky test and came up with the list of failures that are only caused by this change. The failure total number of failures that I've seen locally are:
   ```
           kafka.api.TransactionsBounceTest.testWithGroupId()
           kafka.api.TransactionsBounceTest.testWithGroupMetadata()
           kafka.api.TransactionsTest.testSendOffsetsWithGroupId(String)[1]
           kafka.api.TransactionsTest.testSendOffsetsWithGroupId(String)[2]
           kafka.api.TransactionsTest.testSendOffsetsWithGroupMetadata(String)[1]
           kafka.api.TransactionsTest.testSendOffsetsWithGroupMetadata(String)[2]
           kafka.server.ClientQuotasRequestTest.testAlterClientQuotasBadIp()[1]
           kafka.server.ClientQuotasRequestTest.testAlterClientQuotasBadIp()[2]
           kafka.server.ClientQuotasRequestTest.testAlterClientQuotasBadIp()[3]
           kafka.server.DynamicConfigChangeUnitTest.testIpHandlerUnresolvableAddress()
           kafka.zk.ZkMigrationIntegrationTest.testNewAndChangedTopicsInDualWrite(ClusterInstance)[1]
           kafka.admin.ConfigCommandTest.shouldFailIfInvalidHost()
   ```
   And only the `TransactionsTest` and `TransactionsBounceTest` are the ones that I've identified to be related to this PR.
   I've started to investigate these it so far my conclusion is that the failure there are related to reading of stale cache values because the cache item is stored only once when fetching the offset in [1]. The test neither calls `commitSync`, nor `commitAsync` which means that the cache is never updated in [2] after initially set in [1]. 
   
   I was thinking of dropping off cache update when fetching committed offsets i.e in [1]  and only perform cache update when during offset commit [2] 
   
   1. https://github.com/apache/kafka/blob/76d25c94e2c8723eec31a3df64c752bc66c79b34/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1075
   2. https://github.com/apache/kafka/blob/76d25c94e2c8723eec31a3df64c752bc66c79b34/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1456
   
   That would align to the comment you raised in https://github.com/apache/kafka/pull/13665#discussion_r1197621674 
   Let me know what you think. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] machi1990 commented on a diff in pull request #13665: KAFKA-12485: Speed up Consumer#committed by returning cached offsets for owned partitions

Posted by "machi1990 (via GitHub)" <gi...@apache.org>.
machi1990 commented on code in PR #13665:
URL: https://github.com/apache/kafka/pull/13665#discussion_r1198621580


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -222,6 +225,7 @@ public ConsumerCoordinator(GroupRebalanceConfig rebalanceConfig,
         }
 
         this.metadata.requestUpdate();
+        this.partitionOffsetsCache = new ConcurrentHashMap<>();

Review Comment:
   I'll double check this and move to using a simple HashMap if doable, thanks for calling it out!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] machi1990 commented on pull request #13665: KAFKA-12485: Speed up Consumer#committed by returning cached offsets for owned partitions

Posted by "machi1990 (via GitHub)" <gi...@apache.org>.
machi1990 commented on PR #13665:
URL: https://github.com/apache/kafka/pull/13665#issuecomment-1554132616

   Thank you for the review @showuon I'll address the comments by pushing code change in the next while. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #13665: KAFKA-12485: Speed up Consumer#committed by returning cached offsets for owned partitions

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on code in PR #13665:
URL: https://github.com/apache/kafka/pull/13665#discussion_r1230929414


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -3114,6 +3149,54 @@ public void testFetchCommittedOffsets() {
         assertEquals(new OffsetAndMetadata(offset, leaderEpoch, metadata), fetchedOffsets.get(t1p));
     }
 
+    @Test
+    public void testPopulatingOffsetCacheForAssignedPartition() {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        Map<TopicPartition, OffsetAndMetadata> committedOffsetsCache = coordinator.committedOffsetsCache();
+        // committedOffsetsCache should be empty
+        assertTrue(committedOffsetsCache.isEmpty());
+
+        long offset = 500L;
+        String metadata = "blahblah";
+        Optional<Integer> leaderEpoch = Optional.of(15);
+        OffsetFetchResponse.PartitionData data = new OffsetFetchResponse.PartitionData(offset, leaderEpoch,
+                metadata, Errors.NONE);
+
+        client.prepareResponse(offsetFetchResponse(Errors.NONE, singletonMap(t1p, data)));
+        subscriptions.assignFromUser(singleton(t1p));
+        Map<TopicPartition, OffsetAndMetadata> fetchedOffsets = coordinator.fetchCommittedOffsets(singleton(t1p),
+                time.timer(Long.MAX_VALUE));
+
+        assertNotNull(fetchedOffsets);
+        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(offset, leaderEpoch, metadata);
+        assertEquals(offsetAndMetadata, fetchedOffsets.get(t1p));
+
+        // check committedOffsetsCache is populated
+        assertEquals( 1, committedOffsetsCache.size());
+        assertEquals(offsetAndMetadata, committedOffsetsCache.get(t1p));
+
+        // fetch again with t1p + t2p, but will send fetch for t2p since t1p is in cache
+        long offsetPartition2 = 50L;
+        String metadataPartition2 = "foobar";
+        Optional<Integer> leaderEpochPartition2 = Optional.of(19909);
+        data = new OffsetFetchResponse.PartitionData(offsetPartition2, leaderEpochPartition2,
+                metadataPartition2, Errors.NONE);
+        client.prepareResponse(offsetFetchResponse(Errors.NONE, singletonMap(t2p, data)));
+
+        fetchedOffsets = coordinator.fetchCommittedOffsets(new HashSet<>(Arrays.asList(t1p, t2p)), time.timer(Long.MAX_VALUE));
+
+        assertNotNull(fetchedOffsets);
+
+        assertEquals(2, fetchedOffsets.size()); // tp1 and tp2 should be returned with tp1 coming from cache
+        assertEquals( 1, committedOffsetsCache.size()); // cache size is still 1 since only tp1 is an owned partition

Review Comment:
   nit: additional space in front of `1`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on pull request #13665: KAFKA-12485: Speed up Consumer#committed by returning cached offsets for owned partitions

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on PR #13665:
URL: https://github.com/apache/kafka/pull/13665#issuecomment-1598673535

   @machi1990 , could you explain more about this:
   > The test neither calls commitSync, nor commitAsync which means that the cache is never updated in [2] after initially set in [1].
   
   If it don't commit anything, then the expected committed value should be 0, right? And if the expected value are all greater than 0, there should be somewhere doing offset commit, right? It could be auto commit in the consumer side. (maybe?)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on pull request #13665: MINOR: Split ConsumerCoordinator#testCommitOffsetMetadata onto two test cases testing commitSync and commitAsync

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on PR #13665:
URL: https://github.com/apache/kafka/pull/13665#issuecomment-1605261735

   Failed tests are unrelated:
   ```
       Build / JDK 11 and Scala 2.13 / kafka.zk.ZkMigrationIntegrationTest.[1] Type=ZK, Name=testDualWriteScram, MetadataVersion=3.5-IV2, Security=PLAINTEXT
       Build / JDK 11 and Scala 2.13 / org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()
       Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest.testSyncTopicConfigs()
       Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testReplication()
       Build / JDK 8 and Scala 2.12 / kafka.zk.ZkMigrationIntegrationTest.[1] Type=ZK, Name=testNewAndChangedTopicsInDualWrite, MetadataVersion=3.4-IV0, Security=PLAINTEXT
       Build / JDK 8 and Scala 2.12 / org.apache.kafka.streams.integration.EosIntegrationTest.shouldWriteLatestOffsetsToCheckpointOnShutdown[at_least_once]
       Build / JDK 17 and Scala 2.13 / org.apache.kafka.common.network.SslTransportLayerTest.[1] tlsProtocol=TLSv1.2, useInlinePem=false
       Build / JDK 17 and Scala 2.13 / org.apache.kafka.common.network.SslTransportLayerTest.[3] tlsProtocol=TLSv1.3, useInlinePem=false
       Build / JDK 17 and Scala 2.13 / org.apache.kafka.common.network.SslVersionsTransportLayerTest.tlsServerProtocol = [TLSv1.3, TLSv1.2], tlsClientProtocol = [TLSv1.2, TLSv1.3]
       Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest.testRestartReplication()
       Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testOneWayReplicationWithAutoOffsetSync()
       Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.integration.ExampleConnectIntegrationTest.testSourceConnector
       Build / JDK 17 and Scala 2.13 / org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()
       Build / JDK 17 and Scala 2.13 / org.apache.kafka.controller.QuorumControllerTest.testBalancePartitionLeaders()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon merged pull request #13665: MINOR: Split ConsumerCoordinator#testCommitOffsetMetadata onto two test cases testing commitSync and commitAsync

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon merged PR #13665:
URL: https://github.com/apache/kafka/pull/13665


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] machi1990 commented on a diff in pull request #13665: KAFKA-12485: Speed up Consumer#committed by returning cached offsets for owned partitions

Posted by "machi1990 (via GitHub)" <gi...@apache.org>.
machi1990 commented on code in PR #13665:
URL: https://github.com/apache/kafka/pull/13665#discussion_r1230661381


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -351,6 +359,10 @@ private Exception invokePartitionsRevoked(final SortedSet<TopicPartition> revoke
             final long startMs = time.milliseconds();
             listener.onPartitionsRevoked(revokedPartitions);
             sensors.revokeCallbackSensor.record(time.milliseconds() - startMs);
+            // remove the offset metadata cache for revoked partitions
+            for (TopicPartition revokedPartition: revokedPartitions) {
+                this.committedTopicPartitionOffsetsCache.remove(revokedPartition);
+            }

Review Comment:
   You are correct, we are missing this verification. I'll add them!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on pull request #13665: KAFKA-12485: Speed up Consumer#committed by returning cached offsets for owned partitions

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on PR #13665:
URL: https://github.com/apache/kafka/pull/13665#issuecomment-1547418289

   I'll try to have a look this week. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] machi1990 commented on a diff in pull request #13665: KAFKA-12485: Speed up Consumer#committed by returning cached offsets for owned partitions

Posted by "machi1990 (via GitHub)" <gi...@apache.org>.
machi1990 commented on code in PR #13665:
URL: https://github.com/apache/kafka/pull/13665#discussion_r1198690005


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -1007,16 +1047,33 @@ public Map<TopicPartition, OffsetAndMetadata> fetchCommittedOffsets(final Set<To
             if (pendingCommittedOffsetRequest != null) {
                 future = pendingCommittedOffsetRequest.response;
             } else {
-                future = sendOffsetFetchRequest(partitions);
-                pendingCommittedOffsetRequest = new PendingCommittedOffsetRequest(partitions, generationForOffsetRequest, future);
+                future = sendOffsetFetchRequest(nonCachedPartitions);
+                pendingCommittedOffsetRequest = new PendingCommittedOffsetRequest(nonCachedPartitions, generationForOffsetRequest, future);
             }
             client.poll(future, timer);
 
             if (future.isDone()) {
                 pendingCommittedOffsetRequest = null;
 
                 if (future.succeeded()) {
-                    return future.value();
+                    Map<TopicPartition, OffsetAndMetadata> freshOffsets = future.value();
+
+                    // update cache for assigned partitions that are not cached yet
+                    for (TopicPartition nonCachedAssignedPartition: nonCachedAssignedPartitions) {
+                        if (!this.subscriptions.isAssigned(nonCachedAssignedPartition)) {
+                            // it is possible that the topic is no longer assigned when the response is received,
+                            // in this case we do not update the cache with the fresh value
+                            continue;
+                        }
+
+                        OffsetAndMetadata offset = freshOffsets.get(nonCachedAssignedPartition);
+                        if (offset != null) { // it is possible that the offset and metadata were not fetched
+                            this.partitionOffsetsCache.put(nonCachedAssignedPartition, offset);

Review Comment:
   Hi @showuon I forgot to mention or call your attention on this line https://github.com/apache/kafka/blob/401fb417bf60864e6d380f979d268d895c5ad727/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1422 which already updates the cache. Is that what you've in mind?
   
   My thinking is that the data cached during fetch won't get stale quickly due to that. Let me know if my thinking process is correct. Thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] machi1990 commented on a diff in pull request #13665: KAFKA-12485: Speed up Consumer#committed by returning cached offsets for owned partitions

Posted by "machi1990 (via GitHub)" <gi...@apache.org>.
machi1990 commented on code in PR #13665:
URL: https://github.com/apache/kafka/pull/13665#discussion_r1230614601


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -2521,11 +2521,34 @@ public void testCommitOffsetMetadata() {
 
         AtomicBoolean success = new AtomicBoolean(false);
 
-        Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p, new OffsetAndMetadata(100L, "hello"));
+        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(100L, "hello");
+        Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p, offsetAndMetadata);
         coordinator.commitOffsetsAsync(offsets, callback(offsets, success));
         coordinator.invokeCompletedOffsetCommitCallbacks();
         assertTrue(success.get());
         assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
+        Map<TopicPartition, OffsetAndMetadata> cache = coordinator.committedOffsetsCache();
+        assertEquals(cache.size(), 1);
+        assertEquals(cache.get(t1p), offsetAndMetadata);

Review Comment:
   Thanks, I like the suggested improvements. I'll adjust! 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on pull request #13665: KAFKA-12485: Speed up Consumer#committed by returning cached offsets for owned partitions

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on PR #13665:
URL: https://github.com/apache/kafka/pull/13665#issuecomment-1596875101

   No need to mark as "draft". No worries! :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] machi1990 commented on pull request #13665: KAFKA-12485: Speed up Consumer#committed by returning cached offsets for owned partitions

Posted by "machi1990 (via GitHub)" <gi...@apache.org>.
machi1990 commented on PR #13665:
URL: https://github.com/apache/kafka/pull/13665#issuecomment-1598653771

   > @showuon I was looking onto this and after several local runs, I managed to eliminate some flasky test and came up with the list of failures that are only caused by this change. The failure total number of failures that I've seen locally are:
   > 
   > ```
   >         kafka.api.TransactionsBounceTest.testWithGroupId()
   >         kafka.api.TransactionsBounceTest.testWithGroupMetadata()
   >         kafka.api.TransactionsTest.testSendOffsetsWithGroupId(String)[1]
   >         kafka.api.TransactionsTest.testSendOffsetsWithGroupId(String)[2]
   >         kafka.api.TransactionsTest.testSendOffsetsWithGroupMetadata(String)[1]
   >         kafka.api.TransactionsTest.testSendOffsetsWithGroupMetadata(String)[2]
   >         kafka.server.ClientQuotasRequestTest.testAlterClientQuotasBadIp()[1]
   >         kafka.server.ClientQuotasRequestTest.testAlterClientQuotasBadIp()[2]
   >         kafka.server.ClientQuotasRequestTest.testAlterClientQuotasBadIp()[3]
   >         kafka.server.DynamicConfigChangeUnitTest.testIpHandlerUnresolvableAddress()
   >         kafka.zk.ZkMigrationIntegrationTest.testNewAndChangedTopicsInDualWrite(ClusterInstance)[1]
   >         kafka.admin.ConfigCommandTest.shouldFailIfInvalidHost()
   > ```
   > 
   > And only the `TransactionsTest` and `TransactionsBounceTest` are the ones that I've identified to be related to this PR. I've started to investigate these it so far my conclusion is that the failure there are related to reading of stale cache values because the cache item is stored only once when fetching the offset in [1]. The test neither calls `commitSync`, nor `commitAsync` which means that the cache is never updated in [2] after initially set in [1].
   > 
   > I was thinking of dropping off cache update when fetching committed offsets i.e in [1] and only perform cache update when during offset commit [2]
   > 
   > 1. https://github.com/apache/kafka/blob/76d25c94e2c8723eec31a3df64c752bc66c79b34/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1075
   > 2. https://github.com/apache/kafka/blob/76d25c94e2c8723eec31a3df64c752bc66c79b34/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1456
   > 
   > That would align to the comment you raised in [#13665 (comment)](https://github.com/apache/kafka/pull/13665#discussion_r1197621674) Let me know what you think.
   
   I've pushed this change in https://github.com/apache/kafka/pull/13665/commits/9539c559a782aba8ce95c9b8b48831c6879821d2 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #13665: KAFKA-12485: Speed up Consumer#committed by returning cached offsets for owned partitions

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on code in PR #13665:
URL: https://github.com/apache/kafka/pull/13665#discussion_r1197604865


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -123,6 +124,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
 
     // hold onto request&future for committed offset requests to enable async calls.
     private PendingCommittedOffsetRequest pendingCommittedOffsetRequest = null;
+    // holds the offset metadata for assigned partitions to reduce remote calls thus speeding up fetching partition metadata
+    private final Map<TopicPartition, OffsetAndMetadata> partitionOffsetsCache;

Review Comment:
   The term `offset` might be confused because we have the `offsets` fetching, and `offsets` committing. Please add `committed` in the variable name. Thanks.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -234,6 +238,11 @@ SubscriptionState subscriptionState() {
         return this.subscriptions;
     }
 
+    // package private for testing
+    Map<TopicPartition, OffsetAndMetadata> offsetsCache() {

Review Comment:
   method name: committedOffsetsCache



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -1007,16 +1047,33 @@ public Map<TopicPartition, OffsetAndMetadata> fetchCommittedOffsets(final Set<To
             if (pendingCommittedOffsetRequest != null) {
                 future = pendingCommittedOffsetRequest.response;
             } else {
-                future = sendOffsetFetchRequest(partitions);
-                pendingCommittedOffsetRequest = new PendingCommittedOffsetRequest(partitions, generationForOffsetRequest, future);
+                future = sendOffsetFetchRequest(nonCachedPartitions);
+                pendingCommittedOffsetRequest = new PendingCommittedOffsetRequest(nonCachedPartitions, generationForOffsetRequest, future);
             }
             client.poll(future, timer);
 
             if (future.isDone()) {
                 pendingCommittedOffsetRequest = null;
 
                 if (future.succeeded()) {
-                    return future.value();
+                    Map<TopicPartition, OffsetAndMetadata> freshOffsets = future.value();
+
+                    // update cache for assigned partitions that are not cached yet
+                    for (TopicPartition nonCachedAssignedPartition: nonCachedAssignedPartitions) {
+                        if (!this.subscriptions.isAssigned(nonCachedAssignedPartition)) {
+                            // it is possible that the topic is no longer assigned when the response is received,
+                            // in this case we do not update the cache with the fresh value
+                            continue;
+                        }
+
+                        OffsetAndMetadata offset = freshOffsets.get(nonCachedAssignedPartition);
+                        if (offset != null) { // it is possible that the offset and metadata were not fetched
+                            this.partitionOffsetsCache.put(nonCachedAssignedPartition, offset);

Review Comment:
   Some high level comment here.
   You added offset commit cache when fetching offsets. Currently, we will fetch offset commit when consumer proactively fetch it (won't happen frequently), and also when the consumer first time got assigned a partition, so it needs to know where to start to fetch. And after consumer starts, it'll periodically commit offsets. That means, if we cache this data, it'll be out-dated soon after consumer committed offsets.
   
   On the other hand, if we cache at the place where we committed offsets (i.e. `OffsetCommitResponseHandler`), we should be able to get the latest committed offsets for each partition whenever the consumer commits it. Does that make sense?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -222,6 +225,7 @@ public ConsumerCoordinator(GroupRebalanceConfig rebalanceConfig,
         }
 
         this.metadata.requestUpdate();
+        this.partitionOffsetsCache = new ConcurrentHashMap<>();

Review Comment:
   In consumer, there are 2 threads currently, 1 for heartbeat, 1 for main consumer thread. From my understanding, the heartbeat thread won't do offsets committing. If so, using HashMap should be fine. Could you help confirm it? Check `AbstractCoordinator#HeartbeatThread`.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -346,6 +355,10 @@ private Exception invokePartitionsRevoked(final SortedSet<TopicPartition> revoke
             final long startMs = time.milliseconds();
             listener.onPartitionsRevoked(revokedPartitions);
             sensors.revokeCallbackSensor.record(time.milliseconds() - startMs);
+            // remove the offset metadata cache for revoked partitions
+            for (TopicPartition revokedPartition: revokedPartitions) {
+                this.partitionOffsetsCache.remove(revokedPartition);
+            }

Review Comment:
   `invokePartitionsRevoked` won't happen in eagar consumer protocol after partition assignment in each rebalance (check `ConsumerCoordinator#onJoinComplete`). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] machi1990 commented on a diff in pull request #13665: KAFKA-12485: Speed up Consumer#committed by returning cached offsets for owned partitions

Posted by "machi1990 (via GitHub)" <gi...@apache.org>.
machi1990 commented on code in PR #13665:
URL: https://github.com/apache/kafka/pull/13665#discussion_r1198621058


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -234,6 +238,11 @@ SubscriptionState subscriptionState() {
         return this.subscriptions;
     }
 
+    // package private for testing
+    Map<TopicPartition, OffsetAndMetadata> offsetsCache() {

Review Comment:
   thanks, I'll do so



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] machi1990 commented on pull request #13665: KAFKA-12485: Speed up Consumer#committed by returning cached offsets for owned partitions

Posted by "machi1990 (via GitHub)" <gi...@apache.org>.
machi1990 commented on PR #13665:
URL: https://github.com/apache/kafka/pull/13665#issuecomment-1598817722

   > @machi1990 , could you explain more about this:
   > 
   > > The test neither calls commitSync, nor commitAsync which means that the cache is never updated in [2] after initially set in [1].
   > 
   > If it don't commit anything, then the expected committed value should be 0, right? And if the expected value are all greater than 0, there should be somewhere doing offset commit, right? It could be auto commit in the consumer side. (maybe?)
   
   Sorry @showuon , I should have clarified: There is no manual sync/async commit of offsets.  
   The offset are all greater than 0. The consumer doesn't do auto commit as can be seen here in the initialization https://github.com/apache/kafka/blob/9539c559a782aba8ce95c9b8b48831c6879821d2/core/src/test/scala/integration/kafka/api/TransactionsTest.scala#L774 
   offsets commits are happening somewhere else and that's via the `producer.sendOffsetsToTransaction(..)` e.g https://github.com/apache/kafka/blob/9539c559a782aba8ce95c9b8b48831c6879821d2/core/src/test/scala/integration/kafka/api/TransactionsTest.scala#L247
   
   I didn't think of this earlier on and it changes a few thing, making me think that I might need to revisit the caching logic for committed offsets e.g re-use `consumer#position(tp)`.
   
   I am keen to know what you think? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] machi1990 commented on pull request #13665: KAFKA-12485: Speed up Consumer#committed by returning cached offsets for owned partitions

Posted by "machi1990 (via GitHub)" <gi...@apache.org>.
machi1990 commented on PR #13665:
URL: https://github.com/apache/kafka/pull/13665#issuecomment-1592956695

   > LGTM! Just one minor comment. After addressing it, I'll check the CI build result tomorrow. Thank you for the patch!
   
   Thank you so much for the review and help on this @showuon 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on pull request #13665: KAFKA-12485: Speed up Consumer#committed by returning cached offsets for owned partitions

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on PR #13665:
URL: https://github.com/apache/kafka/pull/13665#issuecomment-1596774700

   @machi1990 , looks like this change breaks some tests. Could you take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] machi1990 commented on pull request #13665: KAFKA-12485: Speed up Consumer#committed by returning cached offsets for owned partitions

Posted by "machi1990 (via GitHub)" <gi...@apache.org>.
machi1990 commented on PR #13665:
URL: https://github.com/apache/kafka/pull/13665#issuecomment-1567839961

   > Overall LGTM. Could we add tests to verify the committed offsets cache will be updated when the consumer committed some offests? Also, you could change to "non-draft" state when you're ready. Thanks.
   
   Thank you @showuon for the review. I was away on public holiday yesterday, I am catching up today and I'l have a look on adding more tests tomorrow. Once that is done, I'll promote the PR and mark it ready for review and ping you then. Cheers!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] machi1990 commented on pull request #13665: KAFKA-12485: Speed up Consumer#committed by returning cached offsets for owned partitions

Posted by "machi1990 (via GitHub)" <gi...@apache.org>.
machi1990 commented on PR #13665:
URL: https://github.com/apache/kafka/pull/13665#issuecomment-1586703079

   Hey @showuon the PR should be ready for another round of review. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #13665: KAFKA-12485: Speed up Consumer#committed by returning cached offsets for owned partitions

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on code in PR #13665:
URL: https://github.com/apache/kafka/pull/13665#discussion_r1230345113


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -127,6 +127,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
 
     // hold onto request&future for committed offset requests to enable async calls.
     private PendingCommittedOffsetRequest pendingCommittedOffsetRequest = null;
+    // holds the offset metadata for assigned partitions to reduce remote calls thus speeding up fetching partition metadata
+    private final Map<TopicPartition, OffsetAndMetadata> committedTopicPartitionOffsetsCache;

Review Comment:
   nit: the comment above should mention this is the `committed offset metadata`



##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -922,13 +922,9 @@ public void testCommitsFetchedDuringAssign() {
 
         // fetch offset for two topics
         Map<TopicPartition, Long> offsets = new HashMap<>();
-        offsets.put(tp0, offset1);
-        client.prepareResponseFrom(offsetResponse(offsets, Errors.NONE), coordinator);
-        assertEquals(offset1, consumer.committed(Collections.singleton(tp0)).get(tp0).offset());
-
-        offsets.remove(tp0);
         offsets.put(tp1, offset2);

Review Comment:
   Could we add a comment above about why we only need to respond with `tp1, offset2`? Something about it's been cached in previous committed offset fetch.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -2521,11 +2521,34 @@ public void testCommitOffsetMetadata() {
 
         AtomicBoolean success = new AtomicBoolean(false);
 
-        Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p, new OffsetAndMetadata(100L, "hello"));
+        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(100L, "hello");
+        Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p, offsetAndMetadata);
         coordinator.commitOffsetsAsync(offsets, callback(offsets, success));
         coordinator.invokeCompletedOffsetCommitCallbacks();
         assertTrue(success.get());
         assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
+        Map<TopicPartition, OffsetAndMetadata> cache = coordinator.committedOffsetsCache();
+        assertEquals(cache.size(), 1);
+        assertEquals(cache.get(t1p), offsetAndMetadata);
+    }
+
+    @Test
+    public void testCommitOffsetMetadataSync() {

Review Comment:
   Thanks for adding the sync test



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -2521,11 +2521,34 @@ public void testCommitOffsetMetadata() {
 
         AtomicBoolean success = new AtomicBoolean(false);
 
-        Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p, new OffsetAndMetadata(100L, "hello"));
+        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(100L, "hello");
+        Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p, offsetAndMetadata);
         coordinator.commitOffsetsAsync(offsets, callback(offsets, success));
         coordinator.invokeCompletedOffsetCommitCallbacks();
         assertTrue(success.get());
         assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
+        Map<TopicPartition, OffsetAndMetadata> cache = coordinator.committedOffsetsCache();
+        assertEquals(cache.size(), 1);
+        assertEquals(cache.get(t1p), offsetAndMetadata);

Review Comment:
   assertEquals method signature is `assertEquals(int expected, int actual)`. Putting the parameter in the correct order will output the reasonable error message if any.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -2521,11 +2521,34 @@ public void testCommitOffsetMetadata() {
 
         AtomicBoolean success = new AtomicBoolean(false);
 
-        Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p, new OffsetAndMetadata(100L, "hello"));
+        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(100L, "hello");
+        Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p, offsetAndMetadata);
         coordinator.commitOffsetsAsync(offsets, callback(offsets, success));
         coordinator.invokeCompletedOffsetCommitCallbacks();
         assertTrue(success.get());
         assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
+        Map<TopicPartition, OffsetAndMetadata> cache = coordinator.committedOffsetsCache();
+        assertEquals(cache.size(), 1);
+        assertEquals(cache.get(t1p), offsetAndMetadata);

Review Comment:
   Also, could we assert cache is empty before we `commitOffsetsAsync`? i.e. 
   ```
   assertTrue(cache.isEmpty());
   coordinator.commitOffsetsAsync(...)
   ...
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -351,6 +359,10 @@ private Exception invokePartitionsRevoked(final SortedSet<TopicPartition> revoke
             final long startMs = time.milliseconds();
             listener.onPartitionsRevoked(revokedPartitions);
             sensors.revokeCallbackSensor.record(time.milliseconds() - startMs);
+            // remove the offset metadata cache for revoked partitions
+            for (TopicPartition revokedPartition: revokedPartitions) {
+                this.committedTopicPartitionOffsetsCache.remove(revokedPartition);
+            }

Review Comment:
   The revoke and lost partition cases should also be tested, to verify we indeed remove them from cache.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -3114,6 +3137,70 @@ public void testFetchCommittedOffsets() {
         assertEquals(new OffsetAndMetadata(offset, leaderEpoch, metadata), fetchedOffsets.get(t1p));
     }
 
+    @Test
+    public void testPopulatingOffsetCacheForAssignedPartition() {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        Map<TopicPartition, OffsetAndMetadata> committedOffsetsCache = coordinator.committedOffsetsCache();
+        // committedOffsetsCache should be empty
+        assertEquals(committedOffsetsCache.size(), 0);
+
+        long offset = 500L;
+        String metadata = "blahblah";
+        Optional<Integer> leaderEpoch = Optional.of(15);
+        OffsetFetchResponse.PartitionData data = new OffsetFetchResponse.PartitionData(offset, leaderEpoch,
+                metadata, Errors.NONE);
+
+        client.prepareResponse(offsetFetchResponse(Errors.NONE, singletonMap(t1p, data)));
+        subscriptions.assignFromUser(singleton(t1p));
+        Map<TopicPartition, OffsetAndMetadata> fetchedOffsets = coordinator.fetchCommittedOffsets(singleton(t1p),
+                time.timer(Long.MAX_VALUE));
+
+        assertNotNull(fetchedOffsets);
+        OffsetAndMetadata expected = new OffsetAndMetadata(offset, leaderEpoch, metadata);
+        assertEquals(expected, fetchedOffsets.get(t1p));
+
+        // check committedOffsetsCache is populated
+        assertEquals(committedOffsetsCache.size(), 1);
+        assertEquals(expected, committedOffsetsCache.get(t1p));
+    }
+
+    @Test
+    public void testReturningCachedOffsetForAssignedPartition() {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        Map<TopicPartition, OffsetAndMetadata> committedOffsetsCache = coordinator.committedOffsetsCache();
+
+        long offset = 500L;
+        String metadata = "blahblah";
+        Optional<Integer> leaderEpoch = Optional.of(15);
+        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(offset, leaderEpoch, metadata);
+        committedOffsetsCache.put(t1p, offsetAndMetadata);
+

Review Comment:
   additional line



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -3114,6 +3137,70 @@ public void testFetchCommittedOffsets() {
         assertEquals(new OffsetAndMetadata(offset, leaderEpoch, metadata), fetchedOffsets.get(t1p));
     }
 
+    @Test
+    public void testPopulatingOffsetCacheForAssignedPartition() {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        Map<TopicPartition, OffsetAndMetadata> committedOffsetsCache = coordinator.committedOffsetsCache();
+        // committedOffsetsCache should be empty
+        assertEquals(committedOffsetsCache.size(), 0);
+
+        long offset = 500L;
+        String metadata = "blahblah";
+        Optional<Integer> leaderEpoch = Optional.of(15);
+        OffsetFetchResponse.PartitionData data = new OffsetFetchResponse.PartitionData(offset, leaderEpoch,
+                metadata, Errors.NONE);
+
+        client.prepareResponse(offsetFetchResponse(Errors.NONE, singletonMap(t1p, data)));
+        subscriptions.assignFromUser(singleton(t1p));
+        Map<TopicPartition, OffsetAndMetadata> fetchedOffsets = coordinator.fetchCommittedOffsets(singleton(t1p),
+                time.timer(Long.MAX_VALUE));
+
+        assertNotNull(fetchedOffsets);
+        OffsetAndMetadata expected = new OffsetAndMetadata(offset, leaderEpoch, metadata);
+        assertEquals(expected, fetchedOffsets.get(t1p));
+
+        // check committedOffsetsCache is populated
+        assertEquals(committedOffsetsCache.size(), 1);
+        assertEquals(expected, committedOffsetsCache.get(t1p));
+    }
+
+    @Test
+    public void testReturningCachedOffsetForAssignedPartition() {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        Map<TopicPartition, OffsetAndMetadata> committedOffsetsCache = coordinator.committedOffsetsCache();
+
+        long offset = 500L;
+        String metadata = "blahblah";
+        Optional<Integer> leaderEpoch = Optional.of(15);
+        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(offset, leaderEpoch, metadata);
+        committedOffsetsCache.put(t1p, offsetAndMetadata);

Review Comment:
   I'm thinking we can merge this test with the above `testPopulatingOffsetCacheForAssignedPartition` test, so that we don't have to initialize all the data again. Something like this:
   ```
   // check committedOffsetsCache is populated
           assertEquals(committedOffsetsCache.size(), 1);
           assertEquals(expected, committedOffsetsCache.get(t1p));
   
          
          client.prepareResponse(offsetFetchResponse(Errors.NONE, singletonMap(t2p, data)));
          // fetch again with t1p + t2p, but will send fetch for t2p since t1p is in cache
           Map<TopicPartition, OffsetAndMetadata> fetchedOffsets = coordinator.fetchCommittedOffsets(new HashSet<>(Arrays.asList(t1p, t2p)),
                   time.timer(Long.MAX_VALUE));
   
           assertNotNull(fetchedOffsets);
           // return 2 results
           assertEquals(fetchedOffsets.size(), 2);
           // the cache size is still 1
           assertEquals(committedOffsetsCache.size(), 1);
    .....
   
   ```
   WDYT?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -2521,11 +2521,34 @@ public void testCommitOffsetMetadata() {
 
         AtomicBoolean success = new AtomicBoolean(false);
 
-        Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p, new OffsetAndMetadata(100L, "hello"));
+        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(100L, "hello");
+        Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p, offsetAndMetadata);
         coordinator.commitOffsetsAsync(offsets, callback(offsets, success));
         coordinator.invokeCompletedOffsetCommitCallbacks();
         assertTrue(success.get());
         assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
+        Map<TopicPartition, OffsetAndMetadata> cache = coordinator.committedOffsetsCache();
+        assertEquals(cache.size(), 1);
+        assertEquals(cache.get(t1p), offsetAndMetadata);

Review Comment:
   Same comments applies to below tests



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] machi1990 commented on a diff in pull request #13665: KAFKA-12485: Speed up Consumer#committed by returning cached offsets for owned partitions

Posted by "machi1990 (via GitHub)" <gi...@apache.org>.
machi1990 commented on code in PR #13665:
URL: https://github.com/apache/kafka/pull/13665#discussion_r1230916909


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -2521,11 +2524,36 @@ public void testCommitOffsetMetadata() {
 
         AtomicBoolean success = new AtomicBoolean(false);
 
-        Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p, new OffsetAndMetadata(100L, "hello"));
+        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(100L, "hello");
+        Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p, offsetAndMetadata);
+        Map<TopicPartition, OffsetAndMetadata> cache = coordinator.committedOffsetsCache();
+        assertTrue(cache.isEmpty());
         coordinator.commitOffsetsAsync(offsets, callback(offsets, success));
         coordinator.invokeCompletedOffsetCommitCallbacks();
         assertTrue(success.get());
         assertEquals(coordinator.inFlightAsyncCommits.get(), 0);

Review Comment:
   You are right. I should've have dealt with this as well earlier! I'll push a change to fix this!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] machi1990 commented on pull request #13665: MINOR: Split ConsumerCoordinator#testCommitOffsetMetadata onto two test cases testing commitSync and commitAsync

Posted by "machi1990 (via GitHub)" <gi...@apache.org>.
machi1990 commented on PR #13665:
URL: https://github.com/apache/kafka/pull/13665#issuecomment-1600404343

   > > offsets commits are happening somewhere else and that's via the producer.sendOffsetsToTransaction(..) e.g
   > 
   > Oh, EOS case! I didn't consider it, sorry! Hmm... if there is EOS case to consider, the original cache mechanism will not work since the offset commit is not via consumer, the consumer has no idea which offset has committed. I think we should close this PR and JIRA ticket as "invalid" and add comment into the JIRA ticket. WDYT?
   
   Thanks @showuon I've marked the JIRA as invalid. 
   As for the PR, I've repurposed it to only keep the things we can keep from this change. 
   Please have a look when you can. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] machi1990 commented on pull request #13665: KAFKA-12485: Speed up Consumer#committed by returning cached offsets for owned partitions

Posted by "machi1990 (via GitHub)" <gi...@apache.org>.
machi1990 commented on PR #13665:
URL: https://github.com/apache/kafka/pull/13665#issuecomment-1547431713

   > I'll try to have a look this week. Thanks.
   
   Thanks @showuon I appreciate it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] machi1990 commented on a diff in pull request #13665: KAFKA-12485: Speed up Consumer#committed by returning cached offsets for owned partitions

Posted by "machi1990 (via GitHub)" <gi...@apache.org>.
machi1990 commented on code in PR #13665:
URL: https://github.com/apache/kafka/pull/13665#discussion_r1198624271


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -346,6 +355,10 @@ private Exception invokePartitionsRevoked(final SortedSet<TopicPartition> revoke
             final long startMs = time.milliseconds();
             listener.onPartitionsRevoked(revokedPartitions);
             sensors.revokeCallbackSensor.record(time.milliseconds() - startMs);
+            // remove the offset metadata cache for revoked partitions
+            for (TopicPartition revokedPartition: revokedPartitions) {
+                this.partitionOffsetsCache.remove(revokedPartition);
+            }

Review Comment:
   ~~If I understood correctly, this part is not needed at all?~~ 
   
   > invokePartitionsRevoked won't happen in eagar consumer protocol after partition assignment in each rebalance (check ConsumerCoordinator#onJoinComplete).
   
   This is correct. This is only called only when cooperative protocol is used.
   I'll look to achieve the same for eager consumer protocol.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] machi1990 commented on a diff in pull request #13665: KAFKA-12485: Speed up Consumer#committed by returning cached offsets for owned partitions

Posted by "machi1990 (via GitHub)" <gi...@apache.org>.
machi1990 commented on code in PR #13665:
URL: https://github.com/apache/kafka/pull/13665#discussion_r1198712020


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -346,6 +355,10 @@ private Exception invokePartitionsRevoked(final SortedSet<TopicPartition> revoke
             final long startMs = time.milliseconds();
             listener.onPartitionsRevoked(revokedPartitions);
             sensors.revokeCallbackSensor.record(time.milliseconds() - startMs);
+            // remove the offset metadata cache for revoked partitions
+            for (TopicPartition revokedPartition: revokedPartitions) {
+                this.partitionOffsetsCache.remove(revokedPartition);
+            }

Review Comment:
   > invokePartitionsRevoked won't happen in eagar consumer protocol after partition assignment in each rebalance (check ConsumerCoordinator#onJoinComplete).
   
    > This is correct. This is only called only when cooperative protocol is used.
   
   An edit on what I said earlier, my statement above is only true the logic done in ConsumerCoordinator#onJoinComplete.
   Eager consumer protocol requires a consumer to always revoke its owned partitions during a rebalance event: according to this javadoc https://github.com/apache/kafka/blob/401fb417bf60864e6d380f979d268d895c5ad727/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java#L250-L251 and that happens here
    https://github.com/apache/kafka/blob/401fb417bf60864e6d380f979d268d895c5ad727/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L849-L871
   
   Do you think we are good in that regard? @showuon 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on a diff in pull request #13665: KAFKA-12485: Speed up Consumer#committed by returning cached offsets for owned partitions

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on code in PR #13665:
URL: https://github.com/apache/kafka/pull/13665#discussion_r1230868853


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -2521,11 +2524,36 @@ public void testCommitOffsetMetadata() {
 
         AtomicBoolean success = new AtomicBoolean(false);
 
-        Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p, new OffsetAndMetadata(100L, "hello"));
+        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(100L, "hello");
+        Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p, offsetAndMetadata);
+        Map<TopicPartition, OffsetAndMetadata> cache = coordinator.committedOffsetsCache();
+        assertTrue(cache.isEmpty());
         coordinator.commitOffsetsAsync(offsets, callback(offsets, success));
         coordinator.invokeCompletedOffsetCommitCallbacks();
         assertTrue(success.get());
         assertEquals(coordinator.inFlightAsyncCommits.get(), 0);

Review Comment:
   I know this is not your change, but please also update it. Thanks.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -3114,6 +3142,54 @@ public void testFetchCommittedOffsets() {
         assertEquals(new OffsetAndMetadata(offset, leaderEpoch, metadata), fetchedOffsets.get(t1p));
     }
 
+    @Test
+    public void testPopulatingOffsetCacheForAssignedPartition() {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        Map<TopicPartition, OffsetAndMetadata> committedOffsetsCache = coordinator.committedOffsetsCache();
+        // committedOffsetsCache should be empty
+        assertTrue(committedOffsetsCache.isEmpty());
+
+        long offset = 500L;
+        String metadata = "blahblah";
+        Optional<Integer> leaderEpoch = Optional.of(15);
+        OffsetFetchResponse.PartitionData data = new OffsetFetchResponse.PartitionData(offset, leaderEpoch,
+                metadata, Errors.NONE);
+
+        client.prepareResponse(offsetFetchResponse(Errors.NONE, singletonMap(t1p, data)));
+        subscriptions.assignFromUser(singleton(t1p));
+        Map<TopicPartition, OffsetAndMetadata> fetchedOffsets = coordinator.fetchCommittedOffsets(singleton(t1p),
+                time.timer(Long.MAX_VALUE));
+
+        assertNotNull(fetchedOffsets);
+        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(offset, leaderEpoch, metadata);
+        assertEquals(offsetAndMetadata, fetchedOffsets.get(t1p));
+
+        // check committedOffsetsCache is populated
+        assertEquals(committedOffsetsCache.size(), 1);

Review Comment:
   ` assertEquals( 1, committedOffsetsCache.size());`



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -3114,6 +3142,54 @@ public void testFetchCommittedOffsets() {
         assertEquals(new OffsetAndMetadata(offset, leaderEpoch, metadata), fetchedOffsets.get(t1p));
     }
 
+    @Test
+    public void testPopulatingOffsetCacheForAssignedPartition() {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        Map<TopicPartition, OffsetAndMetadata> committedOffsetsCache = coordinator.committedOffsetsCache();
+        // committedOffsetsCache should be empty
+        assertTrue(committedOffsetsCache.isEmpty());
+
+        long offset = 500L;
+        String metadata = "blahblah";
+        Optional<Integer> leaderEpoch = Optional.of(15);
+        OffsetFetchResponse.PartitionData data = new OffsetFetchResponse.PartitionData(offset, leaderEpoch,
+                metadata, Errors.NONE);
+
+        client.prepareResponse(offsetFetchResponse(Errors.NONE, singletonMap(t1p, data)));
+        subscriptions.assignFromUser(singleton(t1p));
+        Map<TopicPartition, OffsetAndMetadata> fetchedOffsets = coordinator.fetchCommittedOffsets(singleton(t1p),
+                time.timer(Long.MAX_VALUE));
+
+        assertNotNull(fetchedOffsets);
+        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(offset, leaderEpoch, metadata);
+        assertEquals(offsetAndMetadata, fetchedOffsets.get(t1p));
+
+        // check committedOffsetsCache is populated
+        assertEquals(committedOffsetsCache.size(), 1);
+        assertEquals(offsetAndMetadata, committedOffsetsCache.get(t1p));
+
+        // fetch again with t1p + t2p, but will send fetch for t2p since t1p is in cache
+        long offsetPartition2 = 50L;
+        String metadataPartition2 = "foobar";
+        Optional<Integer> leaderEpochPartition2 = Optional.of(19909);
+        data = new OffsetFetchResponse.PartitionData(offsetPartition2, leaderEpochPartition2,
+                metadataPartition2, Errors.NONE);
+        client.prepareResponse(offsetFetchResponse(Errors.NONE, singletonMap(t2p, data)));
+
+        fetchedOffsets = coordinator.fetchCommittedOffsets(new HashSet<>(Arrays.asList(t1p, t2p)), time.timer(Long.MAX_VALUE));
+
+        assertNotNull(fetchedOffsets);
+
+        assertEquals(fetchedOffsets.size(), 2); // tp1 and tp2 should be returned with tp1 coming from cache
+        assertEquals(committedOffsetsCache.size(), 1); // cache size is still 1 since only tp1 is an owned partition

Review Comment:
   parameter order in assertEquals method.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -3818,13 +3899,16 @@ private void gracefulCloseTest(ConsumerCoordinator coordinator, boolean shouldLe
             return commitRequest.data().groupId().equals(groupId);
         }, new OffsetCommitResponse(new OffsetCommitResponseData()));
 
+        // add t1p to the committed offset metadata cache, we'll then check that the cache is invalidated after revocation which happens during close
+        coordinator.committedOffsetsCache().put(t1p, new OffsetAndMetadata(1L));

Review Comment:
   I know you've added a test to verify revoke case, but it's an instance close case. Could we add one more revoke test in `testRejoinGroup` for a usual revoke case? Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] machi1990 commented on a diff in pull request #13665: KAFKA-12485: Speed up Consumer#committed by returning cached offsets for owned partitions

Posted by "machi1990 (via GitHub)" <gi...@apache.org>.
machi1990 commented on code in PR #13665:
URL: https://github.com/apache/kafka/pull/13665#discussion_r1230918544


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -3114,6 +3142,54 @@ public void testFetchCommittedOffsets() {
         assertEquals(new OffsetAndMetadata(offset, leaderEpoch, metadata), fetchedOffsets.get(t1p));
     }
 
+    @Test
+    public void testPopulatingOffsetCacheForAssignedPartition() {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        Map<TopicPartition, OffsetAndMetadata> committedOffsetsCache = coordinator.committedOffsetsCache();
+        // committedOffsetsCache should be empty
+        assertTrue(committedOffsetsCache.isEmpty());
+
+        long offset = 500L;
+        String metadata = "blahblah";
+        Optional<Integer> leaderEpoch = Optional.of(15);
+        OffsetFetchResponse.PartitionData data = new OffsetFetchResponse.PartitionData(offset, leaderEpoch,
+                metadata, Errors.NONE);
+
+        client.prepareResponse(offsetFetchResponse(Errors.NONE, singletonMap(t1p, data)));
+        subscriptions.assignFromUser(singleton(t1p));
+        Map<TopicPartition, OffsetAndMetadata> fetchedOffsets = coordinator.fetchCommittedOffsets(singleton(t1p),
+                time.timer(Long.MAX_VALUE));
+
+        assertNotNull(fetchedOffsets);
+        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(offset, leaderEpoch, metadata);
+        assertEquals(offsetAndMetadata, fetchedOffsets.get(t1p));
+
+        // check committedOffsetsCache is populated
+        assertEquals(committedOffsetsCache.size(), 1);

Review Comment:
   done



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -3114,6 +3142,54 @@ public void testFetchCommittedOffsets() {
         assertEquals(new OffsetAndMetadata(offset, leaderEpoch, metadata), fetchedOffsets.get(t1p));
     }
 
+    @Test
+    public void testPopulatingOffsetCacheForAssignedPartition() {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        Map<TopicPartition, OffsetAndMetadata> committedOffsetsCache = coordinator.committedOffsetsCache();
+        // committedOffsetsCache should be empty
+        assertTrue(committedOffsetsCache.isEmpty());
+
+        long offset = 500L;
+        String metadata = "blahblah";
+        Optional<Integer> leaderEpoch = Optional.of(15);
+        OffsetFetchResponse.PartitionData data = new OffsetFetchResponse.PartitionData(offset, leaderEpoch,
+                metadata, Errors.NONE);
+
+        client.prepareResponse(offsetFetchResponse(Errors.NONE, singletonMap(t1p, data)));
+        subscriptions.assignFromUser(singleton(t1p));
+        Map<TopicPartition, OffsetAndMetadata> fetchedOffsets = coordinator.fetchCommittedOffsets(singleton(t1p),
+                time.timer(Long.MAX_VALUE));
+
+        assertNotNull(fetchedOffsets);
+        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(offset, leaderEpoch, metadata);
+        assertEquals(offsetAndMetadata, fetchedOffsets.get(t1p));
+
+        // check committedOffsetsCache is populated
+        assertEquals(committedOffsetsCache.size(), 1);
+        assertEquals(offsetAndMetadata, committedOffsetsCache.get(t1p));
+
+        // fetch again with t1p + t2p, but will send fetch for t2p since t1p is in cache
+        long offsetPartition2 = 50L;
+        String metadataPartition2 = "foobar";
+        Optional<Integer> leaderEpochPartition2 = Optional.of(19909);
+        data = new OffsetFetchResponse.PartitionData(offsetPartition2, leaderEpochPartition2,
+                metadataPartition2, Errors.NONE);
+        client.prepareResponse(offsetFetchResponse(Errors.NONE, singletonMap(t2p, data)));
+
+        fetchedOffsets = coordinator.fetchCommittedOffsets(new HashSet<>(Arrays.asList(t1p, t2p)), time.timer(Long.MAX_VALUE));
+
+        assertNotNull(fetchedOffsets);
+
+        assertEquals(fetchedOffsets.size(), 2); // tp1 and tp2 should be returned with tp1 coming from cache
+        assertEquals(committedOffsetsCache.size(), 1); // cache size is still 1 since only tp1 is an owned partition

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] machi1990 commented on a diff in pull request #13665: KAFKA-12485: Speed up Consumer#committed by returning cached offsets for owned partitions

Posted by "machi1990 (via GitHub)" <gi...@apache.org>.
machi1990 commented on code in PR #13665:
URL: https://github.com/apache/kafka/pull/13665#discussion_r1230614177


##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -922,13 +922,9 @@ public void testCommitsFetchedDuringAssign() {
 
         // fetch offset for two topics
         Map<TopicPartition, Long> offsets = new HashMap<>();
-        offsets.put(tp0, offset1);
-        client.prepareResponseFrom(offsetResponse(offsets, Errors.NONE), coordinator);
-        assertEquals(offset1, consumer.committed(Collections.singleton(tp0)).get(tp0).offset());
-
-        offsets.remove(tp0);
         offsets.put(tp1, offset2);

Review Comment:
   good suggestion!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] machi1990 commented on a diff in pull request #13665: KAFKA-12485: Speed up Consumer#committed by returning cached offsets for owned partitions

Posted by "machi1990 (via GitHub)" <gi...@apache.org>.
machi1990 commented on code in PR #13665:
URL: https://github.com/apache/kafka/pull/13665#discussion_r1230920423


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -3818,13 +3899,16 @@ private void gracefulCloseTest(ConsumerCoordinator coordinator, boolean shouldLe
             return commitRequest.data().groupId().equals(groupId);
         }, new OffsetCommitResponse(new OffsetCommitResponseData()));
 
+        // add t1p to the committed offset metadata cache, we'll then check that the cache is invalidated after revocation which happens during close
+        coordinator.committedOffsetsCache().put(t1p, new OffsetAndMetadata(1L));

Review Comment:
   Thanks for pointing this out.  I was thinking along the same lines as well but I thought the `close()` case would have covered everything. I ended up taking your suggestion and added a verification in the `testRejoinGroup`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org