You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/11/16 19:52:29 UTC

[GitHub] [kafka] gardnervickers opened a new pull request #9601: MINOR: Enable flexible versioning for ListOffsetRequest/ListOffsetResponse.

gardnervickers opened a new pull request #9601:
URL: https://github.com/apache/kafka/pull/9601


   This patch enables flexible versioning for ListOffsets req/response, as well as introducing a new
   IBP version allowing the replica fetchers to use this new ListOffsets version.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9601: KAFKA-10729: Bump remaining RPC's to use tagged fields.

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9601:
URL: https://github.com/apache/kafka/pull/9601#discussion_r533034552



##########
File path: core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
##########
@@ -84,19 +85,24 @@ class EdgeCaseRequestTest extends KafkaServerTestHarness {
   }
 
   // Custom header serialization so that protocol assumptions are not forced
-  private def requestHeaderBytes(apiKey: Short, apiVersion: Short, clientId: String = "", correlationId: Int = -1): Array[Byte] = {
+  def requestHeaderBytes(apiKey: Short, apiVersion: Short, clientId: String = "", correlationId: Int = -1): Array[Byte] = {
+    // Check for flex versions, some tests here verify that an invalid apiKey is detected properly, so if -1 is used,
+    // assume the request is not using flex versions.
+    val flexVersion = if (apiKey >= 0) ApiKeys.forId(apiKey).requestHeaderVersion(apiVersion) >= 2 else false
     val size = {
       2 /* apiKey */ +
         2 /* version id */ +
         4 /* correlation id */ +
-        Type.NULLABLE_STRING.sizeOf(clientId) /* client id */
+        Type.NULLABLE_STRING.sizeOf(clientId)  /* client id */ +
+        (if (flexVersion) ByteUtils.sizeOfUnsignedVarint(0) else 0)

Review comment:
       nit: maybe add a comment that this field is for the number of tagged fields?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] gardnervickers commented on a change in pull request #9601: KAFKA-10729: Bump remaining RPC's to use tagged fields.

Posted by GitBox <gi...@apache.org>.
gardnervickers commented on a change in pull request #9601:
URL: https://github.com/apache/kafka/pull/9601#discussion_r532249601



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsResponse.java
##########
@@ -38,10 +38,6 @@
 
     private final AlterReplicaLogDirsResponseData data;
 
-    public AlterReplicaLogDirsResponse(Struct struct) {

Review comment:
       Having this override seemed a bit error prone, and was causing failures for the `NetworkClientTest`. I opted to remove it entirely in favor of forcing the caller to specify the response version. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #9601: MINOR: Enable flexible versioning for ListOffsetRequest/ListOffsetResponse.

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #9601:
URL: https://github.com/apache/kafka/pull/9601#discussion_r524636462



##########
File path: core/src/main/scala/kafka/api/ApiVersion.scala
##########
@@ -424,6 +426,13 @@ case object KAFKA_2_7_IV2 extends DefaultApiVersion {
   val id: Int = 30
 }
 
+case object KAFKA_2_7_IV3 extends DefaultApiVersion {

Review comment:
       2.7 has been branched. It should be 2.8, right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9601: KAFKA-10729: Bump remaining RPC's to use tagged fields.

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9601:
URL: https://github.com/apache/kafka/pull/9601#discussion_r533033927



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
##########
@@ -105,8 +105,8 @@ public int hashCode() {
 
         public final WriteTxnMarkersRequestData data;
 
-        public Builder(final List<TxnMarkerEntry> markers) {
-            super(ApiKeys.WRITE_TXN_MARKERS);
+        public Builder(final List<TxnMarkerEntry> markers, short latestAllowedVersion) {
+            super(ApiKeys.WRITE_TXN_MARKERS, ApiKeys.WRITE_TXN_MARKERS.oldestVersion(), latestAllowedVersion);

Review comment:
       I think this is probably ok, but it is a little inconsistent with how we handle the versions for other inter-broker RPCs. Since we rely on the IBP, we always set the version explicitly in the caller, which means there is exactly one allowable version for the builder to use. See for example `LeaderAndIsrRequest.Builder`.

##########
File path: core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
##########
@@ -84,19 +85,24 @@ class EdgeCaseRequestTest extends KafkaServerTestHarness {
   }
 
   // Custom header serialization so that protocol assumptions are not forced
-  private def requestHeaderBytes(apiKey: Short, apiVersion: Short, clientId: String = "", correlationId: Int = -1): Array[Byte] = {
+  def requestHeaderBytes(apiKey: Short, apiVersion: Short, clientId: String = "", correlationId: Int = -1): Array[Byte] = {
+    // Check for flex versions, some tests here verify that an invalid apiKey is detected properly, so if -1 is used,
+    // assume the request is not using flex versions.
+    val flexVersion = if (apiKey >= 0) ApiKeys.forId(apiKey).requestHeaderVersion(apiVersion) >= 2 else false
     val size = {
       2 /* apiKey */ +
         2 /* version id */ +
         4 /* correlation id */ +
-        Type.NULLABLE_STRING.sizeOf(clientId) /* client id */
+        Type.NULLABLE_STRING.sizeOf(clientId)  /* client id */ +
+        (if (flexVersion) ByteUtils.sizeOfUnsignedVarint(0) else 0)

Review comment:
       nit: maybe add a comment that this is field is for the number of tagged fields?

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
##########
@@ -286,6 +286,7 @@ public static byte requiredMagicForVersion(short produceRequestVersion) {
             case 6:
             case 7:
             case 8:
+            case 9:

Review comment:
       I wonder if we may as well make this the default case. Not sure we're getting much by forcing ourselves to update this logic after each bump. Maybe the range validation is still useful, but that could be done by using `oldestVersion` and `latestVersion`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] gardnervickers commented on a change in pull request #9601: MINOR: Enable flexible versioning for ListOffsetRequest/ListOffsetResponse.

Posted by GitBox <gi...@apache.org>.
gardnervickers commented on a change in pull request #9601:
URL: https://github.com/apache/kafka/pull/9601#discussion_r524820388



##########
File path: core/src/main/scala/kafka/api/ApiVersion.scala
##########
@@ -424,6 +426,13 @@ case object KAFKA_2_7_IV2 extends DefaultApiVersion {
   val id: Int = 30
 }
 
+case object KAFKA_2_7_IV3 extends DefaultApiVersion {

Review comment:
       Thanks, that makes more sense. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] gardnervickers commented on a change in pull request #9601: KAFKA-10729: Bump remaining RPC's to use tagged fields.

Posted by GitBox <gi...@apache.org>.
gardnervickers commented on a change in pull request #9601:
URL: https://github.com/apache/kafka/pull/9601#discussion_r532249311



##########
File path: clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
##########
@@ -613,12 +613,12 @@ private void sendResponse(ResponseHeader respHeader, Struct response) {
         selector.completeReceive(new NetworkReceive(node.idString(), buffer));
     }
 
-    private void sendThrottledProduceResponse(int correlationId, int throttleMs) {
+    private void sendThrottledProduceResponse(int correlationId, int throttleMs, short version) {

Review comment:
       This is to fix `testThrottlingNotEnabledForConnectionToOlderBroker`. The test forces ApiVersionsResponse to version 5, but relied on the fact that nothing really changed between version 5 and version 8 for PRODUCE responses. With flex versions we need to ensure the response matches the ApiVersions response




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9601: KAFKA-10729: Bump remaining RPC's to use tagged fields.

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9601:
URL: https://github.com/apache/kafka/pull/9601#discussion_r527045689



##########
File path: core/src/main/scala/kafka/api/ApiVersion.scala
##########
@@ -108,7 +108,9 @@ object ApiVersion {
     // Bup Fetch protocol for Raft protocol (KIP-595)
     KAFKA_2_7_IV1,
     // Introduced AlterIsr (KIP-497)
-    KAFKA_2_7_IV2
+    KAFKA_2_7_IV2,
+    // Flexible versioning on ListOffsets

Review comment:
       For what it's worth, `WriteTxnMarkers` and `OffsetsForLeaderEpoch` are also inter-broker APIs.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] gardnervickers edited a comment on pull request #9601: KAFKA-10729: Bump remaining RPC's to use tagged fields.

Posted by GitBox <gi...@apache.org>.
gardnervickers edited a comment on pull request #9601:
URL: https://github.com/apache/kafka/pull/9601#issuecomment-729887376


   OffsetForLeaderEpoch and Produce are not yet generated RPCs, but will be once #9401 and #9547 are merged. I've removed the taggedFields bump for these RPC's. We can bump them once their respective PR's are merged. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #9601: KAFKA-10729: Bump remaining RPC's to use tagged fields.

Posted by GitBox <gi...@apache.org>.
dajac commented on pull request #9601:
URL: https://github.com/apache/kafka/pull/9601#issuecomment-730287841


   @gardnervickers https://github.com/apache/kafka/pull/9401 and https://github.com/apache/kafka/pull/9547 have been merged. You can bring them back in this PR if you like.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji merged pull request #9601: KAFKA-10729: Bump remaining RPC's to use tagged fields.

Posted by GitBox <gi...@apache.org>.
hachikuji merged pull request #9601:
URL: https://github.com/apache/kafka/pull/9601


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] gardnervickers commented on pull request #9601: KAFKA-10729: Bump remaining RPC's to use tagged fields.

Posted by GitBox <gi...@apache.org>.
gardnervickers commented on pull request #9601:
URL: https://github.com/apache/kafka/pull/9601#issuecomment-729887376


   OffsetForLeaderEpoch and Produce are not yet versioned RPCs, but will be once #9401 and #9547 are merged. I've removed the taggedFields bump for these RPC's. We can bump them once their respective PR's are merged. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] gardnervickers removed a comment on pull request #9601: KAFKA-10729: Bump remaining RPC's to use tagged fields.

Posted by GitBox <gi...@apache.org>.
gardnervickers removed a comment on pull request #9601:
URL: https://github.com/apache/kafka/pull/9601#issuecomment-729796082


   I realize now that we have several requests which have not been converted to use the generated RPC's. It appears we need to do that to support tagged fields. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] gardnervickers commented on pull request #9601: KAFKA-10729: Bump remaining RPC's to use tagged fields.

Posted by GitBox <gi...@apache.org>.
gardnervickers commented on pull request #9601:
URL: https://github.com/apache/kafka/pull/9601#issuecomment-729796082


   I realize now that we have several requests which have not been converted to use the generated RPC's. It appears we need to do that to support tagged fields. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] gardnervickers commented on a change in pull request #9601: KAFKA-10729: Bump remaining RPC's to use tagged fields.

Posted by GitBox <gi...@apache.org>.
gardnervickers commented on a change in pull request #9601:
URL: https://github.com/apache/kafka/pull/9601#discussion_r532249943



##########
File path: core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
##########
@@ -437,8 +437,8 @@ class ReplicaFetcherThreadTest {
     thread.doWork()
     assertEquals(2, mockNetwork.epochFetchCount)
     assertEquals(1, mockNetwork.fetchCount)
-    assertEquals("OffsetsForLeaderEpochRequest version.",
-      3, mockNetwork.lastUsedOffsetForLeaderEpochVersion)
+    assertTrue("OffsetsForLeaderEpochRequest version.",
+      mockNetwork.lastUsedOffsetForLeaderEpochVersion >= 3)

Review comment:
       Admittedly I'm not sure why we check the version of the last `OffsetForLeaderEpoch` response is 3 here. I decided to widen the check a bit so this won't break for future versions. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9601: KAFKA-10729: Bump remaining RPC's to use tagged fields.

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9601:
URL: https://github.com/apache/kafka/pull/9601#discussion_r533719708



##########
File path: core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
##########
@@ -437,8 +437,8 @@ class ReplicaFetcherThreadTest {
     thread.doWork()
     assertEquals(2, mockNetwork.epochFetchCount)
     assertEquals(1, mockNetwork.fetchCount)
-    assertEquals("OffsetsForLeaderEpochRequest version.",
-      3, mockNetwork.lastUsedOffsetForLeaderEpochVersion)
+    assertTrue("OffsetsForLeaderEpochRequest version.",
+      mockNetwork.lastUsedOffsetForLeaderEpochVersion >= 3)

Review comment:
       I think this was originally using `1` in order to ensure that we were using a version which included the epoch in the response. Since then it looks like it has been updated blindly every time we've bumped the protocol. I'm ok leaving this as is, but we could probably also get rid of it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org