You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/07/04 08:08:35 UTC

[GitHub] [kafka] showuon commented on a change in pull request #10965: KAFKA-13029; Set appropriate fields for FindCoordinatorRequest based on version

showuon commented on a change in pull request #10965:
URL: https://github.com/apache/kafka/pull/10965#discussion_r663463655



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
##########
@@ -1327,7 +1317,7 @@ boolean isEndTxn() {
 
         abstract AbstractRequest.Builder<?> requestBuilder();
 
-        abstract void handleResponse(AbstractResponse responseBody);
+        abstract void handleResponse(AbstractResponse responseBody, short requestVersion);

Review comment:
       maybe we can add a comment or java doc here to describe why `requestVersion` is necessary here.

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java
##########
@@ -43,9 +46,18 @@ public FindCoordinatorRequest build(short version) {
                 throw new UnsupportedVersionException("Cannot create a v" + version + " FindCoordinator request " +
                         "because we require features supported only in 2 or later.");
             }
-            if (version < 4 && !data.coordinatorKeys().isEmpty()) {
-                throw new NoBatchedFindCoordinatorsException("Cannot create a v" + version + " FindCoordinator request " +
+            int batchedKeys = data.coordinatorKeys().size();
+            if (version < MIN_BATCHED_VERSION) {
+                if (batchedKeys > 1)
+                    throw new NoBatchedFindCoordinatorsException("Cannot create a v" + version + " FindCoordinator request " +
                         "because we require features supported only in 4 or later.");

Review comment:
       nit: `"because we require features supported only in" + MIN_BATCHED_VERSION + "or later.");`

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java
##########
@@ -26,9 +26,12 @@
 import org.apache.kafka.common.protocol.Errors;
 
 import java.nio.ByteBuffer;
+import java.util.Collections;
 
 public class FindCoordinatorRequest extends AbstractRequest {
 
+    public static final short MIN_BATCHED_VERSION = 4;

Review comment:
       Nice refactor!

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/CoordinatorStrategy.java
##########
@@ -98,7 +98,7 @@ public ApiRequestScope lookupScope(CoordinatorKey key) {
                 key.idValue + "' cannot be represented in a request."));
         }
         FindCoordinatorResponse response = (FindCoordinatorResponse) abstractResponse;
-        if (batch) {
+        if (!response.data().coordinators().isEmpty()) {

Review comment:
       could we add a comment here to explain what case it is (ex: batch response case)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org