You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/01/18 16:53:08 UTC

[GitHub] [kafka] dajac opened a new pull request #11688: KAFKA-13435; Group won't consume partitions added after static member restart

dajac opened a new pull request #11688:
URL: https://github.com/apache/kafka/pull/11688


   WIP/Require a KIP.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #11688: KAFKA-13435; Static membership protocol should let the leader skip assignment (KIP-814)

Posted by GitBox <gi...@apache.org>.
dajac commented on pull request #11688:
URL: https://github.com/apache/kafka/pull/11688#issuecomment-1033635997


   @hachikuji @showuon Thanks for reviewing. I have updated the PR based on your comments. I have also bumped to version of the JoinGroup API to v9. I thought that we could lean on v8 which is new in 3.2 but it seems safer to bump it again just in case someone would run with trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11688: KAFKA-13435; Static membership protocol should let the leader skip assignment (KIP-814)

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11688:
URL: https://github.com/apache/kafka/pull/11688#discussion_r801269210



##########
File path: core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
##########
@@ -1791,4 +1793,33 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertTrue(records2.count() == 1 && records2.records(tp).asScala.head.offset == 1,
       "Expected consumer2 to consume one message from offset 1, which is the committed offset of consumer1")
   }
+  @Test
+  def testStaticConsumerDetectsNewPartitionCreatedAfterRestart(): Unit = {

Review comment:
       I didn't see any consumer restart in the test. Are you trying to say `afterNewConsumerCreated`?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
##########
@@ -211,7 +211,15 @@ protected void onJoinComplete(int generation, String memberId, String protocol,
     }
 
     @Override
-    protected Map<String, ByteBuffer> performAssignment(String leaderId, String protocol, List<JoinGroupResponseMember> allMemberMetadata) {
+    protected Map<String, ByteBuffer> performAssignment(String leaderId,
+                                                        String protocol,
+                                                        List<JoinGroupResponseMember> allMemberMetadata,
+                                                        Boolean skipAssignment) {
+        // Connect does not support static membership so skipping the
+        // assignment should never happen in practice.
+        if (skipAssignment)
+            return Collections.emptyMap();

Review comment:
       Agree! At least we should log a warning here.

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -655,11 +667,8 @@ private void maybeUpdateGroupSubscription(String assignorName,
 
         maybeUpdateGroupSubscription(assignorName, assignments, allSubscribedTopics);
 
-        assignmentSnapshot = metadataSnapshot;

Review comment:
       I think the reason why we need to set `assignmentSnapShot` here is because after assignment (ex: custom assignor), it might be possible that there are extra topics to be added, and we need to request new metadata and add that in to `metadataSnapshot` in `maybeUpdateGroupSubscription()` method, right?
   
   So, I think you could do like this:
   ```java
   if (skipAssignment) {
        // set the metadataSnapshot here for skipAssignment case
        assignmentSnapshot = metadataSnapshot;
        return Collections.emptyMap();
   }
   ...
   
   maybeUpdateGroupSubscription(assignorName, assignments, allSubscribedTopics);
   // still keep this line here
   assignmentSnapshot = metadataSnapshot;
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac merged pull request #11688: KAFKA-13435; Static membership protocol should let the leader skip assignment (KIP-814)

Posted by GitBox <gi...@apache.org>.
dajac merged pull request #11688:
URL: https://github.com/apache/kafka/pull/11688


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] artemlivshits commented on a change in pull request #11688: KAFKA-13435; Static membership protocol should let the leader skip assignment (KIP-814)

Posted by GitBox <gi...@apache.org>.
artemlivshits commented on a change in pull request #11688:
URL: https://github.com/apache/kafka/pull/11688#discussion_r804264986



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
##########
@@ -211,7 +211,13 @@ protected void onJoinComplete(int generation, String memberId, String protocol,
     }
 
     @Override
-    protected Map<String, ByteBuffer> performAssignment(String leaderId, String protocol, List<JoinGroupResponseMember> allMemberMetadata) {
+    protected Map<String, ByteBuffer> onLeaderElected(String leaderId,
+                                                      String protocol,
+                                                      List<JoinGroupResponseMember> allMemberMetadata,
+                                                      boolean skipAssignment) {
+        if (skipAssignment)
+            throw new IllegalStateException("Can't skip assignment because Connect does not support static membership.");

Review comment:
       Ok, maybe I misunderstood the intent of `skipAssignment`, my understanding was that was intended as an optimization on the client (so that the static members wouldn't do assignment on taking leadership - previous version of this optimization was that we didn't even tell the leader that it's a leader to avoid doing assignment, which resulted in the problem partially fixed by this change).
   
   If it has other side effects, let's add a comment that explains that.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #11688: KAFKA-13435; Static membership protocol should let the leader skip assignment (KIP-814)

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #11688:
URL: https://github.com/apache/kafka/pull/11688#discussion_r802450152



##########
File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
##########
@@ -386,15 +386,39 @@ public void testPerformAssignmentShouldValidateCooperativeAssignment() {
             if (protocol == COOPERATIVE) {
                 // in cooperative protocol, we should throw exception when validating cooperative assignment
                 Exception e = assertThrows(IllegalStateException.class,
-                    () -> coordinator.performAssignment("1", partitionAssignor.name(), metadata));
+                    () -> coordinator.performAssignment("1", partitionAssignor.name(), metadata, false));
                 assertTrue(e.getMessage().contains("Assignor supporting the COOPERATIVE protocol violates its requirements"));
             } else {
                 // in eager protocol, we should not validate assignment
-                coordinator.performAssignment("1", partitionAssignor.name(), metadata);
+                coordinator.performAssignment("1", partitionAssignor.name(), metadata, false);
             }
         }
     }
 
+    @Test
+    public void testPerformAssignmentShouldSkipAssignment() {
+        SubscriptionState mockSubscriptionState = Mockito.mock(SubscriptionState.class);
+
+        Map<String, List<String>> memberSubscriptions = singletonMap(consumerId, singletonList(topic1));
+
+        List<JoinGroupResponseData.JoinGroupResponseMember> metadata = new ArrayList<>();
+        for (Map.Entry<String, List<String>> subscriptionEntry : memberSubscriptions.entrySet()) {
+            ConsumerPartitionAssignor.Subscription subscription = new ConsumerPartitionAssignor.Subscription(subscriptionEntry.getValue());
+            ByteBuffer buf = ConsumerProtocol.serializeSubscription(subscription);
+            metadata.add(new JoinGroupResponseData.JoinGroupResponseMember()
+                .setMemberId(subscriptionEntry.getKey())
+                .setMetadata(buf.array()));
+        }
+
+        // `partitionAssignor.prepare` is not called therefore calling `partitionAssignor.assign` will throw

Review comment:
       That method is part of `MockPartitionAssignor` which is extensively used in tests in this suite. I do agree with you than using a mock is more explicit here. Let me update that.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #11688: KAFKA-13435; Static membership protocol should let the leader skip assignment (KIP-814)

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #11688:
URL: https://github.com/apache/kafka/pull/11688#discussion_r802733093



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -648,10 +648,14 @@ private void maybeUpdateGroupSubscription(String assignorName,
         updateGroupSubscription(allSubscribedTopics);
 
         isLeader = true;
-        assignmentSnapshot = metadataSnapshot;
 
-        if (skipAssignment)
+
+        if (skipAssignment) {
+            log.info("Skipped assignment for returning static leader at generation {}. The static leader " +
+                "will collect its existing assignment.", generation().generationId);

Review comment:
       I would not mention the sync group request. How about `The static leader will continue with its existing assignment`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] artemlivshits commented on a change in pull request #11688: KAFKA-13435; Static membership protocol should let the leader skip assignment (KIP-814)

Posted by GitBox <gi...@apache.org>.
artemlivshits commented on a change in pull request #11688:
URL: https://github.com/apache/kafka/pull/11688#discussion_r805024350



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
##########
@@ -211,7 +211,13 @@ protected void onJoinComplete(int generation, String memberId, String protocol,
     }
 
     @Override
-    protected Map<String, ByteBuffer> performAssignment(String leaderId, String protocol, List<JoinGroupResponseMember> allMemberMetadata) {
+    protected Map<String, ByteBuffer> onLeaderElected(String leaderId,
+                                                      String protocol,
+                                                      List<JoinGroupResponseMember> allMemberMetadata,
+                                                      boolean skipAssignment) {
+        if (skipAssignment)
+            throw new IllegalStateException("Can't skip assignment because Connect does not support static membership.");

Review comment:
       Sounds good.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #11688: KAFKA-13435; Static membership protocol should let the leader skip assignment (KIP-814)

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #11688:
URL: https://github.com/apache/kafka/pull/11688#discussion_r804630897



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
##########
@@ -211,7 +211,13 @@ protected void onJoinComplete(int generation, String memberId, String protocol,
     }
 
     @Override
-    protected Map<String, ByteBuffer> performAssignment(String leaderId, String protocol, List<JoinGroupResponseMember> allMemberMetadata) {
+    protected Map<String, ByteBuffer> onLeaderElected(String leaderId,
+                                                      String protocol,
+                                                      List<JoinGroupResponseMember> allMemberMetadata,
+                                                      boolean skipAssignment) {
+        if (skipAssignment)
+            throw new IllegalStateException("Can't skip assignment because Connect does not support static membership.");

Review comment:
       The fundamental issue is that the assignor is not idempotent nor free of any side effects (e.g. custom ones) so running it and ignoring the new assignment would lead to a weird state for the group. This is briefly discussed in the KIP, see rejected alternatives. I have updated the comment in the GroupCoordinator to stress out this point. Does this work for you?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] artemlivshits commented on a change in pull request #11688: KAFKA-13435; Static membership protocol should let the leader skip assignment (KIP-814)

Posted by GitBox <gi...@apache.org>.
artemlivshits commented on a change in pull request #11688:
URL: https://github.com/apache/kafka/pull/11688#discussion_r803251907



##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -1306,23 +1326,46 @@ class GroupCoordinator(val brokerId: Int,
                 protocolType = group.protocolType,
                 protocolName = group.protocolName,
                 leaderId = currentLeader,
+                skipAssignment = false,
                 error = error
               ))
+            } else if (supportSkippingAssignment) {
+              // Starting from version 9 of the JoinGroup API, static members are able to
+              // skip running the assignor based on the `SkipAssignment` field. We leverage
+              // this to tell the leader that it is the leader of the group but by skipping
+              // running the assignor while the group is in stable state.

Review comment:
       Can you mention somewhere in the comments that this approach still doesn't fully handle a condition if metadata has changed while the leader was down?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
##########
@@ -211,7 +211,13 @@ protected void onJoinComplete(int generation, String memberId, String protocol,
     }
 
     @Override
-    protected Map<String, ByteBuffer> performAssignment(String leaderId, String protocol, List<JoinGroupResponseMember> allMemberMetadata) {
+    protected Map<String, ByteBuffer> onLeaderElected(String leaderId,
+                                                      String protocol,
+                                                      List<JoinGroupResponseMember> allMemberMetadata,
+                                                      boolean skipAssignment) {
+        if (skipAssignment)
+            throw new IllegalStateException("Can't skip assignment because Connect does not support static membership.");

Review comment:
       Would it be safer to just log an error and ignore?  That would make a difference if we say have a bug and return skipAssignment=true by mistake.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #11688: KAFKA-13435; Static membership protocol should let the leader skip assignment (KIP-814)

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #11688:
URL: https://github.com/apache/kafka/pull/11688#discussion_r803838565



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
##########
@@ -211,7 +211,13 @@ protected void onJoinComplete(int generation, String memberId, String protocol,
     }
 
     @Override
-    protected Map<String, ByteBuffer> performAssignment(String leaderId, String protocol, List<JoinGroupResponseMember> allMemberMetadata) {
+    protected Map<String, ByteBuffer> onLeaderElected(String leaderId,
+                                                      String protocol,
+                                                      List<JoinGroupResponseMember> allMemberMetadata,
+                                                      boolean skipAssignment) {
+        if (skipAssignment)
+            throw new IllegalStateException("Can't skip assignment because Connect does not support static membership.");

Review comment:
       That is definitely debatable. The assertion should help us to catch such bug early on as system tests would fail instead of potentially releasing a buggy version.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11688: KAFKA-13435; Static membership protocol should let the leader skip assignment (KIP-814)

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11688:
URL: https://github.com/apache/kafka/pull/11688#discussion_r803165736



##########
File path: core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
##########
@@ -1791,4 +1793,34 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertTrue(records2.count() == 1 && records2.records(tp).asScala.head.offset == 1,
       "Expected consumer2 to consume one message from offset 1, which is the committed offset of consumer1")
   }
+
+  @Test
+  def testStaticConsumerDetectsNewPartitionCreatedAfterRestart(): Unit = {
+    val foo = "foo"
+    val foo0 = new TopicPartition(foo, 0)
+    val foo1 = new TopicPartition(foo, 1)
+
+    val admin = createAdminClient()
+    admin.createTopics(Seq(new NewTopic(foo, 1, 1.asInstanceOf[Short])).asJava).all.get

Review comment:
       nit: I think `1.toShort` works?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #11688: KAFKA-13435; Static membership protocol should let the leader skip assignment (KIP-814)

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #11688:
URL: https://github.com/apache/kafka/pull/11688#discussion_r802484108



##########
File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
##########
@@ -1550,6 +1573,34 @@ public void testMetadataChangeTriggersRebalance() {
         assertTrue(coordinator.rejoinNeededOrPending());
     }
 
+    @Test
+    public void testStaticLeaderRejoinsGroupAndCanTriggersRebalance() {
+        // ensure metadata is up-to-date for leader
+        subscriptions.subscribe(singleton(topic1), rebalanceListener);
+        client.updateMetadata(metadataResponse);
+
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        // the leader is responsible for picking up metadata changes and forcing a group rebalance.
+        // note that `partitionAssignor.prepare` is not called therefore calling `partitionAssignor.assign`
+        // will throw a IllegalStateException. this indirectly verifies that `assign` is correctly skipped.
+        Map<String, List<String>> memberSubscriptions = singletonMap(consumerId, singletonList(topic1));

Review comment:
       Sure. Let me add another test.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #11688: KAFKA-13435; Static membership protocol should let the leader skip assignment (KIP-814)

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #11688:
URL: https://github.com/apache/kafka/pull/11688#discussion_r803835704



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -699,11 +702,15 @@ public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> fut
         return sendSyncGroupRequest(requestBuilder);
     }
 
-    private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) {
+    private RequestFuture<ByteBuffer> onLeaderElected(JoinGroupResponse joinResponse) {
         try {
             // perform the leader synchronization and send back the assignment for the group
-            Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.data().leader(), joinResponse.data().protocolName(),
-                    joinResponse.data().members());
+            Map<String, ByteBuffer> groupAssignment = onLeaderElected(
+                joinResponse.data().leader(),
+                joinResponse.data().protocolName(),
+                joinResponse.data().members(),
+                joinResponse.data().skipAssignment()
+            );

Review comment:
       Given that we already have unit tests verifying this for the two implementations, the extra validation seems unnecessary to me. I can do it though if you believe that there is value in it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #11688: KAFKA-13435; Static membership protocol should let the leader skip assignment (KIP-814)

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #11688:
URL: https://github.com/apache/kafka/pull/11688#discussion_r802514285



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -655,11 +667,8 @@ private void maybeUpdateGroupSubscription(String assignorName,
 
         maybeUpdateGroupSubscription(assignorName, assignments, allSubscribedTopics);
 
-        assignmentSnapshot = metadataSnapshot;

Review comment:
       Yeah, it seems that you are right. I updated the PR as you suggested.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11688: KAFKA-13435; Static membership protocol should let the leader skip assignment (KIP-814)

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11688:
URL: https://github.com/apache/kafka/pull/11688#discussion_r802633329



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -648,10 +648,14 @@ private void maybeUpdateGroupSubscription(String assignorName,
         updateGroupSubscription(allSubscribedTopics);
 
         isLeader = true;
-        assignmentSnapshot = metadataSnapshot;
 
-        if (skipAssignment)
+

Review comment:
       nit: extra empty line here

##########
File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
##########
@@ -1601,6 +1604,36 @@ public void testStaticLeaderRejoinsGroupAndCanTriggersRebalance() {
         assertTrue(coordinator.rejoinNeededOrPending());
     }
 
+    @Test
+    public void testStaticLeaderRejoinsGroupAndCanDetectMetadataChangesForOtherMembers() {

Review comment:
       nice new test added.

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -648,10 +648,14 @@ private void maybeUpdateGroupSubscription(String assignorName,
         updateGroupSubscription(allSubscribedTopics);
 
         isLeader = true;
-        assignmentSnapshot = metadataSnapshot;
 
-        if (skipAssignment)
+
+        if (skipAssignment) {
+            log.info("Skipped assignment for returning static leader at generation {}. The static leader " +
+                "will collect its existing assignment.", generation().generationId);

Review comment:
       I'm not sure if we need to put the 2nd sentence. Maybe the 1st one is enough? Or maybe change the 2nd one with:
   `The static leader will collect its existing assignment with empty assignment syncGroup request.` 
   
   WDYT?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11688: KAFKA-13435; Static membership protocol should let the leader skip assignment (KIP-814)

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11688:
URL: https://github.com/apache/kafka/pull/11688#discussion_r803159171



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -699,11 +702,15 @@ public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> fut
         return sendSyncGroupRequest(requestBuilder);
     }
 
-    private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) {
+    private RequestFuture<ByteBuffer> onLeaderElected(JoinGroupResponse joinResponse) {
         try {
             // perform the leader synchronization and send back the assignment for the group
-            Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.data().leader(), joinResponse.data().protocolName(),
-                    joinResponse.data().members());
+            Map<String, ByteBuffer> groupAssignment = onLeaderElected(
+                joinResponse.data().leader(),
+                joinResponse.data().protocolName(),
+                joinResponse.data().members(),
+                joinResponse.data().skipAssignment()
+            );

Review comment:
       Do you think it is worthwhile validating that `groupAssignment` is empty when `skipAssignment` is set?

##########
File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
##########
@@ -1691,6 +1717,64 @@ public void testMetadataChangeTriggersRebalance() {
         assertTrue(coordinator.rejoinNeededOrPending());
     }
 
+    @Test
+    public void testStaticLeaderRejoinsGroupAndCanTriggersRebalance() {
+        // ensure metadata is up-to-date for leader
+        subscriptions.subscribe(singleton(topic1), rebalanceListener);
+        client.updateMetadata(metadataResponse);
+
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        // the leader is responsible for picking up metadata changes and forcing a group rebalance.
+        // note that `MockPartitionAssignor.prepare` is not called therefore calling `MockPartitionAssignor.assign`
+        // will throw a IllegalStateException. this indirectly verifies that `assign` is correctly skipped.
+        Map<String, List<String>> memberSubscriptions = singletonMap(consumerId, singletonList(topic1));
+        client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, true, Errors.NONE));
+        client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
+
+        coordinator.poll(time.timer(Long.MAX_VALUE));
+
+        assertFalse(coordinator.rejoinNeededOrPending());
+
+        // a new partition is added to the topic
+        metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWith(1, singletonMap(topic1, 2)), false, time.milliseconds());
+        coordinator.maybeUpdateSubscriptionMetadata();
+
+        // we should detect the change and ask for reassignment
+        assertTrue(coordinator.rejoinNeededOrPending());
+    }
+
+    @Test
+    public void testStaticLeaderRejoinsGroupAndCanDetectMetadataChangesForOtherMembers() {
+        // ensure metadata is up-to-date for leader
+        subscriptions.subscribe(singleton(topic1), rebalanceListener);
+        client.updateMetadata(metadataResponse);
+
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        // the leader is responsible for picking up metadata changes and forcing a group rebalance.
+        // note that `MockPartitionAssignor.prepare` is not called therefore calling `MockPartitionAssignor.assign`
+        // will throw a IllegalStateException. this indirectly verifies that `assign` is correctly skipped.
+        Map<String, List<String>> memberSubscriptions = new HashMap<>();
+        memberSubscriptions.put(consumerId, singletonList(topic1));
+        memberSubscriptions.put(consumerId2, singletonList(topic2));
+        client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, true, Errors.NONE));
+        client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
+
+        coordinator.poll(time.timer(Long.MAX_VALUE));
+
+        assertFalse(coordinator.rejoinNeededOrPending());
+

Review comment:
       Could we add an assertion for `SubscriptionState.metadataTopics`?

##########
File path: clients/src/main/resources/common/message/JoinGroupResponse.json
##########
@@ -49,6 +51,8 @@
       "about": "The group protocol selected by the coordinator." },
     { "name": "Leader", "type": "string", "versions": "0+",
       "about": "The leader of the group." },
+    { "name": "SkipAssignment", "type": "bool", "versions": "9+", "default": "false",
+      "about": "True is the leader must skip running the assignment." },

Review comment:
       nit: is -> if

##########
File path: core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
##########
@@ -1791,4 +1793,34 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertTrue(records2.count() == 1 && records2.records(tp).asScala.head.offset == 1,
       "Expected consumer2 to consume one message from offset 1, which is the committed offset of consumer1")
   }
+
+  @Test
+  def testStaticConsumerDetectsNewPartitionCreatedAfterRestart(): Unit = {
+    val foo = "foo"
+    val foo0 = new TopicPartition(foo, 0)
+    val foo1 = new TopicPartition(foo, 1)
+
+    val admin = createAdminClient()
+    admin.createTopics(Seq(new NewTopic(foo, 1, 1.asInstanceOf[Short])).asJava).all.get

Review comment:
       nit: I think 1.toShort works?

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -645,6 +651,15 @@ private void maybeUpdateGroupSubscription(String assignorName,
 
         isLeader = true;
 
+        if (skipAssignment) {
+            log.info("Skipped assignment for returning static leader at generation {}. The static leader " +
+                "will continue with its existing assignment.", generation().generationId);
+            assignmentSnapshot = metadataSnapshot;
+            return Collections.emptyMap();
+        }
+
+        Map<String, ByteBuffer> groupAssignment = new HashMap<>();

Review comment:
       nit: seemed reasonable at its original location




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11688: KAFKA-13435; Static membership protocol should let the leader skip assignment (KIP-814)

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11688:
URL: https://github.com/apache/kafka/pull/11688#discussion_r804247861



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
##########
@@ -211,7 +211,13 @@ protected void onJoinComplete(int generation, String memberId, String protocol,
     }
 
     @Override
-    protected Map<String, ByteBuffer> performAssignment(String leaderId, String protocol, List<JoinGroupResponseMember> allMemberMetadata) {
+    protected Map<String, ByteBuffer> onLeaderElected(String leaderId,
+                                                      String protocol,
+                                                      List<JoinGroupResponseMember> allMemberMetadata,
+                                                      boolean skipAssignment) {
+        if (skipAssignment)
+            throw new IllegalStateException("Can't skip assignment because Connect does not support static membership.");

Review comment:
       Hmm, is it safe to ignore? What is the behavior if the client provides the new assignment anyway when `skipAssignment` is set? It might be worse to let connect generate a new assignment and let the coordinator ignore it. I think I prefer to fail fast.
   
   For what it's worth, I don't think we have a great support for test-only assertions today. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11688: KAFKA-13435; Static membership protocol should let the leader skip assignment (KIP-814)

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11688:
URL: https://github.com/apache/kafka/pull/11688#discussion_r799847085



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -696,8 +698,12 @@ public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> fut
     private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) {
         try {
             // perform the leader synchronization and send back the assignment for the group
-            Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.data().leader(), joinResponse.data().protocolName(),
-                    joinResponse.data().members());
+            Map<String, ByteBuffer> groupAssignment = performAssignment(
+                joinResponse.data().leader(),
+                joinResponse.data().protocolName(),
+                joinResponse.data().members(),
+                joinResponse.data().skipAssignment()

Review comment:
       I think a comment about this would be helpful. An obvious question is why do we still call `performAssignment` when `skipAssignment` is set. It's useful to remember that we still need to propagate the leader and member state to the coordinator implementation.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
##########
@@ -211,7 +211,15 @@ protected void onJoinComplete(int generation, String memberId, String protocol,
     }
 
     @Override
-    protected Map<String, ByteBuffer> performAssignment(String leaderId, String protocol, List<JoinGroupResponseMember> allMemberMetadata) {
+    protected Map<String, ByteBuffer> performAssignment(String leaderId,
+                                                        String protocol,
+                                                        List<JoinGroupResponseMember> allMemberMetadata,
+                                                        Boolean skipAssignment) {
+        // Connect does not support static membership so skipping the
+        // assignment should never happen in practice.
+        if (skipAssignment)
+            return Collections.emptyMap();

Review comment:
       Would it make sense to raise an exception instead?

##########
File path: core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
##########
@@ -1791,4 +1793,33 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertTrue(records2.count() == 1 && records2.records(tp).asScala.head.offset == 1,
       "Expected consumer2 to consume one message from offset 1, which is the committed offset of consumer1")
   }
+  @Test

Review comment:
       nit: add newline

##########
File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
##########
@@ -1550,6 +1573,34 @@ public void testMetadataChangeTriggersRebalance() {
         assertTrue(coordinator.rejoinNeededOrPending());
     }
 
+    @Test
+    public void testStaticLeaderRejoinsGroupAndCanTriggersRebalance() {
+        // ensure metadata is up-to-date for leader
+        subscriptions.subscribe(singleton(topic1), rebalanceListener);
+        client.updateMetadata(metadataResponse);
+
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        // the leader is responsible for picking up metadata changes and forcing a group rebalance.
+        // note that `partitionAssignor.prepare` is not called therefore calling `partitionAssignor.assign`
+        // will throw a IllegalStateException. this indirectly verifies that `assign` is correctly skipped.
+        Map<String, List<String>> memberSubscriptions = singletonMap(consumerId, singletonList(topic1));

Review comment:
       Could we have a case where the other consumers are subscribed to a topic that this consumer is not also subscribed to?

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -642,6 +648,12 @@ private void maybeUpdateGroupSubscription(String assignorName,
         updateGroupSubscription(allSubscribedTopics);
 
         isLeader = true;
+        assignmentSnapshot = metadataSnapshot;
+
+        if (skipAssignment)
+            return Collections.emptyMap();

Review comment:
       Can we add some logging for this case?

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -198,11 +198,13 @@ public AbstractCoordinator(GroupRebalanceConfig rebalanceConfig,
      * @param leaderId The id of the leader (which is this member)
      * @param protocol The protocol selected by the coordinator
      * @param allMemberMetadata Metadata from all members of the group
+     * @param skipAssignment True if leader must skip running the assignor
      * @return A map from each member to their state assignment
      */
     protected abstract Map<String, ByteBuffer> performAssignment(String leaderId,
                                                                  String protocol,
-                                                                 List<JoinGroupResponseData.JoinGroupResponseMember> allMemberMetadata);
+                                                                 List<JoinGroupResponseData.JoinGroupResponseMember> allMemberMetadata,
+                                                                 Boolean skipAssignment);

Review comment:
       nit: could this be `boolean`? Seems we don't need null.

##########
File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
##########
@@ -386,15 +386,39 @@ public void testPerformAssignmentShouldValidateCooperativeAssignment() {
             if (protocol == COOPERATIVE) {
                 // in cooperative protocol, we should throw exception when validating cooperative assignment
                 Exception e = assertThrows(IllegalStateException.class,
-                    () -> coordinator.performAssignment("1", partitionAssignor.name(), metadata));
+                    () -> coordinator.performAssignment("1", partitionAssignor.name(), metadata, false));
                 assertTrue(e.getMessage().contains("Assignor supporting the COOPERATIVE protocol violates its requirements"));
             } else {
                 // in eager protocol, we should not validate assignment
-                coordinator.performAssignment("1", partitionAssignor.name(), metadata);
+                coordinator.performAssignment("1", partitionAssignor.name(), metadata, false);
             }
         }
     }
 
+    @Test
+    public void testPerformAssignmentShouldSkipAssignment() {
+        SubscriptionState mockSubscriptionState = Mockito.mock(SubscriptionState.class);
+
+        Map<String, List<String>> memberSubscriptions = singletonMap(consumerId, singletonList(topic1));
+
+        List<JoinGroupResponseData.JoinGroupResponseMember> metadata = new ArrayList<>();
+        for (Map.Entry<String, List<String>> subscriptionEntry : memberSubscriptions.entrySet()) {
+            ConsumerPartitionAssignor.Subscription subscription = new ConsumerPartitionAssignor.Subscription(subscriptionEntry.getValue());
+            ByteBuffer buf = ConsumerProtocol.serializeSubscription(subscription);
+            metadata.add(new JoinGroupResponseData.JoinGroupResponseMember()
+                .setMemberId(subscriptionEntry.getKey())
+                .setMetadata(buf.array()));
+        }
+
+        // `partitionAssignor.prepare` is not called therefore calling `partitionAssignor.assign` will throw

Review comment:
       Where is `partitionAssignor.prepare` defined? I wonder if it would be more direct to install a mock assignor and then verify no interactions.

##########
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##########
@@ -1306,23 +1326,48 @@ class GroupCoordinator(val brokerId: Int,
                 protocolType = group.protocolType,
                 protocolName = group.protocolName,
                 leaderId = currentLeader,
+                skipAssignment = false,
                 error = error
               ))
             } else {
-              group.maybeInvokeJoinCallback(member, JoinGroupResult(
-                members = List.empty,
-                memberId = newMemberId,
-                generationId = group.generationId,
-                protocolType = group.protocolType,
-                protocolName = group.protocolName,
-                // We want to avoid current leader performing trivial assignment while the group
-                // is in stable stage, because the new assignment in leader's next sync call
-                // won't be broadcast by a stable group. This could be guaranteed by
-                // always returning the old leader id so that the current leader won't assume itself
-                // as a leader based on the returned message, since the new member.id won't match
-                // returned leader id, therefore no assignment will be performed.
-                leaderId = currentLeader,
-                error = Errors.NONE))
+              if (supportSkippingAssignment) {

Review comment:
       nit: `else if`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11688: KAFKA-13435; Static membership protocol should let the leader skip assignment (KIP-814)

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11688:
URL: https://github.com/apache/kafka/pull/11688#discussion_r804248978



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -699,11 +702,15 @@ public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> fut
         return sendSyncGroupRequest(requestBuilder);
     }
 
-    private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) {
+    private RequestFuture<ByteBuffer> onLeaderElected(JoinGroupResponse joinResponse) {
         try {
             // perform the leader synchronization and send back the assignment for the group
-            Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.data().leader(), joinResponse.data().protocolName(),
-                    joinResponse.data().members());
+            Map<String, ByteBuffer> groupAssignment = onLeaderElected(
+                joinResponse.data().leader(),
+                joinResponse.data().protocolName(),
+                joinResponse.data().members(),
+                joinResponse.data().skipAssignment()
+            );

Review comment:
       Fair enough.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #11688: KAFKA-13435; Static membership protocol should let the leader skip assignment (KIP-814)

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #11688:
URL: https://github.com/apache/kafka/pull/11688#discussion_r802417366



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -696,8 +698,12 @@ public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> fut
     private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) {
         try {
             // perform the leader synchronization and send back the assignment for the group
-            Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.data().leader(), joinResponse.data().protocolName(),
-                    joinResponse.data().members());
+            Map<String, ByteBuffer> groupAssignment = performAssignment(
+                joinResponse.data().leader(),
+                joinResponse.data().protocolName(),
+                joinResponse.data().members(),
+                joinResponse.data().skipAssignment()

Review comment:
       That makes sense. I have renamed the method to avoid the confusion and updated its javadoc.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] artemlivshits commented on a change in pull request #11688: KAFKA-13435; Static membership protocol should let the leader skip assignment (KIP-814)

Posted by GitBox <gi...@apache.org>.
artemlivshits commented on a change in pull request #11688:
URL: https://github.com/apache/kafka/pull/11688#discussion_r803873160



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
##########
@@ -211,7 +211,13 @@ protected void onJoinComplete(int generation, String memberId, String protocol,
     }
 
     @Override
-    protected Map<String, ByteBuffer> performAssignment(String leaderId, String protocol, List<JoinGroupResponseMember> allMemberMetadata) {
+    protected Map<String, ByteBuffer> onLeaderElected(String leaderId,
+                                                      String protocol,
+                                                      List<JoinGroupResponseMember> allMemberMetadata,
+                                                      boolean skipAssignment) {
+        if (skipAssignment)
+            throw new IllegalStateException("Can't skip assignment because Connect does not support static membership.");

Review comment:
       We could add an assertion for sure so that in tests we catch it.  It's just I'm not sure that we need this to fail in production.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #11688: KAFKA-13435; Static membership protocol should let the leader skip assignment (KIP-814)

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #11688:
URL: https://github.com/apache/kafka/pull/11688#discussion_r802515290



##########
File path: core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
##########
@@ -1791,4 +1793,33 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertTrue(records2.count() == 1 && records2.records(tp).asScala.head.offset == 1,
       "Expected consumer2 to consume one message from offset 1, which is the committed offset of consumer1")
   }
+  @Test
+  def testStaticConsumerDetectsNewPartitionCreatedAfterRestart(): Unit = {

Review comment:
       Well.. We can't really restart a consumer, right? The only way is to recreate the consumer. What I meant here is that the static member is restarted.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11688: KAFKA-13435; Static membership protocol should let the leader skip assignment (KIP-814)

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11688:
URL: https://github.com/apache/kafka/pull/11688#discussion_r802624600



##########
File path: core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
##########
@@ -1791,4 +1793,33 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertTrue(records2.count() == 1 && records2.records(tp).asScala.head.offset == 1,
       "Expected consumer2 to consume one message from offset 1, which is the committed offset of consumer1")
   }
+  @Test
+  def testStaticConsumerDetectsNewPartitionCreatedAfterRestart(): Unit = {

Review comment:
       Make sense. Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org