You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2022/08/24 14:01:58 UTC
[kafka] branch trunk updated: KAFKA-14178 Don't record queue time for deferred events (#12551)
This is an automated email from the ASF dual-hosted git repository.
davidarthur pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5eff8592cc0 KAFKA-14178 Don't record queue time for deferred events (#12551)
5eff8592cc0 is described below
commit 5eff8592cc0cb57475c92bce6e80264d71a9bfbe
Author: David Arthur <mu...@gmail.com>
AuthorDate: Wed Aug 24 10:01:48 2022 -0400
KAFKA-14178 Don't record queue time for deferred events (#12551)
---
.../org/apache/kafka/controller/QuorumController.java | 18 ++++++++++++++----
1 file changed, 14 insertions(+), 4 deletions(-)
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 3fee25841ba..dcb9ab77fa6 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -709,13 +709,19 @@ public final class QuorumController implements Controller {
private final CompletableFuture<T> future;
private final ControllerWriteOperation<T> op;
private final long eventCreatedTimeNs = time.nanoseconds();
+ private final boolean deferred;
private OptionalLong startProcessingTimeNs = OptionalLong.empty();
private ControllerResultAndOffset<T> resultAndOffset;
ControllerWriteEvent(String name, ControllerWriteOperation<T> op) {
+ this(name, op, false);
+ }
+
+ ControllerWriteEvent(String name, ControllerWriteOperation<T> op, boolean deferred) {
this.name = name;
this.future = new CompletableFuture<T>();
this.op = op;
+ this.deferred = deferred;
this.resultAndOffset = null;
}
@@ -726,7 +732,11 @@ public final class QuorumController implements Controller {
@Override
public void run() throws Exception {
long now = time.nanoseconds();
- controllerMetrics.updateEventQueueTime(NANOSECONDS.toMillis(now - eventCreatedTimeNs));
+ if (!deferred) {
+ // We exclude deferred events from the event queue time metric to prevent
+ // incorrectly including the deferral time in the queue time.
+ controllerMetrics.updateEventQueueTime(NANOSECONDS.toMillis(now - eventCreatedTimeNs));
+ }
int controllerEpoch = curClaimEpoch;
if (!isActiveController()) {
throw newNotControllerException();
@@ -1163,7 +1173,7 @@ public final class QuorumController implements Controller {
private <T> void scheduleDeferredWriteEvent(String name, long deadlineNs,
ControllerWriteOperation<T> op) {
- ControllerWriteEvent<T> event = new ControllerWriteEvent<>(name, op);
+ ControllerWriteEvent<T> event = new ControllerWriteEvent<>(name, op, true);
queue.scheduleDeferred(name, new EarliestDeadlineFunction(deadlineNs), event);
event.future.exceptionally(e -> {
if (e instanceof UnknownServerException && e.getCause() != null &&
@@ -1236,7 +1246,7 @@ public final class QuorumController implements Controller {
// generated by a ControllerWriteEvent have been applied.
return result;
- });
+ }, true);
long delayNs = time.nanoseconds();
if (imbalancedScheduled == ImbalanceSchedule.DEFERRED) {
@@ -1281,7 +1291,7 @@ public final class QuorumController implements Controller {
Arrays.asList(new ApiMessageAndVersion(new NoOpRecord(), (short) 0)),
null
);
- });
+ }, true);
long delayNs = time.nanoseconds() + maxIdleIntervalNs.getAsLong();
queue.scheduleDeferred(WRITE_NO_OP_RECORD, new EarliestDeadlineFunction(delayNs), event);