You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/07/05 08:31:01 UTC

[GitHub] [kafka] dajac commented on a change in pull request #10965: KAFKA-13029; Set appropriate fields for FindCoordinatorRequest based on version

dajac commented on a change in pull request #10965:
URL: https://github.com/apache/kafka/pull/10965#discussion_r663739212



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
##########
@@ -1532,9 +1525,10 @@ String coordinatorKey() {
         }
 
         @Override
-        public void handleResponse(AbstractResponse response) {
+        public void handleResponse(AbstractResponse response, short requestVersion) {
             FindCoordinatorResponse findCoordinatorResponse = (FindCoordinatorResponse) response;
             CoordinatorType coordinatorType = CoordinatorType.forId(builder.data().keyType());
+            boolean batchFindCoordinator = requestVersion >= FindCoordinatorRequest.MIN_BATCHED_VERSION;

Review comment:
       The above suggestion would also us to avoid having to pass the `requestVersion` down here.

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -815,29 +813,22 @@ public void handle(SyncGroupResponse syncResponse,
      */
     private RequestFuture<Void> sendFindCoordinatorRequest(Node node) {
         // initiate the group metadata request
-        log.debug("Sending FindCoordinator request to broker {} with batch={}", node, batchFindCoordinator);
+        log.debug("Sending FindCoordinator request to broker {}", node);
         FindCoordinatorRequestData data = new FindCoordinatorRequestData()
                 .setKeyType(CoordinatorType.GROUP.id());
-        if (batchFindCoordinator) {
-            data.setCoordinatorKeys(Collections.singletonList(this.rebalanceConfig.groupId));
-        } else {
-            data.setKey(this.rebalanceConfig.groupId);
-        }
+        data.setKey(this.rebalanceConfig.groupId);
         FindCoordinatorRequest.Builder requestBuilder = new FindCoordinatorRequest.Builder(data);
         return client.send(node, requestBuilder)
-                .compose(new FindCoordinatorResponseHandler(batchFindCoordinator));
+                .compose(new FindCoordinatorResponseHandler());
     }
 
     private class FindCoordinatorResponseHandler extends RequestFutureAdapter<ClientResponse, Void> {
-        private boolean batch;
-        FindCoordinatorResponseHandler(boolean batch) {
-            this.batch = batch;
-        }
 
         @Override
         public void onSuccess(ClientResponse resp, RequestFuture<Void> future) {
             log.debug("Received FindCoordinator response {}", resp);
 
+            boolean batch = resp.requestHeader().apiVersion() >= FindCoordinatorRequest.MIN_BATCHED_VERSION;

Review comment:
       How about adding a `coordinators` method to `FindCoordinatorResponse` which would either return the list of coordinators (`data.coordinators()`) if not empty or would return a list containing a `Coordinator` created from the top level information. That would remove all the `batch` checks below.

##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -3236,6 +3230,54 @@ public void testDeleteConsumerGroupsWithOlderBroker() throws Exception {
         }
     }
 
+    @Test
+    public void testDeleteMultipleConsumerGroupsWithOlderBroker() throws Exception {
+        final List<String> groupIds = asList("group1", "group2");
+        ApiVersion findCoordinatorV3 = new ApiVersion()
+                .setApiKey(ApiKeys.FIND_COORDINATOR.id)
+                .setMinVersion((short) 0)
+                .setMaxVersion((short) 3);
+        ApiVersion describeGroups = new ApiVersion()
+                .setApiKey(ApiKeys.DESCRIBE_GROUPS.id)
+                .setMinVersion((short) 0)
+                .setMaxVersion(ApiKeys.DELETE_GROUPS.latestVersion());
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
+            env.kafkaClient().setNodeApiVersions(
+                    NodeApiVersions.create(Arrays.asList(findCoordinatorV3, describeGroups)));
+
+            // dummy response for MockClient to handle the UnsupportedVersionException correctly to switch from batched to un-batched

Review comment:
       nit: `dummy` -> `Dummy`?

##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -3236,6 +3230,54 @@ public void testDeleteConsumerGroupsWithOlderBroker() throws Exception {
         }
     }
 
+    @Test
+    public void testDeleteMultipleConsumerGroupsWithOlderBroker() throws Exception {
+        final List<String> groupIds = asList("group1", "group2");
+        ApiVersion findCoordinatorV3 = new ApiVersion()
+                .setApiKey(ApiKeys.FIND_COORDINATOR.id)
+                .setMinVersion((short) 0)
+                .setMaxVersion((short) 3);
+        ApiVersion describeGroups = new ApiVersion()
+                .setApiKey(ApiKeys.DESCRIBE_GROUPS.id)
+                .setMinVersion((short) 0)
+                .setMaxVersion(ApiKeys.DELETE_GROUPS.latestVersion());
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
+            env.kafkaClient().setNodeApiVersions(
+                    NodeApiVersions.create(Arrays.asList(findCoordinatorV3, describeGroups)));
+
+            // dummy response for MockClient to handle the UnsupportedVersionException correctly to switch from batched to un-batched
+            env.kafkaClient().prepareResponse(null);
+            //Retriable FindCoordinatorResponse errors should be retried

Review comment:
       nit: Should we add a space before `Retriable`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org