You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/07/06 20:21:42 UTC

[kafka] branch trunk updated: KAFKA-14032; Dequeue time for forwarded requests is unset (#12360)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6495a0768ca KAFKA-14032; Dequeue time for forwarded requests is unset (#12360)
6495a0768ca is described below

commit 6495a0768cae1086d4b1dfa466967dfe178c1553
Author: YU <yu...@live.com>
AuthorDate: Thu Jul 7 04:21:28 2022 +0800

    KAFKA-14032; Dequeue time for forwarded requests is unset (#12360)
    
    When building a forwarded request, we need to override the dequeue time of the underlying request to match the same value as the envelope. Otherwise, the field is left unset, which causes inaccurate reporting.
    
    Reviewers; Jason Gustafson <ja...@confluent.io>
---
 core/src/main/scala/kafka/server/EnvelopeUtils.scala      |  5 ++++-
 core/src/test/scala/unit/kafka/server/KafkaApisTest.scala | 10 +++++++---
 core/src/test/scala/unit/kafka/utils/TestUtils.scala      |  5 ++++-
 3 files changed, 15 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/kafka/server/EnvelopeUtils.scala b/core/src/main/scala/kafka/server/EnvelopeUtils.scala
index a162ae5fe80..97c532ebb45 100644
--- a/core/src/main/scala/kafka/server/EnvelopeUtils.scala
+++ b/core/src/main/scala/kafka/server/EnvelopeUtils.scala
@@ -84,7 +84,7 @@ object EnvelopeUtils {
     requestChannelMetrics: RequestChannel.Metrics
   ): RequestChannel.Request = {
     try {
-      new RequestChannel.Request(
+      val forwardedRequest = new RequestChannel.Request(
         processor = envelope.processor,
         context = forwardedContext,
         startTimeNanos = envelope.startTimeNanos,
@@ -93,6 +93,9 @@ object EnvelopeUtils {
         requestChannelMetrics,
         Some(envelope)
       )
+      // set the dequeue time of forwardedRequest as the value of envelope request
+      forwardedRequest.requestDequeueTimeNanos = envelope.requestDequeueTimeNanos
+      forwardedRequest
     } catch {
       case e: InvalidRequestException =>
         // We use UNSUPPORTED_VERSION if the embedded request cannot be parsed.
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index cc34cabe05a..d176f369f8d 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -307,8 +307,10 @@ class KafkaApisTest {
         Seq(new AlterConfigsRequest.ConfigEntry("foo", "bar")).asJava))
     val alterConfigsRequest = new AlterConfigsRequest.Builder(configs.asJava, false).build(requestHeader.apiVersion)
 
+    val startTimeNanos = time.nanoseconds()
+    val queueDurationNanos = 5 * 1000 * 1000
     val request = TestUtils.buildEnvelopeRequest(
-      alterConfigsRequest, kafkaPrincipalSerde, requestChannelMetrics, time.nanoseconds())
+      alterConfigsRequest, kafkaPrincipalSerde, requestChannelMetrics, startTimeNanos, startTimeNanos + queueDurationNanos)
 
     val capturedResponse: ArgumentCaptor[AlterConfigsResponse] = ArgumentCaptor.forClass(classOf[AlterConfigsResponse])
     val capturedRequest: ArgumentCaptor[RequestChannel.Request] = ArgumentCaptor.forClass(classOf[RequestChannel.Request])
@@ -321,6 +323,8 @@ class KafkaApisTest {
       any()
     )
     assertEquals(Some(request), capturedRequest.getValue.envelope)
+    // the dequeue time of forwarded request should equals to envelop request
+    assertEquals(request.requestDequeueTimeNanos, capturedRequest.getValue.requestDequeueTimeNanos)
     val innerResponse = capturedResponse.getValue
     val responseMap = innerResponse.data.responses().asScala.map { resourceResponse =>
       resourceResponse.resourceName() -> Errors.forCode(resourceResponse.errorCode)
@@ -397,7 +401,7 @@ class KafkaApisTest {
       .build(requestHeader.apiVersion)
 
     val request = TestUtils.buildEnvelopeRequest(
-      alterConfigsRequest, kafkaPrincipalSerde, requestChannelMetrics, time.nanoseconds(), fromPrivilegedListener)
+      alterConfigsRequest, kafkaPrincipalSerde, requestChannelMetrics, time.nanoseconds(), fromPrivilegedListener = fromPrivilegedListener)
 
     val capturedResponse: ArgumentCaptor[AbstractResponse] = ArgumentCaptor.forClass(classOf[AbstractResponse])
     createKafkaApis(authorizer = Some(authorizer), enableForwarding = true).handle(request, RequestLocal.withThreadConfinedCaching)
@@ -1614,7 +1618,7 @@ class KafkaApisTest {
 
       assertEquals(1, response.data.responses.size)
       val topicProduceResponse = response.data.responses.asScala.head
-      assertEquals(1, topicProduceResponse.partitionResponses.size)   
+      assertEquals(1, topicProduceResponse.partitionResponses.size)
       val partitionProduceResponse = topicProduceResponse.partitionResponses.asScala.head
       assertEquals(Errors.INVALID_PRODUCER_EPOCH, Errors.forCode(partitionProduceResponse.errorCode))
     }
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 05610413e98..01a888c1667 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -2182,6 +2182,7 @@ object TestUtils extends Logging {
     principalSerde: KafkaPrincipalSerde,
     requestChannelMetrics: RequestChannel.Metrics,
     startTimeNanos: Long,
+    dequeueTimeNanos: Long = -1,
     fromPrivilegedListener: Boolean = true
   ): RequestChannel.Request = {
     val clientId = "id"
@@ -2203,7 +2204,7 @@ object TestUtils extends Logging {
       KafkaPrincipal.ANONYMOUS, listenerName, SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY,
       fromPrivilegedListener, Optional.of(principalSerde))
 
-    new RequestChannel.Request(
+    val envelopRequest = new RequestChannel.Request(
       processor = 1,
       context = envelopeContext,
       startTimeNanos = startTimeNanos,
@@ -2212,6 +2213,8 @@ object TestUtils extends Logging {
       metrics = requestChannelMetrics,
       envelope = None
     )
+    envelopRequest.requestDequeueTimeNanos = dequeueTimeNanos
+    envelopRequest
   }
 
   def verifyNoUnexpectedThreads(context: String): Unit = {