You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/07/01 04:07:27 UTC
[kafka] branch trunk updated: KAFKA-14036; Set local time in `ControllerApis` when `handle` returns (#12372)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6bf5bfc298 KAFKA-14036; Set local time in `ControllerApis` when `handle` returns (#12372)
6bf5bfc298 is described below
commit 6bf5bfc2982158c3a1bfff4a0f65ea901ea84e7a
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Thu Jun 30 21:07:21 2022 -0700
KAFKA-14036; Set local time in `ControllerApis` when `handle` returns (#12372)
In `ControllerApis`, we are missing the logic to set the local processing end time after `handle` returns. As a consequence of this, the remote time ends up reported as the local time in the request level metrics. The patch adds the same logic we have in `KafkaApis` to set `apiLocalCompleteTimeNanos`.
Reviewers: José Armando García Sancio <js...@gmail.com>
---
.../main/scala/kafka/server/ControllerApis.scala | 5 +++
.../unit/kafka/server/ControllerApisTest.scala | 47 +++++++++++++++++++---
2 files changed, 46 insertions(+), 6 deletions(-)
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala
index 85ae7d70b6..74bc4dd406 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -117,6 +117,11 @@ class ControllerApis(val requestChannel: RequestChannel,
s"with context ${request.context}", t)
requestHelper.handleError(request, t)
}
+ } finally {
+ // Only record local completion time if it is unset.
+ if (request.apiLocalCompleteTimeNanos < 0) {
+ request.apiLocalCompleteTimeNanos = time.nanoseconds
+ }
}
}
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index 0c6f979874..86a0c85470 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -17,11 +17,6 @@
package kafka.server
-import java.net.InetAddress
-import java.util
-import java.util.Collections.singletonList
-import java.util.{Collections, Properties}
-import java.util.concurrent.{CompletableFuture, ExecutionException}
import kafka.network.RequestChannel
import kafka.raft.RaftManager
import kafka.server.QuotaFactory.QuotaManagers
@@ -44,7 +39,7 @@ import org.apache.kafka.common.message.DeleteTopicsRequestData.DeleteTopicState
import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource, AlterConfigsResourceCollection, AlterableConfig, AlterableConfigCollection}
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse
-import org.apache.kafka.common.message.{CreateTopicsRequestData, _}
+import org.apache.kafka.common.message._
import org.apache.kafka.common.network.{ClientInformation, ListenerName}
import org.apache.kafka.common.protocol.Errors._
import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors}
@@ -64,6 +59,11 @@ import org.mockito.ArgumentMatchers._
import org.mockito.Mockito._
import org.mockito.{ArgumentCaptor, ArgumentMatchers}
+import java.net.InetAddress
+import java.util
+import java.util.Collections.singletonList
+import java.util.concurrent.{CompletableFuture, ExecutionException, TimeUnit}
+import java.util.{Collections, Properties}
import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
import scala.reflect.ClassTag
@@ -174,6 +174,41 @@ class ControllerApisTest {
)
}
+ @Test
+ def testFetchLocalTimeComputedCorrectly(): Unit = {
+ val localTimeDurationMs = 5
+ val initialTimeNanos = time.nanoseconds()
+ val initialTimeMs = time.milliseconds()
+
+ when(
+ raftManager.handleRequest(
+ any(classOf[RequestHeader]),
+ any(classOf[ApiMessage]),
+ any(classOf[Long])
+ )
+ ).thenAnswer { _ =>
+ time.sleep(localTimeDurationMs)
+ new CompletableFuture[ApiMessage]()
+ }
+
+ // Local time should be updated when `ControllerApis.handle` returns
+ val fetchRequestData = new FetchRequestData()
+ val request = buildRequest(new FetchRequest(fetchRequestData, ApiKeys.FETCH.latestVersion))
+ createControllerApis(None, new MockController.Builder().build())
+ .handle(request, RequestLocal.NoCaching)
+
+ verify(raftManager).handleRequest(
+ ArgumentMatchers.eq(request.header),
+ ArgumentMatchers.eq(fetchRequestData),
+ ArgumentMatchers.eq(initialTimeMs)
+ )
+
+ assertEquals(localTimeDurationMs, TimeUnit.MILLISECONDS.convert(
+ request.apiLocalCompleteTimeNanos - initialTimeNanos,
+ TimeUnit.NANOSECONDS
+ ))
+ }
+
@Test
def testUnauthorizedFetchSnapshot(): Unit = {
assertThrows(classOf[ClusterAuthorizationException], () => createControllerApis(