You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/12/01 02:37:50 UTC

[GitHub] [kafka] hachikuji commented on a change in pull request #9601: KAFKA-10729: Bump remaining RPC's to use tagged fields.

hachikuji commented on a change in pull request #9601:
URL: https://github.com/apache/kafka/pull/9601#discussion_r533033927



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
##########
@@ -105,8 +105,8 @@ public int hashCode() {
 
         public final WriteTxnMarkersRequestData data;
 
-        public Builder(final List<TxnMarkerEntry> markers) {
-            super(ApiKeys.WRITE_TXN_MARKERS);
+        public Builder(final List<TxnMarkerEntry> markers, short latestAllowedVersion) {
+            super(ApiKeys.WRITE_TXN_MARKERS, ApiKeys.WRITE_TXN_MARKERS.oldestVersion(), latestAllowedVersion);

Review comment:
       I think this is probably ok, but it is a little inconsistent with how we handle the versions for other inter-broker RPCs. Since we rely on the IBP, we always set the version explicitly in the caller, which means there is exactly one allowable version for the builder to use. See for example `LeaderAndIsrRequest.Builder`.

##########
File path: core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
##########
@@ -84,19 +85,24 @@ class EdgeCaseRequestTest extends KafkaServerTestHarness {
   }
 
   // Custom header serialization so that protocol assumptions are not forced
-  private def requestHeaderBytes(apiKey: Short, apiVersion: Short, clientId: String = "", correlationId: Int = -1): Array[Byte] = {
+  def requestHeaderBytes(apiKey: Short, apiVersion: Short, clientId: String = "", correlationId: Int = -1): Array[Byte] = {
+    // Check for flex versions, some tests here verify that an invalid apiKey is detected properly, so if -1 is used,
+    // assume the request is not using flex versions.
+    val flexVersion = if (apiKey >= 0) ApiKeys.forId(apiKey).requestHeaderVersion(apiVersion) >= 2 else false
     val size = {
       2 /* apiKey */ +
         2 /* version id */ +
         4 /* correlation id */ +
-        Type.NULLABLE_STRING.sizeOf(clientId) /* client id */
+        Type.NULLABLE_STRING.sizeOf(clientId)  /* client id */ +
+        (if (flexVersion) ByteUtils.sizeOfUnsignedVarint(0) else 0)

Review comment:
       nit: maybe add a comment that this is field is for the number of tagged fields?

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
##########
@@ -286,6 +286,7 @@ public static byte requiredMagicForVersion(short produceRequestVersion) {
             case 6:
             case 7:
             case 8:
+            case 9:

Review comment:
       I wonder if we may as well make this the default case. Not sure we're getting much by forcing ourselves to update this logic after each bump. Maybe the range validation is still useful, but that could be done by using `oldestVersion` and `latestVersion`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org