You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2022/08/24 14:01:58 UTC

[kafka] branch trunk updated: KAFKA-14178 Don't record queue time for deferred events (#12551)

This is an automated email from the ASF dual-hosted git repository.

davidarthur pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5eff8592cc0 KAFKA-14178 Don't record queue time for deferred events (#12551)
5eff8592cc0 is described below

commit 5eff8592cc0cb57475c92bce6e80264d71a9bfbe
Author: David Arthur <mu...@gmail.com>
AuthorDate: Wed Aug 24 10:01:48 2022 -0400

    KAFKA-14178 Don't record queue time for deferred events (#12551)
---
 .../org/apache/kafka/controller/QuorumController.java  | 18 ++++++++++++++----
 1 file changed, 14 insertions(+), 4 deletions(-)

diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 3fee25841ba..dcb9ab77fa6 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -709,13 +709,19 @@ public final class QuorumController implements Controller {
         private final CompletableFuture<T> future;
         private final ControllerWriteOperation<T> op;
         private final long eventCreatedTimeNs = time.nanoseconds();
+        private final boolean deferred;
         private OptionalLong startProcessingTimeNs = OptionalLong.empty();
         private ControllerResultAndOffset<T> resultAndOffset;
 
         ControllerWriteEvent(String name, ControllerWriteOperation<T> op) {
+            this(name, op, false);
+        }
+
+        ControllerWriteEvent(String name, ControllerWriteOperation<T> op, boolean deferred) {
             this.name = name;
             this.future = new CompletableFuture<T>();
             this.op = op;
+            this.deferred = deferred;
             this.resultAndOffset = null;
         }
 
@@ -726,7 +732,11 @@ public final class QuorumController implements Controller {
         @Override
         public void run() throws Exception {
             long now = time.nanoseconds();
-            controllerMetrics.updateEventQueueTime(NANOSECONDS.toMillis(now - eventCreatedTimeNs));
+            if (!deferred) {
+                // We exclude deferred events from the event queue time metric to prevent
+                // incorrectly including the deferral time in the queue time.
+                controllerMetrics.updateEventQueueTime(NANOSECONDS.toMillis(now - eventCreatedTimeNs));
+            }
             int controllerEpoch = curClaimEpoch;
             if (!isActiveController()) {
                 throw newNotControllerException();
@@ -1163,7 +1173,7 @@ public final class QuorumController implements Controller {
 
     private <T> void scheduleDeferredWriteEvent(String name, long deadlineNs,
                                                 ControllerWriteOperation<T> op) {
-        ControllerWriteEvent<T> event = new ControllerWriteEvent<>(name, op);
+        ControllerWriteEvent<T> event = new ControllerWriteEvent<>(name, op, true);
         queue.scheduleDeferred(name, new EarliestDeadlineFunction(deadlineNs), event);
         event.future.exceptionally(e -> {
             if (e instanceof UnknownServerException && e.getCause() != null &&
@@ -1236,7 +1246,7 @@ public final class QuorumController implements Controller {
                 // generated by a ControllerWriteEvent have been applied.
 
                 return result;
-            });
+            }, true);
 
             long delayNs = time.nanoseconds();
             if (imbalancedScheduled == ImbalanceSchedule.DEFERRED) {
@@ -1281,7 +1291,7 @@ public final class QuorumController implements Controller {
                     Arrays.asList(new ApiMessageAndVersion(new NoOpRecord(), (short) 0)),
                     null
                 );
-            });
+            }, true);
 
             long delayNs = time.nanoseconds() + maxIdleIntervalNs.getAsLong();
             queue.scheduleDeferred(WRITE_NO_OP_RECORD, new EarliestDeadlineFunction(delayNs), event);