You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "dajac (via GitHub)" <gi...@apache.org> on 2023/03/14 16:29:37 UTC

[GitHub] [kafka] dajac commented on a diff in pull request #13323: KAFKA-14617: Add ReplicaState to FetchRequest

dajac commented on code in PR #13323:
URL: https://github.com/apache/kafka/pull/13323#discussion_r1135754940


##########
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java:
##########
@@ -130,10 +131,33 @@ private static Optional<Integer> optionalEpoch(int rawEpochValue) {
         }
     }
 
+    public static class SimpleBuilder extends AbstractRequest.Builder<FetchRequest> {
+        private final FetchRequestData fetchRequestData;
+        public SimpleBuilder(FetchRequestData fetchRequestData) {
+            super(ApiKeys.FETCH);
+            this.fetchRequestData = fetchRequestData;
+        }
+
+        @Override
+        public FetchRequest build(short version) {
+            int replicaId = FetchRequest.replicaId(fetchRequestData);
+            long replicaEpoch = fetchRequestData.replicaState().replicaEpoch();
+            if (version < 15) {
+                fetchRequestData.setReplicaId(replicaId);
+                fetchRequestData.setReplicaState(new ReplicaState());
+            } else {
+                fetchRequestData.setReplicaState(new ReplicaState().setReplicaId(replicaId).setReplicaEpoch(replicaEpoch));
+                fetchRequestData.setReplicaId(-1);
+            }

Review Comment:
   My understanding is that we always use the new format everywhere so we should only care about downgrading, no? If we get a replica id >= 0, we could even consider throwing an UnsupportedVersionException for instance.



##########
core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala:
##########
@@ -44,7 +45,10 @@ object KafkaNetworkChannel {
       case fetchRequest: FetchRequestData =>
         // Since we already have the request, we go through a simplified builder
         new AbstractRequest.Builder[FetchRequest](ApiKeys.FETCH) {
-          override def build(version: Short): FetchRequest = new FetchRequest(fetchRequest, version)
+          override def build(version: Short): FetchRequest = {
+            val builder = new SimpleBuilder(fetchRequest)
+            new FetchRequest(builder.build(version).data(), version)
+          }
           override def toString: String = fetchRequest.toString
         }

Review Comment:
   You can replace all of this by `new FetchRequest.SimpleBuilder(fetchRequest)`.



##########
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java:
##########
@@ -130,10 +131,33 @@ private static Optional<Integer> optionalEpoch(int rawEpochValue) {
         }
     }
 
+    public static class SimpleBuilder extends AbstractRequest.Builder<FetchRequest> {

Review Comment:
   nit: Could we put a comment saying that this is only used by the KafkaRaftClient?



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -983,16 +987,16 @@ private CompletableFuture<FetchResponseData> handleFetchRequest(
                 Errors error = Errors.forException(cause);
                 if (error != Errors.REQUEST_TIMED_OUT) {
                     logger.debug("Failed to handle fetch from {} at {} due to {}",
-                        request.replicaId(), fetchPartition.fetchOffset(), error);
+                        FetchRequest.replicaId(request), fetchPartition.fetchOffset(), error);
                     return buildEmptyFetchResponse(error, Optional.empty());
                 }
             }
 
             // FIXME: `completionTimeMs`, which can be null
             logger.trace("Completing delayed fetch from {} starting at offset {} at {}",
-                request.replicaId(), fetchPartition.fetchOffset(), completionTimeMs);
+                FetchRequest.replicaId(request), fetchPartition.fetchOffset(), completionTimeMs);
 
-            return tryCompleteFetchRequest(request.replicaId(), fetchPartition, time.milliseconds());
+            return tryCompleteFetchRequest(FetchRequest.replicaId(request), fetchPartition, time.milliseconds());

Review Comment:
   nit: Would it make sense to pull `FetchRequest.replicaId(request)` into a variable instead of calling it everywhere?



##########
core/src/test/scala/kafka/server/RemoteLeaderEndPointTest.scala:
##########
@@ -116,4 +119,18 @@ class RemoteLeaderEndPointTest {
         assertThrows(classOf[UnknownLeaderEpochException], () => endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch + 1))
         assertThrows(classOf[UnknownLeaderEpochException], () => endPoint.fetchLatestOffset(topicPartition, currentLeaderEpoch + 1))
     }
+
+    @Test
+    def testBrokerEpochSupplier(): Unit = {
+        val tp = new TopicPartition("topic1", 0)
+        val topicId1 = Uuid.randomUuid()
+        val log: UnifiedLog = mock(classOf[UnifiedLog])
+        val partitionMap = Map(
+            tp -> PartitionFetchState(Some(topicId1), 150, None, 0, None, state = Fetching, lastFetchedEpoch = None))
+        when(replicaManager.localLogOrException(tp)).thenReturn(log)
+        when(log.logStartOffset).thenReturn(1)
+        val ResultWithPartitions(fetchRequestOpt, partitionsWithError) = endPoint.buildFetch(partitionMap)
+        assertTrue(partitionsWithError.isEmpty)
+        assertEquals(1L, fetchRequestOpt.get.fetchRequest.build(15).replicaEpoch())

Review Comment:
   nit: 1) Should we test all versions? 2) Could we bump the replica epoch after this point, build a new fetch request, and assert it again?



##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java:
##########
@@ -1436,6 +1441,38 @@ public void testInvalidFetchRequest() throws Exception {
         context.assertSentFetchPartitionResponse(Errors.INVALID_REQUEST, epoch, OptionalInt.of(localId));
     }
 
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.FETCH)
+    public void testFetchRequestVersionHandling(short version) throws Exception {

Review Comment:
   I need to take a deeper look into this one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org