You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/07/03 10:33:37 UTC

[GitHub] [kafka] rajinisivaram opened a new pull request #10965: KAFKA-13029; Set appropriate fields for FindCoordinatorRequest based on version

rajinisivaram opened a new pull request #10965:
URL: https://github.com/apache/kafka/pull/10965


   KIP-699 added support for batching in FindCoordinatorRequest using a new protocol that changes the wire format for both batched and unbatched requests. Clients were updated to try the new format first and switch irreversibly to the old format if the new format is not supported on one broker. During rolling upgrade (or a downgrade), it is possible that a broker doesn't support new format at some point while other brokers do at a later point. Clients end up in a bad state until restarted since use new version with old format. This PR changes FindCoordinatorRequest to set data based on actual version when a single group is used. This is always the case for consumer coordinator and transaction coordinator. For admin API, we still switch to unbatched mode on failure, but the data is set based on actual version, so we never fail even if brokers are upgraded/downgraded.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on a change in pull request #10965: KAFKA-13029; Set appropriate fields for FindCoordinatorRequest based on version

Posted by GitBox <gi...@apache.org>.
chia7712 commented on a change in pull request #10965:
URL: https://github.com/apache/kafka/pull/10965#discussion_r663507496



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
##########
@@ -1145,11 +1142,7 @@ private void lookupCoordinator(FindCoordinatorRequest.CoordinatorType type, Stri
 
         FindCoordinatorRequestData data = new FindCoordinatorRequestData()
                 .setKeyType(type.id());
-        if (batchFindCoordinator) {
-            data.setCoordinatorKeys(Collections.singletonList(coordinatorKey));
-        } else {
-            data.setKey(coordinatorKey);
-        }
+        data.setKey(coordinatorKey);

Review comment:
       ditto (code style)

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java
##########
@@ -98,7 +98,8 @@ public ApiRequestScope lookupScope(CoordinatorKey key) {
                 key.idValue + "' cannot be represented in a request."));
         }
         FindCoordinatorResponse response = (FindCoordinatorResponse) abstractResponse;
-        if (batch) {
+        // Handle response based on whether batched version is used by checking `coordinators()` in the response data
+        if (!response.data().coordinators().isEmpty()) {

Review comment:
       Both of consumer coordinator and transaction coordinator parse response according to version. By contrast, it check `coordinators`. Could we unify the behavior? Could consumer coordinator and transaction coordinator check `coordinators` instead? It seems checking `coordinators` can simplify the code as we don't need to pass request version to each handler.

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -815,29 +813,22 @@ public void handle(SyncGroupResponse syncResponse,
      */
     private RequestFuture<Void> sendFindCoordinatorRequest(Node node) {
         // initiate the group metadata request
-        log.debug("Sending FindCoordinator request to broker {} with batch={}", node, batchFindCoordinator);
+        log.debug("Sending FindCoordinator request to broker {}", node);
         FindCoordinatorRequestData data = new FindCoordinatorRequestData()
                 .setKeyType(CoordinatorType.GROUP.id());
-        if (batchFindCoordinator) {
-            data.setCoordinatorKeys(Collections.singletonList(this.rebalanceConfig.groupId));
-        } else {
-            data.setKey(this.rebalanceConfig.groupId);
-        }
+        data.setKey(this.rebalanceConfig.groupId);

Review comment:
       this line can be merged. for example:
   ```java
           FindCoordinatorRequestData data = new FindCoordinatorRequestData()
                   .setKeyType(CoordinatorType.GROUP.id())
                   .setKey(this.rebalanceConfig.groupId);
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on pull request #10965: KAFKA-13029; Set appropriate fields for FindCoordinatorRequest based on version

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on pull request #10965:
URL: https://github.com/apache/kafka/pull/10965#issuecomment-874004039






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram merged pull request #10965: KAFKA-13029; Set appropriate fields for FindCoordinatorRequest based on version

Posted by GitBox <gi...@apache.org>.
rajinisivaram merged pull request #10965:
URL: https://github.com/apache/kafka/pull/10965


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on pull request #10965: KAFKA-13029; Set appropriate fields for FindCoordinatorRequest based on version

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on pull request #10965:
URL: https://github.com/apache/kafka/pull/10965#issuecomment-874004039


   @chia7712 @dajac Thanks for the reviews, addressed comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram merged pull request #10965: KAFKA-13029; Set appropriate fields for FindCoordinatorRequest based on version

Posted by GitBox <gi...@apache.org>.
rajinisivaram merged pull request #10965:
URL: https://github.com/apache/kafka/pull/10965


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #10965: KAFKA-13029; Set appropriate fields for FindCoordinatorRequest based on version

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #10965:
URL: https://github.com/apache/kafka/pull/10965#discussion_r663463655



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
##########
@@ -1327,7 +1317,7 @@ boolean isEndTxn() {
 
         abstract AbstractRequest.Builder<?> requestBuilder();
 
-        abstract void handleResponse(AbstractResponse responseBody);
+        abstract void handleResponse(AbstractResponse responseBody, short requestVersion);

Review comment:
       maybe we can add a comment or java doc here to describe why `requestVersion` is necessary here.

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java
##########
@@ -43,9 +46,18 @@ public FindCoordinatorRequest build(short version) {
                 throw new UnsupportedVersionException("Cannot create a v" + version + " FindCoordinator request " +
                         "because we require features supported only in 2 or later.");
             }
-            if (version < 4 && !data.coordinatorKeys().isEmpty()) {
-                throw new NoBatchedFindCoordinatorsException("Cannot create a v" + version + " FindCoordinator request " +
+            int batchedKeys = data.coordinatorKeys().size();
+            if (version < MIN_BATCHED_VERSION) {
+                if (batchedKeys > 1)
+                    throw new NoBatchedFindCoordinatorsException("Cannot create a v" + version + " FindCoordinator request " +
                         "because we require features supported only in 4 or later.");

Review comment:
       nit: `"because we require features supported only in" + MIN_BATCHED_VERSION + "or later.");`

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java
##########
@@ -26,9 +26,12 @@
 import org.apache.kafka.common.protocol.Errors;
 
 import java.nio.ByteBuffer;
+import java.util.Collections;
 
 public class FindCoordinatorRequest extends AbstractRequest {
 
+    public static final short MIN_BATCHED_VERSION = 4;

Review comment:
       Nice refactor!

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java
##########
@@ -98,7 +98,7 @@ public ApiRequestScope lookupScope(CoordinatorKey key) {
                 key.idValue + "' cannot be represented in a request."));
         }
         FindCoordinatorResponse response = (FindCoordinatorResponse) abstractResponse;
-        if (batch) {
+        if (!response.data().coordinators().isEmpty()) {

Review comment:
       could we add a comment here to explain what case it is (ex: batch response case)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #10965: KAFKA-13029; Set appropriate fields for FindCoordinatorRequest based on version

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #10965:
URL: https://github.com/apache/kafka/pull/10965#discussion_r663739212



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
##########
@@ -1532,9 +1525,10 @@ String coordinatorKey() {
         }
 
         @Override
-        public void handleResponse(AbstractResponse response) {
+        public void handleResponse(AbstractResponse response, short requestVersion) {
             FindCoordinatorResponse findCoordinatorResponse = (FindCoordinatorResponse) response;
             CoordinatorType coordinatorType = CoordinatorType.forId(builder.data().keyType());
+            boolean batchFindCoordinator = requestVersion >= FindCoordinatorRequest.MIN_BATCHED_VERSION;

Review comment:
       The above suggestion would also us to avoid having to pass the `requestVersion` down here.

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -815,29 +813,22 @@ public void handle(SyncGroupResponse syncResponse,
      */
     private RequestFuture<Void> sendFindCoordinatorRequest(Node node) {
         // initiate the group metadata request
-        log.debug("Sending FindCoordinator request to broker {} with batch={}", node, batchFindCoordinator);
+        log.debug("Sending FindCoordinator request to broker {}", node);
         FindCoordinatorRequestData data = new FindCoordinatorRequestData()
                 .setKeyType(CoordinatorType.GROUP.id());
-        if (batchFindCoordinator) {
-            data.setCoordinatorKeys(Collections.singletonList(this.rebalanceConfig.groupId));
-        } else {
-            data.setKey(this.rebalanceConfig.groupId);
-        }
+        data.setKey(this.rebalanceConfig.groupId);
         FindCoordinatorRequest.Builder requestBuilder = new FindCoordinatorRequest.Builder(data);
         return client.send(node, requestBuilder)
-                .compose(new FindCoordinatorResponseHandler(batchFindCoordinator));
+                .compose(new FindCoordinatorResponseHandler());
     }
 
     private class FindCoordinatorResponseHandler extends RequestFutureAdapter<ClientResponse, Void> {
-        private boolean batch;
-        FindCoordinatorResponseHandler(boolean batch) {
-            this.batch = batch;
-        }
 
         @Override
         public void onSuccess(ClientResponse resp, RequestFuture<Void> future) {
             log.debug("Received FindCoordinator response {}", resp);
 
+            boolean batch = resp.requestHeader().apiVersion() >= FindCoordinatorRequest.MIN_BATCHED_VERSION;

Review comment:
       How about adding a `coordinators` method to `FindCoordinatorResponse` which would either return the list of coordinators (`data.coordinators()`) if not empty or would return a list containing a `Coordinator` created from the top level information. That would remove all the `batch` checks below.

##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -3236,6 +3230,54 @@ public void testDeleteConsumerGroupsWithOlderBroker() throws Exception {
         }
     }
 
+    @Test
+    public void testDeleteMultipleConsumerGroupsWithOlderBroker() throws Exception {
+        final List<String> groupIds = asList("group1", "group2");
+        ApiVersion findCoordinatorV3 = new ApiVersion()
+                .setApiKey(ApiKeys.FIND_COORDINATOR.id)
+                .setMinVersion((short) 0)
+                .setMaxVersion((short) 3);
+        ApiVersion describeGroups = new ApiVersion()
+                .setApiKey(ApiKeys.DESCRIBE_GROUPS.id)
+                .setMinVersion((short) 0)
+                .setMaxVersion(ApiKeys.DELETE_GROUPS.latestVersion());
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
+            env.kafkaClient().setNodeApiVersions(
+                    NodeApiVersions.create(Arrays.asList(findCoordinatorV3, describeGroups)));
+
+            // dummy response for MockClient to handle the UnsupportedVersionException correctly to switch from batched to un-batched

Review comment:
       nit: `dummy` -> `Dummy`?

##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -3236,6 +3230,54 @@ public void testDeleteConsumerGroupsWithOlderBroker() throws Exception {
         }
     }
 
+    @Test
+    public void testDeleteMultipleConsumerGroupsWithOlderBroker() throws Exception {
+        final List<String> groupIds = asList("group1", "group2");
+        ApiVersion findCoordinatorV3 = new ApiVersion()
+                .setApiKey(ApiKeys.FIND_COORDINATOR.id)
+                .setMinVersion((short) 0)
+                .setMaxVersion((short) 3);
+        ApiVersion describeGroups = new ApiVersion()
+                .setApiKey(ApiKeys.DESCRIBE_GROUPS.id)
+                .setMinVersion((short) 0)
+                .setMaxVersion(ApiKeys.DELETE_GROUPS.latestVersion());
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
+            env.kafkaClient().setNodeApiVersions(
+                    NodeApiVersions.create(Arrays.asList(findCoordinatorV3, describeGroups)));
+
+            // dummy response for MockClient to handle the UnsupportedVersionException correctly to switch from batched to un-batched
+            env.kafkaClient().prepareResponse(null);
+            //Retriable FindCoordinatorResponse errors should be retried

Review comment:
       nit: Should we add a space before `Retriable`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on pull request #10965: KAFKA-13029; Set appropriate fields for FindCoordinatorRequest based on version

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on pull request #10965:
URL: https://github.com/apache/kafka/pull/10965#issuecomment-874111712


   @showuon @chia7712 @dajac Thanks for the reviews. Test failure (RaftClusterTest.testCreateClusterAndCreateAndManyTopics) not related. Merging to trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #10965: KAFKA-13029; Set appropriate fields for FindCoordinatorRequest based on version

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #10965:
URL: https://github.com/apache/kafka/pull/10965#discussion_r663739212



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
##########
@@ -1532,9 +1525,10 @@ String coordinatorKey() {
         }
 
         @Override
-        public void handleResponse(AbstractResponse response) {
+        public void handleResponse(AbstractResponse response, short requestVersion) {
             FindCoordinatorResponse findCoordinatorResponse = (FindCoordinatorResponse) response;
             CoordinatorType coordinatorType = CoordinatorType.forId(builder.data().keyType());
+            boolean batchFindCoordinator = requestVersion >= FindCoordinatorRequest.MIN_BATCHED_VERSION;

Review comment:
       The above suggestion would also us to avoid having to pass the `requestVersion` down here.

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##########
@@ -815,29 +813,22 @@ public void handle(SyncGroupResponse syncResponse,
      */
     private RequestFuture<Void> sendFindCoordinatorRequest(Node node) {
         // initiate the group metadata request
-        log.debug("Sending FindCoordinator request to broker {} with batch={}", node, batchFindCoordinator);
+        log.debug("Sending FindCoordinator request to broker {}", node);
         FindCoordinatorRequestData data = new FindCoordinatorRequestData()
                 .setKeyType(CoordinatorType.GROUP.id());
-        if (batchFindCoordinator) {
-            data.setCoordinatorKeys(Collections.singletonList(this.rebalanceConfig.groupId));
-        } else {
-            data.setKey(this.rebalanceConfig.groupId);
-        }
+        data.setKey(this.rebalanceConfig.groupId);
         FindCoordinatorRequest.Builder requestBuilder = new FindCoordinatorRequest.Builder(data);
         return client.send(node, requestBuilder)
-                .compose(new FindCoordinatorResponseHandler(batchFindCoordinator));
+                .compose(new FindCoordinatorResponseHandler());
     }
 
     private class FindCoordinatorResponseHandler extends RequestFutureAdapter<ClientResponse, Void> {
-        private boolean batch;
-        FindCoordinatorResponseHandler(boolean batch) {
-            this.batch = batch;
-        }
 
         @Override
         public void onSuccess(ClientResponse resp, RequestFuture<Void> future) {
             log.debug("Received FindCoordinator response {}", resp);
 
+            boolean batch = resp.requestHeader().apiVersion() >= FindCoordinatorRequest.MIN_BATCHED_VERSION;

Review comment:
       How about adding a `coordinators` method to `FindCoordinatorResponse` which would either return the list of coordinators (`data.coordinators()`) if not empty or would return a list containing a `Coordinator` created from the top level information. That would remove all the `batch` checks below.

##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -3236,6 +3230,54 @@ public void testDeleteConsumerGroupsWithOlderBroker() throws Exception {
         }
     }
 
+    @Test
+    public void testDeleteMultipleConsumerGroupsWithOlderBroker() throws Exception {
+        final List<String> groupIds = asList("group1", "group2");
+        ApiVersion findCoordinatorV3 = new ApiVersion()
+                .setApiKey(ApiKeys.FIND_COORDINATOR.id)
+                .setMinVersion((short) 0)
+                .setMaxVersion((short) 3);
+        ApiVersion describeGroups = new ApiVersion()
+                .setApiKey(ApiKeys.DESCRIBE_GROUPS.id)
+                .setMinVersion((short) 0)
+                .setMaxVersion(ApiKeys.DELETE_GROUPS.latestVersion());
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
+            env.kafkaClient().setNodeApiVersions(
+                    NodeApiVersions.create(Arrays.asList(findCoordinatorV3, describeGroups)));
+
+            // dummy response for MockClient to handle the UnsupportedVersionException correctly to switch from batched to un-batched

Review comment:
       nit: `dummy` -> `Dummy`?

##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -3236,6 +3230,54 @@ public void testDeleteConsumerGroupsWithOlderBroker() throws Exception {
         }
     }
 
+    @Test
+    public void testDeleteMultipleConsumerGroupsWithOlderBroker() throws Exception {
+        final List<String> groupIds = asList("group1", "group2");
+        ApiVersion findCoordinatorV3 = new ApiVersion()
+                .setApiKey(ApiKeys.FIND_COORDINATOR.id)
+                .setMinVersion((short) 0)
+                .setMaxVersion((short) 3);
+        ApiVersion describeGroups = new ApiVersion()
+                .setApiKey(ApiKeys.DESCRIBE_GROUPS.id)
+                .setMinVersion((short) 0)
+                .setMaxVersion(ApiKeys.DELETE_GROUPS.latestVersion());
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) {
+            env.kafkaClient().setNodeApiVersions(
+                    NodeApiVersions.create(Arrays.asList(findCoordinatorV3, describeGroups)));
+
+            // dummy response for MockClient to handle the UnsupportedVersionException correctly to switch from batched to un-batched
+            env.kafkaClient().prepareResponse(null);
+            //Retriable FindCoordinatorResponse errors should be retried

Review comment:
       nit: Should we add a space before `Retriable`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on pull request #10965: KAFKA-13029; Set appropriate fields for FindCoordinatorRequest based on version

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on pull request #10965:
URL: https://github.com/apache/kafka/pull/10965#issuecomment-873568935


   @showuon Thanks for the review, addressed comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org