You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/07/01 04:07:27 UTC

[kafka] branch trunk updated: KAFKA-14036; Set local time in `ControllerApis` when `handle` returns (#12372)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6bf5bfc298 KAFKA-14036; Set local time in `ControllerApis` when `handle` returns (#12372)
6bf5bfc298 is described below

commit 6bf5bfc2982158c3a1bfff4a0f65ea901ea84e7a
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Thu Jun 30 21:07:21 2022 -0700

    KAFKA-14036; Set local time in `ControllerApis` when `handle` returns (#12372)
    
    In `ControllerApis`, we are missing the logic to set the local processing end time after `handle` returns. As a consequence of this, the remote time ends up reported as the local time in the request level metrics. The patch adds the same logic we have in `KafkaApis` to set `apiLocalCompleteTimeNanos`.
    
    Reviewers: José Armando García Sancio <js...@gmail.com>
---
 .../main/scala/kafka/server/ControllerApis.scala   |  5 +++
 .../unit/kafka/server/ControllerApisTest.scala     | 47 +++++++++++++++++++---
 2 files changed, 46 insertions(+), 6 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala
index 85ae7d70b6..74bc4dd406 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -117,6 +117,11 @@ class ControllerApis(val requestChannel: RequestChannel,
           s"with context ${request.context}", t)
         requestHelper.handleError(request, t)
       }
+    } finally {
+      // Only record local completion time if it is unset.
+      if (request.apiLocalCompleteTimeNanos < 0) {
+        request.apiLocalCompleteTimeNanos = time.nanoseconds
+      }
     }
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index 0c6f979874..86a0c85470 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -17,11 +17,6 @@
 
 package kafka.server
 
-import java.net.InetAddress
-import java.util
-import java.util.Collections.singletonList
-import java.util.{Collections, Properties}
-import java.util.concurrent.{CompletableFuture, ExecutionException}
 import kafka.network.RequestChannel
 import kafka.raft.RaftManager
 import kafka.server.QuotaFactory.QuotaManagers
@@ -44,7 +39,7 @@ import org.apache.kafka.common.message.DeleteTopicsRequestData.DeleteTopicState
 import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult
 import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource, AlterConfigsResourceCollection, AlterableConfig, AlterableConfigCollection}
 import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse
-import org.apache.kafka.common.message.{CreateTopicsRequestData, _}
+import org.apache.kafka.common.message._
 import org.apache.kafka.common.network.{ClientInformation, ListenerName}
 import org.apache.kafka.common.protocol.Errors._
 import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors}
@@ -64,6 +59,11 @@ import org.mockito.ArgumentMatchers._
 import org.mockito.Mockito._
 import org.mockito.{ArgumentCaptor, ArgumentMatchers}
 
+import java.net.InetAddress
+import java.util
+import java.util.Collections.singletonList
+import java.util.concurrent.{CompletableFuture, ExecutionException, TimeUnit}
+import java.util.{Collections, Properties}
 import scala.annotation.nowarn
 import scala.jdk.CollectionConverters._
 import scala.reflect.ClassTag
@@ -174,6 +174,41 @@ class ControllerApisTest {
     )
   }
 
+  @Test
+  def testFetchLocalTimeComputedCorrectly(): Unit = {
+    val localTimeDurationMs = 5
+    val initialTimeNanos = time.nanoseconds()
+    val initialTimeMs = time.milliseconds()
+
+    when(
+      raftManager.handleRequest(
+        any(classOf[RequestHeader]),
+        any(classOf[ApiMessage]),
+        any(classOf[Long])
+      )
+    ).thenAnswer { _ =>
+      time.sleep(localTimeDurationMs)
+      new CompletableFuture[ApiMessage]()
+    }
+
+    // Local time should be updated when `ControllerApis.handle` returns
+    val fetchRequestData = new FetchRequestData()
+    val request = buildRequest(new FetchRequest(fetchRequestData, ApiKeys.FETCH.latestVersion))
+    createControllerApis(None, new MockController.Builder().build())
+      .handle(request, RequestLocal.NoCaching)
+
+    verify(raftManager).handleRequest(
+      ArgumentMatchers.eq(request.header),
+      ArgumentMatchers.eq(fetchRequestData),
+      ArgumentMatchers.eq(initialTimeMs)
+    )
+
+    assertEquals(localTimeDurationMs, TimeUnit.MILLISECONDS.convert(
+      request.apiLocalCompleteTimeNanos - initialTimeNanos,
+      TimeUnit.NANOSECONDS
+    ))
+  }
+
   @Test
   def testUnauthorizedFetchSnapshot(): Unit = {
     assertThrows(classOf[ClusterAuthorizationException], () => createControllerApis(