You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "dajac (via GitHub)" <gi...@apache.org> on 2023/06/22 09:16:15 UTC

[GitHub] [kafka] dajac opened a new pull request, #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

dajac opened a new pull request, #13901:
URL: https://github.com/apache/kafka/pull/13901

   This patch adds (1) the logic to propagate a new MetadataImage to the running coordinators; and (2) the logic to ensure that all the consumer groups subscribed to topics with changes will refresh their subscriptions metadata on the next heartbeat. In the mean time, it ensures that freshly loaded consumer groups also refresh their subscriptions metadata on the next heartbeat.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1245154086


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -506,32 +555,54 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
             .setClientHost(clientHost)
             .build();
 
+        boolean updatedMemberSubscriptions = false;
         if (!updatedMember.equals(member)) {
             records.add(newMemberSubscriptionRecord(groupId, updatedMember));
 
             if (!updatedMember.subscribedTopicNames().equals(member.subscribedTopicNames())) {
                 log.info("[GroupId " + groupId + "] Member " + memberId + " updated its subscribed topics to: " +
                     updatedMember.subscribedTopicNames());
+                updatedMemberSubscriptions = true;
+            }
 
-                subscriptionMetadata = group.computeSubscriptionMetadata(
-                    member,
-                    updatedMember,
-                    topicsImage
-                );
-
-                if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
-                    log.info("[GroupId " + groupId + "] Computed new subscription metadata: "
-                        + subscriptionMetadata + ".");
-                    records.add(newGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata));
-                }
+            if (!updatedMember.subscribedTopicRegex().equals(member.subscribedTopicRegex())) {
+                log.info("[GroupId " + groupId + "] Member " + memberId + " updated its subscribed regex to: " +
+                    updatedMember.subscribedTopicRegex());
+                updatedMemberSubscriptions = true;
+            }
+        }
 
-                groupEpoch += 1;
-                records.add(newGroupEpochRecord(groupId, groupEpoch));
+        long currentTimeMs = time.milliseconds();
+        boolean maybeUpdateMetadata = updatedMemberSubscriptions || group.refreshMetadataNeeded(currentTimeMs);
+        boolean updatedSubscriptionMetadata = false;
+        if (maybeUpdateMetadata) {
+            subscriptionMetadata = group.computeSubscriptionMetadata(
+                member,
+                updatedMember,
+                metadataImage.topics()
+            );
 
-                log.info("[GroupId " + groupId + "] Bumped group epoch to " + groupEpoch + ".");
+            if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
+                log.info("[GroupId " + groupId + "] Computed new subscription metadata: "
+                    + subscriptionMetadata + ".");
+                records.add(newGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata));
+                updatedSubscriptionMetadata = true;
             }
         }
 
+        if (updatedMemberSubscriptions || updatedSubscriptionMetadata) {
+            groupEpoch += 1;
+            records.add(newGroupEpochRecord(groupId, groupEpoch));
+            log.info("[GroupId " + groupId + "] Bumped group epoch to " + groupEpoch + ".");
+        }
+
+        if (maybeUpdateMetadata) {

Review Comment:
   Reworked this part. Let me know if it looks better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1244497307


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -874,4 +1021,34 @@ public void replay(
             consumerGroup.updateMember(newMember);
         }
     }
+
+    /**
+     * A new metadata image is available.
+     *
+     * @param newImage  The new metadata image.
+     * @param delta     The delta image.
+     */
+    public void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) {
+        metadataImage = newImage;
+
+        // Notify all the groups subscribed to the created, updated or

Review Comment:
   As mentioned before, I got confused and thought this was actually adding topics. If this is just a call to update the existing groups with these topics, this is fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1247154202


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##########
@@ -825,4 +828,60 @@ public void testClose() throws Exception {
         assertFutureThrows(write1, NotCoordinatorException.class);
         assertFutureThrows(write2, NotCoordinatorException.class);
     }
+
+    @Test
+    public void testOnNewMetadataImage() {
+        TopicPartition tp0 = new TopicPartition("__consumer_offsets", 0);
+        TopicPartition tp1 = new TopicPartition("__consumer_offsets", 1);
+
+        MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class);
+        MockPartitionWriter writer = mock(MockPartitionWriter.class);
+        MockCoordinatorBuilderSupplier supplier = mock(MockCoordinatorBuilderSupplier.class);
+        MockCoordinatorBuilder builder = mock(MockCoordinatorBuilder.class);
+
+        CoordinatorRuntime<MockCoordinator, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinator, String>()
+                .withLoader(loader)
+                .withEventProcessor(new MockEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorBuilderSupplier(supplier)
+                .build();
+
+        MockCoordinator coordinator0 = mock(MockCoordinator.class);
+        MockCoordinator coordinator1 = mock(MockCoordinator.class);
+
+        when(supplier.get()).thenReturn(builder);
+        when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+        when(builder.withLogContext(any())).thenReturn(builder);
+        when(builder.build())
+            .thenReturn(coordinator0)
+            .thenReturn(coordinator1);
+
+        CompletableFuture<Void> future0 = new CompletableFuture<>();
+        when(loader.load(tp0, coordinator0)).thenReturn(future0);
+
+        CompletableFuture<Void> future1 = new CompletableFuture<>();
+        when(loader.load(tp1, coordinator1)).thenReturn(future1);
+
+        runtime.scheduleLoadOperation(tp0, 0);
+        runtime.scheduleLoadOperation(tp1, 0);
+
+        // Coordinator 0 is loaded. It should get the current image
+        // that is the empty one.
+        future0.complete(null);
+        verify(coordinator0).onLoaded(MetadataImage.EMPTY);
+
+        // Publish a new image.
+        MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
+        MetadataImage newImage = delta.apply(MetadataProvenance.EMPTY);
+        runtime.onNewMetadataImage(newImage, delta);
+
+        // Coordinator 0 should be notified about it.
+        verify(coordinator0).onNewMetadataImage(newImage, delta);

Review Comment:
   do we care to also show that we update the metadata image on the next heartbeat? maybe we don't necessarily need to



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1246997020


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java:
##########
@@ -537,8 +538,35 @@ public void testUpdateSubscriptionMetadata() {
             consumerGroup.computeSubscriptionMetadata(
                 null,
                 null,
-                image
+                image.topics()
             )
         );
     }
+
+    @Test
+    public void testMetadataRefreshDeadline() {

Review Comment:
   should this also test `requestMetadataRefresh`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1248086301


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##########
@@ -825,4 +828,60 @@ public void testClose() throws Exception {
         assertFutureThrows(write1, NotCoordinatorException.class);
         assertFutureThrows(write2, NotCoordinatorException.class);
     }
+
+    @Test
+    public void testOnNewMetadataImage() {
+        TopicPartition tp0 = new TopicPartition("__consumer_offsets", 0);
+        TopicPartition tp1 = new TopicPartition("__consumer_offsets", 1);
+
+        MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class);
+        MockPartitionWriter writer = mock(MockPartitionWriter.class);
+        MockCoordinatorBuilderSupplier supplier = mock(MockCoordinatorBuilderSupplier.class);
+        MockCoordinatorBuilder builder = mock(MockCoordinatorBuilder.class);
+
+        CoordinatorRuntime<MockCoordinator, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinator, String>()
+                .withLoader(loader)
+                .withEventProcessor(new MockEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorBuilderSupplier(supplier)
+                .build();
+
+        MockCoordinator coordinator0 = mock(MockCoordinator.class);
+        MockCoordinator coordinator1 = mock(MockCoordinator.class);
+
+        when(supplier.get()).thenReturn(builder);
+        when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+        when(builder.withLogContext(any())).thenReturn(builder);
+        when(builder.build())
+            .thenReturn(coordinator0)
+            .thenReturn(coordinator1);
+
+        CompletableFuture<Void> future0 = new CompletableFuture<>();
+        when(loader.load(tp0, coordinator0)).thenReturn(future0);
+
+        CompletableFuture<Void> future1 = new CompletableFuture<>();
+        when(loader.load(tp1, coordinator1)).thenReturn(future1);
+
+        runtime.scheduleLoadOperation(tp0, 0);
+        runtime.scheduleLoadOperation(tp1, 0);
+
+        // Coordinator 0 is loaded. It should get the current image
+        // that is the empty one.
+        future0.complete(null);
+        verify(coordinator0).onLoaded(MetadataImage.EMPTY);
+
+        // Publish a new image.
+        MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
+        MetadataImage newImage = delta.apply(MetadataProvenance.EMPTY);
+        runtime.onNewMetadataImage(newImage, delta);
+
+        // Coordinator 0 should be notified about it.
+        verify(coordinator0).onNewMetadataImage(newImage, delta);

Review Comment:
   This part is handled in the `GroupMetadataManager#onNewMetadataImage`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1244482511


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -709,14 +780,16 @@ public void replay(
         String groupId = key.groupId();
         String memberId = key.memberId();
 
+        ConsumerGroup consumerGroup = getOrMaybeCreateConsumerGroup(groupId, value != null);
+        Set<String> oldSubscribedTopicNames = new HashSet<>(consumerGroup.subscribedTopicNames());
+
         if (value != null) {
-            ConsumerGroup consumerGroup = getOrMaybeCreateConsumerGroup(groupId, true);
             ConsumerGroupMember oldMember = consumerGroup.getOrMaybeCreateMember(memberId, true);
             consumerGroup.updateMember(new ConsumerGroupMember.Builder(oldMember)
                 .updateWith(value)
                 .build());
+            updateGroupsByTopics(groupId, oldSubscribedTopicNames, consumerGroup.subscribedTopicNames());

Review Comment:
   could this and line 803 be outside the if/else? I'm just curious why we moved consumerGroup and old SubscribedTopicNames out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1244469064


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -179,26 +209,45 @@ GroupMetadataManager build() {
     private final int consumerGroupHeartbeatIntervalMs;
 
     /**
-     * The topics metadata (or image).
+     * The metadata refresh interval.
      */
-    private TopicsImage topicsImage;
+    private final int consumerGroupMetadataRefreshIntervalMs;
+
+    /**
+     * The metadata image.
+     *
+     * Package private for testing.

Review Comment:
   this is just private right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1248058020


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##########
@@ -825,4 +828,60 @@ public void testClose() throws Exception {
         assertFutureThrows(write1, NotCoordinatorException.class);
         assertFutureThrows(write2, NotCoordinatorException.class);
     }
+
+    @Test
+    public void testOnNewMetadataImage() {
+        TopicPartition tp0 = new TopicPartition("__consumer_offsets", 0);
+        TopicPartition tp1 = new TopicPartition("__consumer_offsets", 1);
+
+        MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class);
+        MockPartitionWriter writer = mock(MockPartitionWriter.class);
+        MockCoordinatorBuilderSupplier supplier = mock(MockCoordinatorBuilderSupplier.class);
+        MockCoordinatorBuilder builder = mock(MockCoordinatorBuilder.class);
+
+        CoordinatorRuntime<MockCoordinator, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinator, String>()
+                .withLoader(loader)
+                .withEventProcessor(new MockEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorBuilderSupplier(supplier)
+                .build();
+
+        MockCoordinator coordinator0 = mock(MockCoordinator.class);
+        MockCoordinator coordinator1 = mock(MockCoordinator.class);
+
+        when(supplier.get()).thenReturn(builder);
+        when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+        when(builder.withLogContext(any())).thenReturn(builder);
+        when(builder.build())
+            .thenReturn(coordinator0)
+            .thenReturn(coordinator1);
+
+        CompletableFuture<Void> future0 = new CompletableFuture<>();
+        when(loader.load(tp0, coordinator0)).thenReturn(future0);
+
+        CompletableFuture<Void> future1 = new CompletableFuture<>();
+        when(loader.load(tp1, coordinator1)).thenReturn(future1);
+
+        runtime.scheduleLoadOperation(tp0, 0);
+        runtime.scheduleLoadOperation(tp1, 0);
+
+        // Coordinator 0 is loaded. It should get the current image
+        // that is the empty one.
+        future0.complete(null);
+        verify(coordinator0).onLoaded(MetadataImage.EMPTY);
+
+        // Publish a new image.
+        MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
+        MetadataImage newImage = delta.apply(MetadataProvenance.EMPTY);
+        runtime.onNewMetadataImage(newImage, delta);
+
+        // Coordinator 0 should be notified about it.
+        verify(coordinator0).onNewMetadataImage(newImage, delta);

Review Comment:
   Yeah. I think I understand that the metadata image is updated, but I wasn't sure if we had anything ensuring that the new metadata image will also trigger the refresh of the subscription metadata. (Apologies if this was just in a previous pr)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1247163837


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -172,19 +186,21 @@ public List<Record> build(TopicsImage topicsImage) {
             });
 
             // Add subscription metadata.
-            Map<String, TopicMetadata> subscriptionMetadata = new HashMap<>();
-            members.forEach((memberId, member) -> {
-                member.subscribedTopicNames().forEach(topicName -> {
-                    TopicImage topicImage = topicsImage.getTopic(topicName);
-                    if (topicImage != null) {
-                        subscriptionMetadata.put(topicName, new TopicMetadata(
-                            topicImage.id(),
-                            topicImage.name(),
-                            topicImage.partitions().size()
-                        ));
-                    }
+            if (subscriptionMetadata == null) {

Review Comment:
   If subscriptionMetadata is not null, that means we just want to take what we are already given and not generate it based on the subscribedNames and the image?
   Is this done to optimize or are there cases where we want to have something different than what we would have generated?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1248095763


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java:
##########
@@ -564,9 +564,9 @@ public void testMetadataRefreshDeadline() {
         assertTrue(group.hasMetadataExpired(time.milliseconds()));
 
         // Set the refresh time deadline with a higher group epoch.
-        group.setMetadataRefreshDeadline(time.milliseconds() + 1000, group.groupEpoch() + 1);

Review Comment:
   fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1244508147


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -423,6 +456,47 @@ public Map<String, TopicMetadata> computeSubscriptionMetadata(
         return Collections.unmodifiableMap(newSubscriptionMetadata);
     }
 
+    /**
+     * Updates the next metadata refresh time.
+     *
+     * @param nextTimeMs The next time in milliseconds.
+     * @param groupEpoch The associated group epoch.
+     */
+    public void setNextMetadataRefreshTime(
+        long nextTimeMs,
+        int groupEpoch
+    ) {
+        this.nextMetadataRefreshTime = new TimeAndEpoch(nextTimeMs, groupEpoch);
+    }
+
+    /**
+     * Resets the next metadata refresh.
+     */
+    public void resetNextMetadataRefreshTime() {
+        this.nextMetadataRefreshTime = TimeAndEpoch.EMPTY;
+    }
+
+    /**
+     * Checks if a metadata refresh is required. A refresh is required in two cases:
+     * 1) The next update time is smaller or equals to the current time;
+     * 2) The group epoch associated with the next update time is smaller than

Review Comment:
   This is also the case when we reset `nextMetadataRefreshTime` right? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1244497307


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -874,4 +1021,34 @@ public void replay(
             consumerGroup.updateMember(newMember);
         }
     }
+
+    /**
+     * A new metadata image is available.
+     *
+     * @param newImage  The new metadata image.
+     * @param delta     The delta image.
+     */
+    public void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) {
+        metadataImage = newImage;
+
+        // Notify all the groups subscribed to the created, updated or

Review Comment:
   As mentioned before, I got confused and thought this was actually adding topics. If this is just a call to update the existing groups with these topics, this is fine. (Since new topics won't be part of groups yet)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1245212300


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -119,6 +131,18 @@ public String toString() {
      */
     private final TimelineHashMap<Uuid, TimelineHashMap<Integer, Integer>> currentPartitionEpoch;
 
+    /**
+     * The next metadata refresh time. It consists of a timestamp in milliseconds together with
+     * the group epoch at the time of setting it. The metadata refresh time is considered as a
+     * soft state (read that it is not stored in a timeline data structure). It is like this
+     * because it is not persisted to the log. The group epoch is here to ensure that the
+     * next metadata refresh time is invalidated if the group epoch does not correspond to
+     * the current group epoch. This can happen if the next metadata refresh time is updated
+     * after having refreshed the metadata but the write operation failed. In this case, the
+     * time is not automatically rollback.

Review Comment:
   It is actually the other way around. The refresh time is updated immediately but it is not rolled back if the write failed. This is the reason why I have included the group epoch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1248071005


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -1932,6 +1961,435 @@ public void testPartitionAssignorExceptionOnRegularHeartbeat() {
                     .setTopicPartitions(Collections.emptyList())));
     }
 
+    @Test
+    public void testSubscriptionMetadataRefreshedAfterGroupIsLoaded() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        // Create a context with one consumer group containing one member.
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withConsumerGroupMetadataRefreshIntervalMs(5 * 60 * 1000)
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(10)
+                    .setTargetMemberEpoch(10)
+                    .setClientId("client")
+                    .setClientHost("localhost/127.0.0.1")
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2)))
+                    .build())
+                .withAssignment(memberId, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2)))
+                .withAssignmentEpoch(10)
+                .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() {
+                    {
+                        // foo only has 3 partitions stored in the metadata but foo has
+                        // 6 partitions the metadata image.
+                        put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 3));
+                    }
+                }))
+            .build();
+
+        // The metadata refresh flag should be true.
+        ConsumerGroup consumerGroup = context.groupMetadataManager
+            .getOrMaybeCreateConsumerGroup(groupId, false);
+        assertTrue(consumerGroup.hasMetadataExpired(context.time.milliseconds()));
+
+        // Prepare the assignment result.
+        assignor.prepareGroupAssignment(new GroupAssignment(
+            Collections.singletonMap(memberId, new MemberAssignment(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+            )))
+        ));
+
+        // Heartbeat.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = context.consumerGroupHeartbeat(

Review Comment:
   I see we test the heartbeat logic here now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1248086803


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java:
##########
@@ -568,5 +568,20 @@ public void testMetadataRefreshDeadline() {
         assertTrue(group.hasMetadataExpired(time.milliseconds()));
         assertEquals(time.milliseconds() + 1000, group.metadataRefreshDeadline().deadlineMs);
         assertEquals(group.groupEpoch() + 1, group.metadataRefreshDeadline().epoch);
+
+        // Set the refresh deadline.
+        group.setMetadataRefreshDeadline(time.milliseconds() + 1000, group.groupEpoch());

Review Comment:
   i don't understand your comment. could you elaborate?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1248148380


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java:
##########
@@ -568,5 +568,20 @@ public void testMetadataRefreshDeadline() {
         assertTrue(group.hasMetadataExpired(time.milliseconds()));
         assertEquals(time.milliseconds() + 1000, group.metadataRefreshDeadline().deadlineMs);
         assertEquals(group.groupEpoch() + 1, group.metadataRefreshDeadline().epoch);
+
+        // Set the refresh deadline.
+        group.setMetadataRefreshDeadline(time.milliseconds() + 1000, group.groupEpoch());

Review Comment:
   Ok -- so this is behavior we want then? I guess I was just having trouble seeing when we would update to the lower epoch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1248060783


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java:
##########
@@ -568,5 +568,20 @@ public void testMetadataRefreshDeadline() {
         assertTrue(group.hasMetadataExpired(time.milliseconds()));
         assertEquals(time.milliseconds() + 1000, group.metadataRefreshDeadline().deadlineMs);
         assertEquals(group.groupEpoch() + 1, group.metadataRefreshDeadline().epoch);
+
+        // Set the refresh deadline.
+        group.setMetadataRefreshDeadline(time.milliseconds() + 1000, group.groupEpoch());

Review Comment:
   we can just go back an epoch? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1244485648


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -727,6 +800,80 @@ public void replay(
                     + " but did not receive ConsumerGroupTargetAssignmentMetadataValue tombstone.");
             }
             consumerGroup.removeMember(memberId);
+            updateGroupsByTopics(groupId, oldSubscribedTopicNames, consumerGroup.subscribedTopicNames());
+        }
+    }
+
+    /**
+     * @return The set of groups subscribed to the topic.
+     */
+    public Set<String> groupsSubscribedToTopic(String topicName) {
+        Set<String> groups = groupsByTopics.get(topicName);
+        return groups != null ? groups : Collections.emptySet();
+    }
+
+    /**
+     * Subscribes a group to a topic.
+     *
+     * @param groupId   The group id.
+     * @param topicName The topic name.
+     */
+    private void subscribeGroupToTopic(
+        String groupId,
+        String topicName
+    ) {
+        groupsByTopics
+            .computeIfAbsent(topicName, __ -> new TimelineHashSet<>(snapshotRegistry, 1))
+            .add(groupId);
+    }
+
+    /**
+     * Unsubscribes a group from a topic.
+     *
+     * @param groupId   The group id.
+     * @param topicName The topic name.
+     */
+    private void unsubscribeGroupFromTopic(

Review Comment:
   should return be groupsIds type here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1244495117


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -874,4 +1021,34 @@ public void replay(
             consumerGroup.updateMember(newMember);
         }
     }
+
+    /**
+     * A new metadata image is available.
+     *
+     * @param newImage  The new metadata image.
+     * @param delta     The delta image.
+     */
+    public void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) {
+        metadataImage = newImage;
+
+        // Notify all the groups subscribed to the created, updated or

Review Comment:
   Is created topics a different method in topicsDelta? Shouldn't we have `createdTopicIds` and we add them? Or is changedTopics accounting for that? Are there other changes besides topic creation we can have? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1245213037


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -423,6 +456,47 @@ public Map<String, TopicMetadata> computeSubscriptionMetadata(
         return Collections.unmodifiableMap(newSubscriptionMetadata);
     }
 
+    /**
+     * Updates the next metadata refresh time.
+     *
+     * @param nextTimeMs The next time in milliseconds.
+     * @param groupEpoch The associated group epoch.
+     */
+    public void setNextMetadataRefreshTime(
+        long nextTimeMs,
+        int groupEpoch
+    ) {
+        this.nextMetadataRefreshTime = new TimeAndEpoch(nextTimeMs, groupEpoch);
+    }
+
+    /**
+     * Resets the next metadata refresh.

Review Comment:
   Right. Update immediately on the next heartbeat.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1247803617


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##########
@@ -825,4 +828,60 @@ public void testClose() throws Exception {
         assertFutureThrows(write1, NotCoordinatorException.class);
         assertFutureThrows(write2, NotCoordinatorException.class);
     }
+
+    @Test
+    public void testOnNewMetadataImage() {
+        TopicPartition tp0 = new TopicPartition("__consumer_offsets", 0);
+        TopicPartition tp1 = new TopicPartition("__consumer_offsets", 1);
+
+        MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class);
+        MockPartitionWriter writer = mock(MockPartitionWriter.class);
+        MockCoordinatorBuilderSupplier supplier = mock(MockCoordinatorBuilderSupplier.class);
+        MockCoordinatorBuilder builder = mock(MockCoordinatorBuilder.class);
+
+        CoordinatorRuntime<MockCoordinator, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinator, String>()
+                .withLoader(loader)
+                .withEventProcessor(new MockEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorBuilderSupplier(supplier)
+                .build();
+
+        MockCoordinator coordinator0 = mock(MockCoordinator.class);
+        MockCoordinator coordinator1 = mock(MockCoordinator.class);
+
+        when(supplier.get()).thenReturn(builder);
+        when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+        when(builder.withLogContext(any())).thenReturn(builder);
+        when(builder.build())
+            .thenReturn(coordinator0)
+            .thenReturn(coordinator1);
+
+        CompletableFuture<Void> future0 = new CompletableFuture<>();
+        when(loader.load(tp0, coordinator0)).thenReturn(future0);
+
+        CompletableFuture<Void> future1 = new CompletableFuture<>();
+        when(loader.load(tp1, coordinator1)).thenReturn(future1);
+
+        runtime.scheduleLoadOperation(tp0, 0);
+        runtime.scheduleLoadOperation(tp1, 0);
+
+        // Coordinator 0 is loaded. It should get the current image
+        // that is the empty one.
+        future0.complete(null);
+        verify(coordinator0).onLoaded(MetadataImage.EMPTY);
+
+        // Publish a new image.
+        MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
+        MetadataImage newImage = delta.apply(MetadataProvenance.EMPTY);
+        runtime.onNewMetadataImage(newImage, delta);
+
+        // Coordinator 0 should be notified about it.
+        verify(coordinator0).onNewMetadataImage(newImage, delta);

Review Comment:
   We cannot test this here because the runtime is not aware of the concrete implementation of the state machine. I also want to ensure that we are on the same page. The metadata image is updated when `onNewMetadataImage` is called but the subscription metadata is refreshed on the next heartbeat.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1247155039


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##########
@@ -825,4 +828,60 @@ public void testClose() throws Exception {
         assertFutureThrows(write1, NotCoordinatorException.class);
         assertFutureThrows(write2, NotCoordinatorException.class);
     }
+
+    @Test
+    public void testOnNewMetadataImage() {
+        TopicPartition tp0 = new TopicPartition("__consumer_offsets", 0);
+        TopicPartition tp1 = new TopicPartition("__consumer_offsets", 1);
+
+        MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class);
+        MockPartitionWriter writer = mock(MockPartitionWriter.class);
+        MockCoordinatorBuilderSupplier supplier = mock(MockCoordinatorBuilderSupplier.class);
+        MockCoordinatorBuilder builder = mock(MockCoordinatorBuilder.class);
+
+        CoordinatorRuntime<MockCoordinator, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinator, String>()
+                .withLoader(loader)
+                .withEventProcessor(new MockEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorBuilderSupplier(supplier)
+                .build();
+
+        MockCoordinator coordinator0 = mock(MockCoordinator.class);
+        MockCoordinator coordinator1 = mock(MockCoordinator.class);
+
+        when(supplier.get()).thenReturn(builder);
+        when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+        when(builder.withLogContext(any())).thenReturn(builder);
+        when(builder.build())
+            .thenReturn(coordinator0)
+            .thenReturn(coordinator1);
+
+        CompletableFuture<Void> future0 = new CompletableFuture<>();
+        when(loader.load(tp0, coordinator0)).thenReturn(future0);
+
+        CompletableFuture<Void> future1 = new CompletableFuture<>();
+        when(loader.load(tp1, coordinator1)).thenReturn(future1);
+
+        runtime.scheduleLoadOperation(tp0, 0);
+        runtime.scheduleLoadOperation(tp1, 0);
+
+        // Coordinator 0 is loaded. It should get the current image
+        // that is the empty one.
+        future0.complete(null);
+        verify(coordinator0).onLoaded(MetadataImage.EMPTY);
+
+        // Publish a new image.
+        MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
+        MetadataImage newImage = delta.apply(MetadataProvenance.EMPTY);
+        runtime.onNewMetadataImage(newImage, delta);
+
+        // Coordinator 0 should be notified about it.
+        verify(coordinator0).onNewMetadataImage(newImage, delta);

Review Comment:
   Do we also want to test the logic inside the onNewMetadataImage code?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1248087394


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -1932,6 +1961,435 @@ public void testPartitionAssignorExceptionOnRegularHeartbeat() {
                     .setTopicPartitions(Collections.emptyList())));
     }
 
+    @Test
+    public void testSubscriptionMetadataRefreshedAfterGroupIsLoaded() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        // Create a context with one consumer group containing one member.
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withConsumerGroupMetadataRefreshIntervalMs(5 * 60 * 1000)
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(10)
+                    .setTargetMemberEpoch(10)
+                    .setClientId("client")
+                    .setClientHost("localhost/127.0.0.1")
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2)))
+                    .build())
+                .withAssignment(memberId, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2)))
+                .withAssignmentEpoch(10)
+                .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() {
+                    {
+                        // foo only has 3 partitions stored in the metadata but foo has
+                        // 6 partitions the metadata image.
+                        put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 3));
+                    }
+                }))
+            .build();
+
+        // The metadata refresh flag should be true.
+        ConsumerGroup consumerGroup = context.groupMetadataManager
+            .getOrMaybeCreateConsumerGroup(groupId, false);
+        assertTrue(consumerGroup.hasMetadataExpired(context.time.milliseconds()));
+
+        // Prepare the assignment result.
+        assignor.prepareGroupAssignment(new GroupAssignment(
+            Collections.singletonMap(memberId, new MemberAssignment(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+            )))
+        ));
+
+        // Heartbeat.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(10));
+
+        // The member gets partitions 3, 4 and 5 assigned.
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId)
+                .setMemberEpoch(11)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setAssignedTopicPartitions(Arrays.asList(
+                        new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(fooTopicId)
+                            .setPartitions(Arrays.asList(0, 1, 2, 3, 4, 5))
+                    ))),
+            result.response()
+        );
+
+        ConsumerGroupMember expectedMember = new ConsumerGroupMember.Builder(memberId)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            .setTargetMemberEpoch(11)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setServerAssignorName("range")
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
+            .build();
+
+        List<Record> expectedRecords = Arrays.asList(
+            RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() {
+                {
+                    put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6));
+                }
+            }),
+            RecordHelpers.newGroupEpochRecord(groupId, 11),
+            RecordHelpers.newTargetAssignmentRecord(groupId, memberId, mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+            )),
+            RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11),
+            RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember)
+        );
+
+        assertRecordsEquals(expectedRecords, result.records());
+
+        // Check next refresh time.
+        assertFalse(consumerGroup.hasMetadataExpired(context.time.milliseconds()));
+        assertEquals(context.time.milliseconds() + 5 * 60 * 1000, consumerGroup.metadataRefreshDeadline().deadlineMs);
+        assertEquals(11, consumerGroup.metadataRefreshDeadline().epoch);
+    }
+
+    @Test
+    public void testSubscriptionMetadataRefreshedAgainAfterWriteFailure() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        // Create a context with one consumer group containing one member.
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withConsumerGroupMetadataRefreshIntervalMs(5 * 60 * 1000)
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(10)
+                    .setTargetMemberEpoch(10)
+                    .setClientId("client")
+                    .setClientHost("localhost/127.0.0.1")
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2)))
+                    .build())
+                .withAssignment(memberId, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2)))
+                .withAssignmentEpoch(10)
+                .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() {
+                    {
+                        // foo only has 3 partitions stored in the metadata but foo has
+                        // 6 partitions the metadata image.
+                        put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 3));
+                    }
+                }))
+            .build();
+
+        // The metadata refresh flag should be true.
+        ConsumerGroup consumerGroup = context.groupMetadataManager
+            .getOrMaybeCreateConsumerGroup(groupId, false);
+        assertTrue(consumerGroup.hasMetadataExpired(context.time.milliseconds()));
+
+        // Prepare the assignment result.
+        assignor.prepareGroupAssignment(new GroupAssignment(
+            Collections.singletonMap(memberId, new MemberAssignment(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+            )))
+        ));
+
+        // Heartbeat.
+        context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(10));
+
+        // The metadata refresh flag is set to a future time.
+        assertFalse(consumerGroup.hasMetadataExpired(context.time.milliseconds()));
+        assertEquals(context.time.milliseconds() + 5 * 60 * 1000, consumerGroup.metadataRefreshDeadline().deadlineMs);
+        assertEquals(11, consumerGroup.metadataRefreshDeadline().epoch);
+
+        // Rollback the uncommitted changes. This does not rollback the metadata flag
+        // because it is not using a timeline data structure.
+        context.rollback();
+
+        // However, the next heartbeat should detect the divergence based on the epoch and trigger
+        // a metadata refresh.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(10));
+
+
+        // The member gets partitions 3, 4 and 5 assigned.
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId)
+                .setMemberEpoch(11)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setAssignedTopicPartitions(Arrays.asList(
+                        new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(fooTopicId)
+                            .setPartitions(Arrays.asList(0, 1, 2, 3, 4, 5))
+                    ))),
+            result.response()
+        );
+
+        ConsumerGroupMember expectedMember = new ConsumerGroupMember.Builder(memberId)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            .setTargetMemberEpoch(11)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setServerAssignorName("range")
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
+            .build();
+
+        List<Record> expectedRecords = Arrays.asList(
+            RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() {
+                {
+                    put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6));
+                }
+            }),
+            RecordHelpers.newGroupEpochRecord(groupId, 11),
+            RecordHelpers.newTargetAssignmentRecord(groupId, memberId, mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+            )),
+            RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11),
+            RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember)
+        );
+
+        assertRecordsEquals(expectedRecords, result.records());
+
+        // Check next refresh time.
+        assertFalse(consumerGroup.hasMetadataExpired(context.time.milliseconds()));
+        assertEquals(context.time.milliseconds() + 5 * 60 * 1000, consumerGroup.metadataRefreshDeadline().deadlineMs);
+        assertEquals(11, consumerGroup.metadataRefreshDeadline().epoch);
+    }
+
+    @Test
+    public void testGroupIdsByTopics() {
+        String groupId1 = "group1";
+        String groupId2 = "group2";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .build();
+
+        assertEquals(Collections.emptySet(), context.groupMetadataManager.groupsSubscribedToTopic("foo"));
+        assertEquals(Collections.emptySet(), context.groupMetadataManager.groupsSubscribedToTopic("bar"));
+        assertEquals(Collections.emptySet(), context.groupMetadataManager.groupsSubscribedToTopic("zar"));
+
+        // M1 in group 1 subscribes to foo and bar.
+        context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId1,
+            new ConsumerGroupMember.Builder("group1-m1")
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .build()));
+
+        assertEquals(mkSet(groupId1), context.groupMetadataManager.groupsSubscribedToTopic("foo"));
+        assertEquals(mkSet(groupId1), context.groupMetadataManager.groupsSubscribedToTopic("bar"));
+        assertEquals(Collections.emptySet(), context.groupMetadataManager.groupsSubscribedToTopic("zar"));
+
+        // M1 in group 2 subscribes to foo, bar and zar.
+        context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId2,
+            new ConsumerGroupMember.Builder("group2-m1")
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar", "zar"))
+                .build()));
+
+        assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("foo"));
+        assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("bar"));
+        assertEquals(mkSet(groupId2), context.groupMetadataManager.groupsSubscribedToTopic("zar"));
+
+        // M2 in group 1 subscribes to bar and zar.
+        context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId1,
+            new ConsumerGroupMember.Builder("group1-m2")
+                .setSubscribedTopicNames(Arrays.asList("bar", "zar"))
+                .build()));
+
+        assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("foo"));
+        assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("bar"));
+        assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("zar"));
+
+        // M2 in group 2 subscribes to foo and bar.
+        context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId2,
+            new ConsumerGroupMember.Builder("group2-m2")
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .build()));
+
+        assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("foo"));
+        assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("bar"));
+        assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("zar"));
+
+        // M1 in group 1 is removed.
+        context.replay(RecordHelpers.newCurrentAssignmentTombstoneRecord(groupId1, "group1-m1"));
+        context.replay(RecordHelpers.newMemberSubscriptionTombstoneRecord(groupId1, "group1-m1"));
+
+        assertEquals(mkSet(groupId2), context.groupMetadataManager.groupsSubscribedToTopic("foo"));
+        assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("bar"));
+        assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("zar"));
+
+        // M1 in group 2 subscribes to nothing.
+        context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId2,
+            new ConsumerGroupMember.Builder("group2-m1")
+                .setSubscribedTopicNames(Collections.emptyList())
+                .build()));
+
+        assertEquals(mkSet(groupId2), context.groupMetadataManager.groupsSubscribedToTopic("foo"));
+        assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("bar"));
+        assertEquals(mkSet(groupId1), context.groupMetadataManager.groupsSubscribedToTopic("zar"));
+
+        // M2 in group 2 subscribes to foo.

Review Comment:
   right.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1248087985


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -1932,6 +1961,435 @@ public void testPartitionAssignorExceptionOnRegularHeartbeat() {
                     .setTopicPartitions(Collections.emptyList())));
     }
 
+    @Test
+    public void testSubscriptionMetadataRefreshedAfterGroupIsLoaded() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        // Create a context with one consumer group containing one member.
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withConsumerGroupMetadataRefreshIntervalMs(5 * 60 * 1000)
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(10)
+                    .setTargetMemberEpoch(10)
+                    .setClientId("client")
+                    .setClientHost("localhost/127.0.0.1")
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2)))
+                    .build())
+                .withAssignment(memberId, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2)))
+                .withAssignmentEpoch(10)
+                .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() {
+                    {
+                        // foo only has 3 partitions stored in the metadata but foo has
+                        // 6 partitions the metadata image.
+                        put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 3));
+                    }
+                }))
+            .build();
+
+        // The metadata refresh flag should be true.
+        ConsumerGroup consumerGroup = context.groupMetadataManager
+            .getOrMaybeCreateConsumerGroup(groupId, false);
+        assertTrue(consumerGroup.hasMetadataExpired(context.time.milliseconds()));
+
+        // Prepare the assignment result.
+        assignor.prepareGroupAssignment(new GroupAssignment(
+            Collections.singletonMap(memberId, new MemberAssignment(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+            )))
+        ));
+
+        // Heartbeat.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = context.consumerGroupHeartbeat(

Review Comment:
   Correct. I don't get what you mean though?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1244492766


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -874,4 +1021,34 @@ public void replay(
             consumerGroup.updateMember(newMember);
         }
     }
+
+    /**
+     * A new metadata image is available.
+     *
+     * @param newImage  The new metadata image.
+     * @param delta     The delta image.
+     */
+    public void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) {
+        metadataImage = newImage;
+
+        // Notify all the groups subscribed to the created, updated or
+        // deleted topics.
+        Set<String> allGroupIds = new HashSet<>();
+        delta.topicsDelta().changedTopics().forEach((topicId, topicDelta) -> {
+            String topicName = topicDelta.name();
+            Set<String> groupIds = groupsByTopics.get(topicName);
+            if (groupIds != null) allGroupIds.addAll(groupIds);
+        });
+        delta.topicsDelta().deletedTopicIds().forEach(topicId -> {
+            TopicImage topicImage = delta.image().topics().getTopic(topicId);
+            Set<String> groupIds = groupsByTopics.get(topicImage.name());
+            if (groupIds != null) allGroupIds.addAll(groupIds);

Review Comment:
   we don't want to add all the deleted topics do we?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1244506236


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -119,6 +131,18 @@ public String toString() {
      */
     private final TimelineHashMap<Uuid, TimelineHashMap<Integer, Integer>> currentPartitionEpoch;
 
+    /**
+     * The next metadata refresh time. It consists of a timestamp in milliseconds together with
+     * the group epoch at the time of setting it. The metadata refresh time is considered as a
+     * soft state (read that it is not stored in a timeline data structure). It is like this
+     * because it is not persisted to the log. The group epoch is here to ensure that the
+     * next metadata refresh time is invalidated if the group epoch does not correspond to
+     * the current group epoch. This can happen if the next metadata refresh time is updated
+     * after having refreshed the metadata but the write operation failed. In this case, the
+     * time is not automatically rollback.

Review Comment:
   nit: automatically rolled back.
   
   Also in this case, do we not update the refresh time until the epoch bump and write succeed? And we will keep refreshing the metadata in the meantime?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1248086692


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -1932,6 +1961,435 @@ public void testPartitionAssignorExceptionOnRegularHeartbeat() {
                     .setTopicPartitions(Collections.emptyList())));
     }
 
+    @Test
+    public void testSubscriptionMetadataRefreshedAfterGroupIsLoaded() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        // Create a context with one consumer group containing one member.
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withConsumerGroupMetadataRefreshIntervalMs(5 * 60 * 1000)
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(10)
+                    .setTargetMemberEpoch(10)
+                    .setClientId("client")
+                    .setClientHost("localhost/127.0.0.1")
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2)))
+                    .build())
+                .withAssignment(memberId, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2)))
+                .withAssignmentEpoch(10)
+                .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() {
+                    {
+                        // foo only has 3 partitions stored in the metadata but foo has
+                        // 6 partitions the metadata image.
+                        put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 3));
+                    }
+                }))
+            .build();
+
+        // The metadata refresh flag should be true.
+        ConsumerGroup consumerGroup = context.groupMetadataManager
+            .getOrMaybeCreateConsumerGroup(groupId, false);
+        assertTrue(consumerGroup.hasMetadataExpired(context.time.milliseconds()));
+
+        // Prepare the assignment result.
+        assignor.prepareGroupAssignment(new GroupAssignment(
+            Collections.singletonMap(memberId, new MemberAssignment(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+            )))
+        ));
+
+        // Heartbeat.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(10));
+
+        // The member gets partitions 3, 4 and 5 assigned.
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId)
+                .setMemberEpoch(11)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setAssignedTopicPartitions(Arrays.asList(
+                        new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(fooTopicId)
+                            .setPartitions(Arrays.asList(0, 1, 2, 3, 4, 5))
+                    ))),
+            result.response()
+        );
+
+        ConsumerGroupMember expectedMember = new ConsumerGroupMember.Builder(memberId)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            .setTargetMemberEpoch(11)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setServerAssignorName("range")
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
+            .build();
+
+        List<Record> expectedRecords = Arrays.asList(
+            RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() {
+                {
+                    put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6));
+                }
+            }),
+            RecordHelpers.newGroupEpochRecord(groupId, 11),
+            RecordHelpers.newTargetAssignmentRecord(groupId, memberId, mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+            )),
+            RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11),
+            RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember)
+        );
+
+        assertRecordsEquals(expectedRecords, result.records());
+
+        // Check next refresh time.
+        assertFalse(consumerGroup.hasMetadataExpired(context.time.milliseconds()));
+        assertEquals(context.time.milliseconds() + 5 * 60 * 1000, consumerGroup.metadataRefreshDeadline().deadlineMs);
+        assertEquals(11, consumerGroup.metadataRefreshDeadline().epoch);
+    }
+
+    @Test
+    public void testSubscriptionMetadataRefreshedAgainAfterWriteFailure() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        // Create a context with one consumer group containing one member.
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withConsumerGroupMetadataRefreshIntervalMs(5 * 60 * 1000)
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(10)
+                    .setTargetMemberEpoch(10)
+                    .setClientId("client")
+                    .setClientHost("localhost/127.0.0.1")
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2)))
+                    .build())
+                .withAssignment(memberId, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2)))
+                .withAssignmentEpoch(10)
+                .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() {
+                    {
+                        // foo only has 3 partitions stored in the metadata but foo has
+                        // 6 partitions the metadata image.
+                        put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 3));
+                    }
+                }))
+            .build();
+
+        // The metadata refresh flag should be true.
+        ConsumerGroup consumerGroup = context.groupMetadataManager
+            .getOrMaybeCreateConsumerGroup(groupId, false);
+        assertTrue(consumerGroup.hasMetadataExpired(context.time.milliseconds()));
+
+        // Prepare the assignment result.
+        assignor.prepareGroupAssignment(new GroupAssignment(
+            Collections.singletonMap(memberId, new MemberAssignment(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+            )))
+        ));
+
+        // Heartbeat.
+        context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(10));
+
+        // The metadata refresh flag is set to a future time.
+        assertFalse(consumerGroup.hasMetadataExpired(context.time.milliseconds()));
+        assertEquals(context.time.milliseconds() + 5 * 60 * 1000, consumerGroup.metadataRefreshDeadline().deadlineMs);
+        assertEquals(11, consumerGroup.metadataRefreshDeadline().epoch);
+
+        // Rollback the uncommitted changes. This does not rollback the metadata flag
+        // because it is not using a timeline data structure.
+        context.rollback();
+
+        // However, the next heartbeat should detect the divergence based on the epoch and trigger
+        // a metadata refresh.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(10));
+
+
+        // The member gets partitions 3, 4 and 5 assigned.
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId)
+                .setMemberEpoch(11)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setAssignedTopicPartitions(Arrays.asList(
+                        new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(fooTopicId)
+                            .setPartitions(Arrays.asList(0, 1, 2, 3, 4, 5))
+                    ))),
+            result.response()
+        );
+
+        ConsumerGroupMember expectedMember = new ConsumerGroupMember.Builder(memberId)
+            .setMemberEpoch(11)
+            .setPreviousMemberEpoch(10)
+            .setTargetMemberEpoch(11)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setServerAssignorName("range")
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
+            .build();
+
+        List<Record> expectedRecords = Arrays.asList(
+            RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new HashMap<String, TopicMetadata>() {
+                {
+                    put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 6));
+                }
+            }),
+            RecordHelpers.newGroupEpochRecord(groupId, 11),
+            RecordHelpers.newTargetAssignmentRecord(groupId, memberId, mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+            )),
+            RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11),
+            RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember)
+        );
+
+        assertRecordsEquals(expectedRecords, result.records());
+
+        // Check next refresh time.
+        assertFalse(consumerGroup.hasMetadataExpired(context.time.milliseconds()));
+        assertEquals(context.time.milliseconds() + 5 * 60 * 1000, consumerGroup.metadataRefreshDeadline().deadlineMs);
+        assertEquals(11, consumerGroup.metadataRefreshDeadline().epoch);
+    }
+
+    @Test
+    public void testGroupIdsByTopics() {
+        String groupId1 = "group1";
+        String groupId2 = "group2";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .build();
+
+        assertEquals(Collections.emptySet(), context.groupMetadataManager.groupsSubscribedToTopic("foo"));
+        assertEquals(Collections.emptySet(), context.groupMetadataManager.groupsSubscribedToTopic("bar"));
+        assertEquals(Collections.emptySet(), context.groupMetadataManager.groupsSubscribedToTopic("zar"));
+
+        // M1 in group 1 subscribes to foo and bar.
+        context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId1,
+            new ConsumerGroupMember.Builder("group1-m1")
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .build()));
+
+        assertEquals(mkSet(groupId1), context.groupMetadataManager.groupsSubscribedToTopic("foo"));
+        assertEquals(mkSet(groupId1), context.groupMetadataManager.groupsSubscribedToTopic("bar"));
+        assertEquals(Collections.emptySet(), context.groupMetadataManager.groupsSubscribedToTopic("zar"));
+
+        // M1 in group 2 subscribes to foo, bar and zar.
+        context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId2,
+            new ConsumerGroupMember.Builder("group2-m1")
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar", "zar"))
+                .build()));
+
+        assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("foo"));
+        assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("bar"));
+        assertEquals(mkSet(groupId2), context.groupMetadataManager.groupsSubscribedToTopic("zar"));
+
+        // M2 in group 1 subscribes to bar and zar.
+        context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId1,
+            new ConsumerGroupMember.Builder("group1-m2")
+                .setSubscribedTopicNames(Arrays.asList("bar", "zar"))
+                .build()));
+
+        assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("foo"));
+        assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("bar"));
+        assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("zar"));
+
+        // M2 in group 2 subscribes to foo and bar.
+        context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId2,
+            new ConsumerGroupMember.Builder("group2-m2")
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .build()));
+
+        assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("foo"));
+        assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("bar"));
+        assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("zar"));
+
+        // M1 in group 1 is removed.
+        context.replay(RecordHelpers.newCurrentAssignmentTombstoneRecord(groupId1, "group1-m1"));
+        context.replay(RecordHelpers.newMemberSubscriptionTombstoneRecord(groupId1, "group1-m1"));
+
+        assertEquals(mkSet(groupId2), context.groupMetadataManager.groupsSubscribedToTopic("foo"));
+        assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("bar"));
+        assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("zar"));
+
+        // M1 in group 2 subscribes to nothing.
+        context.replay(RecordHelpers.newMemberSubscriptionRecord(groupId2,
+            new ConsumerGroupMember.Builder("group2-m1")
+                .setSubscribedTopicNames(Collections.emptyList())
+                .build()));
+
+        assertEquals(mkSet(groupId2), context.groupMetadataManager.groupsSubscribedToTopic("foo"));
+        assertEquals(mkSet(groupId1, groupId2), context.groupMetadataManager.groupsSubscribedToTopic("bar"));
+        assertEquals(mkSet(groupId1), context.groupMetadataManager.groupsSubscribedToTopic("zar"));
+
+        // M2 in group 2 subscribes to foo.

Review Comment:
   this is effectively just removing bar from the subscription right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1248091492


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java:
##########
@@ -568,5 +568,20 @@ public void testMetadataRefreshDeadline() {
         assertTrue(group.hasMetadataExpired(time.milliseconds()));
         assertEquals(time.milliseconds() + 1000, group.metadataRefreshDeadline().deadlineMs);
         assertEquals(group.groupEpoch() + 1, group.metadataRefreshDeadline().epoch);
+
+        // Set the refresh deadline.
+        group.setMetadataRefreshDeadline(time.milliseconds() + 1000, group.groupEpoch());

Review Comment:
   btw, i just fixed the previous test case at L566.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1248094484


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java:
##########
@@ -564,9 +564,9 @@ public void testMetadataRefreshDeadline() {
         assertTrue(group.hasMetadataExpired(time.milliseconds()));
 
         // Set the refresh time deadline with a higher group epoch.
-        group.setMetadataRefreshDeadline(time.milliseconds() + 1000, group.groupEpoch() + 1);

Review Comment:
   why did we remove +1000? Wouldn't a deadline set to the current time be expired anyway (regardless of epoch)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1248097012


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -1932,6 +1961,435 @@ public void testPartitionAssignorExceptionOnRegularHeartbeat() {
                     .setTopicPartitions(Collections.emptyList())));
     }
 
+    @Test
+    public void testSubscriptionMetadataRefreshedAfterGroupIsLoaded() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        // Create a context with one consumer group containing one member.
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withConsumerGroupMetadataRefreshIntervalMs(5 * 60 * 1000)
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(10)
+                    .setTargetMemberEpoch(10)
+                    .setClientId("client")
+                    .setClientHost("localhost/127.0.0.1")
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2)))
+                    .build())
+                .withAssignment(memberId, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2)))
+                .withAssignmentEpoch(10)
+                .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() {
+                    {
+                        // foo only has 3 partitions stored in the metadata but foo has
+                        // 6 partitions the metadata image.
+                        put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 3));
+                    }
+                }))
+            .build();
+
+        // The metadata refresh flag should be true.
+        ConsumerGroup consumerGroup = context.groupMetadataManager
+            .getOrMaybeCreateConsumerGroup(groupId, false);
+        assertTrue(consumerGroup.hasMetadataExpired(context.time.milliseconds()));
+
+        // Prepare the assignment result.
+        assignor.prepareGroupAssignment(new GroupAssignment(
+            Collections.singletonMap(memberId, new MemberAssignment(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+            )))
+        ));
+
+        // Heartbeat.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = context.consumerGroupHeartbeat(

Review Comment:
   gotcha.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1245156156


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -709,14 +780,16 @@ public void replay(
         String groupId = key.groupId();
         String memberId = key.memberId();
 
+        ConsumerGroup consumerGroup = getOrMaybeCreateConsumerGroup(groupId, value != null);
+        Set<String> oldSubscribedTopicNames = new HashSet<>(consumerGroup.subscribedTopicNames());
+
         if (value != null) {
-            ConsumerGroup consumerGroup = getOrMaybeCreateConsumerGroup(groupId, true);
             ConsumerGroupMember oldMember = consumerGroup.getOrMaybeCreateMember(memberId, true);
             consumerGroup.updateMember(new ConsumerGroupMember.Builder(oldMember)
                 .updateWith(value)
                 .build());
+            updateGroupsByTopics(groupId, oldSubscribedTopicNames, consumerGroup.subscribedTopicNames());

Review Comment:
   Good call. Moved it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "jeffkbkim (via GitHub)" <gi...@apache.org>.
jeffkbkim commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1245682672


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -728,6 +794,81 @@ public void replay(
             }
             consumerGroup.removeMember(memberId);
         }
+
+        updateGroupsByTopics(groupId, oldSubscribedTopicNames, consumerGroup.subscribedTopicNames());

Review Comment:
   how does `groupsByTopics` (and `groups`) know that the changes made here are already committed (and won't be reverted)?
   
   i think i'm confused because in api handling (i.e. consumer group heartbeat) once we modify the timeline data structures we generate records to commit the offset in the timeline but here we do it in reverse



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -423,6 +456,47 @@ public Map<String, TopicMetadata> computeSubscriptionMetadata(
         return Collections.unmodifiableMap(newSubscriptionMetadata);
     }
 
+    /**
+     * Updates the metadata refresh deadline.
+     *
+     * @param deadlineMs The next time in milliseconds.
+     * @param groupEpoch The associated group epoch.
+     */
+    public void setMetadataRefreshDeadline(
+        long deadlineMs,
+        int groupEpoch
+    ) {
+        this.metadataRefreshDeadline = new DeadlineAndEpoch(deadlineMs, groupEpoch);
+    }
+
+    /**
+     * Requests a metadata refresh.
+     */
+    public void requestMetadataRefresh() {
+        this.metadataRefreshDeadline = DeadlineAndEpoch.EMPTY;
+    }
+
+    /**
+     * Checks if a metadata refresh is required. A refresh is required in two cases:
+     * 1) The deadline is smaller or equals to the current time;

Review Comment:
   nit: "or equal to"



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -423,6 +456,47 @@ public Map<String, TopicMetadata> computeSubscriptionMetadata(
         return Collections.unmodifiableMap(newSubscriptionMetadata);
     }
 
+    /**
+     * Updates the next metadata refresh time.
+     *
+     * @param nextTimeMs The next time in milliseconds.
+     * @param groupEpoch The associated group epoch.
+     */
+    public void setNextMetadataRefreshTime(
+        long nextTimeMs,
+        int groupEpoch
+    ) {
+        this.nextMetadataRefreshTime = new TimeAndEpoch(nextTimeMs, groupEpoch);
+    }
+
+    /**
+     * Resets the next metadata refresh.
+     */
+    public void resetNextMetadataRefreshTime() {
+        this.nextMetadataRefreshTime = TimeAndEpoch.EMPTY;
+    }
+
+    /**
+     * Checks if a metadata refresh is required. A refresh is required in two cases:
+     * 1) The next update time is smaller or equals to the current time;
+     * 2) The group epoch associated with the next update time is smaller than

Review Comment:
   shouldn't it be "associated with the next update time is larger than"?
   
   the "current group epoch" is `groupEpoch()` right



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -728,6 +794,81 @@ public void replay(
             }
             consumerGroup.removeMember(memberId);
         }
+
+        updateGroupsByTopics(groupId, oldSubscribedTopicNames, consumerGroup.subscribedTopicNames());
+    }
+
+    /**
+     * @return The set of groups subscribed to the topic.
+     */
+    public Set<String> groupsSubscribedToTopic(String topicName) {
+        Set<String> groups = groupsByTopics.get(topicName);

Review Comment:
   any reason we don't use `getOrDefault()`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "jeffkbkim (via GitHub)" <gi...@apache.org>.
jeffkbkim commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1245712559


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -141,7 +141,7 @@ public static class TimeAndEpoch {
      * after having refreshed the metadata but the write operation failed. In this case, the
      * time is not automatically rolled back.
      */
-    private TimeAndEpoch nextMetadataRefreshTime = TimeAndEpoch.EMPTY;

Review Comment:
   i noticed this across some places.
   
   can we rework the comments in `hasMetadataExpired()` and in the test `testNextMetadataRefreshTime()` along with the name to use deadline?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1246646144


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -728,6 +794,81 @@ public void replay(
             }
             consumerGroup.removeMember(memberId);
         }
+
+        updateGroupsByTopics(groupId, oldSubscribedTopicNames, consumerGroup.subscribedTopicNames());

Review Comment:
   `groupsByTopics` is a timeline data structure and we update it based on ConsumerGroupMemberMetadataKey/Value record. If the record can't be written, the state is reverted to the last written offset and this reverts all those pending changes.
   
   > i think i'm confused because in api handling (i.e. consumer group heartbeat) once we modify the timeline data structures we generate records to commit the offset in the timeline but here we do it in reverse
   
   In the api handling, we never update the timeline data structures. We only generate records and they are updated when the records are applied.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1248223476


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java:
##########
@@ -568,5 +568,20 @@ public void testMetadataRefreshDeadline() {
         assertTrue(group.hasMetadataExpired(time.milliseconds()));
         assertEquals(time.milliseconds() + 1000, group.metadataRefreshDeadline().deadlineMs);
         assertEquals(group.groupEpoch() + 1, group.metadataRefreshDeadline().epoch);
+
+        // Set the refresh deadline.
+        group.setMetadataRefreshDeadline(time.milliseconds() + 1000, group.groupEpoch());

Review Comment:
   I think so. The epoch value is not so important here. The important part is that it should be reset regardless.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1248088555


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##########
@@ -825,4 +828,60 @@ public void testClose() throws Exception {
         assertFutureThrows(write1, NotCoordinatorException.class);
         assertFutureThrows(write2, NotCoordinatorException.class);
     }
+
+    @Test
+    public void testOnNewMetadataImage() {
+        TopicPartition tp0 = new TopicPartition("__consumer_offsets", 0);
+        TopicPartition tp1 = new TopicPartition("__consumer_offsets", 1);
+
+        MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class);
+        MockPartitionWriter writer = mock(MockPartitionWriter.class);
+        MockCoordinatorBuilderSupplier supplier = mock(MockCoordinatorBuilderSupplier.class);
+        MockCoordinatorBuilder builder = mock(MockCoordinatorBuilder.class);
+
+        CoordinatorRuntime<MockCoordinator, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinator, String>()
+                .withLoader(loader)
+                .withEventProcessor(new MockEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorBuilderSupplier(supplier)
+                .build();
+
+        MockCoordinator coordinator0 = mock(MockCoordinator.class);
+        MockCoordinator coordinator1 = mock(MockCoordinator.class);
+
+        when(supplier.get()).thenReturn(builder);
+        when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+        when(builder.withLogContext(any())).thenReturn(builder);
+        when(builder.build())
+            .thenReturn(coordinator0)
+            .thenReturn(coordinator1);
+
+        CompletableFuture<Void> future0 = new CompletableFuture<>();
+        when(loader.load(tp0, coordinator0)).thenReturn(future0);
+
+        CompletableFuture<Void> future1 = new CompletableFuture<>();
+        when(loader.load(tp1, coordinator1)).thenReturn(future1);
+
+        runtime.scheduleLoadOperation(tp0, 0);
+        runtime.scheduleLoadOperation(tp1, 0);
+
+        // Coordinator 0 is loaded. It should get the current image
+        // that is the empty one.
+        future0.complete(null);
+        verify(coordinator0).onLoaded(MetadataImage.EMPTY);
+
+        // Publish a new image.
+        MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
+        MetadataImage newImage = delta.apply(MetadataProvenance.EMPTY);
+        runtime.onNewMetadataImage(newImage, delta);
+
+        // Coordinator 0 should be notified about it.
+        verify(coordinator0).onNewMetadataImage(newImage, delta);

Review Comment:
   https://github.com/apache/kafka/pull/13901/files/40fcd86ff81782e84d2d2835ac106b83fdfb32a9#diff-00f0f81cf13e66781777d94f7d2e68a581663385c37e98792507f2294c91bb09R1025



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1248086301


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##########
@@ -825,4 +828,60 @@ public void testClose() throws Exception {
         assertFutureThrows(write1, NotCoordinatorException.class);
         assertFutureThrows(write2, NotCoordinatorException.class);
     }
+
+    @Test
+    public void testOnNewMetadataImage() {
+        TopicPartition tp0 = new TopicPartition("__consumer_offsets", 0);
+        TopicPartition tp1 = new TopicPartition("__consumer_offsets", 1);
+
+        MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class);
+        MockPartitionWriter writer = mock(MockPartitionWriter.class);
+        MockCoordinatorBuilderSupplier supplier = mock(MockCoordinatorBuilderSupplier.class);
+        MockCoordinatorBuilder builder = mock(MockCoordinatorBuilder.class);
+
+        CoordinatorRuntime<MockCoordinator, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinator, String>()
+                .withLoader(loader)
+                .withEventProcessor(new MockEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorBuilderSupplier(supplier)
+                .build();
+
+        MockCoordinator coordinator0 = mock(MockCoordinator.class);
+        MockCoordinator coordinator1 = mock(MockCoordinator.class);
+
+        when(supplier.get()).thenReturn(builder);
+        when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+        when(builder.withLogContext(any())).thenReturn(builder);
+        when(builder.build())
+            .thenReturn(coordinator0)
+            .thenReturn(coordinator1);
+
+        CompletableFuture<Void> future0 = new CompletableFuture<>();
+        when(loader.load(tp0, coordinator0)).thenReturn(future0);
+
+        CompletableFuture<Void> future1 = new CompletableFuture<>();
+        when(loader.load(tp1, coordinator1)).thenReturn(future1);
+
+        runtime.scheduleLoadOperation(tp0, 0);
+        runtime.scheduleLoadOperation(tp1, 0);
+
+        // Coordinator 0 is loaded. It should get the current image
+        // that is the empty one.
+        future0.complete(null);
+        verify(coordinator0).onLoaded(MetadataImage.EMPTY);
+
+        // Publish a new image.
+        MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
+        MetadataImage newImage = delta.apply(MetadataProvenance.EMPTY);
+        runtime.onNewMetadataImage(newImage, delta);
+
+        // Coordinator 0 should be notified about it.
+        verify(coordinator0).onNewMetadataImage(newImage, delta);

Review Comment:
   This part is handled in the `GroupMetadataManager#onNewMetadataImage` and it is tested separately.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1244497307


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -874,4 +1021,34 @@ public void replay(
             consumerGroup.updateMember(newMember);
         }
     }
+
+    /**
+     * A new metadata image is available.
+     *
+     * @param newImage  The new metadata image.
+     * @param delta     The delta image.
+     */
+    public void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) {
+        metadataImage = newImage;
+
+        // Notify all the groups subscribed to the created, updated or

Review Comment:
   I took a longer look at the code -- and assuming changedTopics includes the newly created topics, this should be fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1245214287


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -423,6 +456,47 @@ public Map<String, TopicMetadata> computeSubscriptionMetadata(
         return Collections.unmodifiableMap(newSubscriptionMetadata);
     }
 
+    /**
+     * Updates the next metadata refresh time.
+     *
+     * @param nextTimeMs The next time in milliseconds.
+     * @param groupEpoch The associated group epoch.
+     */
+    public void setNextMetadataRefreshTime(
+        long nextTimeMs,
+        int groupEpoch
+    ) {
+        this.nextMetadataRefreshTime = new TimeAndEpoch(nextTimeMs, groupEpoch);
+    }
+
+    /**
+     * Resets the next metadata refresh.
+     */
+    public void resetNextMetadataRefreshTime() {
+        this.nextMetadataRefreshTime = TimeAndEpoch.EMPTY;
+    }
+
+    /**
+     * Checks if a metadata refresh is required. A refresh is required in two cases:
+     * 1) The next update time is smaller or equals to the current time;
+     * 2) The group epoch associated with the next update time is smaller than

Review Comment:
   Right. In this case, the time is set to zero so it is always smaller than the current time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1245226991


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -141,7 +141,7 @@ public static class TimeAndEpoch {
      * after having refreshed the metadata but the write operation failed. In this case, the
      * time is not automatically rolled back.
      */
-    private TimeAndEpoch nextMetadataRefreshTime = TimeAndEpoch.EMPTY;

Review Comment:
   I renamed this one. It seems that `deadline` is more appropriate than `nextTime`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1244500812


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -874,4 +1021,34 @@ public void replay(
             consumerGroup.updateMember(newMember);
         }
     }
+
+    /**
+     * A new metadata image is available.
+     *
+     * @param newImage  The new metadata image.
+     * @param delta     The delta image.
+     */
+    public void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) {
+        metadataImage = newImage;
+
+        // Notify all the groups subscribed to the created, updated or

Review Comment:
   I guess my question then is what is the flow for updating the groups with the image? This will just happen on the next heartbeat since we set metadataImage to new image? 
   
   We don't really do any notifying besides resetting the refresh timer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1244508147


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -423,6 +456,47 @@ public Map<String, TopicMetadata> computeSubscriptionMetadata(
         return Collections.unmodifiableMap(newSubscriptionMetadata);
     }
 
+    /**
+     * Updates the next metadata refresh time.
+     *
+     * @param nextTimeMs The next time in milliseconds.
+     * @param groupEpoch The associated group epoch.
+     */
+    public void setNextMetadataRefreshTime(
+        long nextTimeMs,
+        int groupEpoch
+    ) {
+        this.nextMetadataRefreshTime = new TimeAndEpoch(nextTimeMs, groupEpoch);
+    }
+
+    /**
+     * Resets the next metadata refresh.
+     */
+    public void resetNextMetadataRefreshTime() {
+        this.nextMetadataRefreshTime = TimeAndEpoch.EMPTY;
+    }
+
+    /**
+     * Checks if a metadata refresh is required. A refresh is required in two cases:
+     * 1) The next update time is smaller or equals to the current time;
+     * 2) The group epoch associated with the next update time is smaller than

Review Comment:
   This is also the case when we reset the refreshMetadata time right? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1248092025


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -1932,6 +1961,435 @@ public void testPartitionAssignorExceptionOnRegularHeartbeat() {
                     .setTopicPartitions(Collections.emptyList())));
     }
 
+    @Test
+    public void testSubscriptionMetadataRefreshedAfterGroupIsLoaded() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        // Create a context with one consumer group containing one member.
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withConsumerGroupMetadataRefreshIntervalMs(5 * 60 * 1000)
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(10)
+                    .setTargetMemberEpoch(10)
+                    .setClientId("client")
+                    .setClientHost("localhost/127.0.0.1")
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2)))
+                    .build())
+                .withAssignment(memberId, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2)))
+                .withAssignmentEpoch(10)
+                .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() {
+                    {
+                        // foo only has 3 partitions stored in the metadata but foo has
+                        // 6 partitions the metadata image.
+                        put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 3));
+                    }
+                }))
+            .build();
+
+        // The metadata refresh flag should be true.
+        ConsumerGroup consumerGroup = context.groupMetadataManager
+            .getOrMaybeCreateConsumerGroup(groupId, false);
+        assertTrue(consumerGroup.hasMetadataExpired(context.time.milliseconds()));
+
+        // Prepare the assignment result.
+        assignor.prepareGroupAssignment(new GroupAssignment(
+            Collections.singletonMap(memberId, new MemberAssignment(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+            )))
+        ));
+
+        // Heartbeat.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = context.consumerGroupHeartbeat(

Review Comment:
   Sorry. I was referring how when I asked about the heartbeat in the other test, I hadn't seen this one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1248096864


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java:
##########
@@ -568,5 +568,20 @@ public void testMetadataRefreshDeadline() {
         assertTrue(group.hasMetadataExpired(time.milliseconds()));
         assertEquals(time.milliseconds() + 1000, group.metadataRefreshDeadline().deadlineMs);
         assertEquals(group.groupEpoch() + 1, group.metadataRefreshDeadline().epoch);
+
+        // Set the refresh deadline.
+        group.setMetadataRefreshDeadline(time.milliseconds() + 1000, group.groupEpoch());

Review Comment:
   this could for instance if the write is lost so the new epoch would not be known anymore.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1248133399


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java:
##########
@@ -568,5 +568,20 @@ public void testMetadataRefreshDeadline() {
         assertTrue(group.hasMetadataExpired(time.milliseconds()));
         assertEquals(time.milliseconds() + 1000, group.metadataRefreshDeadline().deadlineMs);
         assertEquals(group.groupEpoch() + 1, group.metadataRefreshDeadline().epoch);
+
+        // Set the refresh deadline.
+        group.setMetadataRefreshDeadline(time.milliseconds() + 1000, group.groupEpoch());

Review Comment:
   Yeah, youโ€™re right. I actually use the current group epoch here because hasMetadataExpired would be true if epoch + 1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1248282166


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java:
##########
@@ -568,5 +568,20 @@ public void testMetadataRefreshDeadline() {
         assertTrue(group.hasMetadataExpired(time.milliseconds()));
         assertEquals(time.milliseconds() + 1000, group.metadataRefreshDeadline().deadlineMs);
         assertEquals(group.groupEpoch() + 1, group.metadataRefreshDeadline().epoch);
+
+        // Set the refresh deadline.
+        group.setMetadataRefreshDeadline(time.milliseconds() + 1000, group.groupEpoch());

Review Comment:
   Hmm -- reset meaning we should refresh? I guess my point was that if we lower the epoch we may delay the reset. I guess worst case, we just have the wait the refresh interval though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1244507539


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -423,6 +456,47 @@ public Map<String, TopicMetadata> computeSubscriptionMetadata(
         return Collections.unmodifiableMap(newSubscriptionMetadata);
     }
 
+    /**
+     * Updates the next metadata refresh time.
+     *
+     * @param nextTimeMs The next time in milliseconds.
+     * @param groupEpoch The associated group epoch.
+     */
+    public void setNextMetadataRefreshTime(
+        long nextTimeMs,
+        int groupEpoch
+    ) {
+        this.nextMetadataRefreshTime = new TimeAndEpoch(nextTimeMs, groupEpoch);
+    }
+
+    /**
+     * Resets the next metadata refresh.

Review Comment:
   This means we should update immediately?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1244475836


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -506,32 +555,54 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
             .setClientHost(clientHost)
             .build();
 
+        boolean updatedMemberSubscriptions = false;
         if (!updatedMember.equals(member)) {
             records.add(newMemberSubscriptionRecord(groupId, updatedMember));
 
             if (!updatedMember.subscribedTopicNames().equals(member.subscribedTopicNames())) {
                 log.info("[GroupId " + groupId + "] Member " + memberId + " updated its subscribed topics to: " +
                     updatedMember.subscribedTopicNames());
+                updatedMemberSubscriptions = true;
+            }
 
-                subscriptionMetadata = group.computeSubscriptionMetadata(
-                    member,
-                    updatedMember,
-                    topicsImage
-                );
-
-                if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
-                    log.info("[GroupId " + groupId + "] Computed new subscription metadata: "
-                        + subscriptionMetadata + ".");
-                    records.add(newGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata));
-                }
+            if (!updatedMember.subscribedTopicRegex().equals(member.subscribedTopicRegex())) {
+                log.info("[GroupId " + groupId + "] Member " + memberId + " updated its subscribed regex to: " +
+                    updatedMember.subscribedTopicRegex());
+                updatedMemberSubscriptions = true;
+            }
+        }
 
-                groupEpoch += 1;
-                records.add(newGroupEpochRecord(groupId, groupEpoch));
+        long currentTimeMs = time.milliseconds();
+        boolean maybeUpdateMetadata = updatedMemberSubscriptions || group.refreshMetadataNeeded(currentTimeMs);
+        boolean updatedSubscriptionMetadata = false;
+        if (maybeUpdateMetadata) {
+            subscriptionMetadata = group.computeSubscriptionMetadata(
+                member,
+                updatedMember,
+                metadataImage.topics()
+            );
 
-                log.info("[GroupId " + groupId + "] Bumped group epoch to " + groupEpoch + ".");
+            if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
+                log.info("[GroupId " + groupId + "] Computed new subscription metadata: "
+                    + subscriptionMetadata + ".");
+                records.add(newGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata));
+                updatedSubscriptionMetadata = true;
             }
         }
 
+        if (updatedMemberSubscriptions || updatedSubscriptionMetadata) {
+            groupEpoch += 1;
+            records.add(newGroupEpochRecord(groupId, groupEpoch));
+            log.info("[GroupId " + groupId + "] Bumped group epoch to " + groupEpoch + ".");
+        }
+
+        if (maybeUpdateMetadata) {

Review Comment:
   While I am able to follow the logic and understand it is probably the best way to avoid duplicate code, I do wonder if there is a less confusing way to express this without the similar booleans. It may not be possible, but maybe we can add some comments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1245148023


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -88,10 +93,12 @@ public class GroupMetadataManager {
     public static class Builder {
         private LogContext logContext = null;
         private SnapshotRegistry snapshotRegistry = null;
+        private Time time = null;
         private List<PartitionAssignor> assignors = null;
-        private TopicsImage topicsImage = null;

Review Comment:
   We actually need other features from the MetadataImage (e.g. metadata version).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1245209953


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -874,4 +1021,34 @@ public void replay(
             consumerGroup.updateMember(newMember);
         }
     }
+
+    /**
+     * A new metadata image is available.
+     *
+     * @param newImage  The new metadata image.
+     * @param delta     The delta image.
+     */
+    public void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) {
+        metadataImage = newImage;
+
+        // Notify all the groups subscribed to the created, updated or

Review Comment:
   > Is created topics a different method in topicsDelta? Shouldn't we have createdTopicIds and we add them? Or is changedTopics accounting for that? Are there other changes besides topic creation we can have?
   
   My understanding is that created topics are included in the changed topics as well.
   
   > I guess my question then is what is the flow for updating the groups with the image? This will just happen on the next heartbeat since we set metadataImage to new image?
   
   Right. The idea is to flag all the groups subscribed topics and to let them update themselves on the next heartbeat. I did it this way because we can also rely on this mechanism to refresh regex based subs every X minutes later on. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1248092632


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java:
##########
@@ -568,5 +568,20 @@ public void testMetadataRefreshDeadline() {
         assertTrue(group.hasMetadataExpired(time.milliseconds()));
         assertEquals(time.milliseconds() + 1000, group.metadataRefreshDeadline().deadlineMs);
         assertEquals(group.groupEpoch() + 1, group.metadataRefreshDeadline().epoch);
+
+        // Set the refresh deadline.
+        group.setMetadataRefreshDeadline(time.milliseconds() + 1000, group.groupEpoch());

Review Comment:
   when we set the refresh deadline, there is no check on the group epoch. Maybe it's fine that we went backwards an epoch, but I'm not sure which scenario this would be. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1248102033


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java:
##########
@@ -568,5 +568,20 @@ public void testMetadataRefreshDeadline() {
         assertTrue(group.hasMetadataExpired(time.milliseconds()));
         assertEquals(time.milliseconds() + 1000, group.metadataRefreshDeadline().deadlineMs);
         assertEquals(group.groupEpoch() + 1, group.metadataRefreshDeadline().epoch);
+
+        // Set the refresh deadline.
+        group.setMetadataRefreshDeadline(time.milliseconds() + 1000, group.groupEpoch());

Review Comment:
   I thought in the case the write is lost, we have the higher epoch in the DeadlineAndEpoch and that would signal us to continue to refresh. However, in this case, you are saying we would go back and epoch and decide not to try to refresh anymore. Is that correct?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1244496874


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -874,4 +1021,34 @@ public void replay(
             consumerGroup.updateMember(newMember);
         }
     }
+
+    /**
+     * A new metadata image is available.
+     *
+     * @param newImage  The new metadata image.
+     * @param delta     The delta image.
+     */
+    public void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) {
+        metadataImage = newImage;
+
+        // Notify all the groups subscribed to the created, updated or
+        // deleted topics.
+        Set<String> allGroupIds = new HashSet<>();
+        delta.topicsDelta().changedTopics().forEach((topicId, topicDelta) -> {
+            String topicName = topicDelta.name();
+            Set<String> groupIds = groupsByTopics.get(topicName);
+            if (groupIds != null) allGroupIds.addAll(groupIds);
+        });
+        delta.topicsDelta().deletedTopicIds().forEach(topicId -> {
+            TopicImage topicImage = delta.image().topics().getTopic(topicId);
+            Set<String> groupIds = groupsByTopics.get(topicImage.name());
+            if (groupIds != null) allGroupIds.addAll(groupIds);

Review Comment:
   Oh i misunderstood -- this is to add to the list of groups to notify. ๐Ÿ‘ 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1244495117


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -874,4 +1021,34 @@ public void replay(
             consumerGroup.updateMember(newMember);
         }
     }
+
+    /**
+     * A new metadata image is available.
+     *
+     * @param newImage  The new metadata image.
+     * @param delta     The delta image.
+     */
+    public void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) {
+        metadataImage = newImage;
+
+        // Notify all the groups subscribed to the created, updated or

Review Comment:
   Is created topics a different method in topicsDelta? Shouldn't we have `createdTopicIds` and we add them?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1249388374


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java:
##########
@@ -568,5 +568,20 @@ public void testMetadataRefreshDeadline() {
         assertTrue(group.hasMetadataExpired(time.milliseconds()));
         assertEquals(time.milliseconds() + 1000, group.metadataRefreshDeadline().deadlineMs);
         assertEquals(group.groupEpoch() + 1, group.metadataRefreshDeadline().epoch);
+
+        // Set the refresh deadline.
+        group.setMetadataRefreshDeadline(time.milliseconds() + 1000, group.groupEpoch());

Review Comment:
   I have reworked the test to reduce the confusion. Let me know what you think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac merged pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac merged PR #13901:
URL: https://github.com/apache/kafka/pull/13901


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1246649219


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -728,6 +794,81 @@ public void replay(
             }
             consumerGroup.removeMember(memberId);
         }
+
+        updateGroupsByTopics(groupId, oldSubscribedTopicNames, consumerGroup.subscribedTopicNames());
+    }
+
+    /**
+     * @return The set of groups subscribed to the topic.
+     */
+    public Set<String> groupsSubscribedToTopic(String topicName) {
+        Set<String> groups = groupsByTopics.get(topicName);

Review Comment:
   because it requires the default value must be a TimelineHashSet and i did not want to create one every time. i also considered creating a default empty one to use it here but i was not really happy with this either.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1247156849


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -728,6 +794,81 @@ public void replay(
             }
             consumerGroup.removeMember(memberId);
         }
+
+        updateGroupsByTopics(groupId, oldSubscribedTopicNames, consumerGroup.subscribedTopicNames());
+    }
+
+    /**
+     * @return The set of groups subscribed to the topic.
+     */
+    public Set<String> groupsSubscribedToTopic(String topicName) {
+        Set<String> groups = groupsByTopics.get(topicName);
+        return groups != null ? groups : Collections.emptySet();
+    }
+
+    /**
+     * Subscribes a group to a topic.
+     *
+     * @param groupId   The group id.
+     * @param topicName The topic name.
+     */
+    private void subscribeGroupToTopic(
+        String groupId,
+        String topicName
+    ) {
+        groupsByTopics
+            .computeIfAbsent(topicName, __ -> new TimelineHashSet<>(snapshotRegistry, 1))
+            .add(groupId);
+    }
+
+    /**
+     * Unsubscribes a group from a topic.
+     *
+     * @param groupId   The group id.
+     * @param topicName The topic name.
+     */
+    private void unsubscribeGroupFromTopic(
+        String groupId,
+        String topicName
+    ) {
+        groupsByTopics.computeIfPresent(topicName, (__, groupIds) -> {
+            groupIds.remove(groupId);
+            return groupIds.isEmpty() ? null : groupIds;
+        });
+    }
+
+    /**
+     * Updates the group by topics mapping.
+     *
+     * @param groupId               The group id.
+     * @param oldSubscribedTopics   The old group subscriptions.
+     * @param newSubscribedTopics   The new group subscriptions.
+     */
+    private void updateGroupsByTopics(

Review Comment:
   do we have any test on this and the above code?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1244477507


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -88,10 +93,12 @@ public class GroupMetadataManager {
     public static class Builder {
         private LogContext logContext = null;
         private SnapshotRegistry snapshotRegistry = null;
+        private Time time = null;
         private List<PartitionAssignor> assignors = null;
-        private TopicsImage topicsImage = null;

Review Comment:
   Did we change this to MetadataImage because it was easier to pass in that class? Or do we need features in the metadata image that were not in the topic image?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1244485648


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -727,6 +800,80 @@ public void replay(
                     + " but did not receive ConsumerGroupTargetAssignmentMetadataValue tombstone.");
             }
             consumerGroup.removeMember(memberId);
+            updateGroupsByTopics(groupId, oldSubscribedTopicNames, consumerGroup.subscribedTopicNames());
+        }
+    }
+
+    /**
+     * @return The set of groups subscribed to the topic.
+     */
+    public Set<String> groupsSubscribedToTopic(String topicName) {
+        Set<String> groups = groupsByTopics.get(topicName);
+        return groups != null ? groups : Collections.emptySet();
+    }
+
+    /**
+     * Subscribes a group to a topic.
+     *
+     * @param groupId   The group id.
+     * @param topicName The topic name.
+     */
+    private void subscribeGroupToTopic(
+        String groupId,
+        String topicName
+    ) {
+        groupsByTopics
+            .computeIfAbsent(topicName, __ -> new TimelineHashSet<>(snapshotRegistry, 1))
+            .add(groupId);
+    }
+
+    /**
+     * Unsubscribes a group from a topic.
+     *
+     * @param groupId   The group id.
+     * @param topicName The topic name.
+     */
+    private void unsubscribeGroupFromTopic(

Review Comment:
   should return be boolean here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1245157302


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -874,4 +1021,34 @@ public void replay(
             consumerGroup.updateMember(newMember);
         }
     }
+
+    /**
+     * A new metadata image is available.
+     *
+     * @param newImage  The new metadata image.
+     * @param delta     The delta image.
+     */
+    public void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) {
+        metadataImage = newImage;
+
+        // Notify all the groups subscribed to the created, updated or
+        // deleted topics.
+        Set<String> allGroupIds = new HashSet<>();
+        delta.topicsDelta().changedTopics().forEach((topicId, topicDelta) -> {
+            String topicName = topicDelta.name();
+            Set<String> groupIds = groupsByTopics.get(topicName);
+            if (groupIds != null) allGroupIds.addAll(groupIds);
+        });
+        delta.topicsDelta().deletedTopicIds().forEach(topicId -> {
+            TopicImage topicImage = delta.image().topics().getTopic(topicId);
+            Set<String> groupIds = groupsByTopics.get(topicImage.name());
+            if (groupIds != null) allGroupIds.addAll(groupIds);

Review Comment:
   Correct. I also simplified this code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1247805182


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -172,19 +186,21 @@ public List<Record> build(TopicsImage topicsImage) {
             });
 
             // Add subscription metadata.
-            Map<String, TopicMetadata> subscriptionMetadata = new HashMap<>();
-            members.forEach((memberId, member) -> {
-                member.subscribedTopicNames().forEach(topicName -> {
-                    TopicImage topicImage = topicsImage.getTopic(topicName);
-                    if (topicImage != null) {
-                        subscriptionMetadata.put(topicName, new TopicMetadata(
-                            topicImage.id(),
-                            topicImage.name(),
-                            topicImage.partitions().size()
-                        ));
-                    }
+            if (subscriptionMetadata == null) {

Review Comment:
   Most of tests are just fine with the auto-generated subscription metadata. However, the new ones need specific subscription metadata to verify the check. This is why I extended this builder to either accept the subscription metadata to use or to auto-generate it.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##########
@@ -825,4 +828,60 @@ public void testClose() throws Exception {
         assertFutureThrows(write1, NotCoordinatorException.class);
         assertFutureThrows(write2, NotCoordinatorException.class);
     }
+
+    @Test
+    public void testOnNewMetadataImage() {
+        TopicPartition tp0 = new TopicPartition("__consumer_offsets", 0);
+        TopicPartition tp1 = new TopicPartition("__consumer_offsets", 1);
+
+        MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class);
+        MockPartitionWriter writer = mock(MockPartitionWriter.class);
+        MockCoordinatorBuilderSupplier supplier = mock(MockCoordinatorBuilderSupplier.class);
+        MockCoordinatorBuilder builder = mock(MockCoordinatorBuilder.class);
+
+        CoordinatorRuntime<MockCoordinator, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinator, String>()
+                .withLoader(loader)
+                .withEventProcessor(new MockEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorBuilderSupplier(supplier)
+                .build();
+
+        MockCoordinator coordinator0 = mock(MockCoordinator.class);
+        MockCoordinator coordinator1 = mock(MockCoordinator.class);
+
+        when(supplier.get()).thenReturn(builder);
+        when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+        when(builder.withLogContext(any())).thenReturn(builder);
+        when(builder.build())
+            .thenReturn(coordinator0)
+            .thenReturn(coordinator1);
+
+        CompletableFuture<Void> future0 = new CompletableFuture<>();
+        when(loader.load(tp0, coordinator0)).thenReturn(future0);
+
+        CompletableFuture<Void> future1 = new CompletableFuture<>();
+        when(loader.load(tp1, coordinator1)).thenReturn(future1);
+
+        runtime.scheduleLoadOperation(tp0, 0);
+        runtime.scheduleLoadOperation(tp1, 0);
+
+        // Coordinator 0 is loaded. It should get the current image
+        // that is the empty one.
+        future0.complete(null);
+        verify(coordinator0).onLoaded(MetadataImage.EMPTY);
+
+        // Publish a new image.
+        MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
+        MetadataImage newImage = delta.apply(MetadataProvenance.EMPTY);
+        runtime.onNewMetadataImage(newImage, delta);
+
+        // Coordinator 0 should be notified about it.
+        verify(coordinator0).onNewMetadataImage(newImage, delta);

Review Comment:
   We cannot test this here because the runtime is not aware of the concrete implementation of the state machine. I also want to ensure that we are on the same page. The metadata image is updated when `onNewMetadataImage` is called but the subscription metadata is refreshed on the next heartbeat.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] dajac commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "dajac (via GitHub)" <gi...@apache.org>.
dajac commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1248095382


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java:
##########
@@ -564,9 +564,9 @@ public void testMetadataRefreshDeadline() {
         assertTrue(group.hasMetadataExpired(time.milliseconds()));
 
         // Set the refresh time deadline with a higher group epoch.
-        group.setMetadataRefreshDeadline(time.milliseconds() + 1000, group.groupEpoch() + 1);

Review Comment:
   ah right, i did it wrong here. let me revert.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on a diff in pull request #13901: KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image

Posted by "jolshan (via GitHub)" <gi...@apache.org>.
jolshan commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1248100591


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##########
@@ -825,4 +828,60 @@ public void testClose() throws Exception {
         assertFutureThrows(write1, NotCoordinatorException.class);
         assertFutureThrows(write2, NotCoordinatorException.class);
     }
+
+    @Test
+    public void testOnNewMetadataImage() {
+        TopicPartition tp0 = new TopicPartition("__consumer_offsets", 0);
+        TopicPartition tp1 = new TopicPartition("__consumer_offsets", 1);
+
+        MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class);
+        MockPartitionWriter writer = mock(MockPartitionWriter.class);
+        MockCoordinatorBuilderSupplier supplier = mock(MockCoordinatorBuilderSupplier.class);
+        MockCoordinatorBuilder builder = mock(MockCoordinatorBuilder.class);
+
+        CoordinatorRuntime<MockCoordinator, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinator, String>()
+                .withLoader(loader)
+                .withEventProcessor(new MockEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorBuilderSupplier(supplier)
+                .build();
+
+        MockCoordinator coordinator0 = mock(MockCoordinator.class);
+        MockCoordinator coordinator1 = mock(MockCoordinator.class);
+
+        when(supplier.get()).thenReturn(builder);
+        when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+        when(builder.withLogContext(any())).thenReturn(builder);
+        when(builder.build())
+            .thenReturn(coordinator0)
+            .thenReturn(coordinator1);
+
+        CompletableFuture<Void> future0 = new CompletableFuture<>();
+        when(loader.load(tp0, coordinator0)).thenReturn(future0);
+
+        CompletableFuture<Void> future1 = new CompletableFuture<>();
+        when(loader.load(tp1, coordinator1)).thenReturn(future1);
+
+        runtime.scheduleLoadOperation(tp0, 0);
+        runtime.scheduleLoadOperation(tp1, 0);
+
+        // Coordinator 0 is loaded. It should get the current image
+        // that is the empty one.
+        future0.complete(null);
+        verify(coordinator0).onLoaded(MetadataImage.EMPTY);
+
+        // Publish a new image.
+        MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY);
+        MetadataImage newImage = delta.apply(MetadataProvenance.EMPTY);
+        runtime.onNewMetadataImage(newImage, delta);
+
+        // Coordinator 0 should be notified about it.
+        verify(coordinator0).onNewMetadataImage(newImage, delta);

Review Comment:
   Sorry I found it after posting. I should have looked a little longer. ๐Ÿ˜… 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org